KStreams
스트리밍 처리에서 자주 쓰이는 Kafka Streams는 실시간 분석 + 이벤트 기반 처리를 코드로 구현할 수 있는 강력한 라이브러리이다. 구성 요소로는 KStream, KTable, Window, Join, StateStore, Transformer 등이 있다. 각 개념들을 한번 살펴보고 어떻게 사용되는지 살펴보자.
KStream
KStream은 Kafka Topic에서 들어오는 데이터를 실시간으로 읽어들이는 스트림 이다.
- Append-only log: 시간 흐름에 따라 순차적으로 도착하는 데이터
- Stateful, stateless 데이터를 처리할 수 있음
.map(), .filter(),.groupByKey(),.join()등의 다양한 연산 제공
예시 코드를 보자.
KStream<String, String> stream = builder.stream("user_events");
KStream<String, String> clicks = stream
.filter((key, value) -> value.contains("click")) // "click" 이벤트만 필터링
.mapValues(v -> v.toUpperCase()); // 대문자로 변환
clicks.to("click_events"); // 결과를 새로운 토픽으로 보냄자주 사용하는 KStream 연산자는 다음과 같다:
| 연산자 | 설명 | 예시 |
|---|---|---|
map() |
Key, Value 둘 다 변경 | map((k, v) → KeyValue(k, v+1)) |
mapValues() |
Value만 변경 | 로그 메시지를 가공 |
filter() |
조건에 맞는 메시지만 통과 | 클릭 이벤트만 필터링 |
flatMap() |
하나의 입력 → 여러 출력 | 문장 → 단어 분리 |
groupByKey() |
Key를 기준으로 그룹화 (집계용) | 사용자별 집계 전 단계 |
to() |
Kafka Topic으로 전송 | .to("processed_topic") |
KStream은 그때그때 발생한 이벤트를 덮어쓰지 않고 순서대로 처리한다는 특징이 있다. 이러한 특징이 KTable와 다른데, 어떻게 다른지 살펴보도록 하자.
KTable
KTable은 Kafka Topic에 들어오는 데이터를 Key 기준으로 최신 상태만 유지하는 데이터 테이블 구조다. 즉 Key가 같으면 새로운 값이 이전 값을 덮어쓴다. 정확한 비교를 위해 표를 참고해보자.
| 항목 | KStream |
KTable |
|---|---|---|
| 의미 | 이벤트의 연속적인 흐름 | Key 기반의 최신 상태 (스냅샷) |
| Key 중복 처리 | 중복 허용 (모든 이벤트 처리) | 마지막 값만 유지 (덮어씀) |
| 예시 | 로그인 로그, 클릭 로그 | 유저별 마지막 접속 위치 |
| 조인 방식 | 실시간 Join | 상태 기반 Join |
StreamsBuilder builder = new StreamsBuilder();
// Kafka Topic "user_profiles" → KTable 생성
KTable<String, String> userTable = builder.table("user_profiles");
// 특정 값 출력
userTable.toStream().foreach((key, value) -> {
System.out.println("User ID: " + key + ", Latest Profile: " + value);
});이렇게 되면, 예를 들어 user123이라는 키를 가진 사용자의 value 값이 다음과 같이 KStream으로 들어온다고 해보자.
key: "user123", value: "Korea"
key: "user123", value: "USA"
key: "user123", value: "Japan"결과적으로는, KTable에서 user123의 value로는 "Japan"만 남게 되는 것이다. 이렇게 현재 상태를 추적하는 것에 기능을 담고 있는데, 거꾸로 KTable에서 상태 변환을 하는 경우 KStream으로 상태 변화를 했음을 전달할 수도 있겠다.
주요 연산자는 다음과 같다.
| 연산 | 설명 |
|---|---|
.filter() |
특정 조건의 상태만 유지 |
.mapValues() |
상태 값을 변경 |
.toStream() |
다시 KStream으로 변환 가능 |
.join(...) |
다른 KTable이나 KStream과 조인 가능 |
GlobalKTable
KStream - KTable 간 join을 하기 위해서는 Key alingment가 필요하다. 그런데 KTable이 파티셔닝 되어있으면, 조인하려는 Key가 다른 인스턴스에 있을 수 있다는 문제점이 존재한다. 이럴 때에는 Kafka Streams가 Key를 리디렉션하거나 리파티셔닝해야 하는데, 이 경우 복잡하고 성능도 좋지 않다고 한다.
따라서 GlobalKTable을 사용하면, 모든 인스턴스가 필요한 테이블 전체를 가지고 있기 때문에 로컬에서 바로 join할 수 있다는 장점이 있는 것이다.
| 항목 | KTable |
GlobalKTable |
|---|---|---|
| 파티셔닝 | ✅ Kafka 파티션 기준 분산 저장 | ❌ 모든 인스턴스에 전체 복제 |
| 조인 키 제한 | KStream.key == KTable.key 이어야만 조인 가능 |
KStream의 value에서 원하는 key 추출 가능 |
| 조인 성능 | 파티셔닝 맞아야 효율적 | 항상 로컬 조회이므로 빠름 |
| 용도 | 상태 기반 연산 | 작은 참조 테이블, 코드 맵, 설정값 등 |
| 예시 | 유저 점수 | 국가 코드 맵, 상품 카테고리 맵 |
Window
| 윈도우 종류 | 설명 | 예시 |
|---|---|---|
| Tumbling Window | 일정 시간 단위로 잘라서 서로 겹치지 않게 처리 | 0–10초, 10–20초, ... |
| Hopping Window | 일정 간격으로 중첩되게 슬라이딩 | 0–10초, 5–15초, 10–20초, ... |
| Session Window | 이벤트 간 간격이 짧으면 묶고, 일정 시간 넘으면 끊음 | 사용자 세션 처리 |
| Sliding Window (Kafka 3.1+) | 두 이벤트 간의 **쌍(pair)**을 기반으로 처리 | Fraud 감지 등에서 사용 |
Kafka Streams는 기본적으로 무한한 데이터 스트림을 처리한다. 그렇기 때문에 데이터를 계속 처리만 하면 끝이 없다는 문제점이 존재해서, 시간 단위로 끊어서 집계를 해야 할 필요가 있다.
Join
Join은 실시간으로 들어오는 두 데이터 스트림(또는 테이블)을 합쳐서 정보를 만드는 연산이다. 다음과 같은 예시를 보자.
| 조합 유형 | 설명 | 예시 |
|---|---|---|
KStream ↔ KStream |
실시간 이벤트끼리 조인 (시간 기반 필요) | 주문 이벤트 ↔ 결제 이벤트 |
KStream ↔ KTable |
실시간 이벤트 + 최신 상태 조인 | 로그인 이벤트 ↔ 유저 프로필 |
KTable ↔ KTable |
두 상태 정보를 조인 (Key 기준 최신 상태만 유지) | 유저 정보 ↔ 사용자 설정 |
KStream ↔ GlobalKTable |
이벤트 + 참조 테이블 (key alignment 필요 없음) | 클릭 이벤트 ↔ 국가 코드 매핑 |
사용자 로그인 + 변경된 사용자 정보를 활용하여 스트리밍을 받아야 하는 경우를 생각해보자.
KStream<String, String> logins = builder.stream("login_events"); // key: user_id
KTable<String, String> profiles = builder.table("user_profiles"); // key: user_id
KStream<String, String> enriched = logins.join(
profiles,
(login, profile) -> login + " from " + profile
);
enriched.to("enriched_logins");이렇게 하면, enriched_logins 토픽에 사용자 정보 변경을 반영한 로그인 기록을 볼 수 있을 것이다.
StateStore
StateStore는 Kafka Streams 내부에서 중간 계산 값, 집계 결과, 세션 상태 등을 저장하는 로컬 저장소이다. 이러한 기능은 단순히 데이터 스트리밍을 받는 것에 그치지 않고 변화 추적이나 지난 상태와 비교 같이 복잡한 처리를 해야할 때 필요하다.
KStream/KTable로 실시간 처리하는데, 굳이 StateStore를 직접 써야 할 때가 언제인가? 한 번 표로 살펴보자.
| 처리 방식 | 설명 | StateStore 필요 여부 |
|---|---|---|
| KStream / KTable DSL | .map(), .filter(), .count() 등 → 간단한 집계/조인/변환 |
❌ (Kafka가 자동으로 StateStore 생성) |
| Custom State 처리 필요 | 과거 상태와 비교, 외부 조건 저장, 누적 타이밍 감지 등 | ✅ 수동으로 StateStore 사용해야 함 |
Transformer
스트리밍 데이터의 간단한 처리(reduce, count)의 경우가 아닌, 조금 더 복잡한 처리 로직을 작성할 수 있는 API이다. 예시를 통해서 보면 편할 것 같다.
마지막 10초 동안 클릭이 없다가, 1초 안에 클릭 10건이 발생하면 경고 메시지를 주는 기능을 포함해보자.
public class ClickGapDetector implements Transformer<String, String, KeyValue<String, String>> {
private KeyValueStore<String, Long> lastClickStore;
private KeyValueStore<String, List<Long>> recentClicksStore;
@Override
public void init(ProcessorContext context) {
this.lastClickStore = (KeyValueStore<String, Long>) context.getStateStore("last-click-store");
this.recentClicksStore = (KeyValueStore<String, List<Long>>) context.getStateStore("recent-clicks-store");
}
@Override
public KeyValue<String, String> transform(String userId, String value) {
long now = System.currentTimeMillis();
Long lastClick = lastClickStore.get(userId);
List<Long> recentClicks = recentClicksStore.get(userId);
if (recentClicks == null) {
recentClicks = new ArrayList<>();
}
// ① 최근 클릭 기록에 현재 시각 추가
recentClicks.add(now);
// ② 1초보다 오래된 클릭은 제거
recentClicks.removeIf(ts -> now - ts > 1000);
// ③ 클릭이 없던 시간 체크
boolean wasIdleFor10Sec = lastClick != null && now - lastClick >= 10_000;
// ④ 빠른 클릭이 있었는지 판단
boolean fastBurst = recentClicks.size() >= 10;
// ⑤ 조건 만족 시 경고
if (wasIdleFor10Sec && fastBurst) {
recentClicks.clear(); // 중복 방지
lastClickStore.put(userId, now);
recentClicksStore.put(userId, recentClicks);
return new KeyValue<>(userId, "🚨 10초 대기 후 1초간 10회 이상 클릭 감지!");
}
// 업데이트
lastClickStore.put(userId, now);
recentClicksStore.put(userId, recentClicks);
return null;
}
@Override
public void close() {}
}'MLOps > Kafka' 카테고리의 다른 글
| Kafka-python을 이용한 실시간 비트코인 데이터 받기 (0) | 2025.05.16 |
|---|---|
| Kafka 개념 이해하기 (0) | 2025.05.13 |