<aside> 1️⃣ KafkaTemplate 코드 확인
</aside>
package org.springframework.kafka.core;
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
		ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
		@Override
		public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
			ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
			return observeSend(producerRecord);
		}
		
		@Override
		public CompletableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
			ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
			return observeSend(producerRecord);
		}
		
		@Override
		public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
			ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
			return observeSend(producerRecord);
		}
		
		@Override
		public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
				@Nullable V data) {
		
			ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
			return observeSend(producerRecord);
		}
		
		@Override
		public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
			Assert.notNull(record, "'record' cannot be null");
			return observeSend(record);
		}
		
		@SuppressWarnings("unchecked")
		@Override
		public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
			ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
			if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
				byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
				if (correlationId != null) {
					producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
				}
			}
			return observeSend((ProducerRecord<K, V>) producerRecord);
		}
}
<aside> 2️⃣
GPT에게 질문
</aside>
KafkaTemplate에서 CompletableFuture<SendResult<K, V>>를 반환하게 변경되었다고 하니, 이는 개발에 있어 더 넓은 선택의 여지를 주는 좋은 변경점이라고 생각합니다.CompletableFuture는 Java 8 이상에서 사용할 수 있는 강력한 비동기 프로그래밍 도구이므로, 이러한 변경은 분산 시스템에서의 비동기 처리나 다른 서비스와의 통합을 좀 더 쉽고 유연하게 할 수 있게 될 것입니다.CompletableFuture를 사용하면 다음과 같이 간편하게 여러 비동기 작업을 조합할 수 있습니다:CompletableFuture<SendResult<K, V>> future1 = kafkaTemplate.send(topic1, key1, value1);
CompletableFuture<SendResult<K, V>> future2 = kafkaTemplate.send(topic2, key2, value2);
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
// 두 작업이 모두 완료되면 실행될 로직
combinedFuture.thenRun(() -> {
    // Do something
});
KafkaTemplate을 사용하는 개발자들은 더욱 복잡한 로직을 더 간결하고 효율적인 방법으로 구현할 수 있을 것으로 예상됩니다.<aside> 3️⃣
ListenableFuture vs CompletableFuture
</aside>
CompletableFuture와 ListenableFuture 모두 비동기 프로그래밍을 위한 좋은 도구이지만, 여기에는 몇 가지 중요한 차이점들이 있다.ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "key", "value");
// 비동기 콜백 등록
future.addCallback(
  result -> {
    // 성공한 경우
    System.out.println("Sent message: " + result);
  },
  ex -> {
    // 실패한 경우
    System.err.println("Failed to send message: " + ex.getMessage());
  }
);
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "key", "value");
// 비동기 콜백 등록
future.thenAccept(result -> {
    System.out.println("Sent message: " + result);
}).exceptionally(ex -> {
    System.err.println("Failed to send message: " + ex.getMessage());
    return null;
});