- 이제 Kafka를 사용하여 직접 프로그래밍을 해볼 것이다. 우선 링크에서 kafka를 다운로드하자.
(kafka 3.4.0, Scala 2.13버전을 사용)
(만약 jdk가 설치되어 있지 않으면 설치 후 환경변수 설정까지 해야함)
- 압축을 풀고 "~/user" 디렉토리에 옮긴다. 이후 Kafka 설정 파일을 수정한다(./config/server.properties)
1. log.dirs를 원하는 디렉토리로 수정
2. host와 port를 설정
- 이제 Zookeeper를 실행하고 (port:2181)
(kafka_test) C:\Users\subo0\kafka_2.13-3.4.0\bin\windows>zookeeper-server-start.bat ../../config/zookeeper.properties
- Kafka도 실행해 준다. (port:9092)
(kafka_test) C:\Users\subo0\kafka_2.13-3.4.0\bin\windows>kafka-server-start.bat ../../config/server.properties
(이럴 때마다 맥북을 지르고 싶다...)
- 그리고 kakfa_test라는 이름으로 가상환경 만들고, vscode의 인터프리터를 연결 후 pip로 kafka를 설치해주자. (사실 kafka 설치 전에 해도 상관 없다)
conda create -n kafka_test
conda activate kafka_test
pip install kafka-python
- 이제 간단하게 producer.py를 만들어서 로그 메시지를 생성해 볼 것이다. 아래 코드를 python producer.py로 실행하면
from kafka import KafkaProducer
# producer 인스턴스화
# kafka는 클러스터 환경에서 동작하기 때문에 여러개의 서버로 데이터를 보낼 수 있다.
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'])
# 메시지는 byte형태로 보내기
producer.send('first-topic', b'hello world test 1')
# Remove Buffer
producer.flush()
- 아래와 같이 지정한 디렉토리에 지정된 이름으로 토픽이 생성되고
- 로그를 열어보면 아래와 같이 메시지가 정상적으로 저장됨을 알 수 있다.
- 이제 consumer를 생성해 볼 것이다. 아래 코드를 consumer.py에 작성하고 python consumer.py를 통해 consumer를 실행해준다.
from kafka import KafkaConsumer
consumer = KafkaConsumer('first-topic', bootstrap_servers = ['localhost:9092'])
for messages in consumer:
print(messages)
- consumer를 실행했다면 다른 cmd를 열고 다시 producer.py를 실행해준다.
- consumer를 실행한 cmd를 확인하면 아래와 같이 'first-topic' 토픽의 메시지를 받은 것을 확인할 수 있다.
'Kafka' 카테고리의 다른 글
Stream Processing With Kafka #5 (0) | 2023.05.07 |
---|---|
Stream Processing With Kafka #4 (0) | 2023.05.07 |
Stream Processing With Kafka #2 (0) | 2023.05.05 |
Stream Processing With Kafka #1 (0) | 2023.05.05 |