Spring/Spring

SubscribeOn , PublishOn

100win10 2021. 12. 23. 10:28

 

1. SubscribeOn 와 PublishOn이 혼용된 코드


1.1. Code

1
2
3
4
5
6
Flux.range(1,5)
        .publishOn(Schedulers.newSingle("pub1"))
        .map(i -> i * 10)
        .subscribeOn(Schedulers.newSingle("sub1"))
        .log()
        .subscribe();
cs

 

  •  subscribeOn 이후에 log() 를 봐보자. 어떤 쓰레드에서 log() 가 실행될까? 
  • 리엑터 공식 문서를 보면 subscribeOn을 호출한 객체를 구독할 때는 해당 스트림 전체가 해당 스케쥴러로 다 바뀐다고 하였다. 따라서 "sub1" 이라는 쓰레드에서 실행이 될까?
  • 아니면 그 위에 publishOn에 의해 "pub1" 이라는 쓰레드에서 실행 될까?

 

1.2. 결과

1
2
3
4
5
6
7
...
09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(10)
09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(20)
09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(30)
09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(40)
09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(50)
09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onComplete()
cs
  • 결과를 보면  "pub1"이라는 쓰레드에서 로그가 찍히게 된다.
  • 결과가 잘 이해가 가지 않거나  publishOn 의 흐름과 subscribeOn에 흐름을 단순히 암기했다면 헷갈릴 확률이 높을 수 있다.
  • 개발 패턴에 앞서 데이터 흐름을 유의 깊게 보면서 확인해보자.

 

 

2. Reactive Stream API를 통해 직접 publishOn, subscribeOn 구현해보기


 

2.1. 표준 API

  • 해당 Reactive Stream의 표준 API 4가지를 사용해서 직접 앞에 코드를 재현해보자. 
  • 그리고 어떤 순서로 데이터에 흐름이 실행되는지 알아보자.

 

2.2 마블 다이어그램

    • publishOn
    • 마블 다이어그램을 보고 publishOn을 API 표준을 지키면서 만들어보자. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static Publisher<Integer> pubOnPub(Publisher<Integer> iterPub) {
    return new Publisher<Integer>() {
        @Override
        public void subscribe(Subscriber<super Integer> sub) {
            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                @Override
                public String getThreadNamePrefix() {
                    return "pub";
                }
            });
 
            iterPub.subscribe(new Subscriber<Integer>() {
 
                @Override
                public void onSubscribe(Subscription s) {
                    sub.onSubscribe(s);
                }
 
                @Override
                public void onNext(Integer integer) {
                    es.execute(() -> sub.onNext(integer));
                }
 
                @Override
                public void onError(Throwable t) {
                    es.execute(() -> sub.onError(t));
                }
 
                @Override
                public void onComplete() {
                    es.execute(() -> sub.onComplete());
                }
            });
        }
    };
}
cs
  • onNext 시에 데이터가 DownStream으로 흐르기 때문에  스케쥴러를 만들고 onNext 시에 별도의 스레드로 처리해주면 된다.

 

 

 

 

    • subscribeOn 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static Publisher<Integer> subOnPub(Publisher<Integer> mapPub) {
    return new Publisher<Integer>() {
        ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
            @Override
            public String getThreadNamePrefix() {
                return "sub";
            }
        });
 
        @Override
        public void subscribe(Subscriber<super Integer> sub) {
            es.execute(() -> mapPub.subscribe(sub));
        }
    };
}
cs
  •  subscribe 시에 Upstream으로 흐르기 때문에 위에 Publisher 가 subscribe 할 때  별도의 스레드로 처리하도록 해주자.

 

 

 

 

 

 

 

