Skip to main content

Java Reactor onErrorContinue 이모저모

About 3 minJavaSpringSpring BootArticle(s)blogmeetup.nhncloud.comjavaspringspringbootspring-bootjava-reactor

Java Reactor onErrorContinue 이모저모 관련

Spring > Article(s)

Article(s)

Java Reactor onErrorContinue 이모저모 | NHN Cloud Meetup
Java Reactor onErrorContinue 이모저모

들어가며

Reactive streams 구현체인 Pivotal의 Java Reactor에는 onErrorContinue라는 특수한 연산자가 존재합니다. onErrorContinue는 이름에서 유추할 수 있듯이 해당 스트림에 에러가 발생하더라도 이를 무시하고 계속 로직을 진행할 수 있게 해 주는 연산자입니다. 그런데 onErrorContinueflatMap, doOnError, onErrorResume과 같은 다른 연산자들과 달리 동작 방식이 조금 특별합니다. 그리고 그 특성으로 인해 사용 시에 주의가 필요하며 남용해서는 안 됩니다. onErrorContinue는 언제 써도 좋은 간편한 연산자가 아니라 최후의 수단입니다.

과거에 onErrorContinue의 특성으로 인해 치명적인 버그가 발생한 적이 있습니다. 하지만 지금은 수정된 버그이니 최신 버전(spring-boot-starter-webflux 2.7.6 버전 이상 또는 reactor-core 3.4.25 이상)의 Reactor를 사용하고 있으시다면 안심하시고 재미로 읽어 주세요!


onErrorContinue

Reactor에서 에러란,

Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.

기본적으로 Reactor에서 에러는 터미널 이벤트입니다. 에러가 발생했다는 것은 해당 Reactive 스트림이 종료된다는 의미이죠.

그런데 이러한 정의를 무시하는 한 연산자가 존재하는데, 바로 onErrorContinue입니다.

onErrorContinue는 Mono, Flux와 같이 Publisher의 구현체에서 제공하는 연산자이며, 아래와 같이 사용할 수 있습니다.

public Flux<String> doSomething() {
    return someClient.do()
        .flatMap(this::something)
        .onErrorContinue((e, obj) -> log.error("Failed to do something.", e));
}

onErrorContinue를 사용함으로써 someClient.do() 메서드나 flatMap(this::something)에서 에러를 반환하더라도 해당 스트림은 종료되지 않습니다. 추가적으로 onErrorContinue에 콜백 함수를 등록하여 에러 발생 시 호출될 함수를 등록할 수도 있습니다.

위 예제 코드만 보면 onErrorContiue는 다른 연산자들과 별다른 차이가 없어 보입니다. 그런데 onErrorContinue는 map, flatMap 등과 매우 큰 차이점이 하나 존재하는데, 바로 onErrorContinue는 업스트림에 영향을 준다는 것입니다.

