Programming/Development

[SpringBoot] 외부 오픈API Websocket서버 네트워크 연결 장애 문제 해결 [업비트 Websocket API]

Ho-home 2024. 8. 26. 02:02

서론


 

프로젝트 진행 도중 SpringBoot에서 실시간 데이터를 받아오기 위해 외부 API에 웹소켓으로 연동을 하였습니다.

외부 api에 소켓 연결을 하여 실시간 데이터를 받아오는 로직은, 기본적으로 마켓 데이터를 실시간으로 제공하는 서비스 특성 상 핵심 기능이었습니다.

따라서, 네트워크 연결 실패, 혹은 서버 문제로 인하여 외부 서버에 소켓 연결이 끊길 경우 실시간 데이터를 받아오지 못하는 치명적인 결함이 발견되었습니다.

이 문제를 해결하기 위해 연결이 끊길경우 재접속 연결요청을 주기적으로 실행하는 로직이 필요했습니다.

 

 

문제 코드


public class UpbitWebsocketClient extends WebSocketClient {
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final WebSocketHandler webSocketHandler;
    private final Service service;
    
    public UpbitWebsocketClient(String serverUri, WebSocketHandler webSocketHandler, Service service) throws URISyntaxException {
        super(new URI(serverUri));
        this.webSocketHandler = webSocketHandler;
        this.service = service;
    }
    
    @Override
    public void onOpen(ServerHandshake handshakedata) {
        log.info("업비트 웹소켓 연결 성공");
        try {
            List<String> codes = service.getUpbitMarketData().getMarketList();
            TicketMessage ticketMessage = new TicketMessage("test");
            TradeSubscribe tradeSubscribe = new TradeSubscribe("ticker", codes, false, true);
            String message = objectMapper.writeValueAsString(Arrays.asList(ticketMessage, tradeSubscribe));
            this.send(message);
        } catch (Exception e) {
            log.error("업비트 웹소켓 메시지 송신 실패", e);
        }
    }
    
    @Override
    public void onMessage(ByteBuffer message) {
        String receivedString = StandardCharsets.UTF_8.decode(message).toString();
        try {
            UpbitReceiveDto upbitReceiveDto = objectMapper.readValue(receivedString, UpbitReceiveDto.class);
            SimpleUpbitDto simpleUpbitDto = new SimpleUpbitDto(upbitReceiveDto.getCode(), upbitReceiveDto.getTradeVolume(), rate.multiply(new BigDecimal("100")), upbitReceiveDto.getHighest52WeekPrice(), upbitReceiveDto.getLowest52WeekPrice(), upbitReceiveDto.getOpeningPrice(), upbitReceiveDto.getTradePrice(), upbitReceiveDto.getChange());
            SimpleUpbitDto simpleUpbitDto = new SimpleUpbitDto(upbitReceiveDto.getCode(), upbitReceiveDto.getTradeVolume(), upbitReceiveDto.getSignedChangeRate(), upbitReceiveDto.getHighest52WeekPrice(), upbitReceiveDto.getLowest52WeekPrice(), upbitReceiveDto.getOpeningPrice(), upbitReceiveDto.getTradePrice(), upbitReceiveDto.getChange(), upbitReceiveDto.getAccTradePrice24h());
            String dtoJson = objectMapper.writeValueAsString(simpleUpbitDto);
            webSocketHandler.sendMessageToAll(dtoJson);

        } catch (Exception e) {
            log.error("메세지 DTO 전환 실패", e);
        }
    }
    
    @Override
    public void onMessage(String message) {
    }
    
    @Override
    public void onClose(int code, String reason, boolean remote) {
        log.info("업비트 웹소켓 연결해제 성공");
    }
    
    @Override
    public void onError(Exception ex) {
        log.error("업비트 웹소켓 연결 에러 : " + ex.getMessage(), ex);
    }
}

 

기본적으로, Spring서버가 클라이언트 역할을 하여, 외부 서버에 Websocket 요청을 보내는 경우, WebsocketClient를 사용합니다.

