개발/Java

Socket(Selector)으로 Blocking, NonBlocking 구현

승리승리 2022. 2. 7. 02:22

Socket

저번에는 [WebFlux로 Blocking, NonBlocking 구현]을 작성 했었다.

 

순서가 꼬인거 같지만 이번에는 통신의 가장 기본이 되는 Socket통신으로 구현하려 한다.

 

Socket통신TCP/IP 프로토콜을 사용해서 통신을 수행하는 방법이다.

 

컴퓨터들이 서로 정보를 주고 받는 네트워크는 처리 역할에 따라 계층이 구분되는데 이를 OSI 7계층이라 한다.

 

이를 실제 업무에서 적용한 표준이 TCP/IP 4계층이다.

 

TCP/IP 4계층 역할 데이타 단위 전송 주소 예시 장비
응용 계층(Application) 응용프로그램 간의 데이타 송수신 Data/Message - 파일 전송, 이메일, FTP, HTTP, SSH, Telnet, DNS, SMTP 등 -
전송 계층(Transport) 호스트 간의 자료 송수신 Segment Port TCP, UDP, RTP, RTCP 등 게이트웨이
인터넷 계층(Internet) 데이타 전송을 위한 논리적 주소 지정 및 경로 지정 Packet IP IP, ARP, ICMP, RARP, OSPF 라우터
네트워크 연결 계층(Network Access) 실제 데이타인 프레임을 송수신 Frame MAC Ethernet, PPP, Token Ring 등 브리지, 스위치

Socket통신은 여기서 3계층과 2계층을 이용한 것이다. (링크)

 

Java는 Socket통신을 NonBlocking방식으로 사용하기 위해 nio(New I/O) 패키지를 제공한다.

  I/O NI/O
데이터 흐름 단방향
InputStream/OutputStream으로 구분해서 input/output 사용
양방향
SocketChannel/FileChannel 등 객체 하나로 input/output 사용
데이터 단위 Byte or Byte[] ByteBuffer

아래에 작성하는 코드들은 아래의 Git에서 확인할 수 있다.

정리가 안된 코드라 많이 난잡하다.

Socket Test까지만 작성하고 한번 정리 해놔야겠다. -> 정리 후 아래 코드와 내용이 조금 다를 수 있지만 기본 로직은 같다.

https://github.com/victory0602/NonBlockingTest.git

 

GitHub - victory0602/NonBlockingTest

Contribute to victory0602/NonBlockingTest development by creating an account on GitHub.

github.com

 


Blocking(I/O)

먼저 통신을 받아줄 Server(이하 ResponseServer)에서는 아래와 같이 SocketServer를 구성하였다.

 

단순하게 통신을 받고 출력해주고 2초 지연시키고 ResponseServer로 응답을 보낸다.

try {
	// ServerSocketChannel을 생성하고, 몇 가지 설정을 한다.
	ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
	serverSocketChannel.configureBlocking(true);
	serverSocketChannel.bind(new InetSocketAddress(28080));
	while (true) {
		// 블로킹 방식으로 설정했기 때문에, accept() 메서드에서 연결을 받을 때까지 블락된다.
		SocketChannel socketChannel = serverSocketChannel.accept();
		// 연결된 클라이언트와 입/출력하기.
		Charset charset = Charset.forName("UTF-8");
		ByteBuffer byteBuffer = ByteBuffer.allocate(128);
		socketChannel.read(byteBuffer);
		byteBuffer.flip();
		log.info("Received Data : " + charset.decode(byteBuffer).toString());
		byteBuffer = charset.encode("http://localhost:8080 - Socket Call Test.");
		Thread.sleep(2000);
		socketChannel.write(byteBuffer);
	}
} catch (IOException | InterruptedException e) {
	e.printStackTrace();
}

다른 Server와의 Port충돌을 피하고자 28080으로 설정하였다.


ResponseServer를 호출할 RequestServer에서 아래와 같이 Blocking 호출을 작성하였다.

 

