개발/Java

WebFlux로 Blocking, NonBlocking 구현

승리승리 2022. 1. 28. 01:58

WebFlux

먼저 WebFlux란 Spring 5에 새로게 추가된 모듈이다.

 

Reactive한 개발을 도와주는 모듈NonBlocking에 ReactiveStream을 지원한다.

 

이전에도 비동기/NonBlocking을 구현하는 방법들은 존재했지만 무언가 부족했나보다.

 

구현 기술이 어려워서였을까?

 

이 부분에 대해서는 다른 NonBlocking 구현을 수행하면서 그 차이점을 한번에 정리하고자 한다.

 

Spring MVC는 아래와 같은 흐름을 가진다.

request - Dispatcher Servlet - Handler Mapper - Controller - B/L - Controller - ViewResolver ...

 

Spring WebFlux는 아래와 같은 흐름을 가진다.

request - HttpHandler - WebHandler - Handler Mapper / Handler Adapter - Controller - B/L - Controller - ViewResolver ...

 

WebFluxMVCDispatcherHttp와 Web Handler로 나누었다는 차이가 있다.

 

WebFlux는 Single Thread를 기반으로한 Event Loop방식을 이용해서 Non-Blocking을 구현한다.

(나중에 작성할 vert.x와 동일하다.)

https://luminousmen.com/post/asynchronous-programming-blocking-and-non-blocking

Client로 부터 온 각 요청은 Event Loop에 등록되고

 

Event Loop를 계속해서 탐색하면서 등록된 요청을 호출한다.

 

이때 호출 된 요청의 응답을 기다리지 않고 다음 요청을 처리하고

 

응답이 오면 이를 Client에게 반환한다.

 

 

먼저 API 통신을 Blocking으로 구성하고 테스트를 수행하고자 한다.


아래에 작성하는 코드들은 아래의 Git에서 확인할 수 있다.

정리가 안된 코드라 많이 난잡하다.

Socket Test까지만 작성하고 한번 정리 해놔야겠다.

https://github.com/victory0602/NonBlockingTest.git

 

GitHub - victory0602/NonBlockingTest

Contribute to victory0602/NonBlockingTest development by creating an account on GitHub.

github.com

 

 

통신을 받아줄 Server(이하 ResponseServer)에서는 아래의 API를 정의해주었다.

