![](https://image.toast.com/aaaadh/real/2023/techblog/javareactormain.png)
Java Reactor onErrorContinue 이모저모
Java Reactor onErrorContinue 이모저모 관련
![](https://image.toast.com/aaaadh/real/2023/techblog/javareactormain.png)
들어가며
Reactive streams 구현체인 Pivotal의 Java Reactor에는 onErrorContinue
라는 특수한 연산자가 존재합니다. onErrorContinue
는 이름에서 유추할 수 있듯이 해당 스트림에 에러가 발생하더라도 이를 무시하고 계속 로직을 진행할 수 있게 해 주는 연산자입니다. 그런데 onErrorContinue
는 flatMap
, doOnError
, onErrorResume
과 같은 다른 연산자들과 달리 동작 방식이 조금 특별합니다. 그리고 그 특성으로 인해 사용 시에 주의가 필요하며 남용해서는 안 됩니다. onErrorContinue
는 언제 써도 좋은 간편한 연산자가 아니라 최후의 수단입니다.
과거에 onErrorContinue
의 특성으로 인해 치명적인 버그가 발생한 적이 있습니다. 하지만 지금은 수정된 버그이니 최신 버전(spring-boot-starter-webflux 2.7.6 버전 이상 또는 reactor-core 3.4.25 이상)의 Reactor를 사용하고 있으시다면 안심하시고 재미로 읽어 주세요!
onErrorContinue
Reactor에서 에러란,
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
기본적으로 Reactor에서 에러는 터미널 이벤트입니다. 에러가 발생했다는 것은 해당 Reactive 스트림이 종료된다는 의미이죠.
그런데 이러한 정의를 무시하는 한 연산자가 존재하는데, 바로 onErrorContinue
입니다.
onErrorContinue
는 Mono, Flux와 같이 Publisher의 구현체에서 제공하는 연산자이며, 아래와 같이 사용할 수 있습니다.
public Flux<String> doSomething() {
return someClient.do()
.flatMap(this::something)
.onErrorContinue((e, obj) -> log.error("Failed to do something.", e));
}
onErrorContinue
를 사용함으로써 someClient.do()
메서드나 flatMap(this::something)
에서 에러를 반환하더라도 해당 스트림은 종료되지 않습니다. 추가적으로 onErrorContinue
에 콜백 함수를 등록하여 에러 발생 시 호출될 함수를 등록할 수도 있습니다.
위 예제 코드만 보면 onErrorContiue
는 다른 연산자들과 별다른 차이가 없어 보입니다. 그런데 onErrorContinue
는 map, flatMap 등과 매우 큰 차이점이 하나 존재하는데, 바로 onErrorContinue
는 업스트림에 영향을 준다는 것입니다.
![(출처: <FontIcon icon="fas fa-globe"/>Reactor Flux 공식 문서](https://image.toast.com/aaaadh/real/2023/techblog/1(2).png)
공식 문서에서 확인할 수 있듯 onErrorContinue
는 업스트림에 영향을 주는 연산자입니다.
직접 예제로 살펴보겠습니다.
Flux.just("hello", "world", "error", "foo", "bar")
.flatMap(value -> value.equals("error") ? Flux.error(new Exception("Error occurred.")) : Flux.just(value))
.doOnError(e -> log.error("On error.", e))
.onErrorContinue((e, value) -> log.error("On error continue.", e))
.subscribe(
value -> log.info("On success subscribed. : value={}", value),
e -> log.error("On error subscribed.", e)
);
위와 같은 코드가 있다고 가정하겠습니다. 위 Flux는 그저 단순히 메시지를 다운스트림으로 내보내며, subscriber는 그 메시지를 출력합니다. 단 메시지가 error
라면 에러를 발생시킵니다.
위 Flux 스트림은 중간에 onErrorContinue
를 사용하기 때문에 중간에 error
메시지 값으로 인해서 에러가 발생하더라도 Flux는 중단되지 않으며 정상적으로 hello
, world
, foo
, bar
를 출력합니다. 그런데 여기서 유의해야 할 점은 subscribe
의 에러 헨들러뿐만이 아니라 onErrorContinue
위에 존재하는 doOnError
도 호출되지 않는다는 것입니다. onErrorContinue
가 사용되는 순간, 동작을 예측하기 매우 힘들어집니다. 다른 연산자와는 다르게 업스트림에 영향을 주며 모든 업스트림 에러를 무시하기 때문에 잘못 사용하면 치명적인 버그가 발생할 수 있습니다.
이처럼 onErrorContinue
는 다운스트림뿐만이 아니라 업스트림에도 영향을 주며 실질적으로 해당 스트림에 존재하는 모든 에러를 무시하게 만듭니다. 이러한 특성으로 인해서 과거 이상한 버그가 존재했던 적이 있습니다.
RetryWhen
+ onErrorContinue
앞에서 언급했듯이 reactor-core 3.4.25, spring-boot-starter-webflux 2.7.6 버전에서 해결된 버그입니다. 참고로 onErrorContinue
의 동작 방식이 변경되면서 나타난 버그이기 때문에 매우 옛날 버전에서도 버그가 없을 수 있습니다.
retryWhen
연산자는 에러가 발생한 경우 특정 조건을 참고하여 재시도하는 기능을 제공하는 연산자이며, onErrorContinue
는 업스트림의 에러를 무시하도록 만드는 연산자입니다.
그렇다면 이 두 개를 함께 사용하면 어떻게 될까요? 과거 이와 관련된 Reactor 버그가 존재하기도 하였습니다. 아래는 해당 버그를 재현하는 코드입니다.
@Test
void test() {
Flux<String> fluxStream = Flux.range(1, 1000)
.flatMap(i -> {
log.info("i={}", i);
return Flux.<String>error(new RuntimeException("Exception occurred."))
.retryWhen(Retry.max(5));
}, 10)
.doOnTerminate(() -> log.info("terminated"))
.onErrorContinue((ex, obj) -> log.error("The onErrorContinue called. : obj={}", obj));
StepVerifier.create(fluxStream)
.verifyComplete();
}
아직 버그가 존재하던 2.7.6 미만에서 위 테스트 코드를 실행해 보면 테스트가 끝나지 않는 것을 볼 수 있을 겁니다. 유의해서 봐야 할 점이라면 콘솔에서 i=10
출력을 마지막으로 더 이상 아무런 일도 하지 않는다는 것 입니다. 10이라는 숫자는 위 코드에서 flatMap에 concurrency로 지정한 값입니다. flatMap은 concurrency 수(기본값: 256)만큼 동시에 진행합니다. 즉, 10개의 스트림이 동시에 진행되고 있지만 끝나지 않고 있다는 것을 의미합니다.
이는 retryWhen
과 onErrorContinue
가 같이 사용될 경우 발생하는 현상입니다. 논리적으로 생각해 봐도 이상하기는 합니다.
retryWhen
은 에러를 감지하고 재시도를 하는데 onErrorContinue
에 의해서 모든 에러가 무시되기 때문입니다. 즉, Flux.error
때문에 retryWhen에서 재시도를 준비하고 있는데 발생한 에러(Exception)를 전달 받지 못해 계속 기다리고 있는 것입니다.
retryWhen과 onErrorContinue
의 동작 방식
잠시 Reactive-stream에 대해서 잘 모르는 분을 위해서 조금만 설명하고 넘어가겠습니다. 기본적으로 Reactive-stream은 Publisher와 Subscriber가 존재하는 발행/구독 모델 구조입니다. flatMap
, map
등을 통해서 코드를 연결하더라도 내부적으로는 모두 발행 구독을 통해서 로직이 수행됩니다.
upstream.flatMap(A)
.map(B)
.flatMap(C)
.subscribe()
예를 들어 위와 같은 코드가 있다고 할 때 flatMap(A)
는 upstream
을, map(B)
은 flatMap(A)
을, flatMap(C)
을 구독합니다. 그리고 반대로 upstream
은 flatMap(A)
에게, flatMap(A)
는 map(B)
에게, map(B)
는 flatMap(C)
에게 메시지를 발행합니다. 이 흐름을 조금 더 상세하게 살펴보면 다음과 같습니다(Subscription은 생략).
![](https://image.toast.com/aaaadh/real/2023/techblog/02uB300uC9C0 1(5).png)
Reactor는 Reactive-stream의 구현체이며, Reactor가 제공하는 대부분의 기능들은 위 흐름을 벗어나지 않습니다. 이는 Retry 기능도 마찬가지입니다.
retryWhen
retryWhen
은 FluxRetryWhen
클래스에 Subscriber와 Publisher가 구현되어 있습니다. FluxRetryWhen은 특이하게 메인(main) Subscriber와 동반(companion) Subscriber로 나누어집니다. 메인 Subscriber는 업스트림으로부터 받은 값은 그저 다운스트림으로 전달합니다. 하지만 업스트림으로부터 에러를 받으면 메인 Subscriber는 동반 Subscriber에 시그널을 보내서 재시도할지 여부를 확인하며, 만약 재시도하기로 결정될 경우 메인 Subscriber는 업스트림을 재구독(resubscribe)합니다. 각각 RetryWhenMainSubscriber
클래스와 RetryWhenOtherSubscriber
클래스를 통해 구현되며 대락적인 흐름은 아래와 같습니다.
![](https://image.toast.com/aaaadh/real/2023/techblog/03uB300uC9C0 1(3).png)
![](https://image.toast.com/aaaadh/real/2023/techblog/04uB300uC9C0 1(2).png)
retryWhen operator를 사용하면 파라미터로 RetrySpec을 전달하도록 되어 있는데 이 RetrySpec들을 구성하여 동반 Subscriber의 스트림이 구성됩니다. 대략적으로 아래와 같은 형태라고 할 수 있습니다.
(물론 아래 코드는 설명을 위해 많은 부분이 생략된 대락적인 코드입니다. 실제 코드는 더 많고 복잡합니다.)
retrySignalFlux.concatMap(retrySpec)
.subscribe(retryWhenOtherSubscriber);
동반 Subscriber(retryWhenOtherSubscriber
)는 retrySpec으로부터 발행 받은 결과(onNext
, onError
등)를 바탕으로 재시도할 지 여부를 결정하고 실행합니다. 참고로 retrySignalFlux
는 retryWhenOtherSubscriber
의 멤버 필드로 retryWhenOtherSubscriber를
구독하면 내부적으로 retrySignalFlux
를 구독합니다. 사실상 retryWhenOtherSubscriber
가 발행도 하고 구독도 하는 Processor라고 봐도 무방합니다. 중요한 것은 재시도 수행 여부를 결정하는 로직도 Reactor publisher, subscriber에 의해서 결정되며 concatMap을 통해서 연결하고 있다는 것입니다. 이 연산자는 onErrorContinue
에 의해서 영향을 받는 연산자입니다. 더 이야기하기 전에 onErrorContinue
에 대해서 좀 더 자세히 살펴보겠습니다.
onErrorContinue
앞에서 언급한 것처럼 onErrorContinue
는 다운스트림뿐만이 아니라 업스트림에도 영향을 주는 연산자입니다. 때문에 map
, flatMap
, filter
같은 다른 일반적인 연산자와는 구현 방식이 조금 차이가 있습니다. 무엇보다 onErrorContinue
는 구현체가 없습니다. 즉, FluxMap 클래스, FluxRetryWhen
클래스 같은 Publisher/Subscriber 구조를 사용하는 것이 아니라 Reactor Context를 직접 사용합니다.
Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.resume(genericConsumer)
);
onErrorContinue
연산자를 사용하는 순간 위와 같은 Context key/value가 세팅됩니다. 그리고 onErrorContinue 연산자가 사용된 곳의 업스트림에서 Context의 OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY
값을 꺼내 사용하는 것이죠. Reactor는 내부적으로 이런 연산자를 구현할 때 사용하는 유틸리티 성격의 클래스인 Operators 클래스가 존재합니다. Operators에서 제공하는 몇몇 기능들을 위 Context의 플래그를 사용하여 적절한 로직을 수행합니다.
예를 들어, onNext 중 발생한 에러를 FailureStrategy를 참고하여 필요한 작업을 수행 및 추출하는 onNextError
라는 기능도 제공하고 있습니다. onErrorContinue 연산자를 사용했다면 onNextError
는 위 OnNextFailureStrategy.resume(consumer)
를 사용하여 에러랑 관련된 필요한 로직을 수행하고 null
을 반환하죠.
그리고 flatMap, concatMap 등에서는 에러 없음(null)이라고 판단하고 마저 원래 로직을 수행합니다.
retryWhen + onErrorContinue로 인한 중단 버그
원인
@Test
void test() {
Flux<String> fluxStream = Flux.range(1, 1000)
.flatMap(i -> {
log.info("i={}", i);
return Flux.<String>error(new RuntimeException("asd"))
.retryWhen(Retry.max(5));
}, 10)
.doOnTerminate(() -> log.info("terminated"))
.onErrorContinue((ex, obj) -> log.error("The onErrorContinue called. : obj={}", obj));
StepVerifier.create(fluxStream)
.verifyComplete();
}
앞에서 살펴봤던 버그를 재현하는 코드입니다. 정확히 10번 에러가 발생하고 나서 위 Flux 스트림은 먹통이 되어 버립니다.
retryWhen
은 동반 subscriber에 의해서 RetrySpec의 로직이 concatMap
을 통해서 연결됩니다. 그리고 에러 시그널을 받으면 설정했던 RetrySpec을 통해서 재시도 또는 중지 여부를 결정합니다. 이때 중요한 점은 RetrySpec 로직은 더 이상 재시도를 하면 안 되는 경우, 예를 들어, 최대 재시도 횟수를 초과한 경우 에러를 반환합니다. 그럼 동반 subscriber는 이를 확인하고 메인 subscriber를 중지시킵니다. 그런데 만약 어디선가 onErrorContinue
가 사용되었을 경우 RetrySpec을 연동할 때 사용되었던 concatMap
이 RetrySpec
에서 반환한 에러를 무시하게 됩니다.
// concatMap 연산자의 에러 처리 로직 중 일부
catch (Throwable e) {
Throwable e_ = Operators.onNextError(v, e, this.ctx, s);
if (e_ != null) {
actual.onError(Operators.onOperatorError(s, e, v, this.ctx));
return;
}
else {
continue;
}
}
// ........
위 코드는 concatMap
연산자의 구현 중 에러를 처리하는 부분 중 일부입니다. onErrorContinue
가 사용되었다면 Operators.onNextError
는 null을 반환하며 로직상 continue
가 수행되고 해당 에러는 아무런 조치 없이 다음 작업으로 넘어가게 됩니다. retryWhen 연산자의 동반 subscriber가 에러를 받지 못해 계속 재시도를 할지, 이만 재시도를 중지할지 결정하지 못하는 것이죠...
이로 인해서 retryWhen
이 사용된 스트림이 완료되지 못하고 계속 대기하면서 flatMap
의 concurrency를 계속 점유하고, 결국 모든 max concurrency에 도달하여 이후 작업들이 처리되지 못하는 것입니다.
해결
앞에서 언급드린 것처럼 이 버그는 현재 해결된 상태입니다.어떻게 해결했을까요? 생각보다 간단한 방법으로 해결했습니다.
다음은 버그를 수정한 버전 코드의 대략적인 코드입니다.
retrySignalFlux.deferContextual(cv ->
retrySignalFlux.contextWrite(cv) // context 복구
.concatMap(retrySpec)
.contextWrite(c -> Context.empty()) // context 비우기
)
.subscribe(retryWhenOtherSubscriber);
retrySpec
이 수행될 영역에서만 context를 비워 버리는 것입니다.
딱 2줄 수정해서 버그를 수정한 것이죠. 뭔가 찝찝하기는 하지만 다른 곳에 영향 없이 retrySpec 부분만 수정해서 간단하게(?) 해결한 것 같습니다.
나가며
앞에서 언급한 버그는 현재 수정되었지만 onErrorContinue
는 여전히 사용에 주의해야 하는 연산자입니다. 공식 문서에서도 가능하면 onErrorContinue
보다는 onErrorResume
같은 것을 응용해서 해결하는 것을 권장하고 있습니다. 심지어 Reactor의 한 개발자는 onErrorContinue
API 디자인은 큰 실수라고 언급할 정도였죠.
잊지 마세요! onErrorContinue
는 항상 사용에 주의해야 하며 onErrorContinue
를 사용하기 전에 다른 대안이 없을지 먼저 고민해 보세요.
참고 문헌
- https://projectreactor.io/docs/core/release/reference/
- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
- https://github.com/reactor/reactor-core/issues/2184
- https://github.com/reactor/reactor-core/pull/3262