원활한 Block Check를 위해 PostMan으로 해당 API를 호출하며 테스트하고자 API호출을 통해 실행될 수 있도록 하였다.

// 초기화
public void BlockingSocketInit(String ip, int port) {
    log.info("Call BlockingSocketInit.");
    try {
        blockingSocketChannel = SocketChannel.open();
        blockingSocketChannel.configureBlocking(true);
        blockingSocketChannel.connect(new InetSocketAddress(ip, port));
    } catch (IOException e) {
        errorMsg = e.getLocalizedMessage();
        errorCode = ErrorCode.CONNECTION_ERROR;
        log.error("[{}] {}",errorCode, errorMsg);
    }
}

// 쓰기와 읽기
public ResponseData BlockingSocket(String body) {
    log.info("Call BlockingSocket.");
    BlockingSocketInit("localhost", 28080);
    try {
        // Response Server 호출
        ByteBuffer byteBuffer = ByteBuffer.wrap(body.getBytes());
        blockingSocketChannel.write(byteBuffer);
        BlockingSocketRead();
        blockingSocketChannel.close();
    } catch (IOException e) {
        errorMsg = e.getLocalizedMessage();
        errorCode = ErrorCode.CONNECTION_ERROR;
        log.error("[{}] {}",errorCode, errorMsg);
    }
    responseData.setErrorCode(errorCode);
    responseData.setErrorMsg(errorMsg);
    responseData.setBody(body);
    return responseData;
}

