- 이제 클러스터에서 토픽을 생성해보자. KAFKA_ADVERTISED_LISTENERS에 post를 19091로 설정했기 때문에 bootstrap-server를 localhost:19091로 설정하고, "first-cluster-topic"이라는 이름으로 토픽을 생성했다. 파티션은 3개를 생성했고, replication-factor는 1로 설정했다.
docker exec -it [컨테이너ID] kafka-topics --bootstrap-server=localhost:19091 --create --topic first-cluster-topic --partitions 3 --replication-factor 1
- kafdrop에서 확인해 보면, 아래와 같이 토픽이 잘 생성된 것을 볼 수 있다.
- CLI에서 뿐만 아니라 kafdrop의 UI를 통해서도 토픽을 생성 및 삭제할 수 있다. 다음은 아래의 NEW 버튼을 통해 추가한 새 토픽이다.
- 이제 클러스터에서 사용할 수 있는 producer를 생성할 것이다.
# cluster_producer.py
from kafka import KafkaProducer
brokers = ["localhost:9091", "localhost:9092", "localhost:9093"]
topicName = "first-cluster-topic"
producer = KafkaProducer(bootstrap_servers = brokers)
producer.send(topicName, b"Kafka Cluster Test1 !!!")
producer.flush()
- python cluster_producer.py를 통해 위의 코드를 실행해주면 된다. kafdrop을 사용하면 consumer가 없어도 메시지를 확인해 볼 수 있다.
- 이전과 동일한 방식으로 consumer도 작성하고
# cluster_consumer.py
from kafka import KafkaConsumer
brokers = ["localhost:9091", "localhost:9092", "localhost:9093"]
topicName = "first-cluster-topic"
consumer = KafkaConsumer(topicName, bootstrap_servers = brokers)
for message in consumer:
print(message)
- CLI에서 consumer를 실행시킨 후, 다른 CLI에서 producer를 실행하면 아래와 같은 결과를 얻을 수 있다.
728x90
'Kafka' 카테고리의 다른 글
Stream Processing With Kafka #4 (0) | 2023.05.07 |
---|---|
Stream Processing With Kafka #3 (0) | 2023.05.07 |
Stream Processing With Kafka #2 (0) | 2023.05.05 |
Stream Processing With Kafka #1 (0) | 2023.05.05 |