version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka1:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9095
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9095
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
- "9095:9095"
kafka2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://:9093,EXTERNAL://:9096
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9093,EXTERNAL://localhost:9096
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9093:9093"
- "9096:9096"
kafka3:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://:9094,EXTERNAL://:9097
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9094,EXTERNAL://localhost:9097
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9094:9094"
- "9097:9097"
zipkin:
image: openzipkin/zipkin
ports:
- "9411:9411"
sudo docker-compose up -d
실행이 완료되면 아래와 같이 보인다.
SpringBoot로 가서 설정하자
spring:
kafka:
bootstrap-servers: localhost:9095
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: recipia
zipkin:
base-url: <http://localhost:9411/>
sleuth:
enabled: true
sampler:
probability: 1.0
server:
port: 8081
---
/**
* Kafka의 topic을 관리하는 클래스
*/
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* Kafka 관리자 설정을 Bean으로 등록한다. KafkaAdmin은 Spring Kafka 라이브러리에서 제공하는 클래스로,
* Kafka의 AdminClient를 Spring 컨테이너에 쉽게 등록하고 관리할 수 있도록 도와준다.
*/
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
/**
* Kafka AdminClient를 Bean으로 등록한다. AdminClient는 Kafka 클러스터를 프로그래밍 방식으로 관리할 수 있게 해주는 라이브러리이다.
* 토픽 생성, 수정, 삭제 등의 작업을 수행할 수 있다.
*/
@Bean
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfigurationProperties());
}
/**
* "member-updated" 라는 이름의 새 토픽을 생성한다. topic1() 메서드는 Spring 애플리케이션을 시작할 때 자동으로 호출된다.(@Bean 이라서)
* 따라서 수동으로 호출할 필요는 없으며, "member-updated" 토픽은 애플리케이션 시작 시에 자동으로 생성된다.
*/
@Bean
public NewTopic topic1() {
return new NewTopic("member-updated", 1, (short) 1);
}
/**
* 토픽을 더 추가하려면 이렇게 Bean으로 등록한다.
* 예시로 주석 처리되어 있다.
*/
// @Bean
// public NewTopic topic2() {
// return new NewTopic("another-topic", 1, (short) 1);
// }
}
/**
* Kafka 토픽을 나열하는 클래스이다.
*/
@Slf4j
@RequiredArgsConstructor
@Component
public class TopicLister {
// Kafka AdminClient, Spring의 DI를 통해 주입받는다.
private final AdminClient adminClient;
/**
* 현재 Kafka에 존재하는 모든 토픽을 나열하는 메서드이다.
*/
public void listTopics() {
// AdminClient를 사용해 모든 토픽 정보를 가져온다.
ListTopicsResult listTopicsResult = adminClient.listTopics();
try {
// 토픽 이름만 추출하기 위해 Future 객체를 동기적으로 해결한다.(get()으로 바로 받는다.)
Set<String> topicNames = listTopicsResult.names().get();
// 각 토픽 이름을 로그로 출력한다.
for (String topicName : topicNames) {
log.info("발견된 토픽: {}", topicName);
}
} catch (InterruptedException | ExecutionException e) {
// 예외 상황에 대한 로깅을 한다.
log.error("토픽 목록을 가져오는 도중 오류가 발생했습니다.", e);
}
}
}
@SpringBootApplication
public class MemberApplication {
public static void main(String[] args) {
SpringApplication.run(MemberApplication.class, args);
}
/**
* 애플리케이션 시작 시에 TopicLister의 listTopics() 메서드가 실행되어 Kafka 토픽 목록을 출력한다.
*/
@Bean
public CommandLineRunner run(TopicLister topicLister) {
return args -> {
topicLister.listTopics();
};
}
}
발견된 토픽: receive-response
발견된 토픽: send-username
발견된 토픽: member-updated