어느덧 11월의 마지막이 다가왔다.

코딩 공부를 시작한게 8월인데 벌써 4개월이 지났다니
믿기지 않는다.. 그만큼 많이 발전도 했지만
아직도 많이 부족하다고 느끼기에 시간이 더욱더 간절하게 느껴진다.

오늘은 어제배운 Reactive 프로그램의 스트림즈의 구현체인
Project Reactor에 대해 공부해보는 시간이다.


Project Reactor

Project Reactor 란?

Project Reactor 줄여서 Reactor라고 부른다. 리액티브 스트림즈의 구현체 중 하나로써
Reactive 기반 Spring Web Application을 만들때 사용하는 핵심기술이다.

리액티브 프로그래밍은 Non-Blocking통신을 지원하며, Non-Blocking이 핵심적인 특징이다.
간단하게 얘기하지면 요청 쓰레드가 차단이 되지 않는다 정도로 알면될 것 같다.

Publisher 타입으로 Mono[0|1]와 Flux[N]이라는 두 가지 타입을 제공한다.


Marble Diagram

1). Mono

image

Marble Diagram으로 동그란것 하나를 데이터라 생각하면 쉬울 것 같다.

흐름 순서는 위에 다이어그램을 보면 이해가 쉬울 것 같다.
Mono 시퀀스가 시작되고 데이터를 emit한다. Mono는 Mono[0|1] 즉, 0건 또는 1의
데이터만 emit 하는 Reactor 타입이기 때문에 위와 같이 표현되었다.

그리고 Mono 시퀀스가 종료되고, Operator를 통해 데이터가 가공된다.
그리고 Downstream의 타임라인으로 전달되어
정의되어있는 해당 시퀀스를 실행하고 종료하게 되어진다.

코드로 보면 이렇게 볼 수 있을 것 같다.

public class ReactiveExample {
    public static void main(String[] args) {
        Mono
                .just("Hello, Reactive")
                .map(s -> s.toUpperCase())
                .subscribe(
                        message -> System.out.println(message),
                        error -> System.out.println(error.getMessage()),
                        () -> System.out.println("Complete")
                );
    }
}
// 출력
HELLO, REACTIVE
Complete

위에 코드를 보면 .map();메서드가 Operator의 역할이다.
just();에서 데이터를 emit한 후 .UpperCase(); 대문자로 변환시키고 있다.
그리고 최종적으로 Downstream쪽에서 데이터를 전달해
.subscribe(); 메서드로 전달받은 데이터를 처리해준 모습을 볼 수 있다.
해당 상황은 우선 Error가 발생하지 않은 경우를 테스트 해보았다.

이제 만약 Downstream에서 Error가 발생했을 경우
비정상적인 종료를 한번 확인해보자

public class ReactiveExample {
    public static void main(String[] args) {
        Mono
                .just("Hello, Reactive")
                .map(s -> s.split(" "))
                .subscribe(
                        message -> System.out.println(message[3]),
                        error -> System.out.println("! Error : "+ error.getMessage()),
                        () -> System.out.println("Complete")
                );
    }
}
// 출력
! Error : Index 3 out of bounds for length 2
Complete

위에 예제 코드를 보면 단순히 String을 배열로 쪼개서
Downstream으로 넘겨주는 코드이다.
받았을때 공백기준으로 나눴기때문에 배열방에는 2개만 존재하고
범위를 벗어나는 데이터를 조회하려고 명령을 내렸을 경우
error를 처리해주는 메서드를 만들어 표현할 수 있다.


2). Flux

image

기본 적인 흐름은 Mono와 전부 동일하다.
다른 점은 처리해야할 데이터가 Flux[N]개로
여러개의 데이터를 emit할 수 있는 모습을 다이어그램에서 볼 수 있다.


Scheduler

Reactor에서 Scheduler는 쓰레드를 관리하는 관리자 역할이다.
즉, Reactor Sequence 상에서 처리되는 동작들을 하나 이상의 쓰레드에서
동작하도록 별도의 쓰레드를 제공해준다고 생각하면 쉬울 것 같다.

Reactor는 기본적으로 Non-Blocking 통신을 위한 비동프로그래밍을 위해
탄생했기 때문에 여러 쓰레드를 손쉽게 관리해주는 Scheduler의 역할이 중요하다 할 수 있다.

간단한 코드로 비교예제를 살펴보자 먼저 Scheduler를 추가하지 않았을 경우를 살펴보자

// Scheduler를 추가하지 않았을 경우
@Slf4j
public class SchedulersExample {
    public static void main(String[] args) {
        Flux
            .range(1, 10)
            .filter(n -> n % 2 == 0)
            .map(n -> n * 2)
            .subscribe(data -> log.info("# onNext: {}", data));
    }
}
20:51:36.461 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:51:36.481 [main] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 4
20:51:36.483 [main] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 8
20:51:36.483 [main] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 12
20:51:36.483 [main] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 16
20:51:36.483 [main] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 20

해당 프로그램을 실행시켰을때 콘솔에서 확인할 수 있는 로그이다.
Scheduler를 추가하지 않고 기존과 동일하게 사용하였고
여기서 알 수 있는 점은 [main]쓰레드를 사용하고 있다는 점이다.


Scheduler를 적용한 코드를 확인해보자

