파이썬으로 Kafka 예제
이번 글에서는 파이썬을 이용하여 간단한 Kafka 예제를 진행하려고 합니다. 해당 예제는 로컬 환경에서 Docker Desktop을 이용하여 진행할 것이며, 간단한 예제이므로 1개의 브로커와 주키퍼를 생성할 것입니다.
Docker Desktop이란
Windows 또는 macOS 운영체제에서 Docker를 사용할 수 있게하는 도구로, 로컬 환경에서 도커를 쉽게 설치하고 실행하기 위해 사용됩니다. (윈도우에서 Docker Desktop을 사용하기 위해서는 WSL2(Windows Subsystem For Linux 2) 관련 환경 설정 작업이 필요합니다.)
Docker Compose 작성 및 실행
① docker-compose.yml 작성
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1 # 주키퍼를 식별하는 아이디로 유일한 값, 1개의 주키퍼를 사용할 예정이라 없어도 문제 없음
ZOOKEEPER_CLIENT_PORT: 2181 # 주키퍼 포트, 기본 포트로 2181 사용
ZOOKEEPER_TICK_TIME: 2000 # 클러스터를 구성할 때 동기화를 위한 기본 틱 타임
broker:
image: confluentinc/cp-kafka:7.0.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1 # 카프카의 브로커 아이디로 유일한 값, 1개의 브로커를 사용할 예정이라 없어도 문제 없음
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' # 주키퍼에 연결하기 위한 대상 지정 [서비스이름:컨테이너내부포트]
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT # 보안을 위한 프로토콜 매핑. PLAINTEXT는 암호화하지 않은 일반 평문
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 # 외부 클라이언트에 알려주는 리스너 주소
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # 토픽 복제에 대한 설정 값
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 # 트랜잭션 최소 ISR(InSyncReplicas 설정) 수
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # 트랜잭션 상태에서 복제 수
② docker-compose 실행
docker-compose up -d
③ docker 컨테이너 목록 조회
docker ps -a
kafka-python 라이브러리 설치
파이썬으로 kafka를 호출하는 방법은 대표적으로 2가지(confluent-kafka, kafka-python)가 존재하고 있으며, 이중 많이 사용하고 있는 kafka-python을 설치합니다.
pip install kafka-python
Producer
메시지를 전송할 producer.py를 작성하고 실행해줍니다. json 형태의 데이터를 1000개 생성하여 topic1이라는 topic으로 메시지를 전송하는 코드입니다.
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0, # 메시지 전송 완료에 대한 체크
compression_type='gzip', # 메시지 전달할 때 압축(None, gzip, snappy, lz4 등)
bootstrap_servers=['localhost:9092'], # 전달하고자 하는 카프카 브로커의 주소 리스트
value_serializer=lambda x:dumps(x).encode('utf-8') # 메시지의 값 직렬화
)
start = time.time()
for i in range(1000):
data = {'str' : 'result'+str(i)}
producer.send('topic1', value=data)
producer.flush() #
print('[Done]:', time.time() - start)
Consumer
메시지를 수신하는 consumer.py를 작성하고 실행해줍니다.
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'topic1', # 토픽명
bootstrap_servers=['localhost:9092'], # 카프카 브로커 주소 리스트
auto_offset_reset='earliest', # 오프셋 위치(earliest:가장 처음, latest: 가장 최근)
enable_auto_commit=True, # 오프셋 자동 커밋 여부
group_id='test-group', # 컨슈머 그룹 식별자
value_deserializer=lambda x: loads(x.decode('utf-8')), # 메시지의 값 역직렬화
consumer_timeout_ms=1000 # 데이터를 기다리는 최대 시간
)
print('[Start] get consumer')
for message in consumer:
print(f'Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
print('[End] get consumer')
기타
토픽 리스트 조회
# docker-compose exec [Service Name] kafka-topics --list --bootstrap-server [Service Name]:[Port]
docker-compose exec broker kafka-topics --list --bootstrap-server broker:9092
__consumer_offsets은 컨슈머 그룹의 오프셋 정보를 저장하는 특별한 내부 토픽으로 각 컨슈머가 읽은 메시지의 오프셋을 추적하고 관리하는 역할을 합니다.
토픽 정보 확인
# docker-compose exec [Service Name] kafka-topics --describe --topic [Topic Name] --bootstrap-server [Service Name]:[Port]
docker-compose exec broker kafka-topics --describe --topic topic1 --bootstrap-server broker:9092
참고
Apache Kafka Installation and Setup FAQs
How do I download, install, and run Kafka? Frequently asked questions and answers on how to get started using Kafka.
developer.confluent.io
함께 보면 좋은 글
Apache Kafka(카프카) 기본개념
Apache Kafka는 실시간 데이터 처리와 스트리밍에 필요한 강력한 도구입니다. 대용량 데이터를 빠르게 이동하고 처리하는 데 적합하며, 로그 처리, 데이터 스트리밍 등 다양한 분야에서 활용됩니다
dev-records.tistory.com
'Big Data' 카테고리의 다른 글
Apache Airflow란? (0) | 2023.07.20 |
---|---|
Data Lake와 Data Warehouse, Data Mart 개념과 비교 (0) | 2023.06.16 |
Apache Kafka(카프카) 기본개념 (2) | 2023.06.09 |
Colab + Spark 활용한 간단한 예제 (0) | 2023.06.05 |
Apache Spark란? (0) | 2023.06.02 |
댓글