Kafka

Stream Processing With Kafka #5

PON_Z 2023. 5. 7. 22:38

- 이제 클러스터에서 토픽을 생성해보자. KAFKA_ADVERTISED_LISTENERS에 post를 19091로 설정했기 때문에 bootstrap-server를 localhost:19091로 설정하고, "first-cluster-topic"이라는 이름으로 토픽을 생성했다. 파티션은 3개를 생성했고, replication-factor는 1로 설정했다.

docker exec -it [컨테이너ID] kafka-topics --bootstrap-server=localhost:19091 --create --topic first-cluster-topic --partitions 3 --replication-factor 1

 

- kafdrop에서 확인해 보면, 아래와 같이 토픽이 잘 생성된 것을 볼 수 있다. 

 

- CLI에서 뿐만 아니라 kafdrop의 UI를 통해서도 토픽을 생성 및 삭제할 수 있다. 다음은 아래의 NEW 버튼을 통해 추가한 새 토픽이다.

 

 

- 이제 클러스터에서 사용할 수 있는 producer를 생성할 것이다.

# cluster_producer.py
from kafka import KafkaProducer

brokers = ["localhost:9091", "localhost:9092", "localhost:9093"]
topicName = "first-cluster-topic"

producer = KafkaProducer(bootstrap_servers = brokers)

producer.send(topicName, b"Kafka Cluster Test1 !!!")
producer.flush()

- python cluster_producer.py를 통해 위의 코드를 실행해주면 된다. kafdrop을 사용하면 consumer가 없어도 메시지를 확인해 볼 수 있다.

- 이전과 동일한 방식으로 consumer도 작성하고

# cluster_consumer.py
from kafka import KafkaConsumer

brokers = ["localhost:9091", "localhost:9092", "localhost:9093"]

topicName = "first-cluster-topic"
consumer = KafkaConsumer(topicName, bootstrap_servers = brokers)

for message in consumer:
    print(message)

 

- CLI에서 consumer를 실행시킨 후, 다른 CLI에서 producer를 실행하면 아래와 같은 결과를 얻을 수 있다.

728x90

'Kafka' 카테고리의 다른 글

Stream Processing With Kafka #4  (0) 2023.05.07
Stream Processing With Kafka #3  (0) 2023.05.07
Stream Processing With Kafka #2  (0) 2023.05.05
Stream Processing With Kafka #1  (0) 2023.05.05