<aside> 1️⃣ KafkaTemplate 코드 확인
</aside>
package org.springframework.kafka.core;
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
@Override
public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return observeSend(producerRecord);
}
@Override
public CompletableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return observeSend(producerRecord);
}
@Override
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
return observeSend(producerRecord);
}
@Override
public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
@Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
return observeSend(producerRecord);
}
@Override
public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
Assert.notNull(record, "'record' cannot be null");
return observeSend(record);
}
@SuppressWarnings("unchecked")
@Override
public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
if (correlationId != null) {
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
}
}
return observeSend((ProducerRecord<K, V>) producerRecord);
}
}
<aside> 2️⃣
GPT에게 질문
</aside>
KafkaTemplate
에서 CompletableFuture<SendResult<K, V>>
를 반환하게 변경되었다고 하니, 이는 개발에 있어 더 넓은 선택의 여지를 주는 좋은 변경점이라고 생각합니다.CompletableFuture
는 Java 8 이상에서 사용할 수 있는 강력한 비동기 프로그래밍 도구이므로, 이러한 변경은 분산 시스템에서의 비동기 처리나 다른 서비스와의 통합을 좀 더 쉽고 유연하게 할 수 있게 될 것입니다.CompletableFuture
를 사용하면 다음과 같이 간편하게 여러 비동기 작업을 조합할 수 있습니다:CompletableFuture<SendResult<K, V>> future1 = kafkaTemplate.send(topic1, key1, value1);
CompletableFuture<SendResult<K, V>> future2 = kafkaTemplate.send(topic2, key2, value2);
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
// 두 작업이 모두 완료되면 실행될 로직
combinedFuture.thenRun(() -> {
// Do something
});
KafkaTemplate
을 사용하는 개발자들은 더욱 복잡한 로직을 더 간결하고 효율적인 방법으로 구현할 수 있을 것으로 예상됩니다.<aside> 3️⃣
ListenableFuture vs CompletableFuture
</aside>
CompletableFuture
와 ListenableFuture
모두 비동기 프로그래밍을 위한 좋은 도구이지만, 여기에는 몇 가지 중요한 차이점들이 있다.ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "key", "value");
// 비동기 콜백 등록
future.addCallback(
result -> {
// 성공한 경우
System.out.println("Sent message: " + result);
},
ex -> {
// 실패한 경우
System.err.println("Failed to send message: " + ex.getMessage());
}
);
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "key", "value");
// 비동기 콜백 등록
future.thenAccept(result -> {
System.out.println("Sent message: " + result);
}).exceptionally(ex -> {
System.err.println("Failed to send message: " + ex.getMessage());
return null;
});