<aside> 1️⃣ KafkaTemplate 코드 확인

</aside>

  1. KafkaTemplate<K,V> 메서드로 들어와서 send()부분만 확인했다.
package org.springframework.kafka.core;

public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
		ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {

		@Override
		public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
			ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
			return observeSend(producerRecord);
		}
		
		@Override
		public CompletableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
			ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
			return observeSend(producerRecord);
		}
		
		@Override
		public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
			ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
			return observeSend(producerRecord);
		}
		
		@Override
		public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
				@Nullable V data) {
		
			ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
			return observeSend(producerRecord);
		}
		
		@Override
		public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
			Assert.notNull(record, "'record' cannot be null");
			return observeSend(record);
		}
		
		@SuppressWarnings("unchecked")
		@Override
		public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
			ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
			if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
				byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
				if (correlationId != null) {
					producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
				}
			}
			return observeSend((ProducerRecord<K, V>) producerRecord);
		}

}

<aside> 2️⃣

GPT에게 질문

</aside>

CompletableFuture<SendResult<K, V>> future1 = kafkaTemplate.send(topic1, key1, value1);
CompletableFuture<SendResult<K, V>> future2 = kafkaTemplate.send(topic2, key2, value2);

CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);

// 두 작업이 모두 완료되면 실행될 로직
combinedFuture.thenRun(() -> {
    // Do something
});

<aside> 3️⃣

ListenableFuture vs CompletableFuture

</aside>

1. ListenableFuture (2022년 기준)

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "key", "value");

// 비동기 콜백 등록
future.addCallback(
  result -> {
    // 성공한 경우
    System.out.println("Sent message: " + result);
  },
  ex -> {
    // 실패한 경우
    System.err.println("Failed to send message: " + ex.getMessage());
  }
);

2. CompletableFuture (2023년 기준)

CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "key", "value");

// 비동기 콜백 등록
future.thenAccept(result -> {
    System.out.println("Sent message: " + result);
}).exceptionally(ex -> {
    System.err.println("Failed to send message: " + ex.getMessage());
    return null;
});

3. 장점 비교