개발/Java

Java Future, ExecutorService 로 NonBlocking 구현

승리승리 2022. 3. 4. 00:59

Future, ExecutorService로 NonBlocking을 구현하고자 한다.

 

이전 내용들과 동일하게 API통신을 예시로 들려한다.

 

Future예시

@PostMapping("/future01")
public ResponseEntity<ResponseData> future01() {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    log.trace("Call future01.");
    restTemplate = restTemplateConfig.restTemplate(
            restTemplateConfig.factory(
                    restTemplateConfig.httpClient()
            )
    );
    // 시간 측정을 위해 선언
    final StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    ResponseEntity responseEntity = null;
    Future<String> future = null;
    String body = "http://localhost:18080/future01 - Call future01.";
    for(int i = 0; i < 3; i++) {
        // Response Server API 호출
        future = (Future<String>) executorService.submit(() -> {
            log.info("executorService.submit() Call");
            restTemplateService.write(restTemplate, url, HttpMethod.POST, body).getBody().toString();
        });
    }
    stopWatch.stop();
    log.info("Total Second : {}", stopWatch.getTotalTimeSeconds());
    try {
        // false
        if(future.isDone()) {
            // Blocking
            log.info("future.get() : {}", future.get(5000L, TimeUnit.MILLISECONDS));
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
    return responseEntity.status(HttpStatus.OK).body(responseData);
}

되게 간단하다.

 

future = (Future<String>) executorService.submit(() -> {
    log.info("executorService.submit() Call");
    restTemplateService.write(restTemplate, url, HttpMethod.POST, body).getBody().toString();
});

Loop돌면서 ExecutorService의 submit으로 API통신 로직을 전달하였다.

 

전체 동작 시간은 3.578E-4가 나왔고 로그는 아래와 같다.

 

호출한 Server 로그

22:36:39.466  ...APIController   : Total Second : 3.578E-4
22:36:39.469  ...APIController   : executorService.submit() Call
22:36:41.492  ...APIController   : executorService.submit() Call
22:36:43.514  ...APIController   : executorService.submit() Call

 

호출된 Server 로그

22:36:39.477  ...JsonContoller      : errorMsg : http://localhost:18080/future01 - Call future01.
22:36:41.496  ...JsonContoller      : errorMsg : http://localhost:18080/future01 - Call future01.
22:36:43.517  ...JsonContoller      : errorMsg : http://localhost:18080/future01 - Call future01.

 

호출한 Server 로그를 보면 알 수 있듯이 전체 수행 시간을 출력한 뒤 API호출을 수행했다.

 

그 후 API호출을 딜레이 없이 연달아 호출하였고

 

호출된 Server 로그를 통해 정상적으로 2초 간격을 두고 수행됨을 확인할 수 있다.

 

모든 동작이 끝난 후 결과가 저장된 future객체의 get()를 이용해 결과를 꺼내올 수 있는데 이는 Blocking작업이다.

 

이를 Callback처리하는 예시를 위해 FutureTask<T>를 이용해서도 구현해봤다.

 

@PostMapping("/future02")
public ResponseEntity<ResponseData> future02() throws InterruptedException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    log.trace("Call future02.");
    restTemplate = restTemplateConfig.restTemplate(
            restTemplateConfig.factory(
                    restTemplateConfig.httpClient()
            )
    );
    // 시간 측정을 위해 선언
    final StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    ResponseEntity responseEntity = null;
    FutureTask<String> futureTask = null;
    String body = "http://localhost:18080/future01 - Call future02.";
    futureTask = new FutureTask<>(() ->
            restTemplateService.write(restTemplate, url, HttpMethod.POST, body).getBody().toString()){
        @Override
        protected void done() {
            log.info("done call*************");
        }
    };
    for(int i = 0; i < 3; i++) {
        // Response Server API 호출
        executorService.execute(futureTask);
    }
    stopWatch.stop();
    log.info("Total Second : {}", stopWatch.getTotalTimeSeconds());
    return responseEntity.status(HttpStatus.OK).body(responseData);
}

API호출을 Callable로 정의해서 FutureTask객체에 정의에 담아줬다.

 

이때 done함수를 오버라이드 하면서 완료 시 호출할 작업을 명시해줬다.

 

이제 API호출을 수행해보면 아래와 같이

 

호출한 Server 로그

23:44:47.608  ...APIController   : Total Second : 0.0044465
23:44:49.641  ...APIController   : done call*************

 

호출된 Server 로그

23:44:47.627  ...JsonContoller      : errorMsg : http://localhost:18080/future01 - Call future02.

한번 호출하고 말아버린다.

 

?!

 

이에 호출부를 Loop문 밖에서 세번 연속 호출해보았다.

executorService.execute(futureTask);
executorService.execute(futureTask);
executorService.execute(futureTask);

 

하지만 여전히 한번 호출하고 말아버린다.

 

그럼 FutureTask가 문제겠지? 하고는 아래와 같이 FutureTask를 매번 새롭게 만들어 전달했다.

executorService.execute(new FutureTask<>(() ->
        restTemplateService.write(restTemplate, url, HttpMethod.POST, body).getBody().toString()){
    @Override
    protected void done() {
        log.info("done call*************");
    }
});

호출한 Server 로그

 

00:47:39.082  ...APIController   : Total Second : 7.885E-4
00:47:41.101  ...APIController   : done call*************
00:47:41.103  ...APIController   : done call*************
00:47:41.164  ...APIController   : done call*************

호출된 Server 로그

00:47:39.096  ...JsonContoller      : errorMsg : http://localhost:18080/future01 - Call future02.[0]
00:47:39.096  ...JsonContoller      : errorMsg : http://localhost:18080/future01 - Call future02.[2]
00:47:39.101  ...JsonContoller      : errorMsg : http://localhost:18080/future01 - Call future02.[1]

 

전체 수행 시간을 출력한 뒤에 API통신 완료 후 done()함수를 호출한 것을 알 수 있다.

 


 

비즈니스 로직에 NonBlocking 적용은 얼추 끝난거 같으니 이제 Event-Driven을 해야겠다.

 

예전 회사에서는 vert.x를 적용해서 Event-Driven을 사용하였다.

 

각 Service를 JobList에 등록하고 이를 꺼내어 처리하는 방식이었는데

 

WebFlux에서는 이를 어떻게 사용하는지 찾아보고 실습해야겠다.

'개발 > Java' 카테고리의 다른 글

Socket(Selector)으로 Blocking, NonBlocking 구현  (0) 2022.02.07
WebFlux로 Blocking, NonBlocking 구현  (0) 2022.01.28
동기와 비동기, Blocking과 Non Blocking  (0) 2022.01.22
Reactive란  (0) 2022.01.21
[java] dual pivot quick sort/정렬  (0) 2021.08.11