ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • SubscribeOn , PublishOn
    Spring/Spring 2021. 12. 23. 10:28

     

    1. SubscribeOn 와 PublishOn이 혼용된 코드


    1.1. Code

    1
    2
    3
    4
    5
    6
    Flux.range(1,5)
            .publishOn(Schedulers.newSingle("pub1"))
            .map(i -> i * 10)
            .subscribeOn(Schedulers.newSingle("sub1"))
            .log()
            .subscribe();
    cs

     

    •  subscribeOn 이후에 log() 를 봐보자. 어떤 쓰레드에서 log() 가 실행될까? 
    • 리엑터 공식 문서를 보면 subscribeOn을 호출한 객체를 구독할 때는 해당 스트림 전체가 해당 스케쥴러로 다 바뀐다고 하였다. 따라서 "sub1" 이라는 쓰레드에서 실행이 될까?
    • 아니면 그 위에 publishOn에 의해 "pub1" 이라는 쓰레드에서 실행 될까?

     

    1.2. 결과

    1
    2
    3
    4
    5
    6
    7
    ...
    09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(10)
    09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(20)
    09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(30)
    09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(40)
    09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onNext(50)
    09:14:39.104 [pub1-2] INFO reactor.Flux.SubscribeOn.1 - onComplete()
    cs
    • 결과를 보면  "pub1"이라는 쓰레드에서 로그가 찍히게 된다.
    • 결과가 잘 이해가 가지 않거나  publishOn 의 흐름과 subscribeOn에 흐름을 단순히 암기했다면 헷갈릴 확률이 높을 수 있다.
    • 개발 패턴에 앞서 데이터 흐름을 유의 깊게 보면서 확인해보자.

     

     

    2. Reactive Stream API를 통해 직접 publishOn, subscribeOn 구현해보기


     

    2.1. 표준 API

    • 해당 Reactive Stream의 표준 API 4가지를 사용해서 직접 앞에 코드를 재현해보자. 
    • 그리고 어떤 순서로 데이터에 흐름이 실행되는지 알아보자.

     

    2.2 마블 다이어그램

      • publishOn
      • 마블 다이어그램을 보고 publishOn을 API 표준을 지키면서 만들어보자. 
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    private static Publisher<Integer> pubOnPub(Publisher<Integer> iterPub) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<super Integer> sub) {
                ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                    @Override
                    public String getThreadNamePrefix() {
                        return "pub";
                    }
                });
     
                iterPub.subscribe(new Subscriber<Integer>() {
     
                    @Override
                    public void onSubscribe(Subscription s) {
                        sub.onSubscribe(s);
                    }
     
                    @Override
                    public void onNext(Integer integer) {
                        es.execute(() -> sub.onNext(integer));
                    }
     
                    @Override
                    public void onError(Throwable t) {
                        es.execute(() -> sub.onError(t));
                    }
     
                    @Override
                    public void onComplete() {
                        es.execute(() -> sub.onComplete());
                    }
                });
            }
        };
    }
    cs
    • onNext 시에 데이터가 DownStream으로 흐르기 때문에  스케쥴러를 만들고 onNext 시에 별도의 스레드로 처리해주면 된다.

     

     

     

     

      • subscribeOn 
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private static Publisher<Integer> subOnPub(Publisher<Integer> mapPub) {
        return new Publisher<Integer>() {
            ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                @Override
                public String getThreadNamePrefix() {
                    return "sub";
                }
            });
     
            @Override
            public void subscribe(Subscriber<super Integer> sub) {
                es.execute(() -> mapPub.subscribe(sub));
            }
        };
    }
    cs
    •  subscribe 시에 Upstream으로 흐르기 때문에 위에 Publisher 가 subscribe 할 때  별도의 스레드로 처리하도록 해주자.

     

     

     

     

     

     

     

    2.3 표준 API 코드

    • 그럼 이제 앞서 보았던 코드를 Reactive Stream API 표준을 사용하여 풀어서 데이터의 흐름을 보자.
       
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      public static void main(String[] args) {
       
          Publisher<Integer> iterPub = iterPub();
          Publisher<Integer> pubOnPub = pubOnPub(iterPub);
          Publisher<Integer> mapPub = mapPub(pubOnPub,i -> i * 10);
          Publisher<Integer> subOnPub = subOnPub(mapPub);
          Subscriber<Integer> subscriber = logSub();
       
          subOnPub.subscribe(subscriber); // (1)
      }
       
      private static Publisher<Integer> iterPub() {
          return new Publisher<Integer>() {
              @Override
              public void subscribe(Subscriber<super Integer> sub) {
                  sub.onSubscribe(new Subscription() {   // (5)
                      @Override
                      public void request(long n) { // (9)
                          sub.onNext(1); // (10)
                          sub.onNext(2);
                          sub.onNext(3);
                          sub.onNext(4);
                          sub.onNext(5);
                          sub.onComplete();
                      }
       
                      @Override
                      public void cancel() {
                      }
                  });
              }
          };
      }
       
      private static Publisher<Integer> pubOnPub(Publisher<Integer> iterPub) {
          return new Publisher<Integer>() {
              @Override
              public void subscribe(Subscriber<super Integer> sub) {
                  ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                      @Override
                      public String getThreadNamePrefix() {
                          return "pub";
                      }
                  });
       
                  iterPub.subscribe(new Subscriber<Integer>() {  // (4)
       
                      @Override
                      public void onSubscribe(Subscription s) {
                          sub.onSubscribe(s); // (6)
                      }
       
                      @Override
                      public void onNext(Integer integer) {
                          es.execute(() -> sub.onNext(integer)); // (11)
                      }
       
                      @Override
                      public void onError(Throwable t) {
                          es.execute(() -> sub.onError(t));
                      }
       
                      @Override
                      public void onComplete() {
                          es.execute(() -> sub.onComplete());
                      }
                  });
              }
          };
      }
      private static Publisher<Integer> mapPub(Publisher<Integer> pubOnPub, UnaryOperator<Integer> f) {
          return new Publisher<Integer>() {
              @Override
              public void subscribe(Subscriber<super Integer> sub) {
                  Function<Integer, Integer> f = (i) -> i * 10;
       
                  pubOnPub.subscribe(new Subscriber<Integer>() {    // (3)
                      @Override
                      public void onSubscribe(Subscription s) {
                          sub.onSubscribe(s); // (7)
                      }
       
                      @Override
                      public void onNext(Integer integer) {
                          sub.onNext(f.apply(integer)); // (12)
                      }
       
                      @Override
                      public void onError(Throwable t) {
                          sub.onError(t);
                      }
       
                      @Override
                      public void onComplete() {
                          sub.onComplete();
                      }
                  });
              }
          };
      }
       
      private static Publisher<Integer> subOnPub(Publisher<Integer> mapPub) {
          return new Publisher<Integer>() {
              ExecutorService es = Executors.newSingleThreadExecutor(new CustomizableThreadFactory() {
                  @Override
                  public String getThreadNamePrefix() {
                      return "sub";
                  }
              });
       
              @Override
              public void subscribe(Subscriber<super Integer> sub) {
                  es.execute(() -> mapPub.subscribe(sub));  // (2)
              }
          };
      }
       
       
      private static Subscriber<Integer> logSub() {
          return new Subscriber<>() {
              @Override
              public void onSubscribe(Subscription s) {
                  log.info("onSubscribe"); 
                  s.request(Long.MAX_VALUE); // (8)
              }
       
              @Override
              public void onNext(Integer i) {
                  log.info("onNext: " + i);  // (13)
              }
       
              @Override
              public void onError(Throwable t) {
                  log.info("onError");
              }
       
              @Override
              public void onComplete() {
                  log.info("onComplete");
              }
          };
      }
      cs
       
       

    2-4  구독 흐름

    • (1) :  
    • 처음 구독이 흐르는 시점은 subscribe()를 통해 구독자를 호출하는 시점이다.   - main Thread 
    • (2) :  
      subOnPub에 subscribe 메서드가 실행이 되는데 이때 별도의 쓰레드에서  mapPub에 subscribe가 실행되게 해준다.  - sub Thread
    • (3) :  
      mapPub에 subscribe는 pubOnPub에 subscribe를 호출한다.  - sub Thread

    • (4) : 
      pubOnPub에 subscribe는 iterPub에 subscribe를 호출한다.  - sub Thread

    • (5) : 
      여기서의 sub 파라미터는 pubOnPub에 Subscriber 이다.  
      구독이 시작되면 몇 개의 데이터를 요청할지 ( request )를  전달해야 한다.   sub.onSubscribe()에 의해 pubOnPub에 있는 onSubscribe가 호출이 될 것이다.  - sub Thread

    • (6) : 
      해당 sub ( Subscriber ) 파라미터는 mapPub에 Subscriber 이다. mapPub에 onSubscribe가 호출이 될 것이다. - sub Thread

    • (7) :  
      여기서의 sub 파라미터는 logSub이 된다. 이제 logSub에 onSubscribe가 호출이 될 것이다. 
      subOnPub은 (2)번에서 별도의 쓰레드에서 mapPub이 실행되게 해주는 역할이다.  - sub Thread

    • (8) : 
      요청을 제한 없이 하기 위해 Long.MAX_VALUE 로 request 값을 전달한다. - sub Thread

    • (9) : 
      request 를 호출했으니 이제 데이터가 흐르게 된다. - sub Thread

    • (10) :
      sub.onNext(1)이 먼저 호출이 될 것이다.  여기서의 sub 파라미터는 pubOnPub에서 구현했던 Subscriber이다. - sub Thread

    • (11) :  
      다음 mapPub에서 구현한 Subscriber가 onNext()를 실행할텐데 이때 pub1에 쓰레드로 흐름이 바뀐다. - pub Thread


    • (12):
      mapPub은 정의한 Function에 의해 곱하기 10을 해서 다음 sub에 onNext로 흐르게 된다.  
      여기서의 Subscriber는 logSub에서 구현한 Subscriber이고  logSub에 onNext()로 가게 된다. - pub Thread


    • (13) :
      이제 log.info로 데이터를 찍을때 해당 pub이라는 Thread에서 흐르는 것을 알 수 있다. onNext(2) ~ onNext(5) 도 마찬가지로 진행이 된다. - pub Thread

    2.5. 결과

     

     

Designed by Tistory.