https://kroki.io/blockdiag/svg/eNpLyslPzk7JTExXqOZSUMgvykzNK0ksyczPU7BVKMgvKilKzCyxBsp4J6ZlJ8b7phYXJ6anKkTnJCal5tgqgUUVXq-c8qZ7zpvlDUo6Csn5OflFtko5mekZJUk5palKsWDdqZVwPW-blqCpSy9KTc2DKAxLBOqBK321YRJupagu0rUDWYJVHGymNVctACfGUpM=
// Kafka Producer 예시 코드
import org.apache.kafka.clients.producer.*;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092","localhost:9093","localhost:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test", "key", "value");
// 메시지 전송
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("메시지가 성공적으로 전송되었습니다.");
}
}
});
producer.close();
}
}
https://kroki.io/blockdiag/svg/eNpLyslPzk7JTExXqOZSUMgvykzNK0ksyczPU7BVKMgvKilKzCyxBsp4J6ZlJ8b7phYXJ6anKkTnJCal5tgqgUUVXq-c8qZ7zpvlDUo6Csn5OflFtko5mekZJUk5palKsWDdqZVwPW-blmgA-ZpoitOLUlPzIKrDEoEa4epfbZikARbBo8M_La04tQSu5c2MJW-ntLxp7daASKDrrEzNyckvh2gNycxNLS5JzC1AOLC54c28ljddS942rwEaowFXgc8Yj9TElNSiYoQh05e87p2iARXGpxM1ZHXtQIGFVRwcCFhlIL7EKgV3PFZZqPOsuWoB0WC7eQ==
Kafka 메시지: Kafka에서 주고받는 정보의 '상자'이다.
상세한 설명
코드 예시 (심화)
// Kafka Producer with Headers and Timestamp 예시 코드
import org.apache.kafka.clients.producer.*;
import java.util.Arrays;
public class AdvancedKafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// 헤더 추가
Headers headers = new RecordHeaders();
headers.add("event-type", "user-login".getBytes());
// 타임스탬프 추가 (현재 시간)
long timestamp = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>("test", 0, timestamp, "key", "value", headers);
// 메시지 전송
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("메시지가 성공적으로 전송되었습니다.");
}
}
});
producer.close();
}
}