본문 바로가기

빗썸 테크코스 아카데미/1주차(Event Driven)

3일차 리액티브 프로그래밍 오퍼레이션

Sequence 생성

Just

시퀀스를 생성하는 가장 쉬운 방법은 Flux.just()를 사용하는 것이다. just() 메서드는 시퀀스로 사용할 데이터가 이미 존재할 때 사용한다. 다음은 사용 예이다.

내부 구조를 살펴보면 다음과 같다. 전달받은 값이 빈값이라면 빈플럭스를 반환하고 그게 아니라면 FluxArray를 반환한다.

 

위에도 확인할수 있는 사실은 3가지가 존재한다.

1. Flux, Mono는 Publisher를 구현한 클래스들이고 Sequence(데이터 흐름)를 생성한다.

2. Publisher와 Subscriber구조에서 볼수 있듯이, Sequence는 subscriber에 의해 subscribe되어 데이터가 처리 된다.

3. 이후 subscriber은 request를 통해 전달받은 데이터의 수를 제한하여 전달 받을 수 있고, onNext()를 통해 다음 데이터를 전달받아 처리하며, 작업 완료시 onComplete()를 호출한다.

 

StepVerifier

 

  • Reactor-test 프로젝트에 속한다
  • Publisher 를 구독하면서 예상값과 순서를 검증(Assert)할 수 있다
  • 검증이 실패할 경우, AssertionError 를 발생시킨다
  • create메서드로 인스턴스를 생성한다
  • 반드시 verify() 메서드(=StepVerifier.LastStep)를 호출해야 한다
  • 보통 다음과 같은 형태이다
StepVerifier.create(T<Publisher>)
.{expectations...}
.verify()

실 예제를 통해 한번 확인해보자

다음 테스트를 통해 우리는 2가지 사실 알수 있다.

1. expectNext는 예측한 값인 파라미터를 Stream으로 변환하고 Subscribe되는 sequence data와 비교 한다.

이후 List<Event<T>>를 생성하고 sequence를 반환한다.

아래를 보자

 

2. Verify() Method는 전달받은 sequence값들을 예측한 값들과 Validate()라는 Method를 사용하여 비교하는 Method이다. 

verify는 시간을 전달받아 시간을 조절할수 있으며, test를 유발하기 위해서는 항상 불러야 하는 Method이다.

Verify()를 호출하기 위한 2가지 방법

ExcpectError() + verify()

verifiyComplete()

위에서 보았듯이 Subscriber은 sequence의 데이터를 다 처리할시 onComplete()를 호출한다. 아니면 다 처리되지 않은 상태에서 error가 발생해 마무리 된다면 onError()를 호출한다. 

따라서 결국에는 Sequence의 마지막 call Method가 onError()인지 onComplete()인지 확인하기 위해서 두가지중 한가지의 방법을 사용하여야 한다. 

 

Scheduler

subscribeOn()

위의 코드들을 돌려보면 발견 할수 있는 사실은 대부분이 "main" 스레드에서 출력되었다는 점인데, 이는 subscribe() 메소드의 호출자가 main 스레드라는 의미이다. 이를 통해 중요한 사실을 알 수 있게 된다: Reactor는 스레드를 지독히도 절약하는데, 이는 가능한 최대의 성능을 낼 수 있도록 보장하기 위해서이다. 사실 이 말은 지난 5년간, 서비스의 성능을 조금이라도 더 쥐어짜기 위해 스레드와 스레드풀, 그리고 비동기 처리에 대해 논쟁한 적이 있는 사람에게는 매우 이상하게 들릴 것이다. 하지만 이것은 사실이다: 아무리 JVM이 여러 개의 스레드에 대한 동시 처리를 최적화할 수 있다 하더라도, 스레드들 간의 명시적인 스위칭없이, 단일 스레드 내에서 처리하는 것이 언제나 더 빠르다. Reactor는 모든 비동기 처리를 제어하는 열쇠를 사용자에게 건내주고, 사용자가 무엇을 해야할지 알고 있다고 가정한다.

Flux는 스레드 바운더리를 제어할 수 있는 몇 개의 설정 메소드들을 제공한다. 예를 들어, Flux.subscribeOn() 메소드를 이용하여 구독이 백그라운드 스레드에서 처리되도록 설정할 수 있다:

subscription on background thread

이 코드의 실행 결과는 다음과 같다.

output for parallel subscribing
 

이 코드를 직접 작성했든 아니면 Copy and Paste를 했든, JVM이 종료되기 전에 이 처리가 끝날 때 까지 기다려야(wait) 한다.

로그에서 구독 뿐만 아니라 각각의 처리들이 하나의 백그라운드 스레드(parallel-1-1)에서 발생했음을 확인할 수 있다 - 이는 우리가 코드에서 Flux에 대해 백그라운드로 구독할 것을 요청했기 때문이다. 각 아이템에 대한 처리가 CPU 집약적인 작업이라면 이렇게 해도 무방하다.(하지만 백그라운드 스레드를 사용했다고 해도, 컨텍스트 스위치에 대한 비용을 지불했지만 처리가 빨라지지는 않기 때문에 무의미한 일이라고 할 수 있다.) 또, I/O 집약적인, 그리고 아마도 블록킹 I/O를 사용하는 아이템 처리를 하고 싶을 수도 있을 것이다. 그리고 이 경우, 호출자가 대기하지 않도록 최대한 빨리 처리하고 싶을 것이다. 이 경우에도 우리의 친구 스레드풀을 이용할 수 있는데, 이 때에도 Schedulers.parallel() 메소드를 호출하면 된다. 각각의 아이템들의 처리를 (스레드풀의 제한 범위 내에서) 별도의 스레드가 수행하도록 변경하려면, 하나의 Flux를 별도의 분리된 배포자로 나누어야 하고, 각각의 배포자들이 각각의 백그라운드 스레드에서 수행되도록 설정해야 한다. 이를 위해 flatMap() 이라는 오퍼레이터를 이용할 수 있는데, 이것은 각 아이템들을 하나의 (다른 타입일 수도 있는) Publisher로 변경하고, 그 새로운 타입의 시퀀스로 돌아간다:

 

publishOn()

Flux에는 또, publishOn() 이라는 메소드가 있는데, 이는 구독자 자신을 위한 것이 아니라 리스너를 위한 메소드이다. (즉, onNext() 혹은 소비자를 위한 콜백이다.

callback for listeners

이 결과는 다음과 같다.

result of publishOn

여기서 소비자의 콜백("Consumed:..." 로그)가 pub-1-1이라는 구독자용 스레드에서 실행된 것을 확인할 수 있다. 만약 이 코드에서 subscribeOn()을 제거한다면 데이터의 두 번째 덩어리(chunk)들도 pub-1-1 스레드에서 처리되는 것을 확인할 수 있을 것이다. 다시 한 번 얘기하지만, Reactor는 스레드를 절약하는 경향이 있기 때문에, 명시적으로 스레드를 스위치해 달라는 요청이 없다면 현재 사용하고 있는 스레드를 지속적으로 사용하게 된다.