현재, 위의 코드상태는 onOpen을 통해 API서버와 handshake가 성립하면, 이 서버에 요청 query를 날려 해당 데이터에 관한 데이터를 실시간으로 onMessage(ByteBuffer message)메소드를 통하여 받아오는 형태입니다.

 

업비트 Websocket API는 기본적으로, plain text형태로 데이터가 제공되는것이 아닌, ByteBuffer형태로 데이터가 송신되므로, 이 점 유의하여 UTF8 디코더를 사용하여 디코딩을 하는 과정이 필요합니다.

 

위 로직은 현재, 네트워크 문제가 생기거나, 서버 상 문제가 생겨 Socket 연결이 종료된다면, 서버 상 문제가 해결이 된다고 해도 재연결을 하는 로직이 없습니다.

따라서 한 번 서버에 문제가 생기면 실시간 데이터는 서버를 다시 재 가동 시키지 않는 한 받아볼 수 없는 문제가 있습니다.

 

서비스 특성 상, 인위적으로 API서버와의 연결을 해제할 일이 없으므로, onClose() 메서드가 호출되거나, WebsocketClient에 문제가 생겨 onError() 메서드가 호출이 된다면 재접속을 시켜주는 로직을 구현하여 문제를 해결 하여야 합니다.

 

 

문제 해결 - reconnectBlocking() 메소드


물론, Upbit Websocket에서 제공하는 소켓확인 구문인 "PING" text를 보냈을 시에 대해 답변이 오면 connect를 그대로 두고, 답변이 오지않으면 reconnect를 요청하는 방법도 있습니다.

하지만, 이러한 로직을 사용하기위해서는, 스케쥴러의 주기를 짧게가져가야 하는 점과, 이로 인한 오버헤드가 발생하는 점, 또한 서버측에서 소켓 연결이 끊김을 인지하고 즉각적으로 대처를 할 수 없다는점을 토대로, 저는 서버가 소켓연결이 끊김을 인지하고 즉각적으로 연결을 재요청하는 로직을 원했기 때문에 OnError, OnClose메서드에 로직을 추가하는 방향으로 설계하였습니다.

 

우선,  문제를 해결하기위하여, WebsocketClient 클래스의 reconnectBlocking() 메소드를 사용하였습니다.

여기서, reconnect() 메소드를 사용하지 않는 이유는, reconnectBlocking()의 경우 reconnect()와는 달리, 연결이 성공하거나 예외가 발생하기 전까지 return이 되지 않기때문에 (즉, 스레드 블로킹이 되기 때문에) 로직이 간결해지는 효과를 보기 위함입니다.

 

하지만, reconnectBlocking() 메소드를 사용하는 경우에는, 말 그대로 특정 기능을 수행하기 전까지는 스레드를 blocking상태에서 풀어주지 않기때문에 주 스레드가 blocking상태로 있게 될 경우, 심각한 성능 저하가 발생할 수 있기 때문에 추가적으로 스레드를 만들어 준 후, reconnectBlocking()메소드를 실행하는 로직에 배치하여야합니다.

 

여기서, 추가적으로 스레드를 만들어 줄 때 매우 주의하여야 할 사항이 있는데, 바로 synchronized (동기화)입니다. 

 

문제 해결 코드를 보기 전, 우선 이 '스레드'와, '동기화'에 대한 개념을 익히는게 우선이라고 생각하여 개념설명을 진행하겠습니다.

그만큼 자바에서 멀티 스레드를 사용할 때 로직에서 매우 중요하게 고려하여야 할 점이기에, 잘 따라와주시길 바랍니다.

 

 

스레드 동기화


 

우선, 스레드 동기화를 이해하기 전에 '스레드'와, '동기화'에 대한 개념을 간략하게라도 짚을 필요가 있습니다.

 

스레드란, 간략히 이야기하자면 프로세스에서 일을 처리하는 단위, 즉 '일꾼'이라고 보시면 될것같습니다.

