-
SubscribeOn , PublishOnSpring/Spring 2021. 12. 23. 10:28
1. SubscribeOn 와 PublishOn이 혼용된 코드
1.1. Code
123456Flux.range(1,5).publishOn(Schedulers.newSingle("pub1")).map(i -> i * 10).subscribeOn(Schedulers.newSingle("sub1")).log().subscribe();cs - subscribeOn 이후에 log() 를 봐보자. 어떤 쓰레드에서 log() 가 실행될까?
- 리엑터 공식 문서를 보면 subscribeOn을 호출한 객체를 구독할 때는 해당 스트림 전체가 해당 스케쥴러로 다 바뀐다고 하였다. 따라서 "sub1" 이라는 쓰레드에서 실행이 될까?
- 아니면 그 위에 publishOn에 의해 "pub1" 이라는 쓰레드에서 실행 될까?
1.2. 결과
1234567...09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(10)09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(20)09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(30)09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(40)09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(50)09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onComplete()cs - 결과를 보면 "pub1"이라는 쓰레드에서 로그가 찍히게 된다.
- 결과가 잘 이해가 가지 않거나 publishOn 의 흐름과 subscribeOn에 흐름을 단순히 암기했다면 헷갈릴 확률이 높을 수 있다.
- 개발 패턴에 앞서 데이터 흐름을 유의 깊게 보면서 확인해보자.
2. Reactive Stream API를 통해 직접 publishOn, subscribeOn 구현해보기
2.1. 표준 API
- 해당 Reactive Stream의 표준 API 4가지를 사용해서 직접 앞에 코드를 재현해보자.
- 그리고 어떤 순서로 데이터에 흐름이 실행되는지 알아보자.
2.2 마블 다이어그램
- publishOn
- 마블 다이어그램을 보고 publishOn을 API 표준을 지키면서 만들어보자.
123456789101112131415161718192021222324252627282930313233343536private static Publisher<Integer> pubOnPub(Publisher<Integer> iterPub) {return new Publisher<Integer>() {@Overridepublic void subscribe(Subscriber<? super Integer> sub) {ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {@Overridepublic String getThreadNamePrefix() {return "pub";}});iterPub.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {sub.onSubscribe(s);}@Overridepublic void onNext(Integer integer) {es.execute(() -> sub.onNext(integer));}@Overridepublic void onError(Throwable t) {es.execute(() -> sub.onError(t));}@Overridepublic void onComplete() {es.execute(() -> sub.onComplete());}});}};}cs - onNext 시에 데이터가 DownStream으로 흐르기 때문에 스케쥴러를 만들고 onNext 시에 별도의 스레드로 처리해주면 된다.
- subscribeOn
123456789101112131415private static Publisher<Integer> subOnPub(Publisher<Integer> mapPub) {return new Publisher<Integer>() {ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {@Overridepublic String getThreadNamePrefix() {return "sub";}});@Overridepublic void subscribe(Subscriber<? super Integer> sub) {es.execute(() -> mapPub.subscribe(sub));}};}cs - subscribe 시에 Upstream으로 흐르기 때문에 위에 Publisher 가 subscribe 할 때 별도의 스레드로 처리하도록 해주자.
2.3 표준 API 코드
- 그럼 이제 앞서 보았던 코드를 Reactive Stream API 표준을 사용하여 풀어서 데이터의 흐름을 보자.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142public static void main(String[] args) {Publisher<Integer> iterPub = iterPub();Publisher<Integer> pubOnPub = pubOnPub(iterPub);Publisher<Integer> mapPub = mapPub(pubOnPub,i -> i * 10);Publisher<Integer> subOnPub = subOnPub(mapPub);Subscriber<Integer> subscriber = logSub();subOnPub.subscribe(subscriber); // (1)}private static Publisher<Integer> iterPub() {return new Publisher<Integer>() {@Overridepublic void subscribe(Subscriber<? super Integer> sub) {sub.onSubscribe(new Subscription() { // (5)@Overridepublic void request(long n) { // (9)sub.onNext(1); // (10)sub.onNext(2);sub.onNext(3);sub.onNext(4);sub.onNext(5);sub.onComplete();}@Overridepublic void cancel() {}});}};}private static Publisher<Integer> pubOnPub(Publisher<Integer> iterPub) {return new Publisher<Integer>() {@Overridepublic void subscribe(Subscriber<? super Integer> sub) {ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {@Overridepublic String getThreadNamePrefix() {return "pub";}});iterPub.subscribe(new Subscriber<Integer>() { // (4)@Overridepublic void onSubscribe(Subscription s) {sub.onSubscribe(s); // (6)}@Overridepublic void onNext(Integer integer) {es.execute(() -> sub.onNext(integer)); // (11)}@Overridepublic void onError(Throwable t) {es.execute(() -> sub.onError(t));}@Overridepublic void onComplete() {es.execute(() -> sub.onComplete());}});}};}private static Publisher<Integer> mapPub(Publisher<Integer> pubOnPub, UnaryOperator<Integer> f) {return new Publisher<Integer>() {@Overridepublic void subscribe(Subscriber<? super Integer> sub) {Function<Integer, Integer> f = (i) -> i * 10;pubOnPub.subscribe(new Subscriber<Integer>() { // (3)@Overridepublic void onSubscribe(Subscription s) {sub.onSubscribe(s); // (7)}@Overridepublic void onNext(Integer integer) {sub.onNext(f.apply(integer)); // (12)}@Overridepublic void onError(Throwable t) {sub.onError(t);}@Overridepublic void onComplete() {sub.onComplete();}});}};}private static Publisher<Integer> subOnPub(Publisher<Integer> mapPub) {return new Publisher<Integer>() {ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {@Overridepublic String getThreadNamePrefix() {return "sub";}});@Overridepublic void subscribe(Subscriber<? super Integer> sub) {es.execute(() -> mapPub.subscribe(sub)); // (2)}};}private static Subscriber<Integer> logSub() {return new Subscriber<>() {@Overridepublic void onSubscribe(Subscription s) {log.info("onSubscribe");s.request(Long.MAX_VALUE); // (8)}@Overridepublic void onNext(Integer i) {log.info("onNext: " + i); // (13)}@Overridepublic void onError(Throwable t) {log.info("onError");}@Overridepublic void onComplete() {log.info("onComplete");}};}cs
2-4 구독 흐름
- (1) :
- 처음 구독이 흐르는 시점은 subscribe()를 통해 구독자를 호출하는 시점이다. - main Thread
- (2) :
subOnPub에 subscribe 메서드가 실행이 되는데 이때 별도의 쓰레드에서 mapPub에 subscribe가 실행되게 해준다. - sub Thread
- (3) :
mapPub에 subscribe는 pubOnPub에 subscribe를 호출한다. - sub Thread - (4) :
pubOnPub에 subscribe는 iterPub에 subscribe를 호출한다. - sub Thread - (5) :
여기서의 sub 파라미터는 pubOnPub에 Subscriber 이다.
구독이 시작되면 몇 개의 데이터를 요청할지 ( request )를 전달해야 한다. sub.onSubscribe()에 의해 pubOnPub에 있는 onSubscribe가 호출이 될 것이다. - sub Thread - (6) :
해당 sub ( Subscriber ) 파라미터는 mapPub에 Subscriber 이다. mapPub에 onSubscribe가 호출이 될 것이다. - sub Thread - (7) :
여기서의 sub 파라미터는 logSub이 된다. 이제 logSub에 onSubscribe가 호출이 될 것이다.
subOnPub은 (2)번에서 별도의 쓰레드에서 mapPub이 실행되게 해주는 역할이다. - sub Thread
- (8) :
요청을 제한 없이 하기 위해 Long.MAX_VALUE 로 request 값을 전달한다. - sub Thread - (9) :
request 를 호출했으니 이제 데이터가 흐르게 된다. - sub Thread - (10) :
sub.onNext(1)이 먼저 호출이 될 것이다. 여기서의 sub 파라미터는 pubOnPub에서 구현했던 Subscriber이다. - sub Thread - (11) :
다음 mapPub에서 구현한 Subscriber가 onNext()를 실행할텐데 이때 pub1에 쓰레드로 흐름이 바뀐다. - pub Thread - (12):
mapPub은 정의한 Function에 의해 곱하기 10을 해서 다음 sub에 onNext로 흐르게 된다.
여기서의 Subscriber는 logSub에서 구현한 Subscriber이고 logSub에 onNext()로 가게 된다. - pub Thread - (13) :
이제 log.info로 데이터를 찍을때 해당 pub이라는 Thread에서 흐르는 것을 알 수 있다. onNext(2) ~ onNext(5) 도 마찬가지로 진행이 된다. - pub Thread
2.5. 결과
'Spring > Spring' 카테고리의 다른 글
Spring Batch에서 Job Completed 이후 종료되지 않을때 (0) 2023.09.09 Spring Batch 3.0 이상에서 JPA 사용시 PlatformTransactionManager 설정 (0) 2023.06.01 Reactor List가 비어있을 때의 switchIfEmpty와 defaultIfEmpty (0) 2021.10.14 스프링 @Transactional과 잠금 (0) 2021.08.11 Mono.just()와 Mono.defer()에 대한 이해 (0) 2021.07.30