max-poll-records
참조: https://devoong2.tistory.com/entry/Kafka-컨슈머의-Poll-동작과정-및-maxpollrecords-에-대한-오해
단건의 메시지를 처리하기 위해서 enable.auto.commit을 max-poll-records와 함께 사용해야하는가?
kafkaListenerContainerFactory
의 동시성 설정을 1로 유지하여 메시지 처리를 단일 스레드로 제한합니다. 이를 통해 메시지가 순서대로 처리되도록 합니다.@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(3000);
factory.setConcurrency(1); // 단일 스레드로 처리
return factory;
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class KafkaMessageListener {
@KafkaListener(topics = "your-topic-name", containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String message, Acknowledgment acknowledgment) {
try {
// 메시지 처리 로직
System.out.println("Processing message: " + message);
// 메시지 처리 완료 후 offset 커밋
acknowledgment.acknowledge();
} catch (Exception e) {
// 예외 처리 로직
System.err.println("Error processing message: " + message);
// 예외 발생 시 커밋하지 않음
}
}
}
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: your-group-id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1
enable-auto-commit: false
max.poll.records=1
).@KafkaListener
메서드에서 처리됩니다.acknowledgment.acknowledge()
를 호출하여 커밋합니다.만약 파티션이 3개인데 setConcurrency(1)
로 설정하면 어떻게 되는지 설명드리겠습니다.