Java Reactor onErrorContinue ์ด๋ชจ์ ๋ชจ
Java Reactor onErrorContinue ์ด๋ชจ์ ๋ชจ ๊ด๋ จ
๋ค์ด๊ฐ๋ฉฐ
Reactive streams ๊ตฌํ์ฒด์ธ Pivotal์ Java Reactor์๋ onErrorContinue
๋ผ๋ ํน์ํ ์ฐ์ฐ์๊ฐ ์กด์ฌํฉ๋๋ค. onErrorContinue
๋ ์ด๋ฆ์์ ์ ์ถํ ์ ์๋ฏ์ด ํด๋น ์คํธ๋ฆผ์ ์๋ฌ๊ฐ ๋ฐ์ํ๋๋ผ๋ ์ด๋ฅผ ๋ฌด์ํ๊ณ ๊ณ์ ๋ก์ง์ ์งํํ ์ ์๊ฒ ํด ์ฃผ๋ ์ฐ์ฐ์์
๋๋ค. ๊ทธ๋ฐ๋ฐ onErrorContinue
๋ flatMap
, doOnError
, onErrorResume
๊ณผ ๊ฐ์ ๋ค๋ฅธ ์ฐ์ฐ์๋ค๊ณผ ๋ฌ๋ฆฌ ๋์ ๋ฐฉ์์ด ์กฐ๊ธ ํน๋ณํฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ ๊ทธ ํน์ฑ์ผ๋ก ์ธํด ์ฌ์ฉ ์์ ์ฃผ์๊ฐ ํ์ํ๋ฉฐ ๋จ์ฉํด์๋ ์ ๋ฉ๋๋ค. onErrorContinue
๋ ์ธ์ ์จ๋ ์ข์ ๊ฐํธํ ์ฐ์ฐ์๊ฐ ์๋๋ผ ์ตํ์ ์๋จ์
๋๋ค.
๊ณผ๊ฑฐ์ onErrorContinue
์ ํน์ฑ์ผ๋ก ์ธํด ์น๋ช
์ ์ธ ๋ฒ๊ทธ๊ฐ ๋ฐ์ํ ์ ์ด ์์ต๋๋ค. ํ์ง๋ง ์ง๊ธ์ ์์ ๋ ๋ฒ๊ทธ์ด๋ ์ต์ ๋ฒ์ (spring-boot-starter-webflux 2.7.6 ๋ฒ์ ์ด์ ๋๋ reactor-core 3.4.25 ์ด์)์ Reactor๋ฅผ ์ฌ์ฉํ๊ณ ์์ผ์๋ค๋ฉด ์์ฌํ์๊ณ ์ฌ๋ฏธ๋ก ์ฝ์ด ์ฃผ์ธ์!
onErrorContinue
Reactor์์ ์๋ฌ๋,
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
๊ธฐ๋ณธ์ ์ผ๋ก Reactor์์ ์๋ฌ๋ ํฐ๋ฏธ๋ ์ด๋ฒคํธ์ ๋๋ค. ์๋ฌ๊ฐ ๋ฐ์ํ๋ค๋ ๊ฒ์ ํด๋น Reactive ์คํธ๋ฆผ์ด ์ข ๋ฃ๋๋ค๋ ์๋ฏธ์ด์ฃ .
๊ทธ๋ฐ๋ฐ ์ด๋ฌํ ์ ์๋ฅผ ๋ฌด์ํ๋ ํ ์ฐ์ฐ์๊ฐ ์กด์ฌํ๋๋ฐ, ๋ฐ๋ก onErrorContinue
์
๋๋ค.
onErrorContinue
๋ Mono, Flux์ ๊ฐ์ด Publisher์ ๊ตฌํ์ฒด์์ ์ ๊ณตํ๋ ์ฐ์ฐ์์ด๋ฉฐ, ์๋์ ๊ฐ์ด ์ฌ์ฉํ ์ ์์ต๋๋ค.
public Flux<String> doSomething() {
return someClient.do()
.flatMap(this::something)
.onErrorContinue((e, obj) -> log.error("Failed to do something.", e));
}
onErrorContinue
๋ฅผ ์ฌ์ฉํจ์ผ๋ก์จ someClient.do()
๋ฉ์๋๋ flatMap(this::something)
์์ ์๋ฌ๋ฅผ ๋ฐํํ๋๋ผ๋ ํด๋น ์คํธ๋ฆผ์ ์ข
๋ฃ๋์ง ์์ต๋๋ค. ์ถ๊ฐ์ ์ผ๋ก onErrorContinue
์ ์ฝ๋ฐฑ ํจ์๋ฅผ ๋ฑ๋กํ์ฌ ์๋ฌ ๋ฐ์ ์ ํธ์ถ๋ ํจ์๋ฅผ ๋ฑ๋กํ ์๋ ์์ต๋๋ค.
์ ์์ ์ฝ๋๋ง ๋ณด๋ฉด onErrorContiue
๋ ๋ค๋ฅธ ์ฐ์ฐ์๋ค๊ณผ ๋ณ๋ค๋ฅธ ์ฐจ์ด๊ฐ ์์ด ๋ณด์
๋๋ค. ๊ทธ๋ฐ๋ฐ onErrorContinue
๋ map, flatMap ๋ฑ๊ณผ ๋งค์ฐ ํฐ ์ฐจ์ด์ ์ด ํ๋ ์กด์ฌํ๋๋ฐ, ๋ฐ๋ก onErrorContinue
๋ ์
์คํธ๋ฆผ์ ์ํฅ์ ์ค๋ค๋ ๊ฒ์
๋๋ค.
๊ณต์ ๋ฌธ์์์ ํ์ธํ ์ ์๋ฏ onErrorContinue
๋ ์
์คํธ๋ฆผ์ ์ํฅ์ ์ฃผ๋ ์ฐ์ฐ์์
๋๋ค.
์ง์ ์์ ๋ก ์ดํด๋ณด๊ฒ ์ต๋๋ค.
Flux.just("hello", "world", "error", "foo", "bar")
.flatMap(value -> value.equals("error") ? Flux.error(new Exception("Error occurred.")) : Flux.just(value))
.doOnError(e -> log.error("On error.", e))
.onErrorContinue((e, value) -> log.error("On error continue.", e))
.subscribe(
value -> log.info("On success subscribed. : value={}", value),
e -> log.error("On error subscribed.", e)
);
์์ ๊ฐ์ ์ฝ๋๊ฐ ์๋ค๊ณ ๊ฐ์ ํ๊ฒ ์ต๋๋ค. ์ Flux๋ ๊ทธ์ ๋จ์ํ ๋ฉ์์ง๋ฅผ ๋ค์ด์คํธ๋ฆผ์ผ๋ก ๋ด๋ณด๋ด๋ฉฐ, subscriber๋ ๊ทธ ๋ฉ์์ง๋ฅผ ์ถ๋ ฅํฉ๋๋ค. ๋จ ๋ฉ์์ง๊ฐ error
๋ผ๋ฉด ์๋ฌ๋ฅผ ๋ฐ์์ํต๋๋ค.
์ Flux ์คํธ๋ฆผ์ ์ค๊ฐ์ onErrorContinue
๋ฅผ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ ์ค๊ฐ์ error
๋ฉ์์ง ๊ฐ์ผ๋ก ์ธํด์ ์๋ฌ๊ฐ ๋ฐ์ํ๋๋ผ๋ Flux๋ ์ค๋จ๋์ง ์์ผ๋ฉฐ ์ ์์ ์ผ๋ก hello
, world
, foo
, bar
๋ฅผ ์ถ๋ ฅํฉ๋๋ค. ๊ทธ๋ฐ๋ฐ ์ฌ๊ธฐ์ ์ ์ํด์ผ ํ ์ ์ subscribe
์ ์๋ฌ ํจ๋ค๋ฌ๋ฟ๋ง์ด ์๋๋ผ onErrorContinue
์์ ์กด์ฌํ๋ doOnError
๋ ํธ์ถ๋์ง ์๋๋ค๋ ๊ฒ์
๋๋ค. onErrorContinue
๊ฐ ์ฌ์ฉ๋๋ ์๊ฐ, ๋์์ ์์ธกํ๊ธฐ ๋งค์ฐ ํ๋ค์ด์ง๋๋ค. ๋ค๋ฅธ ์ฐ์ฐ์์๋ ๋ค๋ฅด๊ฒ ์
์คํธ๋ฆผ์ ์ํฅ์ ์ฃผ๋ฉฐ ๋ชจ๋ ์
์คํธ๋ฆผ ์๋ฌ๋ฅผ ๋ฌด์ํ๊ธฐ ๋๋ฌธ์ ์๋ชป ์ฌ์ฉํ๋ฉด ์น๋ช
์ ์ธ ๋ฒ๊ทธ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค.
์ด์ฒ๋ผ onErrorContinue
๋ ๋ค์ด์คํธ๋ฆผ๋ฟ๋ง์ด ์๋๋ผ ์
์คํธ๋ฆผ์๋ ์ํฅ์ ์ฃผ๋ฉฐ ์ค์ง์ ์ผ๋ก ํด๋น ์คํธ๋ฆผ์ ์กด์ฌํ๋ ๋ชจ๋ ์๋ฌ๋ฅผ ๋ฌด์ํ๊ฒ ๋ง๋ญ๋๋ค. ์ด๋ฌํ ํน์ฑ์ผ๋ก ์ธํด์ ๊ณผ๊ฑฐ ์ด์ํ ๋ฒ๊ทธ๊ฐ ์กด์ฌํ๋ ์ ์ด ์์ต๋๋ค.
RetryWhen
+ onErrorContinue
์์์ ์ธ๊ธํ๋ฏ์ด reactor-core 3.4.25, spring-boot-starter-webflux 2.7.6 ๋ฒ์ ์์ ํด๊ฒฐ๋ ๋ฒ๊ทธ์
๋๋ค. ์ฐธ๊ณ ๋ก onErrorContinue
์ ๋์ ๋ฐฉ์์ด ๋ณ๊ฒฝ๋๋ฉด์ ๋ํ๋ ๋ฒ๊ทธ์ด๊ธฐ ๋๋ฌธ์ ๋งค์ฐ ์๋ ๋ฒ์ ์์๋ ๋ฒ๊ทธ๊ฐ ์์ ์ ์์ต๋๋ค.
retryWhen
์ฐ์ฐ์๋ ์๋ฌ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ ํน์ ์กฐ๊ฑด์ ์ฐธ๊ณ ํ์ฌ ์ฌ์๋ํ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ ์ฐ์ฐ์์ด๋ฉฐ, onErrorContinue
๋ ์
์คํธ๋ฆผ์ ์๋ฌ๋ฅผ ๋ฌด์ํ๋๋ก ๋ง๋๋ ์ฐ์ฐ์์
๋๋ค.
๊ทธ๋ ๋ค๋ฉด ์ด ๋ ๊ฐ๋ฅผ ํจ๊ป ์ฌ์ฉํ๋ฉด ์ด๋ป๊ฒ ๋ ๊น์? ๊ณผ๊ฑฐ ์ด์ ๊ด๋ จ๋ Reactor ๋ฒ๊ทธ๊ฐ ์กด์ฌํ๊ธฐ๋ ํ์์ต๋๋ค. ์๋๋ ํด๋น ๋ฒ๊ทธ๋ฅผ ์ฌํํ๋ ์ฝ๋์ ๋๋ค.
@Test
void test() {
Flux<String> fluxStream = Flux.range(1, 1000)
.flatMap(i -> {
log.info("i={}", i);
return Flux.<String>error(new RuntimeException("Exception occurred."))
.retryWhen(Retry.max(5));
}, 10)
.doOnTerminate(() -> log.info("terminated"))
.onErrorContinue((ex, obj) -> log.error("The onErrorContinue called. : obj={}", obj));
StepVerifier.create(fluxStream)
.verifyComplete();
}
์์ง ๋ฒ๊ทธ๊ฐ ์กด์ฌํ๋ 2.7.6 ๋ฏธ๋ง์์ ์ ํ
์คํธ ์ฝ๋๋ฅผ ์คํํด ๋ณด๋ฉด ํ
์คํธ๊ฐ ๋๋์ง ์๋ ๊ฒ์ ๋ณผ ์ ์์ ๊ฒ๋๋ค. ์ ์ํด์ ๋ด์ผ ํ ์ ์ด๋ผ๋ฉด ์ฝ์์์ i=10
์ถ๋ ฅ์ ๋ง์ง๋ง์ผ๋ก ๋ ์ด์ ์๋ฌด๋ฐ ์ผ๋ ํ์ง ์๋๋ค๋ ๊ฒ ์
๋๋ค. 10์ด๋ผ๋ ์ซ์๋ ์ ์ฝ๋์์ flatMap์ concurrency๋ก ์ง์ ํ ๊ฐ์
๋๋ค. flatMap์ concurrency ์(๊ธฐ๋ณธ๊ฐ: 256)๋งํผ ๋์์ ์งํํฉ๋๋ค. ์ฆ, 10๊ฐ์ ์คํธ๋ฆผ์ด ๋์์ ์งํ๋๊ณ ์์ง๋ง ๋๋์ง ์๊ณ ์๋ค๋ ๊ฒ์ ์๋ฏธํฉ๋๋ค.
์ด๋ retryWhen
๊ณผ onErrorContinue
๊ฐ ๊ฐ์ด ์ฌ์ฉ๋ ๊ฒฝ์ฐ ๋ฐ์ํ๋ ํ์์
๋๋ค. ๋
ผ๋ฆฌ์ ์ผ๋ก ์๊ฐํด ๋ด๋ ์ด์ํ๊ธฐ๋ ํฉ๋๋ค.
retryWhen
์ ์๋ฌ๋ฅผ ๊ฐ์งํ๊ณ ์ฌ์๋๋ฅผ ํ๋๋ฐ onErrorContinue
์ ์ํด์ ๋ชจ๋ ์๋ฌ๊ฐ ๋ฌด์๋๊ธฐ ๋๋ฌธ์
๋๋ค. ์ฆ, Flux.error
๋๋ฌธ์ retryWhen์์ ์ฌ์๋๋ฅผ ์ค๋นํ๊ณ ์๋๋ฐ ๋ฐ์ํ ์๋ฌ(Exception)๋ฅผ ์ ๋ฌ ๋ฐ์ง ๋ชปํด ๊ณ์ ๊ธฐ๋ค๋ฆฌ๊ณ ์๋ ๊ฒ์
๋๋ค.
retryWhen๊ณผ onErrorContinue
์ ๋์ ๋ฐฉ์
์ ์ Reactive-stream์ ๋ํด์ ์ ๋ชจ๋ฅด๋ ๋ถ์ ์ํด์ ์กฐ๊ธ๋ง ์ค๋ช
ํ๊ณ ๋์ด๊ฐ๊ฒ ์ต๋๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก Reactive-stream์ Publisher์ Subscriber๊ฐ ์กด์ฌํ๋ ๋ฐํ/๊ตฌ๋
๋ชจ๋ธ ๊ตฌ์กฐ์
๋๋ค. flatMap
, map
๋ฑ์ ํตํด์ ์ฝ๋๋ฅผ ์ฐ๊ฒฐํ๋๋ผ๋ ๋ด๋ถ์ ์ผ๋ก๋ ๋ชจ๋ ๋ฐํ ๊ตฌ๋
์ ํตํด์ ๋ก์ง์ด ์ํ๋ฉ๋๋ค.
upstream.flatMap(A)
.map(B)
.flatMap(C)
.subscribe()
์๋ฅผ ๋ค์ด ์์ ๊ฐ์ ์ฝ๋๊ฐ ์๋ค๊ณ ํ ๋ flatMap(A)
๋ upstream
์, map(B)
์ flatMap(A)
์, flatMap(C)
์ ๊ตฌ๋
ํฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ ๋ฐ๋๋ก upstream
์ flatMap(A)
์๊ฒ, flatMap(A)
๋ map(B)
์๊ฒ, map(B)
๋ flatMap(C)
์๊ฒ ๋ฉ์์ง๋ฅผ ๋ฐํํฉ๋๋ค. ์ด ํ๋ฆ์ ์กฐ๊ธ ๋ ์์ธํ๊ฒ ์ดํด๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค(Subscription์ ์๋ต).
Reactor๋ Reactive-stream์ ๊ตฌํ์ฒด์ด๋ฉฐ, Reactor๊ฐ ์ ๊ณตํ๋ ๋๋ถ๋ถ์ ๊ธฐ๋ฅ๋ค์ ์ ํ๋ฆ์ ๋ฒ์ด๋์ง ์์ต๋๋ค. ์ด๋ Retry ๊ธฐ๋ฅ๋ ๋ง์ฐฌ๊ฐ์ง์ ๋๋ค.
retryWhen
retryWhen
์ FluxRetryWhen
ํด๋์ค์ Subscriber์ Publisher๊ฐ ๊ตฌํ๋์ด ์์ต๋๋ค. FluxRetryWhen์ ํน์ดํ๊ฒ ๋ฉ์ธ(main) Subscriber์ ๋๋ฐ(companion) Subscriber๋ก ๋๋์ด์ง๋๋ค. ๋ฉ์ธ Subscriber๋ ์
์คํธ๋ฆผ์ผ๋ก๋ถํฐ ๋ฐ์ ๊ฐ์ ๊ทธ์ ๋ค์ด์คํธ๋ฆผ์ผ๋ก ์ ๋ฌํฉ๋๋ค. ํ์ง๋ง ์
์คํธ๋ฆผ์ผ๋ก๋ถํฐ ์๋ฌ๋ฅผ ๋ฐ์ผ๋ฉด ๋ฉ์ธ Subscriber๋ ๋๋ฐ Subscriber์ ์๊ทธ๋์ ๋ณด๋ด์ ์ฌ์๋ํ ์ง ์ฌ๋ถ๋ฅผ ํ์ธํ๋ฉฐ, ๋ง์ฝ ์ฌ์๋ํ๊ธฐ๋ก ๊ฒฐ์ ๋ ๊ฒฝ์ฐ ๋ฉ์ธ Subscriber๋ ์
์คํธ๋ฆผ์ ์ฌ๊ตฌ๋
(resubscribe)ํฉ๋๋ค. ๊ฐ๊ฐ RetryWhenMainSubscriber
ํด๋์ค์ RetryWhenOtherSubscriber
ํด๋์ค๋ฅผ ํตํด ๊ตฌํ๋๋ฉฐ ๋๋ฝ์ ์ธ ํ๋ฆ์ ์๋์ ๊ฐ์ต๋๋ค.
retryWhen operator๋ฅผ ์ฌ์ฉํ๋ฉด ํ๋ผ๋ฏธํฐ๋ก RetrySpec์ ์ ๋ฌํ๋๋ก ๋์ด ์๋๋ฐ ์ด RetrySpec๋ค์ ๊ตฌ์ฑํ์ฌ ๋๋ฐ Subscriber์ ์คํธ๋ฆผ์ด ๊ตฌ์ฑ๋ฉ๋๋ค. ๋๋ต์ ์ผ๋ก ์๋์ ๊ฐ์ ํํ๋ผ๊ณ ํ ์ ์์ต๋๋ค.
(๋ฌผ๋ก ์๋ ์ฝ๋๋ ์ค๋ช
์ ์ํด ๋ง์ ๋ถ๋ถ์ด ์๋ต๋ ๋๋ฝ์ ์ธ ์ฝ๋์
๋๋ค. ์ค์ ์ฝ๋๋ ๋ ๋ง๊ณ ๋ณต์กํฉ๋๋ค.)
retrySignalFlux.concatMap(retrySpec)
.subscribe(retryWhenOtherSubscriber);
๋๋ฐ Subscriber(retryWhenOtherSubscriber
)๋ retrySpec์ผ๋ก๋ถํฐ ๋ฐํ ๋ฐ์ ๊ฒฐ๊ณผ(onNext
, onError
๋ฑ)๋ฅผ ๋ฐํ์ผ๋ก ์ฌ์๋ํ ์ง ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํ๊ณ ์คํํฉ๋๋ค. ์ฐธ๊ณ ๋ก retrySignalFlux
๋ retryWhenOtherSubscriber
์ ๋ฉค๋ฒ ํ๋๋ก retryWhenOtherSubscriber๋ฅผ
๊ตฌ๋
ํ๋ฉด ๋ด๋ถ์ ์ผ๋ก retrySignalFlux
๋ฅผ ๊ตฌ๋
ํฉ๋๋ค. ์ฌ์ค์ retryWhenOtherSubscriber
๊ฐ ๋ฐํ๋ ํ๊ณ ๊ตฌ๋
๋ ํ๋ Processor๋ผ๊ณ ๋ด๋ ๋ฌด๋ฐฉํฉ๋๋ค. ์ค์ํ ๊ฒ์ ์ฌ์๋ ์ํ ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํ๋ ๋ก์ง๋ Reactor publisher, subscriber์ ์ํด์ ๊ฒฐ์ ๋๋ฉฐ concatMap์ ํตํด์ ์ฐ๊ฒฐํ๊ณ ์๋ค๋ ๊ฒ์
๋๋ค. ์ด ์ฐ์ฐ์๋ onErrorContinue
์ ์ํด์ ์ํฅ์ ๋ฐ๋ ์ฐ์ฐ์์
๋๋ค. ๋ ์ด์ผ๊ธฐํ๊ธฐ ์ ์ onErrorContinue
์ ๋ํด์ ์ข ๋ ์์ธํ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
onErrorContinue
์์์ ์ธ๊ธํ ๊ฒ์ฒ๋ผ onErrorContinue
๋ ๋ค์ด์คํธ๋ฆผ๋ฟ๋ง์ด ์๋๋ผ ์
์คํธ๋ฆผ์๋ ์ํฅ์ ์ฃผ๋ ์ฐ์ฐ์์
๋๋ค. ๋๋ฌธ์ map
, flatMap
, filter
๊ฐ์ ๋ค๋ฅธ ์ผ๋ฐ์ ์ธ ์ฐ์ฐ์์๋ ๊ตฌํ ๋ฐฉ์์ด ์กฐ๊ธ ์ฐจ์ด๊ฐ ์์ต๋๋ค. ๋ฌด์๋ณด๋ค onErrorContinue
๋ ๊ตฌํ์ฒด๊ฐ ์์ต๋๋ค. ์ฆ, FluxMap ํด๋์ค, FluxRetryWhen
ํด๋์ค ๊ฐ์ Publisher/Subscriber ๊ตฌ์กฐ๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์๋๋ผ Reactor Context๋ฅผ ์ง์ ์ฌ์ฉํฉ๋๋ค.
Context.of(
OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY,
OnNextFailureStrategy.resume(genericConsumer)
);
onErrorContinue
์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ ์๊ฐ ์์ ๊ฐ์ Context key/value๊ฐ ์ธํ
๋ฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ onErrorContinue ์ฐ์ฐ์๊ฐ ์ฌ์ฉ๋ ๊ณณ์ ์
์คํธ๋ฆผ์์ Context์ OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY
๊ฐ์ ๊บผ๋ด ์ฌ์ฉํ๋ ๊ฒ์ด์ฃ . Reactor๋ ๋ด๋ถ์ ์ผ๋ก ์ด๋ฐ ์ฐ์ฐ์๋ฅผ ๊ตฌํํ ๋ ์ฌ์ฉํ๋ ์ ํธ๋ฆฌํฐ ์ฑ๊ฒฉ์ ํด๋์ค์ธ Operators ํด๋์ค๊ฐ ์กด์ฌํฉ๋๋ค. Operators์์ ์ ๊ณตํ๋ ๋ช๋ช ๊ธฐ๋ฅ๋ค์ ์ Context์ ํ๋๊ทธ๋ฅผ ์ฌ์ฉํ์ฌ ์ ์ ํ ๋ก์ง์ ์ํํฉ๋๋ค.
์๋ฅผ ๋ค์ด, onNext ์ค ๋ฐ์ํ ์๋ฌ๋ฅผ FailureStrategy๋ฅผ ์ฐธ๊ณ ํ์ฌ ํ์ํ ์์
์ ์ํ ๋ฐ ์ถ์ถํ๋ onNextError
๋ผ๋ ๊ธฐ๋ฅ๋ ์ ๊ณตํ๊ณ ์์ต๋๋ค. onErrorContinue ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ค๋ฉด onNextError
๋ ์ OnNextFailureStrategy.resume(consumer)
๋ฅผ ์ฌ์ฉํ์ฌ ์๋ฌ๋ ๊ด๋ จ๋ ํ์ํ ๋ก์ง์ ์ํํ๊ณ null
์ ๋ฐํํ์ฃ .
๊ทธ๋ฆฌ๊ณ flatMap, concatMap ๋ฑ์์๋ ์๋ฌ ์์(null)์ด๋ผ๊ณ ํ๋จํ๊ณ ๋ง์ ์๋ ๋ก์ง์ ์ํํฉ๋๋ค.
retryWhen + onErrorContinue๋ก ์ธํ ์ค๋จ ๋ฒ๊ทธ
์์ธ
@Test
void test() {
Flux<String> fluxStream = Flux.range(1, 1000)
.flatMap(i -> {
log.info("i={}", i);
return Flux.<String>error(new RuntimeException("asd"))
.retryWhen(Retry.max(5));
}, 10)
.doOnTerminate(() -> log.info("terminated"))
.onErrorContinue((ex, obj) -> log.error("The onErrorContinue called. : obj={}", obj));
StepVerifier.create(fluxStream)
.verifyComplete();
}
์์์ ์ดํด๋ดค๋ ๋ฒ๊ทธ๋ฅผ ์ฌํํ๋ ์ฝ๋์ ๋๋ค. ์ ํํ 10๋ฒ ์๋ฌ๊ฐ ๋ฐ์ํ๊ณ ๋์ ์ Flux ์คํธ๋ฆผ์ ๋จนํต์ด ๋์ด ๋ฒ๋ฆฝ๋๋ค.
retryWhen
์ ๋๋ฐ subscriber์ ์ํด์ RetrySpec์ ๋ก์ง์ด concatMap
์ ํตํด์ ์ฐ๊ฒฐ๋ฉ๋๋ค. ๊ทธ๋ฆฌ๊ณ ์๋ฌ ์๊ทธ๋์ ๋ฐ์ผ๋ฉด ์ค์ ํ๋ RetrySpec์ ํตํด์ ์ฌ์๋ ๋๋ ์ค์ง ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํฉ๋๋ค. ์ด๋ ์ค์ํ ์ ์ RetrySpec ๋ก์ง์ ๋ ์ด์ ์ฌ์๋๋ฅผ ํ๋ฉด ์ ๋๋ ๊ฒฝ์ฐ, ์๋ฅผ ๋ค์ด, ์ต๋ ์ฌ์๋ ํ์๋ฅผ ์ด๊ณผํ ๊ฒฝ์ฐ ์๋ฌ๋ฅผ ๋ฐํํฉ๋๋ค. ๊ทธ๋ผ ๋๋ฐ subscriber๋ ์ด๋ฅผ ํ์ธํ๊ณ ๋ฉ์ธ subscriber๋ฅผ ์ค์ง์ํต๋๋ค. ๊ทธ๋ฐ๋ฐ ๋ง์ฝ ์ด๋์ ๊ฐ onErrorContinue
๊ฐ ์ฌ์ฉ๋์์ ๊ฒฝ์ฐ RetrySpec์ ์ฐ๋ํ ๋ ์ฌ์ฉ๋์๋ concatMap
์ด RetrySpec
์์ ๋ฐํํ ์๋ฌ๋ฅผ ๋ฌด์ํ๊ฒ ๋ฉ๋๋ค.
// concatMap ์ฐ์ฐ์์ ์๋ฌ ์ฒ๋ฆฌ ๋ก์ง ์ค ์ผ๋ถ
catch (Throwable e) {
Throwable e_ = Operators.onNextError(v, e, this.ctx, s);
if (e_ != null) {
actual.onError(Operators.onOperatorError(s, e, v, this.ctx));
return;
}
else {
continue;
}
}
// ........
์ ์ฝ๋๋ concatMap
์ฐ์ฐ์์ ๊ตฌํ ์ค ์๋ฌ๋ฅผ ์ฒ๋ฆฌํ๋ ๋ถ๋ถ ์ค ์ผ๋ถ์
๋๋ค. onErrorContinue
๊ฐ ์ฌ์ฉ๋์๋ค๋ฉด Operators.onNextError
๋ null์ ๋ฐํํ๋ฉฐ ๋ก์ง์ continue
๊ฐ ์ํ๋๊ณ ํด๋น ์๋ฌ๋ ์๋ฌด๋ฐ ์กฐ์น ์์ด ๋ค์ ์์
์ผ๋ก ๋์ด๊ฐ๊ฒ ๋ฉ๋๋ค. retryWhen ์ฐ์ฐ์์ ๋๋ฐ subscriber๊ฐ ์๋ฌ๋ฅผ ๋ฐ์ง ๋ชปํด ๊ณ์ ์ฌ์๋๋ฅผ ํ ์ง, ์ด๋ง ์ฌ์๋๋ฅผ ์ค์งํ ์ง ๊ฒฐ์ ํ์ง ๋ชปํ๋ ๊ฒ์ด์ฃ ...
์ด๋ก ์ธํด์ retryWhen
์ด ์ฌ์ฉ๋ ์คํธ๋ฆผ์ด ์๋ฃ๋์ง ๋ชปํ๊ณ ๊ณ์ ๋๊ธฐํ๋ฉด์ flatMap
์ concurrency๋ฅผ ๊ณ์ ์ ์ ํ๊ณ , ๊ฒฐ๊ตญ ๋ชจ๋ max concurrency์ ๋๋ฌํ์ฌ ์ดํ ์์
๋ค์ด ์ฒ๋ฆฌ๋์ง ๋ชปํ๋ ๊ฒ์
๋๋ค.
ํด๊ฒฐ
์์์ ์ธ๊ธ๋๋ฆฐ ๊ฒ์ฒ๋ผ ์ด ๋ฒ๊ทธ๋ ํ์ฌ ํด๊ฒฐ๋ ์ํ์ ๋๋ค.์ด๋ป๊ฒ ํด๊ฒฐํ์๊น์? ์๊ฐ๋ณด๋ค ๊ฐ๋จํ ๋ฐฉ๋ฒ์ผ๋ก ํด๊ฒฐํ์ต๋๋ค.
๋ค์์ ๋ฒ๊ทธ๋ฅผ ์์ ํ ๋ฒ์ ์ฝ๋์ ๋๋ต์ ์ธ ์ฝ๋์ ๋๋ค.
retrySignalFlux.deferContextual(cv ->
retrySignalFlux.contextWrite(cv) // context ๋ณต๊ตฌ
.concatMap(retrySpec)
.contextWrite(c -> Context.empty()) // context ๋น์ฐ๊ธฐ
)
.subscribe(retryWhenOtherSubscriber);
retrySpec
์ด ์ํ๋ ์์ญ์์๋ง context๋ฅผ ๋น์ ๋ฒ๋ฆฌ๋ ๊ฒ์
๋๋ค.
๋ฑ 2์ค ์์ ํด์ ๋ฒ๊ทธ๋ฅผ ์์ ํ ๊ฒ์ด์ฃ . ๋ญ๊ฐ ์ฐ์ฐํ๊ธฐ๋ ํ์ง๋ง ๋ค๋ฅธ ๊ณณ์ ์ํฅ ์์ด retrySpec ๋ถ๋ถ๋ง ์์ ํด์ ๊ฐ๋จํ๊ฒ(?) ํด๊ฒฐํ ๊ฒ ๊ฐ์ต๋๋ค.
๋๊ฐ๋ฉฐ
์์์ ์ธ๊ธํ ๋ฒ๊ทธ๋ ํ์ฌ ์์ ๋์์ง๋ง onErrorContinue
๋ ์ฌ์ ํ ์ฌ์ฉ์ ์ฃผ์ํด์ผ ํ๋ ์ฐ์ฐ์์
๋๋ค. ๊ณต์ ๋ฌธ์์์๋ ๊ฐ๋ฅํ๋ฉด onErrorContinue
๋ณด๋ค๋ onErrorResume
๊ฐ์ ๊ฒ์ ์์ฉํด์ ํด๊ฒฐํ๋ ๊ฒ์ ๊ถ์ฅํ๊ณ ์์ต๋๋ค. ์ฌ์ง์ด Reactor์ ํ ๊ฐ๋ฐ์๋ onErrorContinue
API ๋์์ธ์ ํฐ ์ค์๋ผ๊ณ ์ธ๊ธํ ์ ๋์์ฃ .
์์ง ๋ง์ธ์! onErrorContinue
๋ ํญ์ ์ฌ์ฉ์ ์ฃผ์ํด์ผ ํ๋ฉฐ onErrorContinue
๋ฅผ ์ฌ์ฉํ๊ธฐ ์ ์ ๋ค๋ฅธ ๋์์ด ์์์ง ๋จผ์ ๊ณ ๋ฏผํด ๋ณด์ธ์.