본문 바로가기
DB/Redis

[Redis] (STOMP + pub/sub) 채팅 기능(서버 다중화)

by lucas_owner 2025. 3. 4.

Redis - STOMP + Redis Pub/Sub 채팅 기능(서버 다중화)

채팅 기능에서 많이 사용하는 프로토콜인 STOMP 와 Redis 의 pub/sub 기능을 활용해 보는 포스팅입니다.

평소에 채팅 기능에 대한 많은 호기심은 가지고 있지만, 실제 애플리케이션에서 사용할만한 채팅 기능은 고도화를 해본적이 없었다.

그러던 와중 우아한기술블로그 포스팅을 접하게 되었고, 평소 궁금했던 영역의 비지니스에 대해서 생각해보면서 구현하게 되었다

(해당 블로그는 2021년 작성이다,, 꽤나 늦게 봤다)

 

https://lucas-owner.tistory.com/68

 

[Redis] Spring Boot - Redis Pub, Sub 구현&응용

목차 Spring Boot - Redis 환경 구성 테스트 개요 서버간 데이터를 동기화 하기 위해, 방법을 찾다가 Redis Pub/Sub 을 사용하는것이 현상황에서 최선이라는 판단을 내리고, 알아보게 되었다. (A - B 각서버

lucas-owner.tistory.com

기본적인 Redis 연동과, Pub/Sub 의 개념은 위 글에서 확인 할 수 있다.

 


1. STOMP ?

STOMP 는 Simple || Stream Text Oriented Message Protocol 의 약자이며 텍스트 기반의 메시지 프로토콜이다

클라이언트 - 서버 간 통신할때 필요한 내용, 형식, 유형과 같은 규칙 기반으로 TCP, WebSocket 양방향 네트워크 프로토콜 기반으로 동작한다.

HTTP 프로토콜과 같이 Request - Response 패턴이 아닌, Broker 에 연결된 Client 들 간 메시지를 교환하는 방식이다.

즉 Pub/Sub 구조이며, 메시지 공급주체와, 소비하는 주체를 분리/사용 하는 패턴이다.

 

메세지 브로커를 사용하기에, 메시징을 중계하는 서버를 따로 두어 안정성과 확장성을 추가로 고려 할 수 있으며,

메시지를 수신/발신 처리 하는 부분이 확실하게 정의 되어있기에 개발자 입장에서 명확하게 인지할 수 있다.

또한 STOMP 의 `destination` 헤더를 사용하여 Spring Security 를 사용할 수 있다(메시지 보안)

 

Message Queue Tool 들과 연동하여 사용,관리 하기 좋다 (Kafka, Redis, RabbitMQ) 필요한 사항에 맞춰 MQ 를 선택하여 사용하면 좋을것 같다.

 

Flow

Clients <-> Server 간 큰 흐름을 보자면 아래와 같다.

RDB 를 우선 배제하고 구현할 예정이다.

1. Server 기동시 Redis 에서 Hash 조회 `CHAT_ROOMS` -> Redis 에 개설되어 있는 채팅방 조회 및 Memory 저장
2. 새로운 채팅방 개설은 `POST /chat/room` 로 요청
3. STOMP 프로토콜을 사용하여, Connect -> Subscribe -> Send 순서로 진행 필요함
   1. Connect: `ws://localhost:8080/ws-stomp` - Connect Type `STOMP`
   2. Subscribe: `/sub/chat/room/{roomId}` 
   3. Send Destination: `/pub/chat/message`
   4. Send Body: `{"roomId": "{roomId}", "message": "{message}", "sender": "{sender}"}`

 

각 채팅방은 Redis 에 CHAT_ROOMS 라는 Hash 로 저장될 예정이다, 또한 애플리케이션 내부에서 Map 객체로 각 채팅방에 대한

정보를 담아두고 모든 구간에서 해당 데이터를 가져와 사용할 예정이다(불필요한 네트워크 자원 소모 방지)

위와 같다면이때 Redis 에는 데이터가 있지만, Server 가 다중화 되어있거나, 재기동을 하는 상황이라면 Map 에는 데이터가 없기에 Exception 이 발생하고 이를 방지하기 위해서다.

이후 부터는 STOMP 를 이용하여 Client - server 간 Connection - Subscribe(구독) 절차를 진행한다.

여기까지가 Client - Server 간 Flow 이다.

 

Server 내부에서는 각 채팅방에 대한 Redis Listener 를 통해 새로운 메시지 발행시, 해당 Listener 즉 채팅방을 구독하고 있는 사용자들에 메시지를 전송하는 역할을 수행해야 한다.

 

모든 절차가 완료되면 Client 는 해당 채팅방에 메세지 전송/수신 등의 동작을 수행 할 수 있게된다.

 

 

채팅방 관리: ChatRoom

메시지 수/발신: Chat

 

위의 다이어 그램은 비지니스 로직에 대해서만 다루게 될 Class 들이다.

이외의 설정 Class 들은 Redis, STOMP 관련 만 추가로 다루게 된다.

 

 

STOMP + Redis 채팅 구현

implementation 'org.springframework.boot:spring-boot-starter-websocket' // websocket(STOMP)

STOMP 관련 Dependecy 를 추가해주도록 하자

 

Redis 관련은 이전글에서 다루었기에 해당 글에서는 다루지 않는다.

Redis 관련 추가되는 설정이나 내용에 대해서만 다룰 예정이다.

 

1. Redis Subscriber Listener 

이전 시간에서 RedisListener 를 생성해주었었고, 따로 Subscriber 들에게 전송하지는 않았었다.

혼동을 줄이기 위해 새로운 Listener 를 생성하여 STOMP Broker 에 메시지를 전송하도록 하겠다.

@Service
@RequiredArgsConstructor
@Slf4j
public class RedisSubMsgListener implements MessageListener {

    private final RedisTemplate<String, Object> template;
    private final ObjectMapper objectMapper;
    private final SimpMessagingTemplate messagingTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String publishMessage = template
                    .getStringSerializer().deserialize(message.getBody());

            MessageDto messageDto = objectMapper.readValue(publishMessage, MessageDto.class);

            // To Clients
            messagingTemplate.convertAndSend("/sub/chat/room/" + messageDto.getRoomId(), messageDto);
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }
    }
}

 

