준호씨의 블로그

Spring - @KafkaListener topic 여러개 사용하기. 설정에서 여러개 불러 오려면? 본문

개발이야기

Spring - @KafkaListener topic 여러개 사용하기. 설정에서 여러개 불러 오려면?

준호씨 2020. 11. 5. 00:10
반응형

Spring프로젝트에서 간단하게 KafkaListener를 구현한 예제입니다.

application.yaml에 토픽 이름을 설정해 두었습니다.

kafka:
  topic:
    member: member-topic

설정에 있는 토픽명을 사용하기 위해 topics = "${kafka.topic.member}"를 입력했습니다.

    @KafkaListener(id = "myGroupId", topics = "${kafka.topic.member}")
    public void kafkaListener(MessageDto messageDto) {
        switch (messageDto.getType()) {
            case REGISTER: {
                memberService.register(messageDto);
                break;
            }
            case WITHDRAWAL: {
                memberService.withdrawal(messageDto.getId());
                break;
            }
            default: {
                throw new RuntimeException("wrong type error. type=" + messageDto.getType());
            }
        }
    }

그런데 뭔가 작업을 하다 보면 토픽을 분리해야 되는 경우가 생기기도 합니다. 토픽 이름을 바꾸려고 하는데 그냥 바꿔 버리면 일시적으로 메시지를 못 받다가 유실해 버릴 수도 있습니다. 그래서 일단 기존 토픽도 사용하고 새 토픽도 사용하는 상태로 만들어야 될 수 있습니다.

application.yaml에 새로운 토픽 이름을 추가합니다.

kafka:
  topic:
    member-old: member-topic
    member: member-topic-sandbox

 

아래처럼 토픽을 두 개 적어주면 됩니다.

    @KafkaListener(id = "myGroupId", topics = {"${kafka.topic.member-old}", "${kafka.topic.member}"})
    public void kafkaListener(MessageDto messageDto) {
        switch (messageDto.getType()) {
            case REGISTER: {
                memberService.register(messageDto);
                break;
            }
            case WITHDRAWAL: {
                memberService.withdrawal(messageDto.getId());
                break;
            }
            default: {
                throw new RuntimeException("wrong type error. type=" + messageDto.getType());
            }
        }
    }

 

그런데 개발 환경에 따라 특정 phase에서는 토픽을 나눌 필요가 없을 수도 있고, 어떤 경우에는 토픽의 개수를 가변적으로 바꿔야 되는 상황이 될 수도 있습니다.

application.yaml 설정에서 쉼표(,)로 토픽 리스트를 만들었습니다.

kafka:
  topic:
    member: member-topic,member-topic-sandbox

이러한 설정을 가져다 사용하려고 할 때 SpEL (Spring Expression Language)를 사용할 수 있습니다.

    @KafkaListener(id = "myGroupId", topics = "#{'${kafka.topic.member}'.split(',')}")
    public void kafkaListener(MessageDto messageDto) {
        switch (messageDto.getType()) {
            case REGISTER: {
                memberService.register(messageDto);
                break;
            }
            case WITHDRAWAL: {
                memberService.withdrawal(messageDto.getId());
                break;
            }
            default: {
                throw new RuntimeException("wrong type error. type=" + messageDto.getType());
            }
        }
    }

kafka.topic.member의 설정을 가져온 다음 쉼표(,)로 split 해서 리스트로 만든다는 의미입니다.

SpEL을 이용해서 설정은 하나인데 여러 개의 토픽을 쉽게 사용할 수 있습니다.

반응형
Comments