이러한 스레드들은 각자 자신만의 고유한 '스택 공간'을 갖고있는데, 이러한 스택 공간을 통해 자신이 현재 코드 상에서 어느곳에 위치하였는지, 어디로 돌아가야하는지 등을 알 수 있습니다.

헌데, 스레드는 프로세스에서 하나만 존재하는것이 아닌, 여러개 존재할 수 있습니다.

여기서 문제점이 발생하는데, 스레드 간 스택 공간을 제외한 나머지 힙, 데이터, 코드영역을 공유한다는 점입니다.

 

문제상황의 예를 들자면, 자원을 공유함에 따라, A 스레드가 데이터영역의 변수 'number'를 2에서 1로 변경한 후, 자신의 스택에 따라 다음로직을 실행하여 number를 사용합니다.

A스레드가 'number'를 2에서 1로 변경한 후, number를 사용하는 로직을 실행하기 전, B스레드도 'number'라는 변수에 접근하여 이번에는 1에서 3으로 변경합니다.

A스레드는 'number'변수가 1인줄알고 로직을 실행하였는데, 결과를 출력해보니 'number'변수가 3일때의 결과가 나와버렸습니다.

이렇게 여러 스레드가 있을 때 추가적인 조치를 취해주지않는다면 A스레드의 실행결과처럼 다른 스레드가 공유자원에 간섭을 하였을 경우 원래 예측되는 실행결과와는 다른 결과가 도출될 수 있다는 문제점이 있습니다.

 

스레드 동기화

 

이러한 문제를 해결하기 위하여 '동기화'라는 해결책이 제시됩니다.

'동기화'는 하나의 스레드가 로직을 실행하고 있을 때, 다른 스레드가 로직을 실행하고 있는 스레드의 자원에 간섭할 수 없도록 막는 행위입니다.

즉, 위의 문제상황에서 'A'스레드가 실행하고있는 로직에 들어있는 자원에 'B'스레드가 접근할 수 없어 결과가 예측대로 나오도록 할 수 있는 매커니즘입니다.

이렇게 동기화 된 스레드는 'Thread-safe'하다고도 불립니다.

 

이 정도까지 '스레드'와 '동기화'에 대해 아주 간략히 정리해보았습니다.

 

 

문제 해결 


 