2.3 표준 API 코드

  • 그럼 이제 앞서 보았던 코드를 Reactive Stream API 표준을 사용하여 풀어서 데이터의 흐름을 보자.
     
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    public static void main(String[] args) {
     
        Publisher<Integer> iterPub = iterPub();
        Publisher<Integer> pubOnPub = pubOnPub(iterPub);
        Publisher<Integer> mapPub = mapPub(pubOnPub,i -> i * 10);
        Publisher<Integer> subOnPub = subOnPub(mapPub);
        Subscriber<Integer> subscriber = logSub();
     
        subOnPub.subscribe(subscriber); // (1)
    }
     
    private static Publisher<Integer> iterPub() {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<super Integer> sub) {
                sub.onSubscribe(new Subscription() {   // (5)
                    @Override
                    public void request(long n) { // (9)
                        sub.onNext(1); // (10)
                        sub.onNext(2);
                        sub.onNext(3);
                        sub.onNext(4);
                        sub.onNext(5);
                        sub.onComplete();
                    }
     
                    @Override
                    public void cancel() {
                    }
                });
            }
        };
    }
     
    private static Publisher<Integer> pubOnPub(Publisher<Integer> iterPub) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<super Integer> sub) {
                ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                    @Override
                    public String getThreadNamePrefix() {
                        return "pub";
                    }
                });
     
                iterPub.subscribe(new Subscriber<Integer>() {  // (4)
     
                    @Override
                    public void onSubscribe(Subscription s) {
                        sub.onSubscribe(s); // (6)
                    }
     
                    @Override
                    public void onNext(Integer integer) {
                        es.execute(() -> sub.onNext(integer)); // (11)
                    }
     
                    @Override
                    public void onError(Throwable t) {
                        es.execute(() -> sub.onError(t));
                    }
     
                    @Override
                    public void onComplete() {
                        es.execute(() -> sub.onComplete());
                    }
                });
            }
        };
    }
    private static Publisher<Integer> mapPub(Publisher<Integer> pubOnPub, UnaryOperator<Integer> f) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<super Integer> sub) {
                Function<Integer, Integer> f = (i) -> i * 10;
     
                pubOnPub.subscribe(new Subscriber<Integer>() {    // (3)
                    @Override
                    public void onSubscribe(Subscription s) {
                        sub.onSubscribe(s); // (7)
                    }
     
                    @Override
                    public void onNext(Integer integer) {
                        sub.onNext(f.apply(integer)); // (12)
                    }
     
                    @Override
                    public void onError(Throwable t) {
                        sub.onError(t);
                    }
     
                    @Override
                    public void onComplete() {
                        sub.onComplete();
                    }
                });
            }
        };
    }
     
    private static Publisher<Integer> subOnPub(Publisher<Integer> mapPub) {
        return new Publisher<Integer>() {
            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                @Override
                public String getThreadNamePrefix() {
                    return "sub";
                }
            });
     
            @Override
            public void subscribe(Subscriber<super Integer> sub) {
                es.execute(() -> mapPub.subscribe(sub));  // (2)
            }
        };
    }
     
     
    private static Subscriber<Integer> logSub() {
        return new Subscriber<>() {
            @Override
            public void onSubscribe(Subscription s) {
                log.info("onSubscribe"); 
                s.request(Long.MAX_VALUE); // (8)
            }
     
            @Override
            public void onNext(Integer i) {
                log.info("onNext: " + i);  // (13)
            }
     
            @Override
            public void onError(Throwable t) {
                log.info("onError");
            }
     
            @Override
            public void onComplete() {
                log.info("onComplete");
            }
        };
    }
    cs
     
     

2-4  구독 흐름

  • (1) :  
  • 처음 구독이 흐르는 시점은 subscribe()를 통해 구독자를 호출하는 시점이다.   - main Thread 
  • (2) :  
    subOnPub에 subscribe 메서드가 실행이 되는데 이때 별도의 쓰레드에서  mapPub에 subscribe가 실행되게 해준다.  - sub Thread
  • (3) :  
    mapPub에 subscribe는 pubOnPub에 subscribe를 호출한다.  - sub Thread

  • (4) : 
    pubOnPub에 subscribe는 iterPub에 subscribe를 호출한다.  - sub Thread

  • (5) : 
    여기서의 sub 파라미터는 pubOnPub에 Subscriber 이다.  
    구독이 시작되면 몇 개의 데이터를 요청할지 ( request )를  전달해야 한다.   sub.onSubscribe()에 의해 pubOnPub에 있는 onSubscribe가 호출이 될 것이다.  - sub Thread

  • (6) : 
    해당 sub ( Subscriber ) 파라미터는 mapPub에 Subscriber 이다. mapPub에 onSubscribe가 호출이 될 것이다. - sub Thread

  • (7) :  
    여기서의 sub 파라미터는 logSub이 된다. 이제 logSub에 onSubscribe가 호출이 될 것이다. 
    subOnPub은 (2)번에서 별도의 쓰레드에서 mapPub이 실행되게 해주는 역할이다.  - sub Thread

  • (8) : 
    요청을 제한 없이 하기 위해 Long.MAX_VALUE 로 request 값을 전달한다. - sub Thread

  • (9) : 
    request 를 호출했으니 이제 데이터가 흐르게 된다. - sub Thread

  • (10) :
    sub.onNext(1)이 먼저 호출이 될 것이다.  여기서의 sub 파라미터는 pubOnPub에서 구현했던 Subscriber이다. - sub Thread

  • (11) :  
    다음 mapPub에서 구현한 Subscriber가 onNext()를 실행할텐데 이때 pub1에 쓰레드로 흐름이 바뀐다. - pub Thread


  • (12):
    mapPub은 정의한 Function에 의해 곱하기 10을 해서 다음 sub에 onNext로 흐르게 된다.  
    여기서의 Subscriber는 logSub에서 구현한 Subscriber이고  logSub에 onNext()로 가게 된다. - pub Thread


  • (13) :
    이제 log.info로 데이터를 찍을때 해당 pub이라는 Thread에서 흐르는 것을 알 수 있다. onNext(2) ~ onNext(5) 도 마찬가지로 진행이 된다. - pub Thread

2.5. 결과