본문 바로가기
DB/Redis

[Redis] Spring Boot - Redis Pub, Sub 구현&응용

by lucas_owner 2024. 3. 29.

 

목차

  1. Spring Boot - Redis 환경 구성
  2. 테스트

 

개요

서버간 데이터를 동기화 하기 위해, 방법을 찾다가 Redis Pub/Sub 을 사용하는것이 현상황에서 최선이라는 판단을 내리고, 알아보게 되었다. (A - B 각서버간 방화벽 오픈 X, 통신불가 , 하지만 Redis 가 존재하는 C 서버에는 A, B 서버 모두가 바라보고 있음.)

Redis를 채택한 또하나의 이유중 하나는, Message를 저장하지 않는 이유때문도 존재한다(Kafka는 메세지내용도 저장함)

 

 

메시지 브로커를 구현하는 것이기 때문에, 구현자가 원하는 비지니스 로직을 덧붙힌다면 채팅, 알림, 작업 큐, 동기화 등등 다양한 작업을 할 수 있을것이다. 

 

흐름 

Client Message 요청 -> Controller -> Service -> Pub - Redis -> MessageListener -> Sub Logic

 

 

Redis Pub/Sub 의 개념에 대해서는 아래 글을 참고.

https://lucas-owner.tistory.com/60

 

[Redis] Redis - pub/sub 이란?

1. Message Queue pub/sub 은 Message Queue 라는 통신 방법중 하나이다. MSA(Micro Service Architecture) 의 구조에서는 모듈A 에서 모듈 B로 API 데이터를 넘겨야 하는 일들이 발생한다. 그런 상황에서 데이터 교환

lucas-owner.tistory.com

 

 

환경 및 설정

  • Spring Boot - 2.7.10
  • Redis - 7.0.10
  • Docker

Redis 서버는 띄워져있다는 가정하에 작성됨.

 

 

1. Spring Boot - Redis  환경 구성

우선 Redis Client 로 Lettuce 를 사용 할것이기 때문에, 의존성을 추가 해야한다.

 

* Gradle

// Gradle
implementation 'org.springframework.boot:spring-boot-starter-data-redis' //redis

 

* Maven

<dependency>    
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

 

 

의존성 추가 이후에는, 당연하게도 설정을 통해 Redis Connection 설정, pub/sub 을 위한 RedisMessageListener 설정이 필요하다.

본 예제에서 다루는 설정은 아래와 같이 이루어진다.

  1. application.yml
  2. RedisConfig
  3. RedisPublisher(메세지 발송 클래스)
  4. RedisSubscribeListener(레디스 메세지 리스너)

1-1. application.yml - 상황에 따라 profile 설정.

# Redis Config
spring:

  config:
    activate:
      on-profile: mariadb

  redis:
    host: localhost
    port: 6379

 


1-2. RedisConfig

@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableRedisRepositories
public class RedisConfig {

    private final RedisProperties redisProperties;


    // lettuce
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(redisProperties.getHost(), redisProperties.getPort());
    }

    // redisTemplate: default Value Type = JSON
    // If you want to use String Type, you can change ValueSerializer to StringRedisSerializer or Use StringRedisTemplate
    @Bean
    public RedisTemplate<?, ?> redisTemplate() {
        RedisTemplate<?, ?> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());   //connection
        redisTemplate.setKeySerializer(new StringRedisSerializer());    // key
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class)); //Java Obj <-> JSON -> String Value
        return redisTemplate;
    }

    /**
     * Redis pub/sub 메시지 처리 Listener
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListener() {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory());
        return container;
    }
}

 

- Spring <-> Redis 와의 연결 설정을 위한 가장 중요한 설정이다.

- RedisTemplate 설정을 보면, Serializer 가 존재하는데, 해당 부분을 유의해야 한다.

  • Redis 는 default 로 Key - Value 를 String(Hash)를 저장한다. 
  • Java 객체(etc. DTO) 를 Redis 에 전송하고 사용할것이기 때문에, Value 설정 필요.

- RedisMessageListenerContainer 는 서버와 연결된(Redis ConnectionFactory)의 Redis 에서 

메세지 리스너에 대한 비동기 동작을 제공하는 컨테이너이다. 깊은 수준의 설정 X 

 

MessageListenerContainer 에 대한 설명은 공식 docs를 한번은 보기를 권장한다.

https://docs.spring.io/spring-data/data-redis/docs/current/api/org/springframework/data/redis/listener/RedisMessageListenerContainer.html

 

 

1-3. Redis Pub

- 해당 클래스는, 메세지 전송 요청을 Redis 에 전송하는 역할을 한다. 

메세지를 전송할 채널, Java 객체(DTO) or String 메시지를 인자로 Redis Channel 에 pub 할 계획이다.

우선, 객체 타입 전송을 위해 DTO 를 하나 생성해주도록 하자.

 

1-3-1. MessageDTO

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDto implements Serializable {

    private static final long serialVersionUID = 1L;

    private String message; // 전송할 메세지 내용
    private String sender; // 메세지 발신자
    private String roomId; // 메세지 방 번호 || 타겟 Channel
}

1-3-2. RedisPublisher

@Service
public class RedisPublisher {

    private final RedisTemplate<String, Object> template;

    public RedisPublisher(RedisTemplate<String, Object> template) {
        this.template = template;
    }


    /**
     * Object publish
     */
    public void publish(ChannelTopic topic, MessageDto dto) {
        template.convertAndSend(topic.getTopic(), dto);
    }

    /**
     * String publish
     */
    public void publish(ChannelTopic topic ,String data) {
        template.convertAndSend(topic.getTopic(), data);
    }
}