// 아래 API는 다른 Server에 추가하여 API Test 통신을 받아준다.
@PostMapping("/apiTest")
// 평소 Server 구현 시 ResponseEntity에 ResponseData라는 객체를 담아 처리하기에 이를 그대로 가져왔다.
public ResponseEntity<ResponseData> apiTest(@RequestBody HashMap<String, Object> paramMap) throws InterruptedException {
    log.trace("Call apiTest.");
    // ResponseServer에서는 전달받은 Data에서 errorMsg를 추출해서 로그로 뿌려주면서 호출 Server를 확인해주고
    String errorMsg = paramMap.get("errorMsg").toString();
    log.info("errorMsg : {}", errorMsg);
    
    // 응답으로 전달할 메시지를 저장한뒤
    responseData.setErrorMsg("http://localhost:8080/apiTest - Call apiTest.");
    // 2초의 간격을 주었다.
    Thread.sleep(2000);
    return ResponseEntity.status(HttpStatus.OK).body(responseData);

Blocking(RestTemplate)

이제 ResponseServer를 호출할 Request Server에서 아래와 같은 Blocking 호출을 작성하였다.

import org.springframework.web.client.RestTemplate;

/*
* Blocking 통신 구현을 위한 RestTemplate 선언
* HTTP Server와의 통신을 단순화하고 RESTful 원칙을 지킨다.
*/
ApplicationContext restTemplateAC = new AnnotationConfigApplicationContext(RestTemplate.class);
RestTemplate restTemplate = restTemplateAC.getBean(RestTemplate.class);

@PostMapping("/blocking")
public ResponseEntity<ResponseData> Blocking() {
    log.trace("Call blocking.");
    // 시간 측정을 위해 선언
    final StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    // Set Header
    HttpHeaders headers = new HttpHeaders();
    headers.add("Content-Type", "application/json");
    responseData.setErrorMsg("http://localhost:18080/blocking - Call blocking.");
    HttpEntity<ResponseData> responseDataHttpEntity = new HttpEntity<>(responseData, headers);
    ResponseEntity responseEntity = null;
    for(int i = 0; i < 3; i++) {
        // Response Server API 호출
        responseEntity = restTemplate.exchange(url, HttpMethod.POST, responseDataHttpEntity, ResponseData.class);
        String body = responseEntity.getBody().toString();
        log.info("body : {}", body);
    }
    stopWatch.stop();
    log.info("Total Second : {}", stopWatch.getTotalTimeSeconds());
    return responseEntity.status(HttpStatus.OK).body(responseData);
}

내가 RestTemplate도 이번에 처음 사용해봐서 import문을 작성해두었다.

 

RestTemplate는 Blocking I/O 기반의 동기식 API이며 보통 build()함수를 이용해서 갖가지 설정을 수행하고

생성에 소요되는 시간을 줄이기 위해 Factory방식으로 사용한다.

 

여기서는 단순 Blocking Test이므로 Factory를 이용한 생성은 생략하고 등록된 Bean을 꺼내와 사용하였다.

 

A. 먼저 Post Man을 이용하여 RequestServer로 Blocking API를 호출했다.

 

B. 그러면 위와 같이 RequestServer의 Blocking API가 호출된다.

 

for Loop를 이용해 3번 호출하였고 ResponseServer에서 전달해준 응답 값을 출력해주는 Log와 총 소요시간을 확인할 수 있다.

 

C. ResponseServer에서는 Log를 통해 RequestServer의 Blocking API를 통해 호출되었음을 알 수 있다.

 

두 Server의 Log를 통해 호출 순서를 알 수 있다.

 

B : ResponseServerLog : 18:33:38.034

C : RequestServerLog : 18:33:40.050

 

B : ResponseServerLog : 18:33:40.084

C : RequestServerLog : 18:33:42.097

 

B : ResponseServerLog : 18:33:42.127

C : RequestServerLog : 18:33:44.137

 

ResponseServerLog : 18:33:44.138 -> 전체 소요시간 출력

 

위 Log를 통해 호출 순서가 아래와 같음을 알 수 있다.

ResponseServer에서 RequestServer로의 호출 B가 이루어지고 ResponseServer는 Blocking이 걸려 아무런 작업을 할 수 없는 상태가 된다.

 

이후 에서 응답이 오는 C 이후 

log.info("body : {}", body);

를 호출하게 된다.

 

Blocking의 증거는 Post Man에서도 확인할 수 있다.

위 사진을 보면 호출 후 응답이 올 때까지 Blocking이 걸렸기에 바로 이어서 호출할 수 없음을 확인할 수 있다.

 

 

총 소요 시간은 약 6초가 출력되었다.


Blocking이 들어간 NonBlocking(WebClient) -> 결국 Blocking

WebClient를 이용한 NonBlocking작성 시 Blocking이 섞여있다면 Blocking이나 다름 없다.

 

@PostMapping("/nonBlocking/webcline01")
public Mono<ResponseEntity<ResponseData>> NonBlockingWebClient01() {
    log.trace("Call nonBlocking webcline01.");
    WebClient webClient = WebClient.create("http://localhost:8080");
    final StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    responseData.setErrorMsg("http://localhost:18080/nonBlocking - Call nonBlocking webcline01.");
    Mono<ResponseData> responseEntityMono = null;
    for(int i = 0; i < 3; i++) {
        responseEntityMono = webClient.post()
                .uri("/apiTest")
                .bodyValue(responseData)
                .retrieve()
                .bodyToMono(ResponseData.class);
        // block()메소드를 이용하여 결과 값을 가져오는데 여기서 Block이 걸린다.
        ResponseData body = responseEntityMono.block();
        log.info("body : {}", body.toString());
    }
    stopWatch.stop();
    log.info("Total Second : {}", stopWatch.getTotalTimeSeconds());
    return Mono.fromSupplier(() -> ResponseEntity.status(HttpStatus.OK).body(responseData));
}

WebClient를 create 메소드를 이용해서 생성하였다.

 

RestTemplate처럼 Factory방식으로 사용하지만 Test를 위해 다음에 해당 내용을 작성하도록 한다.

 

retrieve() 메소드나 exchange() 메소드를 호출해서 Body나 Status, Header 등을 가져올 수 있는데 나는 Body값을 가져오기 위해 retrieve()를 사용했다.

// block()메소드를 이용하여 결과 값을 가져오는데 여기서 Block이 걸린다.
ResponseData body = responseEntityMono.block();
log.info("body : {}", body.toString());

주석의 내용처럼 block()메소드를 호출하면서 응답 값을 받아오고자 한다.

 

이때 Blocking이 걸리면서 Request Server는 응답 값을 기다리면서 다른 작업을 수행하지 못하게 된다.

 

 

로그부분만 캡쳐한 사진이다.

 

호출 순서가 Blocking(RestTemplate)와 동일하다.

 

또한 이전 Test와 동일하게 Post Man의 연속 호출도 불가함으로 Blocking상태를 확인할 수 있다.

 

소요 시간도 약 6초임을 확인할 수 있다.


NonBlocking(WebClient)_바로 출력

위 내용에서 block()메소드를 제거하고 subscribe를 이용하여 NonBlocking으로 작성했다.

@PostMapping("/nonBlocking/webcline02")
public Mono<ResponseEntity<ResponseData>> NonBlockingWebClient02() {
    log.trace("Call nonBlocking webcline02.");
    WebClient webClient = WebClient.create("http://localhost:8080");
    final StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    responseData.setErrorMsg("http://localhost:18080/nonBlocking - Call nonBlocking webcline02.");
    Mono<ResponseData> responseEntityMono = null;
    for(int i = 0; i < 3; i++) {
        webClient.post()
                .uri("/apiTest")
                .bodyValue(responseData)
                .retrieve()
                .bodyToMono(ResponseData.class)
                // 호출 순서를 확인하기 위한 응답 값 출력
                .subscribe(response ->
                        log.info("body : {}", response));
    }
    stopWatch.stop();
    log.info("Total Second : {}", stopWatch.getTotalTimeSeconds());
    return Mono.fromSupplier(() -> ResponseEntity.status(HttpStatus.OK).body(responseData));
}

 

API호출 후 RequestServer와 ResponseServer의 Log를 확인해보면 아래와 같다.

 

RequestServerLog : 01:03:30.966 -> 전체 소요시간 출력

 

B : ResponseServerLog : 01:03:30.996

B : ResponseServerLog : 01:03:30.997

B : ResponseServerLog : 01:03:31.022

C : RequestServerLog : 01:03:33.023

C : RequestServerLog : 01:03:33.030

C : RequestServerLog : 01:03:33.047

 

RequestServer API를 호출하고 곧바로 전체 소요 시간을 출력했다.

 

이후 ResponseServer에서 작업(Log출력)이 수행되고 나서

 

RequestServer의 작업이 수행되었다.

 

위 사진과 같이 ResponseServer에서 BreakPoint를 걸었어도 Post Man에서는 계속해서 호출할 수 있는 상태이며 이를 통해 NonBlocking상태임을 확인할 수 있다.

 

수행시간은 Blocking작업과 다르게 0.0029867초가 출력되었다.


NonBlocking(WebClient)_응답 값 사용

아래는 응답 값을 받아 처리하도록 한 NonBlocking이다.

@PostMapping("/nonBlocking/webcline03")
public Mono<ResponseEntity<ResponseData>> NonBlockingWebClient03() {
    log.trace("Call nonBlocking webcline03.");
    WebClient webClient = WebClient.create("http://localhost:8080");
    final StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    Mono<ResponseEntity<ResponseData>> responseEntityMono = null;
    for(int i = 0; i < 3; i++) {
        responseData.setErrorMsg("http://localhost:18080/nonBlocking - Call nonBlocking webcline03(" + i + ")");
        responseEntityMono = webClient.post()
                .uri("/apiTest")
                .bodyValue(responseData)
                .retrieve()
                .toEntity(ResponseData.class);
    }
    /*
    * 비즈니스 로직 수행 ...
    * */
    responseEntityMono.subscribe(response ->
            log.info("body : {}", response));
    stopWatch.stop();
    log.info("Total Second : {}", stopWatch.getTotalTimeSeconds());
    return Mono.fromSupplier(() -> ResponseEntity.status(HttpStatus.OK).body(responseData));
}

응답 값을 toEntity(...)메소드를 이용하여 반환 받고

 

추후 필요할 때 꺼내어 사용하도록 하였다.

 

사용 전까지 비즈니스 로직을 작성하여 ResponseServer API호출이라는 작업을 신경쓰지 않고 비즈니스 작업을 수행할 수 있다.

 

위 코드는 하나의 객체에 API호출의 반환을 전부 담았기에 마지막 API 호출에 대한 반환만을 가지고 있게 된다.


 

위 내용들을 이전에 작업했던 내용들에 어떻게 적용해야할지 생각해봤는데

 

생각나는 모든 로직이 이전 작업의 결과물을 토대로 작업이 수행되는 Blocking작업 이기에 당장 어떻게 활용해야할지 

 

떠오르질 않는다.

 

NonBlocking에 대항 다양한 예시를 조금 더 찾아봐야겠다.

 

이후 다른 방식으로 NonBlocking을 작성하고

 

WebClient에 대한 내용을 작성해야겠다.

 

- 참고

https://binux.tistory.com/56

 

[Spring Reactive] WebClient

Web Reactive Stack 공식문서의 WebClient 부분을 읽고 해석하며 작성했습니다. Web on Reactive Stack The original web framework included in the Spring Framework, Spring Web MVC, was purpose-built for th..

binux.tistory.com

https://akageun.github.io/2019/06/23/spring-webflux-4-webclient.html

 

[Spring Webflux] 4. WebClient 사용해보기

언제나 개발하기를 즐기는 개발자 입니다.

akageun.github.io

https://happycloud-lee.tistory.com/220

 

Spring WebClient 쉽게 이해하기

1. Spring WebClient 이해 1) WHY ? 우리가 개발하는 어플리케이션들을 크게 2개로 나눠보면 요청자와 제공자라고 할 수 있습니다. 요청자를 consumer 또는 subscriber라고 하고, 제공자를 producer 또는 provider..

happycloud-lee.tistory.com

https://heeyeah.github.io/spring/2020-02-29-web-flux/

 

[Spring] WebFlux란?

WebFlux? Spring Framwork5에서 새롭게 추가된 모듈이다. web-flux는 client, server에서 reactive 스타일의 어플리케이션 개발을 도와주는 모듈이라고 한다. 스프링 공식문서에 있는, Spring WebFlux에 대한 소개와

heeyeah.github.io

https://alwayspr.tistory.com/44

 

Spring WebFlux는 어떻게 적은 리소스로 많은 트래픽을 감당할까?

위 그림은 DZone 게시글 중 하나인 Spring WebFlux를 이용한 Boot2와 Spring MVC를 이용한 Boot1을 비교한 그래프이다. 해당 그래프에서는 두 가지 특징을 볼 수 있다. 첫 번째로는 유저가 적을 때에는 성능

alwayspr.tistory.com