@KafkaListener
애노테이션을 사용하면 Spring Kafka가 내부적으로 메시지를 자동으로 poll
한다. 이 과정은 다음과 같다.
동작 과정
@KafkaListener
가 붙은 메서드를 찾아 Kafka Consumer를 초기화한다.subscribe
메서드를 호출하여 지정된 토픽에 대한 구독을 시작한다.poll
하는 작업을 수행한다.poll
로 가져온 메시지를 @KafkaListener
가 붙은 메서드에 전달하여 비즈니스 로직을 수행한다.예시 코드
@KafkaListener(topics = "my_topic", groupId = "my_group")
public void consumeMessage(String message) {
// 메시지 처리 로직
System.out.println("Received Message: " + message);
}
@KafkaListener
애노테이션은 my_topic
토픽의 메시지를 수신하고, consumeMessage
메서드를 호출하여 메시지를 처리합니다. groupId
는 Consumer Group을 지정하는데 사용됩니다.@KafkaListener
를 사용하면 내부적으로 subscribe
가 처리되기 때문에 별도로 subscribe
메서드를 호출할 필요가 없습니다.@KafkaListener
는 별도의 스레드에서 동작하므로, 멀티스레딩 환경에서의 동기화와 관련된 주의사항이 적용됩니다.@KafkaListener
를 사용하면 복잡한 Kafka Consumer 로직을 간단하게 처리할 수 있으며, Spring Boot 애플리케이션과 자연스럽게 통합된다.<aside>
💡 @KafkaListener
어노테이션 동작 원리
</aside>
@KafkaListener
어노테이션이 붙은 메서드를 찾기 위해 KafkaListenerAnnotationBeanPostProcessor
가 스프링 빈을 순회한다.// KafkaListenerAnnotationBeanPostProcessor.java
public Object postProcessAfterInitialization(Object bean, String beanName) {
// @KafkaListener 어노테이션이 붙은 메서드를 찾고 처리
}
KafkaMessageListenerContainer
에 등록한다.// KafkaMessageListenerContainer.java
public void setupMessageListener(Object messageListener) {
// 메시지 리스너 설정
}
KafkaMessageListenerContainer
는 Kafka 토픽에서 메시지를 수신합니다.// KafkaMessageListenerContainer.java
public void doPoll() {
// Kafka 토픽에서 메시지를 폴링
}
@KafkaListener
어노테이션이 붙은 메서드에 전달한다.// 내부 로직 예시
public void invokeListenerMethod(String methodName, Object... args) {
// 리스너 메서드 호출
}
@KafkaListener
어노테이션이 동작하게 된다. 이것은 대략적인 설명이며, 실제 코드는 더 복잡할 수 있다. 이해하기 쉽게 설명하기 위해 간단하게 요약했다.