해당 Class 에서 추가된 내용은 messagingTemplate 에 관한 내용이다. 채팅방에 발행된 메시지를 Listener 가 읽게되면

해당 내용을 Clients 에게 전송하는 내용이다.

 

`/sub/chat/room ~` 해당 부분은 STOMP 설정중 하나로 추가된 내용인데

prefix 가 어떤게 붙는지에 따라 메세지를 받을지, 전송할지 정의한 내용이다, 즉 /sub/~ 이면 해당 경로를 구독한 사람들에게 메시지를 전송하게 되는것이다.

 

참고로 MessageDto 객체는 아래와 같다.

Client 에게 받을 필수적인 필드들이다.

 

 

 

2. STOMP(WebSocket) Config

@Configuration
@EnableWebSocket // webSocket
@EnableWebSocketMessageBroker // STOMP
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/sub"); //msg 받을경로(구독)
        registry.setApplicationDestinationPrefixes("/pub"); //msg 보낼경로(발행)
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-stomp").setAllowedOrigins("*").withSockJS();
        registry.addEndpoint("/ws-stomp").setAllowedOrigins("*");
    }
}

위에서 설명했던, prefix 를 설정하여 MessageBroker 가 어떤 path 를 사용하여 보내고, 받을지에 대한 설정과

어떤 end-point 로 Connection 을 맺을지에 대한 설정이다.

withSockJS() 의 경우 Web Socket 을 지원하지 않는 구형 브라우저, 네트워크 환경일 경우 SocketJS 를 사용하여 대체옵션을 사용할 수 있도록 하게 해준다.

이때 Client 의 지원 여부에 따라, Long-polling 혹은 Polling 으로 통신하게 된다.

 

 

3. 채팅방 객체(ChatRoom)

@Getter
@Setter
@ToString
public class ChatRoom implements Serializable {

    private static final long serialVersionUID = 111111L;