// Scheduler를 사용하였을 경우
@Slf4j
public class SchedulersExample {
    public static void main(String[] args) throws InterruptedException {
        Flux
                .range(1, 10)
                .subscribeOn(Schedulers.boundedElastic())
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
                .filter(n -> n % 2 == 0)
                .map(n -> n * 2)
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}
20:53:19.592 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:53:19.623 [main] INFO com.codestates.example.schedulers.SchedulersExample02 - # doOnSubscribe
20:53:19.630 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 4
20:53:19.631 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 8
20:53:19.631 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 12
20:53:19.631 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 16
20:53:19.631 [boundedElastic-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 20

위에 코드는 Scheduler를 사용하였을때의 코드이다.
첫번째 코드와 차이점은 subscribeOn();, doOnSubscribe();
Operator를 추가해서 사용을 하고 있다는 점이다.

subscribeOn(); Operator는 Schedulers.boundedElastic()와 같은
Scheduler를 지정하면 구독 직후에 실행되는 쓰레드가 main 쓰레드에서 해당 쓰레드로 변경된다.

doOnSubscribe();Operator는 구독 발생 직후에 Trigger되는 Operator로
구독 직후에 어떤 동작을 수행하고 싶을때 사용하는 Operator이다.

출력 결과를 보면 알 수 있듯이 [main]쓰레드에서 진행 중이던 흐름이
[boundedElastic-1] 쓰레드로 변경된 모습을 로그로 확인해 볼 수 있다.


마지막으로 예제하나만 더보고 넘어가자

@Slf4j
public class SchedulersExample {
    public static void main(String[] args) throws InterruptedException {
        Flux
            .range(1, 10)
            .subscribeOn(Schedulers.boundedElastic())
            .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))

            .publishOn(Schedulers.parallel())  // (1)
            .filter(n -> n % 2 == 0)
            .doOnNext(data -> log.info("# filter doOnNext"))  // (2)

            .publishOn(Schedulers.parallel())    // (3)
            .map(n -> n * 2)
            .doOnNext(data -> log.info("# map doOnNext")) // (4)

            .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(100L);
    }
}
21:03:32.441 [main] INFO com.codestates.example.schedulers.SchedulersExample - # doOnSubscribe
21:03:32.448 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample - # filter doOnNext
21:03:32.448 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample - # filter doOnNext
21:03:32.448 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample - # filter doOnNext
21:03:32.448 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # map doOnNext
21:03:32.448 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample - # filter doOnNext
21:03:32.448 [parallel-2] INFO com.codestates.example.schedulers.SchedulersExample - # filter doOnNext
21:03:32.448 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 4
21:03:32.451 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # map doOnNext
21:03:32.451 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 8
21:03:32.451 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # map doOnNext
21:03:32.451 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 12
21:03:32.451 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # map doOnNext
21:03:32.451 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 16
21:03:32.451 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # map doOnNext
21:03:32.451 [parallel-1] INFO com.codestates.example.schedulers.SchedulersExample - # onNext: 20

해당코드가 이전과 다른점은 publishOn();이라는 Operator가 추가된 예제이다.
publishOn();publishOn(); 기준으로 Downstream쪽 쓰레드가
publishOn();에서 Scheduler로 지정한 쓰레드로 변경이된다. (Schedulers.parallel())

doOnNext(); Operator는 바로앞에 위치한 Operaotr가 실행될 때
Trigger되는 Operator이다. 위에선 filter와 map을 구분짓기위해 사용했다.

콘솔로그에서 볼 수 있듯이 [main]쓰레드에서 [parallel-2]로 쓰레드가 변경되었고
[parallel-1]로 변경된 모습을 볼 수 있다. 내부적으로 [parallel] 쓰레드의 순서는 보장되지 않는다.


Operator

이전부터 계속 Operator를 사용한는 코드들을 보았다.
그만큼 Reactor와 Operator를 뗄래야 뗄 수 없는 관계이다.

Operator 관련해서는 사실 지원하는 종류가 너무 많기 때문에
부가적인 Advance 공부는 추후에 해봐야할 것 같고
목록만 우선 정리 해놓으려고 한다.

그 중에서 자주사용하고 중요한 것 들을 보라색으로 표시해둔다.

새로운 Sequence를 생성하고자 할 경우 (Creating)

  • just()
  • fromStream()
  • fromIterable()
  • fromArray()
  • range()
  • interval()
  • empty()
  • never
  • defer()
  • using()
  • generate()
  • create()

기존 Sequence에서 변환작업이 필요한 경우 (Transforming)

  • map()
  • flatMap()
  • concat()
  • collectList()
  • collectMap()
  • merge()
  • zip()
  • the()
  • switchIfEmpty
  • and()
  • when()

Sequence에서 내부의 동작을 확인하고자 할 경우 (Peeking)

  • doOnSubscribe
  • doOnNext()
  • doOnError()
  • doOnCancel()
  • doFirst()
  • doOnRequest()
  • doOnTerminate()
  • doAfterTerminate()
  • doOnEach()
  • doFinally()
  • log()

Sequence에서 데이터를 걸러야할 경우 (Filtering)

  • filter()
  • ignoreElements()
  • distinct()
  • take()
  • next()
  • skip()
  • sample()
  • single()

에러를 처리하고자할 경우 (Handling errors)

  • error()
  • timeout()
  • onErrorReturn()
  • onErrorResume()
  • onErrorMap()
  • doFinally()
  • retry()



오늘은 어제보다 조금더 Spring WebFlux에 대해 알아보았고
사실 아직 Spring MVC 어떻게 대체해서 사용해야할지 크게
감이 잡히지는 않는다. 뭔가 Stream API와 비슷한 느낌이 들어 친숙하긴하지만
개념들이 많이 다른 것 처럼 느껴져서 어렵고 생소했다.

내일 우리가 구현했던 Spring MVC 대신
Spring WebFlux 기술을 활용해서 한번 적용해보자

오늘 공부는 여기서 끝 !!


오늘의 커피량: ☕️ ☕️
오늘의 점심: 제육볶음, 김치찌개, 밥