(출처: <FontIcon icon="fas fa-globe"/>Reactor Flux 공식 문서
(출처: Reactor Flux 공식 문서open in new window

공식 문서에서 확인할 수 있듯 onErrorContinue는 업스트림에 영향을 주는 연산자입니다.

직접 예제로 살펴보겠습니다.

Flux.just("hello", "world", "error", "foo", "bar")
    .flatMap(value -> value.equals("error") ? Flux.error(new Exception("Error occurred.")) : Flux.just(value))
    .doOnError(e -> log.error("On error.", e))
    .onErrorContinue((e, value) -> log.error("On error continue.", e))
    .subscribe(
        value -> log.info("On success subscribed. : value={}", value),
        e -> log.error("On error subscribed.", e)
    );

위와 같은 코드가 있다고 가정하겠습니다. 위 Flux는 그저 단순히 메시지를 다운스트림으로 내보내며, subscriber는 그 메시지를 출력합니다. 단 메시지가 error라면 에러를 발생시킵니다.

위 Flux 스트림은 중간에 onErrorContinue를 사용하기 때문에 중간에 error 메시지 값으로 인해서 에러가 발생하더라도 Flux는 중단되지 않으며 정상적으로 hello, world, foo, bar를 출력합니다. 그런데 여기서 유의해야 할 점은 subscribe의 에러 헨들러뿐만이 아니라 onErrorContinue 위에 존재하는 doOnError도 호출되지 않는다는 것입니다. onErrorContinue가 사용되는 순간, 동작을 예측하기 매우 힘들어집니다. 다른 연산자와는 다르게 업스트림에 영향을 주며 모든 업스트림 에러를 무시하기 때문에 잘못 사용하면 치명적인 버그가 발생할 수 있습니다.

이처럼 onErrorContinue는 다운스트림뿐만이 아니라 업스트림에도 영향을 주며 실질적으로 해당 스트림에 존재하는 모든 에러를 무시하게 만듭니다. 이러한 특성으로 인해서 과거 이상한 버그가 존재했던 적이 있습니다.


RetryWhen + onErrorContinue

앞에서 언급했듯이 reactor-core 3.4.25, spring-boot-starter-webflux 2.7.6 버전에서 해결된 버그입니다. 참고로 onErrorContinue의 동작 방식이 변경되면서 나타난 버그이기 때문에 매우 옛날 버전에서도 버그가 없을 수 있습니다.

retryWhen 연산자는 에러가 발생한 경우 특정 조건을 참고하여 재시도하는 기능을 제공하는 연산자이며, onErrorContinue는 업스트림의 에러를 무시하도록 만드는 연산자입니다.

그렇다면 이 두 개를 함께 사용하면 어떻게 될까요? 과거 이와 관련된 Reactor 버그가 존재하기도 하였습니다. 아래는 해당 버그를 재현하는 코드입니다.

    @Test
    void test() {
        Flux<String> fluxStream = Flux.range(1, 1000)
            .flatMap(i -> {
                log.info("i={}", i);
                return Flux.<String>error(new RuntimeException("Exception occurred."))
                    .retryWhen(Retry.max(5));
            }, 10)
            .doOnTerminate(() -> log.info("terminated"))
            .onErrorContinue((ex, obj) -> log.error("The onErrorContinue called. : obj={}", obj));

        StepVerifier.create(fluxStream)
            .verifyComplete();
    }

아직 버그가 존재하던 2.7.6 미만에서 위 테스트 코드를 실행해 보면 테스트가 끝나지 않는 것을 볼 수 있을 겁니다. 유의해서 봐야 할 점이라면 콘솔에서 i=10 출력을 마지막으로 더 이상 아무런 일도 하지 않는다는 것 입니다. 10이라는 숫자는 위 코드에서 flatMap에 concurrency로 지정한 값입니다. flatMap은 concurrency 수(기본값: 256)만큼 동시에 진행합니다. 즉, 10개의 스트림이 동시에 진행되고 있지만 끝나지 않고 있다는 것을 의미합니다.

이는 retryWhenonErrorContinue가 같이 사용될 경우 발생하는 현상입니다. 논리적으로 생각해 봐도 이상하기는 합니다.

retryWhen은 에러를 감지하고 재시도를 하는데 onErrorContinue에 의해서 모든 에러가 무시되기 때문입니다. 즉, Flux.error 때문에 retryWhen에서 재시도를 준비하고 있는데 발생한 에러(Exception)를 전달 받지 못해 계속 기다리고 있는 것입니다.


retryWhen과 onErrorContinue의 동작 방식

잠시 Reactive-stream에 대해서 잘 모르는 분을 위해서 조금만 설명하고 넘어가겠습니다. 기본적으로 Reactive-stream은 Publisher와 Subscriber가 존재하는 발행/구독 모델 구조입니다. flatMap, map 등을 통해서 코드를 연결하더라도 내부적으로는 모두 발행 구독을 통해서 로직이 수행됩니다.

upstream.flatMap(A)
    .map(B)
    .flatMap(C)
    .subscribe()

예를 들어 위와 같은 코드가 있다고 할 때 flatMap(A)upstream을, map(B)flatMap(A)을, flatMap(C)을 구독합니다. 그리고 반대로 upstreamflatMap(A)에게, flatMap(A)map(B)에게, map(B)flatMap(C)에게 메시지를 발행합니다. 이 흐름을 조금 더 상세하게 살펴보면 다음과 같습니다(Subscription은 생략).

Reactor는 Reactive-stream의 구현체이며, Reactor가 제공하는 대부분의 기능들은 위 흐름을 벗어나지 않습니다. 이는 Retry 기능도 마찬가지입니다.

retryWhen

retryWhenFluxRetryWhen 클래스에 Subscriber와 Publisher가 구현되어 있습니다. FluxRetryWhen은 특이하게 메인(main) Subscriber와 동반(companion) Subscriber로 나누어집니다. 메인 Subscriber는 업스트림으로부터 받은 값은 그저 다운스트림으로 전달합니다. 하지만 업스트림으로부터 에러를 받으면 메인 Subscriber는 동반 Subscriber에 시그널을 보내서 재시도할지 여부를 확인하며, 만약 재시도하기로 결정될 경우 메인 Subscriber는 업스트림을 재구독(resubscribe)합니다. 각각 RetryWhenMainSubscriber 클래스와 RetryWhenOtherSubscriber 클래스를 통해 구현되며 대락적인 흐름은 아래와 같습니다.

retryWhen operator를 사용하면 파라미터로 RetrySpec을 전달하도록 되어 있는데 이 RetrySpec들을 구성하여 동반 Subscriber의 스트림이 구성됩니다. 대략적으로 아래와 같은 형태라고 할 수 있습니다.
(물론 아래 코드는 설명을 위해 많은 부분이 생략된 대락적인 코드입니다. 실제 코드는 더 많고 복잡합니다.)

retrySignalFlux.concatMap(retrySpec)
        .subscribe(retryWhenOtherSubscriber);

동반 Subscriber(retryWhenOtherSubscriber)는 retrySpec으로부터 발행 받은 결과(onNext, onError 등)를 바탕으로 재시도할 지 여부를 결정하고 실행합니다. 참고로 retrySignalFluxretryWhenOtherSubscriber의 멤버 필드로 retryWhenOtherSubscriber를 구독하면 내부적으로 retrySignalFlux를 구독합니다. 사실상 retryWhenOtherSubscriber가 발행도 하고 구독도 하는 Processor라고 봐도 무방합니다. 중요한 것은 재시도 수행 여부를 결정하는 로직도 Reactor publisher, subscriber에 의해서 결정되며 concatMap을 통해서 연결하고 있다는 것입니다. 이 연산자는 onErrorContinue에 의해서 영향을 받는 연산자입니다. 더 이야기하기 전에 onErrorContinue에 대해서 좀 더 자세히 살펴보겠습니다.

onErrorContinue

앞에서 언급한 것처럼 onErrorContinue는 다운스트림뿐만이 아니라 업스트림에도 영향을 주는 연산자입니다. 때문에 map, flatMap, filter 같은 다른 일반적인 연산자와는 구현 방식이 조금 차이가 있습니다. 무엇보다 onErrorContinue는 구현체가 없습니다. 즉, FluxMap 클래스, FluxRetryWhen 클래스 같은 Publisher/Subscriber 구조를 사용하는 것이 아니라 Reactor Context를 직접 사용합니다.

Context.of(
        OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
        OnNextFailureStrategy.resume(genericConsumer)
);

onErrorContinue 연산자를 사용하는 순간 위와 같은 Context key/value가 세팅됩니다. 그리고 onErrorContinue 연산자가 사용된 곳의 업스트림에서 Context의 OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY 값을 꺼내 사용하는 것이죠. Reactor는 내부적으로 이런 연산자를 구현할 때 사용하는 유틸리티 성격의 클래스인 Operators 클래스가 존재합니다. Operators에서 제공하는 몇몇 기능들을 위 Context의 플래그를 사용하여 적절한 로직을 수행합니다.

예를 들어, onNext 중 발생한 에러를 FailureStrategy를 참고하여 필요한 작업을 수행 및 추출하는 onNextError라는 기능도 제공하고 있습니다. onErrorContinue 연산자를 사용했다면 onNextError는 위 OnNextFailureStrategy.resume(consumer)를 사용하여 에러랑 관련된 필요한 로직을 수행하고 null을 반환하죠.

그리고 flatMap, concatMap 등에서는 에러 없음(null)이라고 판단하고 마저 원래 로직을 수행합니다.


retryWhen + onErrorContinue로 인한 중단 버그

원인

    @Test
    void test() {
        Flux<String> fluxStream = Flux.range(1, 1000)
            .flatMap(i -> {
                log.info("i={}", i);
                return Flux.<String>error(new RuntimeException("asd"))
                    .retryWhen(Retry.max(5));
            }, 10)
            .doOnTerminate(() -> log.info("terminated"))
            .onErrorContinue((ex, obj) -> log.error("The onErrorContinue called. : obj={}", obj));

        StepVerifier.create(fluxStream)
            .verifyComplete();
    }

앞에서 살펴봤던 버그를 재현하는 코드입니다. 정확히 10번 에러가 발생하고 나서 위 Flux 스트림은 먹통이 되어 버립니다.

retryWhen은 동반 subscriber에 의해서 RetrySpec의 로직이 concatMap을 통해서 연결됩니다. 그리고 에러 시그널을 받으면 설정했던 RetrySpec을 통해서 재시도 또는 중지 여부를 결정합니다. 이때 중요한 점은 RetrySpec 로직은 더 이상 재시도를 하면 안 되는 경우, 예를 들어, 최대 재시도 횟수를 초과한 경우 에러를 반환합니다. 그럼 동반 subscriber는 이를 확인하고 메인 subscriber를 중지시킵니다. 그런데 만약 어디선가 onErrorContinue가 사용되었을 경우 RetrySpec을 연동할 때 사용되었던 concatMapRetrySpec에서 반환한 에러를 무시하게 됩니다.

    // concatMap 연산자의 에러 처리 로직 중 일부            
    catch (Throwable e) {
        Throwable e_ = Operators.onNextError(v, e, this.ctx, s);
        if (e_ != null) {
            actual.onError(Operators.onOperatorError(s, e, v, this.ctx));
            return;
        }
        else {
            continue;
        }
    }
    // ........

위 코드는 concatMap 연산자의 구현 중 에러를 처리하는 부분 중 일부입니다. onErrorContinue가 사용되었다면 Operators.onNextError는 null을 반환하며 로직상 continue가 수행되고 해당 에러는 아무런 조치 없이 다음 작업으로 넘어가게 됩니다. retryWhen 연산자의 동반 subscriber가 에러를 받지 못해 계속 재시도를 할지, 이만 재시도를 중지할지 결정하지 못하는 것이죠...

이로 인해서 retryWhen이 사용된 스트림이 완료되지 못하고 계속 대기하면서 flatMap의 concurrency를 계속 점유하고, 결국 모든 max concurrency에 도달하여 이후 작업들이 처리되지 못하는 것입니다.

해결

앞에서 언급드린 것처럼 이 버그는 현재 해결된 상태입니다.어떻게 해결했을까요? 생각보다 간단한 방법으로 해결했습니다.

다음은 버그를 수정한 버전 코드의 대략적인 코드입니다.

retrySignalFlux.deferContextual(cv -> 
        retrySignalFlux.contextWrite(cv)            // context 복구
            .concatMap(retrySpec)
            .contextWrite(c -> Context.empty())     // context 비우기
    )
    .subscribe(retryWhenOtherSubscriber);

retrySpec이 수행될 영역에서만 context를 비워 버리는 것입니다.

딱 2줄 수정해서 버그를 수정한 것이죠. 뭔가 찝찝하기는 하지만 다른 곳에 영향 없이 retrySpec 부분만 수정해서 간단하게(?) 해결한 것 같습니다.


나가며

앞에서 언급한 버그는 현재 수정되었지만 onErrorContinue는 여전히 사용에 주의해야 하는 연산자입니다. 공식 문서에서도 가능하면 onErrorContinue보다는 onErrorResume 같은 것을 응용해서 해결하는 것을 권장하고 있습니다. 심지어 Reactor의 한 개발자는 onErrorContinue API 디자인은 큰 실수라고 언급할 정도였죠.

잊지 마세요! onErrorContinue는 항상 사용에 주의해야 하며 onErrorContinue를 사용하기 전에 다른 대안이 없을지 먼저 고민해 보세요.

참고 문헌


이찬희 (MarkiiimarK)
Never Stop Learning.