문제 해결을 위해 주 스레드를 사용하지않고, 추가적인 스레드를 생성하여 reconnect를 담당하도록 처리하였습니다.

 

	@Override
    public void onOpen(ServerHandshake handshakedata) {
    
    		try {
            isConnected = true;
            
            // socket data 요청 query message 생성하는 부분
            
            this.send(message);

        } catch (Exception e) {
            log.error("업비트 웹소켓 메시지 송신 실패", e);
        }
    }


	@Override	
	public void onMessage(ByteBuffer message) {

        String receivedString = StandardCharsets.UTF_8.decode(message).toString();
        if(!"{\"status\":\"UP\"}".equals(receivedString)) {
            try {
            
				// onMessage 데이터 handling하는 로직

            } catch (Exception e) {
                log.error("메세지 DTO 전환 실패", e);
            }
        }else{ // {"status" : "up"} 응답이 도착하면
            synchronized (this) {
                isConnected = true;
                this.notify(); // 대기중인 스레드를 깨움
            }
        }
    }
    
    @Override
    public void onClose(int code, String reason, boolean remote) {
        log.info(">>> 업비트 웹소켓 연결해제 성공");
        isConnected = false;
        attemptReConnect(); // 소켓 재 연결 요청을 하는 메소드
    }

    @Override
    public void onError(Exception ex) {
        log.error("업비트 웹소켓 연결 에러 : " + ex.getMessage(), ex);
        isConnected = false;
        attemptReConnect(); // 소켓 재 연결 요청을 하는 메소드
    }

    /**
     * 별도의 스레드를 추가적으로 생성하여 재연결 로직 실행
     * 만일, 스레드를 생성하지않고 메인스레드를 사용한다면 reconnect가 되거나, 에러가 발생하기 전까지 메인스레드가 blocking상태로 존재할 것. (심각한 성능저하 우려)
     *
     */
    public void attemptReConnect(){
        if(isReconnecting){ // reconnect 로직이 수행중이라면
            return; // 바로 return
        }
        isReconnecting = true; // reconnect 로직이 수행중이 아니라면 스레드 진입 후 isReconnecting에 플래그
        new Thread(() -> { // 새로운 스레드 생성
            try {
                log.info("재연결 시도중...");
                while (!isConnected) { // 소켓 서버에 connect되지 않았다면
                    try {
                        Thread.sleep(5000); // 스레드 5초간 sleep (5초 간격 재요청)
                        log.info("재연결 시도");
                        this.reconnectBlocking(); // 소켓 서버 connect 요청
                        if (this.isOpen() && checkNetworkConnect()) { // WebsocketClient가 열려있고, Network연결이 정상적이라면
                            isConnected = true; // isConnect true 플래그
                            log.info("재연결 성공");
                        }else{ // WebsocketClient가 Open이 아니거나, 네트워크연결이 정상적이지 않다면
                            log.warn("네트워크 상태 불안정. 네트워크 연결 다시 시도");
                            isConnected = false; // isConnected false 플래그 (while문 끊기지 않고, 재 요청을 위함)
                        }
                    } catch (InterruptedException e) {
                        log.error("재 연결 방해됨.", e);
                        Thread.currentThread().interrupt();
                    } catch (Exception e) {
                        log.error("재연결 실패", e);
                    }
                }
            } catch (Exception ex) {
                log.error("재연결 중 문제 발생", ex);
            } finally {
                if(isConnected) {
                    isReconnecting = false; // thread의 모든 로직이 종료되면 reconnect로직이 종료되었음을 표시하기 위해 false처리
                }
            }
        }).start();
    }


    private boolean checkNetworkConnect() {
        try{
            this.send("PING"); // API 서버에 "PING" 요청 (handshake 요청)

            // 3초간 blocking
            // ('pong'을 onMessage() 메소드에서 받고, isConnected를 true로 만드는 시간을 3초 간 기다림)
            synchronized (this){ // 이 객체(WebsocketClient)를 동기화 하여 다른 스레드가 접근하지 못하도록 synchronized
                this.wait(3000); // 3초간 멈춤
            }
            return isConnected; // isConnect flag return (만약, "PONG"메세지를 받았다면 isConnect는 이 객체가 3초간 멈춰있는 동안 true로 변했을것임. 받지 못했다면 false상태.)
        } catch(InterruptedException e){
            log.error("네트워크 상태 확인 중 방해됨", e);
            Thread.currentThread().interrupt();
            return false; // Interrupt 발생 시 false return
        }catch(Exception e){
            log.error("네트워크 상태 확인 중 오류 발생", e);
            return false; // 예외 발생 시 false return
        }
    }

	// Upbit API Server는 120초간 메세지 응답이 없을 시 연결을 종료하므로, 연결 유지를 위한 Scheduling
    @Scheduled(fixedRate = 60*1000)
    public void sendPingMessage(){
        if (isConnected){
            try{
                this.send("PING");
                log.info("업비트 API 서버에 "PING" 송신 완료");
            }catch(Exception e){
                log.error("업비트 API 서버에 "PING" 송신 실패", e);
            }
        }else{
            log.warn("업비트 API 서버에 웹소켓 연결 안되어있음");
        }
    }

 

이 로직의 핵심사항은, 주 스레드를 reconnect하는 데 사용하지 않고, 추가적인 스레드를 생성하여 reconnect로직을 처리하고, 추가적인 스레드를 사용함에 따라 동기화를 처리함에 있습니다.

 

 

이상으로, 블로그 포스팅을 마치겠습니다.

긴 글 읽어주셔서 감사합니다.

 

출처

https://javadoc.io/doc/org.java-websocket/Java-WebSocket/1.3.9/org/java_websocket/client/WebSocketClient.html