- 전송할 데이터의 타입은 오버로딩을 통해 구현해두었다. 

- publish는 convertAndSend() 라는 메서드로 실행되게 되며, topic == Channel 이라고 생각하면된다. 

  -> 즉, 특정 Channel 에 메세지를 전송.

 


1-4. Redis Sub (Listener)

- Pub을 통해 메세지를 전송했다면, 전송된 메세지를 받아야한다. 

해당 클래스에서는, pub 된 메세지가 존재할때, converting, 로깅, 비지니스로직(DB 저장, 등등) 을 수행하기 위해 설정한다.

@Service
@RequiredArgsConstructor
@Slf4j
public class RedisSubscribeListener implements MessageListener {

    private final RedisTemplate<String, Object> template;
    private final ObjectMapper objectMapper;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String publishMessage = template
                    .getStringSerializer().deserialize(message.getBody());

            MessageDto messageDto = objectMapper.readValue(publishMessage, MessageDto.class);

            log.info("Redis Subscribe Channel : " + messageDto.getRoomId());
            log.info("Redis SUB Message : {}", publishMessage);

            // Return || Another Method Call(etc.save to DB)
            // TODO
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }
    }
}

- Value 를 JSON 형태로 직렬화 해서 pub 했기 때문에, 받아올때는 역직렬화를 통해 사용해주도록 하자.

- 사용자의 요구사항, 요건에 맞게 수정하여 사용하면 될것 같다. 

 


1-5.  RedisPubSubService() , Controller 

- 각 Channel별 pub,sub 요청과, 핸들링을 처리하는 각 클래스를 생성하자

 

* Controller 

@RestController
@RequiredArgsConstructor
@Slf4j
@RequestMapping("/redis/pubsub")
public class RedisPubSubController {

    private final RedisPubService redisSubscribeService;


    @PostMapping("/send")
    public void sendMessage(@RequestParam(required = true) String channel, @RequestBody MessageDto message) {
        log.info("Redis Pub MSG Channel = {}", channel);
        redisSubscribeService.pubMsgChannel(channel, message);
    }

    @PostMapping("/cancle")
    public void cancelSubChannel(@RequestParam String channel) {
        redisSubscribeService.cancelSubChannel(channel);
    }

 

 

* Service

@Service
@RequiredArgsConstructor
@Slf4j
public class RedisPubService {

    private final RedisMessageListenerContainer redisMessageListenerContainer;
    private final RedisPublisher redisPublisher;

    // 각 Channel 별 Listener
    private final RedisSubscribeListener redisSubscribeListener;


    /**
     * Channel 별 Message 전송
     * @param
     */
    public void pubMsgChannel(String channel ,MessageDto message) {

		//1. 요청한 Channel 을 구독.
        redisMessageListenerContainer.addMessageListener(redisSubscribeListener, new ChannelTopic(channel));

        //2. Message 전송
        redisPublisher.publish(new ChannelTopic(channel), message);
    }

    /**
     * Channel 구독 취소
     * @param channel
     */
    public void cancelSubChannel(String channel) {
        redisMessageListenerContainer.removeMessageListener(redisSubscribeListener);
    }

}

 

- 해당 예제에서는, 간단하게 각 Channel 별로 메세지 전송 + SUB 만을 테스트하기에 코드를 최소화 시켰다.

- 앞서 설정했던 클래스들을 기반으로 pub, sub 을 수행한다.

- addMessageListener() 는 설정한 채널을 구독하여, pub 이벤트가 발생하면 sub 설정대로 동작하게 된다.

* 각 로직이 다르고, 복잡성을 고려해야 한다면, Listener를 분리하는것도 좋은 방법이 될것 같다.

 

 

2. 테스트

  1. Swagger 를 이용하여 pub 요청 
  2. 각 채널 (one, two) 별 pub 전송
  3. 각 채널별 sub 확인. 

 

2-1. Swagger 를 이용하여 pub 요청 

 

 

 

- Client 측에서의 요청은 완료 되었다. 로그와 Redis 를 확인해보자. 

 

Listener

 

redis-cli

- Redis-cli) subscribe {채널명} 을 통해 구독 할 수 있으며, 메세지를 볼 수있다. 

- Redis 에는 설정한대로 직렬화 되어 값이 저장된것을 확인 할 수 있으며, Sub Channel 에서도 메세지를 정상적으로 확인할 수 있다.

 


Spring Boot와 Redis 를 사용해, 메세지 이벤트 기반 기본 틀에 대해 구조를 만들어보았고 간단한 테스트까지 진행해보았다.

테스트를 진행하며, 어떤상황에서 어떤구조를 만들어야 하는지, 더 좋은 객체화 방법이나 구조에대한 아이디어를 적용해봐야 할것같다 ,,

 

반응형

댓글