    private String roomId;
    private String roomName;

    public static ChatRoom createRoom(String name) {
        ChatRoom chatRoom = new ChatRoom();
        chatRoom.roomId = UUID.randomUUID().toString();
        chatRoom.roomName = name;
        return chatRoom;
    }
}

 

각 채팅방에 대한 정보를 redis 에 적재하고, message 수/발신 시 사용해야 하기에 생성해주었다

또한 Redis 에 적재시 Serialize 가 가능해야 하기에 Serializable 를 참조하도록 하였다.

 

 

4. ChatRoomService

@Service
@RequiredArgsConstructor
@Slf4j
public class ChatRoomService {

    private final RedisTemplate<String, Object> redisTemplate;
    private final ChatService chatService;

    private static final String CHAT_ROOMS_KEY = "CHAT_ROOMS"; // Redis Hash Key

    // 채팅방 생성
    public ChatRoom createRoom(String name) {
        ChatRoom chatRoom = ChatRoom.createRoom(name);
        redisTemplate.opsForHash().put(CHAT_ROOMS_KEY, chatRoom.getRoomId(), chatRoom);
        chatService.genNewRoom(chatRoom.getRoomId()); // Mem 적재
        return chatRoom;
    }

    // 채팅방 리스트 조회
    public List<ChatRoom> findAllRooms() {
        return redisTemplate.opsForHash().values(CHAT_ROOMS_KEY)
                .stream()
                .map(obj -> (ChatRoom) obj)
                .collect(Collectors.toList());
    }

    // 특정 채팅방 찾기
    public ChatRoom findRoomById(String roomId) {
        return (ChatRoom) redisTemplate.opsForHash().get(CHAT_ROOMS_KEY, roomId);
    }
}

 

우선적으로 채팅방에 대한 생성이나, 조회 할 수 있는 Service Class 이다. 

유의할 점으로는 채팅방 개설시 Redis 에 Hash 로 정보를 저장한다는 점이다.

 

hkeys {Key} 혹은, hgetall 로 조회 가능하며(redis)

hget 커맨드로 더 자세한 내용을 확인 할 수 있다

 

CHAT_ROOMS
  ├── e7e36454-2236-4507-9f13-2828750e6a87: {chatRoom 객체}
  ├── anotherRoomId: {anotherChatRoom 객체}
  └── ...

hash key: CHAT_ROOMS

field: chatRoom 객체

value: 채팅방에 대한 객체

 

 

5. ChatService(채팅 기능 서비스)

@Service
@RequiredArgsConstructor
@Slf4j
public class ChatService {

    private final RedisTemplate<String, Object> redisTemplate;
    private final RedisMessageListenerContainer redisMessageListenerContainer;
    private final RedisPublisher redisPublisher; // Publisher
    private final RedisSubMsgListener redisSubscribeListener;

    private Map<String, ChannelTopic> topics = new ConcurrentHashMap<>(); // Topic 객체 list

    @PostConstruct
    public void initTopicsFromRedis() {
        Map<Object, Object> chatRooms = redisTemplate.opsForHash().entries("CHAT_ROOMS");

        for (Object key : chatRooms.keySet()) {
            String roomId = (String) key;
            ChannelTopic topic = new ChannelTopic(roomId);
            topics.put(roomId, topic);
            redisMessageListenerContainer.addMessageListener(redisSubscribeListener, topic);
            log.info("🔄 Redis에서 기존 채팅방 복구: {}", roomId);
        }
    }


    // generate New Room(In-Memory)
    public void genNewRoom(String roomId) {
        ChannelTopic channelTopic = topics.get(roomId);
        if(channelTopic == null) {
            channelTopic = new ChannelTopic(roomId);
            redisMessageListenerContainer.addMessageListener(redisSubscribeListener, channelTopic);
            topics.put(roomId, channelTopic);
        }
    }

    // send
	public void msgSend(MessageDto dto) {
        if(topics.get(dto.getRoomId()) != null) {
            redisPublisher.publish(topics.get(dto.getRoomId()), dto);
        }else {
            log.error("채팅방이 존재하지 않습니다.");
        }
    }
    
