1. docker-compose.yml 작성
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:9095
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9095
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"
      - "9095:9095"

  kafka2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: INTERNAL://:9093,EXTERNAL://:9096
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9093,EXTERNAL://localhost:9096
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9093:9093"
      - "9096:9096"

  kafka3:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: INTERNAL://:9094,EXTERNAL://:9097
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9094,EXTERNAL://localhost:9097
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9094:9094"
      - "9097:9097"

  zipkin:
    image: openzipkin/zipkin
    ports:
      - "9411:9411"
  1. docker desktop설치

Untitled

  1. 터미널로 docker-compose.yml이 있는 위치로 가서 아래의 명령어 실행
sudo docker-compose up -d
  1. 실행이 완료되면 아래와 같이 보인다.

    Untitled

  2. SpringBoot로 가서 설정하자

spring:
  kafka:
    bootstrap-servers: localhost:9095
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: recipia

  zipkin:
    base-url: <http://localhost:9411/>

  sleuth:
    enabled: true
    sampler:
      probability: 1.0

server:
  port: 8081
---
  1. SpringBoot에서 topic 생성, 삭제, 수정을 하기위한 클래스를 작성한다.
/**
 * Kafka의 topic을 관리하는 클래스
 */
@Configuration
public class KafkaTopicConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * Kafka 관리자 설정을 Bean으로 등록한다. KafkaAdmin은 Spring Kafka 라이브러리에서 제공하는 클래스로,
     * Kafka의 AdminClient를 Spring 컨테이너에 쉽게 등록하고 관리할 수 있도록 도와준다.
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return new KafkaAdmin(configs);
    }

    /**
     * Kafka AdminClient를 Bean으로 등록한다. AdminClient는 Kafka 클러스터를 프로그래밍 방식으로 관리할 수 있게 해주는 라이브러리이다.
     * 토픽 생성, 수정, 삭제 등의 작업을 수행할 수 있다.
     */
    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfigurationProperties());
    }

    /**
     * "member-updated" 라는 이름의 새 토픽을 생성한다. topic1() 메서드는 Spring 애플리케이션을 시작할 때 자동으로 호출된다.(@Bean 이라서)
     * 따라서 수동으로 호출할 필요는 없으며, "member-updated" 토픽은 애플리케이션 시작 시에 자동으로 생성된다.
     */
    @Bean
    public NewTopic topic1() {
        return new NewTopic("member-updated", 1, (short) 1);
    }

    /**
     * 토픽을 더 추가하려면 이렇게 Bean으로 등록한다.
     * 예시로 주석 처리되어 있다.
     */
//    @Bean
//    public NewTopic topic2() {
//        return new NewTopic("another-topic", 1, (short) 1);
//    }

}
  1. 이제 토픽이 생성될텐데 그럼 뭐가 생성되었는지도 SpringBoot실행할때 확인하도록 코드를 추가한다.
/**
 * Kafka 토픽을 나열하는 클래스이다.
 */
@Slf4j
@RequiredArgsConstructor
@Component
public class TopicLister {

    // Kafka AdminClient, Spring의 DI를 통해 주입받는다.
    private final AdminClient adminClient;

    /**
     * 현재 Kafka에 존재하는 모든 토픽을 나열하는 메서드이다.
     */
    public void listTopics() {
        // AdminClient를 사용해 모든 토픽 정보를 가져온다.
        ListTopicsResult listTopicsResult = adminClient.listTopics();

        try {
            // 토픽 이름만 추출하기 위해 Future 객체를 동기적으로 해결한다.(get()으로 바로 받는다.)
            Set<String> topicNames = listTopicsResult.names().get();

            // 각 토픽 이름을 로그로 출력한다.
            for (String topicName : topicNames) {
                log.info("발견된 토픽: {}", topicName);
            }
        } catch (InterruptedException | ExecutionException e) {
            // 예외 상황에 대한 로깅을 한다.
            log.error("토픽 목록을 가져오는 도중 오류가 발생했습니다.", e);
        }
    }

}
  1. 최종적으로 위에서 작성한 코드가 SpringBoot를 시작할때 main클래스랑 같이 동작하도록 코드를 넣어준다.
@SpringBootApplication
public class MemberApplication {

	public static void main(String[] args) {
		SpringApplication.run(MemberApplication.class, args);
	}

	/**
	 * 애플리케이션 시작 시에 TopicLister의 listTopics() 메서드가 실행되어 Kafka 토픽 목록을 출력한다.
	 */
	@Bean
	public CommandLineRunner run(TopicLister topicLister) {
		return args -> {
			topicLister.listTopics();
		};
	}

}
  1. 결과 로그
발견된 토픽: receive-response
발견된 토픽: send-username
발견된 토픽: member-updated
  1. 이제 코드를 짜서 보낸다.