MLOps/Kafka

Kafka-python을 이용한 실시간 비트코인 데이터 받기

manfromearth1 2025. 5. 16. 00:51
[Binance WebSocket]
       ↓
[Kafka Producer (Python)]
       ↓
[Kafka Broker on EC2]
       ↓
[Kafka Consumer (Python)]

 

위와 같은 플로우를 바탕으로 실시간 비트코인 데이터를 받아보자.


1. AWS EC2 실행 & Kafka 설치

우선 EC2를 실행해보자. 아래의 사진과 같이 인스턴스를 시작하자.

인스턴스 시작 클릭
인스턴스 유형: t3.small, 키 페어가 없으면 새로 생성하자.
다음과 같은 설정을 해주자. Zookeeper와 Kafka를 위한 포트!
프리 티어에 맞게 30GB 스토리지 사용하자!

이제 터미널을 열고 EC2에 접속해주자.

ssh -i "PEM KEY DIRECTORY"  ec2-user@<PUBLIC IP>

 

이제는 Kafka를 설치할 차례다. 아래의 코드를 실행해주자.

# Java JDK 설치
wget https://download.oracle.com/java/21/latest/jdk-21_linux-x64_bin.tar.gz
tar -xvf jdk-21_linux-x64_bin.tar.gz

`ls` 명령어를 통해서 jdk 버전을 확인해주고, `vim ~/.bashrc`를 실행하자. 실행하고 나서 아래의 수정사항을 반영하자.

# INSERT 눌러서 편집 모드, export PATH= 찾기
export PATH=/home/ec2-user/jdk-21.0.7/bin:$PATH
# ESC + :wq 입력하면 파일 저장 및 나가기

source ~/.bashrc # 변경사항 반영

java -version
# 아래는 명령어의 결과. 다음과 같이 뜨면 성공!
java version "21.0.7" 2025-04-15 LTS
Java(TM) SE Runtime Environment (build 21.0.7+8-LTS-245)
Java HotSpot(TM) 64-Bit Server VM (build 21.0.7+8-LTS-245, mixed mode, sharing)

 

이제 Kafka를 설치하도록 하자.

# 설치 후 압축 풀기
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
tar -xzf kafka_2.12-3.8.0.tgz

# kafka 설치 폴더로 이동
cd kafka_2.12-3.8.0

# 1번째 터미널에 zookeeper 실행
bin/zookeeper-server-start.sh config/zookeeper.properties

# 2번째 터미널에 broker 실행
# 그치만 해야 할 것이 있다~
cd kafka_2.12-3.8.0
vim config/server.properties
advertised.listeners=PLAINTEXT://EC2-PUBLIC-IP:9092
:wq

# broker 실행
bin/kafka-server-start.sh config/server.properties

이렇게 broker, zookeeper을 실행하면 일단은 끝이다!

 

2. Binance Websocket을 이용한 pub sub 코드

# producer.py
import asyncio
import json
from kafka import KafkaProducer
import websockets

BINANCE_WS = "wss://stream.binance.com:9443/ws/btcusdt@trade"

producer = KafkaProducer(
    bootstrap_servers=["<EC2_PUBLIC_IP>:9092"],  # EC2 퍼블릭 IP
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

async def binance_to_kafka():
    async with websockets.connect(BINANCE_WS) as websocket:
        while True:
            message = await websocket.recv()
            data = json.loads(message)
            producer.send("binance-trades", data)
            print("Sent:", data)

asyncio.run(binance_to_kafka())
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    "binance-trades",
    bootstrap_servers=["<EC2_PUBLIC_IP>:9092"],
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    auto_offset_reset='latest',
    enable_auto_commit=True
)

for message in consumer:
    print("Received:", message.value)

본인 EC2 퍼블릭 IP를 작성해주도록 하자.

이제 다음과 같은 절차를 거치면 된다.

# Kafka 토픽 생성
bin/kafka-topics.sh --create \
  --topic binance-trades \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1


# EC2에서 consumer.py 실행
python consumer.py
# 혹은 python3 consumer.py

# 다른 터미널 / 로컬에서 producer.py 실행
python producer.py

이렇게 consumer.py를 실행한 터미널에서 데이터가 엄청 많이 오는 것을 확인할 수 있다!

생각보다 어렵지 않은 코드다. MLOps를 할 때 가장 기본이 되는 데이터 스트리밍을 하는 법을 배웠고, 이 다음에는 데이터를 받아오면서 전처리와 S3에 담는 과정을 구현해보면 될 것 같다.

항상 종료는 꼭 해주자!

'MLOps > Kafka' 카테고리의 다른 글

Kafka Streams, KStreams  (0) 2025.05.13
Kafka 개념 이해하기  (0) 2025.05.13