    //TODO: unsubscribe(exit)
}

 

flow 에서 설명했듯 Server 기동시 Redis 에 존재하는 채팅방들을 가져와 Redis 의 Listener 로 등록해준다.

또한 새로운 채팅방을 생성할때 마찬가지로 Map 객체 내부에 존재할 수 있도록 Listener 등록을 추가해줬다.

 

메세지 전송의 경우 간단하게 Redis Publisher 를 사용하여 타겟이 되는 채팅방 ID 와, 메세지 객체를 전송해주었다.

 

* Redis Publisher Class

 

 

 

6. Controller 

이제 Clients 가 메세지 전송을 하기위한 Controller 를 만들어준다

@Controller
@RequiredArgsConstructor
@Slf4j
public class ChatController {

    private final ChatService chatService;

    @MessageMapping("/chat/message")
    public void send(MessageDto message) {
        chatService.msgSend(message);
    }
}

 

Client 는 "/pub/chat/message" path 로 전송시 `@MessageMapping` 에 의하여 메세지 라우팅을 하게 되면서 메세지 전송이 가능하게 된다. 

 

이때 왜 @RestController 가 아닐까?

일반적으로 @RestController 도 가능은 하다. 다만 STOMP 에서는 JSON 응답을 직접 반환하지 않기 때문에 잘 사용하지 않는다. 

-> HTTP Response 를 직접 반환하지 않기 때문에 의미가 없다.

 

그래서 STOMP 기반 메시지 처리 컨트롤러는 @Controller 를 사용하는것이 일반적이다.

또한 @MessageMapping 의 경우 @Service, @Component 에서는 동작하지 않는다. 필요시 별도의 Handler 를 구현해야한다.

 

@MessageMapping 은 Spring 의 STOMP 메시지 처리 컨트롤러에서만 동작하고, Spring 의 @Controller 처럼 동작한다.

Spring Boot 에서 WebSocket 을 활성화 하면 내부적으로 SimpAnnotationMethodMessageHandler 가 동작하고

해당 Handler 는 @MessageMapping 을 찾아서 WebSocket 메시지와 매핑한다. 

이때 Handler 는 @RequestMapping - HTTP 요청을 처리하는 것과 유사항 방식으로 동작한다.

 

 

7. Test

STOMP 테스트의 경우 apic.app 을 활용하는것 같은데, 현재 시점에서는 접속이 불가하였다.

https://jiangxy.github.io/websocket-debug-tool/ 

해당 사이트를 통해서 apic 과 동일하게 STOMP 테스트를 진행할 수 있다. 

 

flow 에서 설명했던것 처럼 STOMP 는 다음과 같은 순서를 따른다

1. Connection
2. Subscribe
3. send

 

해당 사이트에서 너무도 친절하게 top-down 방식으로 진행 할 수 있도록 버튼 layout 이 구현되어 있다.

 

이때 STOMP 의 Connection 은 특정 규약을 따르게 되는데 

 

ws://{IP}/{STOMP Endpoint} 이다.

여기서 Endpoint 는 WebSocketConfig.class 에서의 End-Point 를 의미하는것이다.

Connect Type 은 STOMP 선택.

 

그 외에 Subscribe Destination 과 send Destination 또한 우리가 정의한 규칙에 맞춰 

/pub/~, /sub/~ 로 시작하여 Path 를 일치 시켜주자.

 

테스트를 위해 브라우저 2개를 사용하여 Connection - Subscribe 까지 진행 후 각각의 브라우저에서 메시지를 전송해보면

수/발신이 정상적으로 이루어지는것을 확인할 수 있다.

 

 

또한 Redis 를 사용하여 Pub/Sub 을 진행하기 때문에 서버가 다중화 되어있어도 같은 Redis 를 보고 있다면 Client 가 어떤 서버에 연결되더라도 상관없이 실시간 채팅 기능을 사용할 수 있다(서버와 상관없이).

 

 

 

이후에는 Disconnection 에 대한 부분과, 채팅방 삭제와 관련된 기능들을 추가를 해보면 좋을것 같다.

반응형

댓글