Socket
저번에는 [WebFlux로 Blocking, NonBlocking 구현]을 작성 했었다.
순서가 꼬인거 같지만 이번에는 통신의 가장 기본이 되는 Socket통신으로 구현하려 한다.
Socket통신은 TCP/IP 프로토콜을 사용해서 통신을 수행하는 방법이다.
컴퓨터들이 서로 정보를 주고 받는 네트워크는 처리 역할에 따라 계층이 구분되는데 이를 OSI 7계층이라 한다.
이를 실제 업무에서 적용한 표준이 TCP/IP 4계층이다.
TCP/IP 4계층 | 역할 | 데이타 단위 | 전송 주소 | 예시 | 장비 |
응용 계층(Application) | 응용프로그램 간의 데이타 송수신 | Data/Message | - | 파일 전송, 이메일, FTP, HTTP, SSH, Telnet, DNS, SMTP 등 | - |
전송 계층(Transport) | 호스트 간의 자료 송수신 | Segment | Port | TCP, UDP, RTP, RTCP 등 | 게이트웨이 |
인터넷 계층(Internet) | 데이타 전송을 위한 논리적 주소 지정 및 경로 지정 | Packet | IP | IP, ARP, ICMP, RARP, OSPF | 라우터 |
네트워크 연결 계층(Network Access) | 실제 데이타인 프레임을 송수신 | Frame | MAC | Ethernet, PPP, Token Ring 등 | 브리지, 스위치 |
Socket통신은 여기서 3계층과 2계층을 이용한 것이다. (링크)
Java는 Socket통신을 NonBlocking방식으로 사용하기 위해 nio(New I/O) 패키지를 제공한다.
I/O | NI/O | |
데이터 흐름 | 단방향 InputStream/OutputStream으로 구분해서 input/output 사용 |
양방향 SocketChannel/FileChannel 등 객체 하나로 input/output 사용 |
데이터 단위 | Byte or Byte[] | ByteBuffer |
아래에 작성하는 코드들은 아래의 Git에서 확인할 수 있다.
정리가 안된 코드라 많이 난잡하다.
Socket Test까지만 작성하고 한번 정리 해놔야겠다. -> 정리 후 아래 코드와 내용이 조금 다를 수 있지만 기본 로직은 같다.
https://github.com/victory0602/NonBlockingTest.git
Blocking(I/O)
먼저 통신을 받아줄 Server(이하 ResponseServer)에서는 아래와 같이 SocketServer를 구성하였다.
단순하게 통신을 받고 출력해주고 2초 지연시키고 ResponseServer로 응답을 보낸다.
try {
// ServerSocketChannel을 생성하고, 몇 가지 설정을 한다.
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
serverSocketChannel.bind(new InetSocketAddress(28080));
while (true) {
// 블로킹 방식으로 설정했기 때문에, accept() 메서드에서 연결을 받을 때까지 블락된다.
SocketChannel socketChannel = serverSocketChannel.accept();
// 연결된 클라이언트와 입/출력하기.
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
socketChannel.read(byteBuffer);
byteBuffer.flip();
log.info("Received Data : " + charset.decode(byteBuffer).toString());
byteBuffer = charset.encode("http://localhost:8080 - Socket Call Test.");
Thread.sleep(2000);
socketChannel.write(byteBuffer);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
다른 Server와의 Port충돌을 피하고자 28080으로 설정하였다.
ResponseServer를 호출할 RequestServer에서 아래와 같이 Blocking 호출을 작성하였다.
원활한 Block Check를 위해 PostMan으로 해당 API를 호출하며 테스트하고자 API호출을 통해 실행될 수 있도록 하였다.
// 초기화
public void BlockingSocketInit(String ip, int port) {
log.info("Call BlockingSocketInit.");
try {
blockingSocketChannel = SocketChannel.open();
blockingSocketChannel.configureBlocking(true);
blockingSocketChannel.connect(new InetSocketAddress(ip, port));
} catch (IOException e) {
errorMsg = e.getLocalizedMessage();
errorCode = ErrorCode.CONNECTION_ERROR;
log.error("[{}] {}",errorCode, errorMsg);
}
}
// 쓰기와 읽기
public ResponseData BlockingSocket(String body) {
log.info("Call BlockingSocket.");
BlockingSocketInit("localhost", 28080);
try {
// Response Server 호출
ByteBuffer byteBuffer = ByteBuffer.wrap(body.getBytes());
blockingSocketChannel.write(byteBuffer);
BlockingSocketRead();
blockingSocketChannel.close();
} catch (IOException e) {
errorMsg = e.getLocalizedMessage();
errorCode = ErrorCode.CONNECTION_ERROR;
log.error("[{}] {}",errorCode, errorMsg);
}
responseData.setErrorCode(errorCode);
responseData.setErrorMsg(errorMsg);
responseData.setBody(body);
return responseData;
}
// 읽기
public void BlockingSocketRead() {
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
int readInt = 0;
try {
// 여기서 Block
readInt = blockingSocketChannel.read(byteBuffer);
if (-1 != readInt) {
Charset charset = Charset.forName("UTF-8");
byteBuffer.flip();
String body = charset.decode(byteBuffer).toString();
log.info("body : {}", body);
}
} catch (IOException e) {
e.printStackTrace();
}
}
A. PostMan을 이용하여 RequestServer로 SocketBlocking API를 호출
B. 그러면 위와 같이 RequestServer의 SocketBlocking API가 호출된다.
for Loop를 이용해 ResponseServer를 3번 호출하였고 ResponseServer에서 전달해준 응답 값을 출력해주는 Log와 총 소요시간을 확인할 수 있다.
C. ResponseServer에서는 Log를 통해 RequestServer의 SocketBlocking API를 통해 호출되었음을 알 수 있다.
두 Server의 Log를 통해 호출 순서를 알 수 있다.
B : RequestServerLog : 00:22:28.776
C : ResponseServerLog : 00:22:30.790
B : RequestServerLog : 00:22:30.791
C : ResponseServerLog : 00:22:32.800
B : RequestServerLog : 00:22:32.802
C : ResponseServerLog : 00:22:34.810
RequestServerLog : 00:22:34.810 -> 전체 소요시간 출력
위 Log를 통해 호출 순서가 아래와 같음을 알 수 있다.
ResponseServer에서 RequestServer로의 호출 B가 이루어지고 ResponseServer는 Blocking이 걸려 아무런 작업을 할 수 없는 상태가 된다.
이후 에서 응답이 오는 C 이후
log.info("body : {}", body);
를 호출하게 된다.
위 사진을 보면 호출 후 응답이 올 때까지 Blocking이 걸렸기에 바로 이어서 호출할 수 없음을 확인할 수 있다.
총 소요 시간은 약 6초가 출력되었다.
NonBlocking(NI/O)
Blocking 코드에서는 read시 Block이 걸리는걸 확인할 수 있었다.
우선 요청을 전부 보내고나서 응답이 왔을 때 이에 반응할 수 있도록 해야한다.
이에 Blocking 코드에서 사용한 Channel을 Selector라는 이벤트 리스너에 등록하고
-> Selector 사용 없이 무한 루프를 이용할 수 있지만 그럴경우 CPU사용량이 많아지고 의미가 없기에 다루지 않는다.
ResponseServer의 응답 또는 요청이 있을 때 Selector에게 알려주는 방식으로 동작하게 한다.
요청을 받는 ResponseServer는 아래와 같이 변형된다.
ServerSocketChannel 생성 후 요청에 따른 행동을 수행하는 메인 로직
try {
// ServerSocketChannel을 생성하고, 몇 가지 설정을 한다.
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(28080));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, null);
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if(selectionKey.isAcceptable()) {
// 접속
accept(selector, selectionKey);
} else if(selectionKey.isReadable()) {
// 수신
receive(selector, selectionKey);
} else if(selectionKey.isWritable()) {
// 발신
send(selector, selectionKey);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
접속을 확인하는 accept함수
public static void accept(Selector selector, SelectionKey selectionKey) {
try {
log.info("accept Start");
// 키 채널 가져오기
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
// 채널 가져오기
SocketChannel socketChannel = serverSocketChannel.accept();
// NonBlocking 설정
socketChannel.configureBlocking(false);
// 접속 Socket 단위로 사용되는 Buffer
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("NonBlocking Socket Server. call accept");
// Socekt채널을 channel에 송신 등록
socketChannel.register(selector, SelectionKey.OP_WRITE, stringBuffer);
log.info("accept End");
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
데이터 수신 시 호출되는 receive함수
// 수신
public static void receive(Selector selector, SelectionKey selectionKey) {
try {
log.info("receive Start");
// 키 채널 가져오기
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// NonBlocking 설정
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 데이터 수신
int readSize = socketChannel.read(byteBuffer);
if(readSize == -1) {
socketChannel.close();
selectionKey.cancel();
return;
}
byte[] data = new byte[readSize];
System.arraycopy(byteBuffer.array(), 0, data, 0, readSize);
StringBuffer stringBuffer = (StringBuffer) selectionKey.attachment();
stringBuffer.append(new String(data));
log.info("receive stringBuffer : {}", stringBuffer.toString());
socketChannel.register(selector, SelectionKey.OP_WRITE, stringBuffer);
log.info("receive End");
} catch (IOException e) {
e.printStackTrace();
}
}
데이터 발신 시 호출되는 send함수
// 발신
public static void send(Selector selector, SelectionKey selectionKey) {
try {
log.info("send Start");
// 키 채널 가져오기
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// NonBlocking 설정
socketChannel.configureBlocking(false);
StringBuffer stringBuffer = (StringBuffer) selectionKey.attachment();
String data = "NonBlocking Socket Server. call send";
stringBuffer.setLength(0);
ByteBuffer byteBuffer = ByteBuffer.wrap(data.getBytes());
Thread.sleep(2000);
socketChannel.write(byteBuffer);
socketChannel.register(selector, SelectionKey.OP_READ, stringBuffer);
log.info("send End");
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
작성할 때에 이전에 작성한 WebFlux예시인 Http Server처럼
어차피 ResponseServer는 받고 2초 후 응답 보내주면 끝이니 따로 수정하지 말자
라고 생각했었다.
하지만 Socket은 단방향이 아닌 양방향 통신이므로 ResponseServer와 RequestServer 둘 다 NonBlcoking으로 작성되어야 한다.
ResponseServer를 수정하지 않고 Blocking 코드를 유지하면 read작업에서 Block이 걸리면서 한번의 통신 받아 수행하게 된다.
RequestServer는 아래와 같다.
초기화 함수
public void NonBlockingSocketInit(String ip, int port) {
log.info("Call NonBlockingSocketInit Method.");
try {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(ip, port));
socketChannel.configureBlocking(false);
// Channel에 Selector 등록
socketChannel.register(selector, SelectionKey.OP_READ, new StringBuffer());
} catch (IOException e) {
e.printStackTrace();
}
}
Blocking 설정을 false로 해서 NonBlocking을 명시했고
Channel에 selector를 등록하고 통신이 들어올 때 읽을 수 있도록 OP_READ로 초기화했다.
ResponseServer로 쓰기를 수행할 함수
public void NonBlockingSocketWrite(String body) {
log.info("Call NonBlockingSocketWrite Method.");
try {
ByteBuffer byteBuffer = ByteBuffer.wrap(body.getBytes());
socketChannel.write(byteBuffer);
} catch (IOException e) {
errorMsg = e.getLocalizedMessage();
errorCode = ErrorCode.CONNECTION_ERROR;
log.error("[{}] {}",errorCode, errorMsg);
}
}
요청을 보내고 ResponseServer에서 응답이 오는 경우를 캐치해서 read(읽기)를 수행해줄 놈이 필요하다.
이를 Runnable로 구현해서 메인 스레드가 아닌 다른 스레드에서 수행하도록 한다.
수행 내용은 Selector에 등록된 key를 꺼내서 상태를 계속해서 체크해주는 것이다.
public static class Receive implements Runnable {
public void run() {
try {
while(true) {
selector.select();
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
if(key.isReadable()) {
read(key);
}
iterator.remove();
}
}
}
catch (Exception ex) {
ex.printStackTrace();
}
}
private String read(SelectionKey key) {
log.info("Call read method.");
String body = "";
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
int readInt = 0;
try {
readInt = ((SocketChannel)key.channel()).read(byteBuffer);
if (-1 != readInt) {
Charset charset = Charset.forName("UTF-8");
byteBuffer.flip();
body = charset.decode(byteBuffer).toString();
log.info("body : {}", body);
}
} catch (IOException e) {
e.printStackTrace();
}
return body;
}
}
이제 원활한 테스트를 위해 API를 통해 호출될 수 있도록 하였고
새로운 스레드에서 수신 체크를 해주기 위해 바로 위에 작성한 Receive를 실행해준다.
@PostMapping("/nonBlockingSocket")
public ResponseEntity<ResponseData> NonBlockingSocket() {
log.info("Call nonBlockingSocket API.");
// 시간 측정을 위해 선언
final StopWatch stopWatch = new StopWatch();
stopWatch.start();
ResponseEntity responseEntity = null;
responseData.setErrorMsg("http://localhost:18080 - Call nonBlockingSocket.");
socketService.NonBlockingSocketInit("localhost", 28080);
SocketService.Receive receive = new SocketService.Receive();
new Thread(receive).start();
for (int i = 0; i < 3; i++) {
socketService.NonBlockingSocketWrite("http://localhost:18080 - Call nonBlockingSocket.");
}
stopWatch.stop();
log.info("Total Second : {}", stopWatch.getTotalTimeSeconds());
return responseEntity.status(HttpStatus.OK).body(responseData);
}
위를 수행하면 Log가 아래와 같이 출력된다.
우선 RequestServer
ResponseServer
ReqeustServer에서는 3번 연속 호출하고
총 소요 시간 출력하고
ResponseServer의 응답을 읽는다.
ResponseServer에서는 receive stringBuffer ...에서 RequestServer의 요청을 읽어주는데
두번만 호출되고 짤린걸 볼 수 있다. 연속 호출이라 처음 receive에 겹친거 같다.
여기서는 send호출만 봐주면 된다.
순서를 정리하면 아래와 같다.
RequestServer에서 ResponseServer로 write 호출
B : RequestServerLog : 02:00:56.996
B : RequestServerLog : 02:00:56.996
B : RequestServerLog : 02:00:56.997
총 소요 시간 출력
B : RequestServerLog : 02:00:56.997
ResponseServer에서 RequestServer로 write 호출
C : ResponseServerLog : 02:00:59.005
C : ResponseServerLog : 02:01:01.009
C : ResponseServerLog : 02:01:03.015
RequestServer의 호출이 모두 이루어지고 ResponseServer에서 비즈니스 로직(2초 지연)이 수행되었음을 확인할 수 있다.
RequestServer에서는 우선 요청을 전부 보내고나서 ResponseServer에서 응답이 오는 것을
SocketService.Receive receive = new SocketService.Receive();
new Thread(receive).start();
위 코드를 통해 확인하기에 메인 스레드는 다른 로직을 수행할 수 있게 된다.
총 소요시간은 0.0045777
또한 사진과 같이 ResponseServer에 BreakPoint를 걸어도 PostMan에서 계속해서 요청을 보낼 수 있음을 확인할 수 있다. -> NonBlocking
Socket통신은 많이 다뤄보지 않아서 그런지 Test를 작성하는데에 시간이 더 오래걸렸다.
그래도 옛날부터 작성 되어온 좋은 예시가 많아서 공부할 거리는 많아서 좋았다.
채팅 말고 어디에 사용해야할지는 아직 감이 오질 않는다.
이제 이전에 했던 WebFlux도 Service로 빼고 코드 정리하고...
Future에 대해 작성해야겠다.
- 참고
https://pangtrue.tistory.com/217
https://nowonbun.tistory.com/687
'개발 > Java' 카테고리의 다른 글
Java Future, ExecutorService 로 NonBlocking 구현 (0) | 2022.03.04 |
---|---|
WebFlux로 Blocking, NonBlocking 구현 (0) | 2022.01.28 |
동기와 비동기, Blocking과 Non Blocking (0) | 2022.01.22 |
Reactive란 (0) | 2022.01.21 |
[java] dual pivot quick sort/정렬 (0) | 2021.08.11 |