Kafka

Stream Processing With Kafka #3

PON_Z 2023. 5. 7. 00:08

- 이제 Kafka를 사용하여 직접 프로그래밍을 해볼 것이다. 우선 링크에서 kafka를 다운로드하자.

(kafka 3.4.0, Scala 2.13버전을 사용)

(만약 jdk가 설치되어 있지 않으면 설치 후 환경변수 설정까지 해야함)

 

- 압축을 풀고 "~/user" 디렉토리에 옮긴다. 이후 Kafka 설정 파일을 수정한다(./config/server.properties)

1. log.dirs를 원하는 디렉토리로 수정
2. host와 port를 설정

 

- 이제 Zookeeper를 실행하고 (port:2181)

(kafka_test) C:\Users\subo0\kafka_2.13-3.4.0\bin\windows>zookeeper-server-start.bat ../../config/zookeeper.properties

- Kafka도 실행해 준다. (port:9092)

(kafka_test) C:\Users\subo0\kafka_2.13-3.4.0\bin\windows>kafka-server-start.bat ../../config/server.properties

(이럴 때마다 맥북을 지르고 싶다...)

 

 

-  그리고 kakfa_test라는 이름으로 가상환경 만들고, vscode의 인터프리터를 연결 후 pip로 kafka를 설치해주자. (사실 kafka 설치 전에 해도 상관 없다)

conda create -n kafka_test
conda activate kafka_test
pip install kafka-python

 

- 이제 간단하게 producer.py를 만들어서 로그 메시지를 생성해 볼 것이다. 아래 코드를 python producer.py로 실행하면

from kafka import KafkaProducer
# producer 인스턴스화
# kafka는 클러스터 환경에서 동작하기 때문에 여러개의 서버로 데이터를 보낼 수 있다.
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'])

# 메시지는 byte형태로 보내기
producer.send('first-topic', b'hello world test 1')
# Remove Buffer
producer.flush()

 

- 아래와 같이 지정한 디렉토리에 지정된 이름으로 토픽이 생성되고

 

- 로그를 열어보면 아래와 같이 메시지가 정상적으로 저장됨을 알 수 있다.

 

- 이제 consumer를 생성해 볼 것이다. 아래 코드를 consumer.py에 작성하고 python consumer.py를 통해 consumer를 실행해준다.

from kafka import KafkaConsumer

consumer = KafkaConsumer('first-topic', bootstrap_servers = ['localhost:9092'])

for messages in consumer:
    print(messages)

 

- consumer를 실행했다면 다른 cmd를 열고 다시 producer.py를 실행해준다. 

 

- consumer를 실행한 cmd를 확인하면 아래와 같이 'first-topic' 토픽의 메시지를 받은 것을 확인할 수 있다.

728x90

'Kafka' 카테고리의 다른 글

Stream Processing With Kafka #5  (0) 2023.05.07
Stream Processing With Kafka #4  (0) 2023.05.07
Stream Processing With Kafka #2  (0) 2023.05.05
Stream Processing With Kafka #1  (0) 2023.05.05