// 읽기
public void BlockingSocketRead() {
    ByteBuffer byteBuffer = ByteBuffer.allocate(128);
    int readInt = 0;
    try {
        // 여기서 Block
        readInt = blockingSocketChannel.read(byteBuffer);
        if (-1 != readInt) {
            Charset charset = Charset.forName("UTF-8");
            byteBuffer.flip();
            String body = charset.decode(byteBuffer).toString();
            log.info("body : {}", body);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

 

 

A. PostMan을 이용하여 RequestServer로 SocketBlocking API를 호출

 

B. 그러면 위와 같이 RequestServerSocketBlocking API가 호출된다.

 

for Loop를 이용해 ResponseServer를 3번 호출하였고 ResponseServer에서 전달해준 응답 값을 출력해주는 Log와 총 소요시간을 확인할 수 있다.

C. ResponseServer에서는 Log를 통해 RequestServerSocketBlocking API를 통해 호출되었음을 알 수 있다.

 

두 Server의 Log를 통해 호출 순서를 알 수 있다.

B : RequestServerLog : 00:22:28.776

C : ResponseServerLog : 00:22:30.790

 

B : RequestServerLog : 00:22:30.791

C : ResponseServerLog : 00:22:32.800

 

B : RequestServerLog : 00:22:32.802

C : ResponseServerLog : 00:22:34.810

 

RequestServerLog : 00:22:34.810 -> 전체 소요시간 출력

 

위 Log를 통해 호출 순서가 아래와 같음을 알 수 있다.

 

 

ResponseServer에서 RequestServer로의 호출 B가 이루어지고 ResponseServer는 Blocking이 걸려 아무런 작업을 할 수 없는 상태가 된다.

 

이후 에서 응답이 오는 C 이후 

log.info("body : {}", body);

를 호출하게 된다.

 

위 사진을 보면 호출 후 응답이 올 때까지 Blocking이 걸렸기에 바로 이어서 호출할 수 없음을 확인할 수 있다.

 

 

총 소요 시간은 약 6초가 출력되었다.


NonBlocking(NI/O)

Blocking 코드에서는 read시 Block이 걸리는걸 확인할 수 있었다.

 

우선 요청을 전부 보내고나서 응답이 왔을 때 이에 반응할 수 있도록 해야한다.

 

이에 Blocking 코드에서 사용한 Channel을 Selector라는 이벤트 리스너에 등록하고

-> Selector 사용 없이 무한 루프를 이용할 수 있지만 그럴경우 CPU사용량이 많아지고 의미가 없기에 다루지 않는다.

 

ResponseServer의 응답 또는 요청이 있을 때 Selector에게 알려주는 방식으로 동작하게 한다.

 

요청을 받는 ResponseServer는 아래와 같이 변형된다.

ServerSocketChannel 생성 후 요청에 따른 행동을 수행하는 메인 로직

try {
	// ServerSocketChannel을 생성하고, 몇 가지 설정을 한다.
	ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
	serverSocketChannel.configureBlocking(false);
	serverSocketChannel.bind(new InetSocketAddress(28080));
	Selector selector = Selector.open();
	serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, null);
	while (selector.select() > 0) {
		Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
		while (iterator.hasNext()) {
			SelectionKey selectionKey = iterator.next();
			iterator.remove();
			if(selectionKey.isAcceptable()) {
				// 접속
				accept(selector, selectionKey);
			} else if(selectionKey.isReadable()) {
				// 수신
				receive(selector, selectionKey);
			} else if(selectionKey.isWritable()) {
				// 발신
				send(selector, selectionKey);
			}
		}
	}
} catch (IOException e) {
	e.printStackTrace();
}

 

접속을 확인하는 accept함수

public static void accept(Selector selector, SelectionKey selectionKey) {
	try {
		log.info("accept Start");
		// 키 채널 가져오기
		ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
		// 채널 가져오기
		SocketChannel socketChannel = serverSocketChannel.accept();
		// NonBlocking 설정
		socketChannel.configureBlocking(false);
		// 접속 Socket 단위로 사용되는 Buffer
		StringBuffer stringBuffer = new StringBuffer();
		stringBuffer.append("NonBlocking Socket Server. call accept");
		// Socekt채널을 channel에 송신 등록
		socketChannel.register(selector, SelectionKey.OP_WRITE, stringBuffer);
		log.info("accept End");
	} catch (IllegalArgumentException e) {
		e.printStackTrace();
	} catch (IOException e) {
		e.printStackTrace();
	}
}

데이터 수신 시 호출되는 receive함수

// 수신
public static void receive(Selector selector, SelectionKey selectionKey) {
	try {
		log.info("receive Start");
		// 키 채널 가져오기
		SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
		// NonBlocking 설정
		socketChannel.configureBlocking(false);
		ByteBuffer byteBuffer = ByteBuffer.allocate(128);
		// 데이터 수신
		int readSize = socketChannel.read(byteBuffer);
		if(readSize == -1) {
			socketChannel.close();
			selectionKey.cancel();
			return;
		}
		byte[] data = new byte[readSize];
		System.arraycopy(byteBuffer.array(), 0, data, 0, readSize);
		StringBuffer stringBuffer = (StringBuffer) selectionKey.attachment();
		stringBuffer.append(new String(data));
		log.info("receive stringBuffer : {}", stringBuffer.toString());
		socketChannel.register(selector, SelectionKey.OP_WRITE, stringBuffer);
		log.info("receive End");
	} catch (IOException e) {
		e.printStackTrace();
	}
}

 

데이터 발신 시 호출되는 send함수

// 발신
public static void send(Selector selector, SelectionKey selectionKey) {
	try {
		log.info("send Start");
		// 키 채널 가져오기
		SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
		// NonBlocking 설정
		socketChannel.configureBlocking(false);
		StringBuffer stringBuffer = (StringBuffer) selectionKey.attachment();
		String data = "NonBlocking Socket Server. call send";
		stringBuffer.setLength(0);
		ByteBuffer byteBuffer = ByteBuffer.wrap(data.getBytes());
		Thread.sleep(2000);
		socketChannel.write(byteBuffer);
		socketChannel.register(selector, SelectionKey.OP_READ, stringBuffer);
		log.info("send End");
	} catch (IOException | InterruptedException e) {
		e.printStackTrace();
	}
}

작성할 때에 이전에 작성한 WebFlux예시인 Http Server처럼

어차피 ResponseServer는 받고 2초 후 응답 보내주면 끝이니 따로 수정하지 말자

라고 생각했었다.

 

하지만 Socket은 단방향이 아닌 양방향 통신이므로 ResponseServerRequestServer 둘 다 NonBlcoking으로 작성되어야 한다.

 

ResponseServer를 수정하지 않고 Blocking 코드를 유지하면 read작업에서 Block이 걸리면서 한번의 통신 받아 수행하게 된다.


RequestServer는 아래와 같다.

 

초기화 함수

public void NonBlockingSocketInit(String ip, int port) {
    log.info("Call NonBlockingSocketInit Method.");
    try {
        selector = Selector.open();
        socketChannel = SocketChannel.open(new InetSocketAddress(ip, port));
        socketChannel.configureBlocking(false);
        // Channel에 Selector 등록
        socketChannel.register(selector, SelectionKey.OP_READ, new StringBuffer());
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Blocking 설정을 false로 해서 NonBlocking을 명시했고

 

Channel에 selector를 등록하고 통신이 들어올 때 읽을 수 있도록 OP_READ로 초기화했다.

 

ResponseServer로 쓰기를 수행할 함수

public void NonBlockingSocketWrite(String body) {
    log.info("Call NonBlockingSocketWrite Method.");
    try {
        ByteBuffer byteBuffer = ByteBuffer.wrap(body.getBytes());
        socketChannel.write(byteBuffer);
    } catch (IOException e) {
        errorMsg = e.getLocalizedMessage();
        errorCode = ErrorCode.CONNECTION_ERROR;
        log.error("[{}] {}",errorCode, errorMsg);
    }
}

 

요청을 보내고  ResponseServer에서 응답이 오는 경우를 캐치해서 read(읽기)를 수행해줄 놈이 필요하다.

 

이를 Runnable로 구현해서 메인 스레드가 아닌 다른 스레드에서 수행하도록 한다.

 

수행 내용은 Selector에 등록된 key를 꺼내서 상태를 계속해서 체크해주는 것이다.

 

public static class Receive implements Runnable {
    public void run() {
        try {
            while(true) {
                selector.select();
                Iterator iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = (SelectionKey) iterator.next();
                    if(key.isReadable()) {
                        read(key);
                    }
                    iterator.remove();
                }
            }
        }
        catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    private String read(SelectionKey key) {
        log.info("Call read method.");
        String body = "";
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
        int readInt = 0;
        try {
            readInt = ((SocketChannel)key.channel()).read(byteBuffer);
            if (-1 != readInt) {
                Charset charset = Charset.forName("UTF-8");
                byteBuffer.flip();
                body = charset.decode(byteBuffer).toString();
                log.info("body : {}", body);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return body;
    }
}

이제 원활한 테스트를 위해 API를 통해 호출될 수 있도록 하였고

 

새로운 스레드에서 수신 체크를 해주기 위해 바로 위에 작성한 Receive를 실행해준다.

@PostMapping("/nonBlockingSocket")
public ResponseEntity<ResponseData> NonBlockingSocket() {
    log.info("Call nonBlockingSocket API.");
    // 시간 측정을 위해 선언
    final StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    ResponseEntity responseEntity = null;
    responseData.setErrorMsg("http://localhost:18080 - Call nonBlockingSocket.");
    socketService.NonBlockingSocketInit("localhost", 28080);
    SocketService.Receive receive = new SocketService.Receive();
    new Thread(receive).start();
    for (int i = 0; i < 3; i++) {
        socketService.NonBlockingSocketWrite("http://localhost:18080 - Call nonBlockingSocket.");
    }
    stopWatch.stop();
    log.info("Total Second : {}", stopWatch.getTotalTimeSeconds());
    return responseEntity.status(HttpStatus.OK).body(responseData);
}

위를 수행하면 Log가 아래와 같이 출력된다.

 

우선 RequestServer

 

ResponseServer

 

ReqeustServer에서는 3번 연속 호출하고

 

총 소요 시간 출력하고

 

ResponseServer의 응답을 읽는다.

 

ResponseServer에서는 receive stringBuffer ...에서 RequestServer의 요청을 읽어주는데

 

두번만 호출되고 짤린걸 볼 수 있다. 연속 호출이라 처음 receive에 겹친거 같다.

 

여기서는 send호출만 봐주면 된다.

 

순서를 정리하면 아래와 같다.

 

RequestServer에서 ResponseServerwrite 호출

B : RequestServerLog : 02:00:56.996
B : RequestServerLog : 02:00:56.996
B : RequestServerLog : 02:00:56.997

 

총 소요 시간 출력
B : RequestServerLog : 02:00:56.997

 

ResponseServer에서 RequestServer로 write 호출

C : ResponseServerLog : 02:00:59.005
C : ResponseServerLog : 02:01:01.009
C : ResponseServerLog : 02:01:03.015

 

RequestServer의 호출이 모두 이루어지고 ResponseServer에서 비즈니스 로직(2초 지연)이 수행되었음을 확인할 수 있다.

 

RequestServer에서는 우선 요청을 전부 보내고나서 ResponseServer에서 응답이 오는 것을

SocketService.Receive receive = new SocketService.Receive();
new Thread(receive).start();

위 코드를 통해 확인하기에 메인 스레드는 다른 로직을 수행할 수 있게 된다.

 

총 소요시간은 0.0045777

 

 

또한 사진과 같이 ResponseServerBreakPoint를 걸어도 PostMan에서 계속해서 요청을 보낼 수 있음을 확인할 수 있다. -> NonBlocking


Socket통신은 많이 다뤄보지 않아서 그런지 Test를 작성하는데에 시간이 더 오래걸렸다.

 

그래도 옛날부터 작성 되어온 좋은 예시가 많아서 공부할 거리는 많아서 좋았다.

 

채팅 말고 어디에 사용해야할지는 아직 감이 오질 않는다.

 

이제 이전에 했던 WebFlux도 Service로 빼고 코드 정리하고...

 

Future에 대해 작성해야겠다.

 

- 참고

https://pangtrue.tistory.com/217

 

[Java] NIO

java.nio 패키지 참고로 nio는 New I/O의 약자. ( non-blocking의 약자인줄 알았는데 아님. ) 기존의 java.io 패키지를 대체하기 위해 새롭게 나온 패키지다. 과거 Java I/O는 C/C++과 비교해 매우 많이 느리다는

pangtrue.tistory.com

https://nowonbun.tistory.com/687

 

[Java] NIO(Non-Blocking IO) Socket 통신

안녕하세요. 명월입니다. 이 글은 Java에서 NIO Socket 통신에 대한 글입니다. 예전에 Java환경에서 소켓 통신을 하는 방법에 대한 글을 작성한 적이 있습니다. 링크 - [Java강좌 - 23] 소켓 통신 (Socket)

nowonbun.tistory.com

https://javacan.tistory.com/entry/87

 

자바 1.4의 새로운 입출력, NIO API 3부 - 논블럭킹 I/O와 Selector

논블럭킹 IO에 대해서 살펴보고, Selector를 이용한 논블럭킹 IO의 사용법을 알아본다. 블럭킹 IO와 논블럭킹 IO 자바 1.5가 조만간 정식 버전이 출시될 것으로 기대되고 있기는 하지만, 필자가 앞서

javacan.tistory.com

https://jungwoon.github.io/java/2019/01/15/NIO-Network.html

 

NIO 네트워크 정리 | Jungwoon Blog

이번에 업무에서 NIO를 사용할 일이 있어서 공부하면서 내용을 정리해보고자 합니다. 해당 포스팅은 palpit Vlog님의 블로그의 내용을 보면서 정리한 내용입니다. NIO 네트워크? NIO를 이용하여 TCP 관

jungwoon.github.io

'개발 > Java' 카테고리의 다른 글

Java Future, ExecutorService 로 NonBlocking 구현  (0) 2022.03.04
WebFlux로 Blocking, NonBlocking 구현  (0) 2022.01.28
동기와 비동기, Blocking과 Non Blocking  (0) 2022.01.22
Reactive란  (0) 2022.01.21
[java] dual pivot quick sort/정렬  (0) 2021.08.11