이전에 Pulsar 설치와 실행에 대하여 포스팅을 하였는데, 이번에는 Pulsar에서 제공하고 있는 Pulsar White Paper를 간략하게 정리해보도록 하겠다. 원문은 아래 링크에서 확인 가능하다. 

Pulsar White Paper 원문 보러 가기


White Paper

1. Introduction

User behavior event는 user-agent 또는 IP주소와 같은 많은 정보들을 가지고 있다. eBay에서는 다량의 event들을 처리하고 있는데, BOT이 생성한 event들을 걸러내어 처리할 수 있다.

이 문서에서는 Pulsar의 data pipeline에 대한 디자인을 제안한다. 계산 대부분을 CEP 엔진을 사용하여 고가용성의 분산처리 시스템을 구축하였다. 우리는 이벤트 스트림을 데이터 베이스 테이블처럼 사용하는 것을 제안한다. Pulsar는 SQL 쿼리가 데이터 베이스 테이블을 사용하는 것처럼 실시간 스트림에대해 집합을 생성하는 것을 가능하게 한다. 이 문서에서는 Pulsar에서 이벤트 스트림을 처리할 때 이벤트를 보완하게 하고, 필터하고, 변화시키는 방식에 대해서 설명하였다.

2. DATA AND PROCESSING MODEL 

2.1 User Behavior Data 

사용자 행동 데이터의 예측이 불가능한 패턴이 초당 100만 개가 넘게 생산된다. 이러한 데이터를 빠르게 처리하기 위해 in-memory 처리 방식을 해야 한다.

2.2 Data Enrichment    

BOT 데이터들은 필터로 거른다. 필터링을 거친 데이터를 지역, 기기, 인구 통계 등의 데이터로 좀 더 풍부하게 만든다.

2.3 Sessionization

관련된 이벤트를 공통된 키로 구분하여 그룹핑해주는 상태 관리 프로세스이다.

2.4 User defined partial views of original streams

event들의 dimension이 아주 많지만 실제 사용자들은 원하는 특정 demention만 원하기 때문에  사용자들이 설정할 수 있게 하였다.

2.5 Computing aggregates in real-time for groups of multiple dimensions

다양한 dimension을 그룹화 하기 위해서 실시간으로 summary data를 산출 한다.

2.6 Computing Top N, Percentiles. and Distinct Count Metrics in real time

distinct count와 percentile을 구하기 위하여 HyperLogLog와 TDigest라는 알고리즘을 사용하여 공간과 시간을 절약한다. 이 알고리즘들은 근사 알고리즘으로써, 1%대의 에러를 가지는데, 이는 통계학적으로 문제없음이라고 판단한다.  

2.7 Dealing with out of order and delayed events

이벤트가 비정상이거나 지연되는경우, 롤업기능으로 처리 가능하다.

3. ARCHITECTURE

pipeline

<Fig. 1. Real-time Data Pipeline>

3.1 Complex Event Processing Framework

Jetstream 은 Spring으로 작성한 CEP 엔진으로써, 모든 pulsar의 real-time pipeline은 Jetstream으로 작성하였다. Jetstream은 다음과 같은 기능이 탑재되어있다.

1. 처리 로직을 서술적 SQL로 정의할 수 있다.
2. 어플리케이션 재시작이 필요없는 Hot deploy SQL.
3. Annotation 플러그인 프레임워크를 사용하여 SQL 기능성을 증대시킨다.
4. SQL을 사용하여 pipeline을 타고 전달한다.
5. SQL을 사용하여 stream affinity를 동적으로  생성할 수 있다.
6. Spring IOC를 사용한 Declarative pipeline stitching은 런타임에 동적 토폴로지 변경을 가능하게 한다.
7. Esper의 통합기능을 통한 CEP 기능.
8. elastic scaling이 되는 Clustering.
9. Cloud deployment가 가능한다.
10. push pull 모델을 사용한 pub-sub messaging기능을 가지고 있다.

3.2 Messaging 

3.2.1 Ingest Messaging 

REST interface를 사용하여 이벤트를 받아서 처리한다.

3.2.2 Push vs. Pull Messaging

push messaging과 Pull messaging의 장단점이 있기 때문에 어떤것을 선택해야 할 지에 대해 고민하였다.

3.2.3 Messaging Options

Messaging은 하이브리드로 구현하기로 하였다. Jetstream은 kafka를 사용하여 pub-sub cluster messaging을 할 수 있게 하였다.

4. REAL-TIME PIPELINE

4.1 Collector

Real-time pipeline의 첫 번째 단계. Jetstream CEP엔진이 있는 곳이다. 이벤트를 REST interface로 받아서 처리한다. CEP엔진은 BOT 이벤트들을 필터링해준다.

4.1.1 Geo and Device Classification Enrichment

BOT 이벤트들이 걸러내진 이벤트들에 지역 정보를 추가한다.

4.2 Bot Detector

BOT signature cache에 등록된 패턴을 걸러내도 그 외의 패턴들은 걸러내기 쉽지 않다. 하지만 CEP엔진으로 BOT의 특징을 잡아내어 BOT signature cache에 등록하여 Collector가 BOT을 걸러낼 수 있게 한다. 

4.3 Sessionizer

Real-time pipeline의 두 번째 단계이다. 이 단계의 주된 기능은 tenancy based sessionization 을 지원하는 것이다. Sessionization 은 session 기간에 특정 식별자를 포함하는 이벤트의 임시 그룹핑 과정이다. 유니크한 식별자의 이벤트가 처음 탐지되면 각 window는 시작된다. 이 window는 이벤트가 더 들어오지 않으면 끝나게 된다.

4.3.1 Session Meta Data, Counters and State

이벤트가 도달하면, 세션당 메타데이터, 빈도수 계산 및 상태를 관리한다. 모든 처리 로직은 SQL로 작성되었다.

4.3.2 Session Store

Backing store에 back up 되는 local off-heap cache에 세션 레코드를 저장하였다. Sessionizer 에서는 cache entry에 TTL을 세팅 가능하고 해당 entry가 만기 되었을 때 정확한 알림을 받을 수 있어야 하는 것이 필요하였다. 그래서 특별한 off-heap cache를 개발하였다. 

4.3.3 Session Backing Store

Session Backing Store를 위해, 다음의 사항들을 고려하였다.

a. local read, write, delete 지원 
b. local cross data center replication 지원
c. eventual consistency 지원 (고가용성을 달성하기 위해 분산 컴퓨팅에 사용되는 일관성 모델이다. 데이터에 새로운 업데이트가 이루어지지 않은 경우, 마지막으로 업데이트된 값을 반환하는 것을 비공식적으로 보장한다.
d. 저장 항목에 대한 life cycle을 관리한다. (TTL 지원)
e. 쓰기와 읽기의 비율을 10:4로 지원한다.
f. 초당 읽기 쓰기를 1M까지 지원한다.(Scale to)
g. 변수 크기는 200bytes에서 50Kbytes까지 지원한다. (Scale well)
h. Cloud에서 배포를 선호한다.
i. 주어진 클라이언트 노드에서 삽입된 키의 범위를 조회하기 위하여 보조 인덱스를 만든다.

위의 사항을 충족시키는 적합한 Session store를 찾기 위하여 Cassandra, Couchbase, 자체 개발database를 비교했다. Cassandra와 Couchbase는 Disk기반 솔루션이었기 때문에, in-memory방식이 필요한 우리는 자체 개발 database를 Session store로 선택하였다.

4.3.4 SQL extensions

Jetstream은 Esper의 SQL 언어를 확장하기 위하여 사용자들이 원하는 annotation을 작성할 수  있게 annotation 플러그인 프레임워크를 제공한다.

4.4 Event Distributor

pipeline의 세 번째 단계이다. 이 단계의 주 기능은 pipeline subscriber의 커스텀 뷰를 생성하게 하는 것이다.

4.4.1 Event filtering, Mutation and Routing

이벤트들은 SQL 문법으로 변화시키고 필터링하고 전달할 수 있다. 또한, annotaion을 사용하여 이벤트가 지나갈 루트를 지정해줄 수 있다.

4.5 Metrics calculator for Multi-dimensional OLAP

Metrics Calculator는 다양한 dimension과 time series data를 생산하는 사용자 정의의 metrics를 계산하는 실시간 metric computation engine이다. Metrics Calculator는, multiple dimension을 그룹핑하여 추출할 수 있는 SQL쿼리를 실행 할 수 있게 인터페이스를 제공해준다. 이 단계에서는 이벤트를 SQL로 컨트롤 가능하다.

4.5.1 Harvesting Metrics

이벤트들은 random scheduling이나 affinity based scheduling을 통해 Metrics Calulator application cluster에 스케쥴 될 수 있다. Metrics는 short tumbling wondows(10초동안) live stream에서 추출 된다. Window가 진행되고 있을때 unique dimension grouping을 위해 metric event가 생산된다.

4.5.2 Aggregating across the Metrics Calculator cluster

SQL문과 annotation을 사용하여 여러노드간의 aggregation이 가능하다.

4.5.3 Creating rollups in time series database

time series data를 생성하여 DB에 저장하거나 실시간 dashboard에서 widget으로 보여준다.

4.5.4 Metric Store

Metric Store를 하기 위하여 아래의 DB들을 비교해 보았다.

Open TSDB vs. Cassandra vs. DRUID 

 

Open TSDB 

Cassandra 

장점

빠른 데이터 수집
몇몇 자체 집합 함수 사용 가능

른 데이터 수집
Rollup 가능

단점

Rollup생성 불가
기존에 있는 집합으로
새로운 집합 생성을 지원하지 않음

group by 와 집합함수를 지원하지 않음 

Pulsar는 필요조건을 충족시키기 위하여 DRUID를 사용하였다.

5. CONCLUDING REMARKS

이 문서에서는 Pulsar를 구현할 때 고려했던 부분들의 일부이다. 실시간으로 사용자 행동분석과 관련된 문제들을 위하여 데이터와 프로세싱모델에 관해 설명하고 있다. Pulsar는 eBay cloud에서 1년간 사용되었다. 0.01% 미만의 안정적인 유실로 초당 10만 개의 events들을 처리했다. 95퍼센타일로 pipeline의 end to end 지연은 100ms 미만이다. 99.99%의 가용성으로 파이프라인을 성공적으로 구동시켰다. eBay의 몇몇 팀은 성공적으로 우리의 플랫폼을 사용하여 in-session 개인화, 광고, 인터넷 마케팅, 빌링, 비지니스 모니터링 등 많은 문제에 대한 해결책을 찾았다.

Pulsar의 주요 사용처는 사용자 행동 분석에 포커스를 맞추고 있지만, Pulsar를 실시간 처리가 요구되는 많은 다른 사용 경우에 대해서 쓰이기를 원한다.


White Paper에서는 실제로 eBay에서 Pulsar를 사용하여 사용자 행동분석 패턴을 분석하여 여러 가지 문제들을 해결하였다고 서술하고 있다. 또, Pulsar는 빠르고 신뢰도 높은 데이터 처리를 할 뿐만 아니라 실시간으로 데이터 분석이 가능함을 강조하였다. 그 기반은 Jetstram이라는 독자적인 플랫폼이고, 익숙한 SQL문과 annotation을 사용하여 데이터 분석이 가능함을 설명하고 있다.

White Paper가 아주 상세한 부분까지는 서술하고 있지 않지만, 어떤 데이터들을 어떻게, 무엇을 사용하여 처리하는가에 대한 궁금증을 풀 수 있기에는 충분한 것 같다. 자세한 쿼리 예제나 구조가 궁금하면 Pulsar 공식홈페이지에서 확인할 수 있다.

http://gopulsar.io/


마치며...

번역과 이해를 같이 해야하여 어려웠지만, 실시간 데이터 분석 툴의 구조를 나름은 자세히 알 수 있게 되어 의미있는 시간이었음을 상기하며, 이번 포스팅은 여기에서 마친다.



참고자료: http://gopulsar.io/docs/Whitepaper_Pulsar_Real-timeAnalyticsatScale.pdf


Posted by 알 수 없는 사용자
,



eBay에서 Pulsar라는 실시간 데이터 분석 툴을 오픈소스로 공개하였다. 

공식 문서에 따르면, Pulsar는 Scalability, Availability, Flexibility를 강조한 실시간 데이터 분석 및 모니터링 툴이다. 

Pulsar의 key point와 정리는 다음의 포스트를 참고하기 바란다.


eBay Pulsar: real-time analytics platform


위의 포스팅에서 간략하게 정리가 되어있기 때문에, 이번 포스팅에서는 Pulsar의 구성에 관해 설명하고 설치 방법에 대하여 번역하였다. 해당 원문은 다음 링크에서 확인 가능하다.


https://github.com/pulsarIO/realtime-analytics/wiki/Get-Started


Pulsar는...


스트리밍 SQL을 사용한 인터넷 스케일 실시간 분석 툴이다.


Pipeline Components


Pulsar pipeline은 Jetstream framework의 상단에 내장되어 있다. 이 pipeline은 아래와 같은 구성으로 되어있다.

1. Collector : 이벤트를 지역 정보 분류와 기기 타입을 탐지하여 REST endpoint를 통해 수집한다. 2. Sessionizer : 이벤트를 세션화 한 후, 그 세션 상태를 관리하고 마커 이벤트를 만들어 낸다. 3. Distributor : 이벤트 라우터처럼 다른 consumer들에 이벤트를 필터하고 변화시킨다. 4. Metrics Calculator : 다양한 방면에 대한 측정 기준을 계산하고 metrics store에 지속시킨다. 5. Replay : 이벤트들을 downstream 애플리케이션으로 진행하게 하지 못했을 때, Pipeline 애플리케이션들은 문제가 발생한 이벤트를 kafka에 지속시킨다. Replay는 이벤트들을 kafka에서 가져오고, pipeline에서 100%의 신뢰도를 주기 위해 원래의 downstream 애플리케이션으로 진행하게 하는 역할을 한다. 6. ConfigApp - 모든 pipeline을 위한 Dynamic provision configuration들은 Jetstream framework에 built-in app에서 설정 할 수 있다.


Dependencies


pipeline을 실행하려면 다음의 사항들이 요구된다.

1. Zookeeper 2. Mongo 3. Kafka 4. Cassandra

* Docker에 올려져 있는 이미지를 사용 하면 설치하기 쉽다.


Single node Deployment


한 개의 docker 호스트 안에서, 각 docker container instance들은 virtual network를 사용하여 통신한다.

Docker를 지원하는 node를 준비한다.  (* Docker 이미지로 설치를 쉽게 할 수 있다. 이번 포스팅에서는 Docker로 설치하는 법만 포스팅하겠다.)

1. Docker를 지원하는 리눅스 instance를 준비한다. 이때 Docker 버전은 1.3.0 이상이어야 한다. 2. instance의 최소 사양은 8GB RAM | 2 VCPU | 30.0GB Disk


설치하기


스크립트를 실행하여 docker에서 dependencies 항목에 있는 이미지와 pulsar, 그리고 데모까지 한번에 가져올 수 있다. (데모는 바로 실행 가능)

다운로드 링크 또는 


run sh <(curl -shttps://raw.githubusercontent.com/pulsarIO/realtime-analytics/master/Demo/rundemo.sh)


또는 아래 스크립트를 실행하면 된다.

#!/bin/sh

echo '>>> Start external dependency'
sudo docker run -d --name zkserver -t "jetstream/zookeeper"
sleep 5

sudo docker run -d --name mongoserver -t "mongo"
sleep 5

sudo docker run -d --name kafkaserver -t "jetstream/kafka"
sleep 5

sudo docker run -d --name cassandraserver -t "poklet/cassandra"

echo '>>> Sleep 30 seconds'
sleep 30

mkdir /tmp/pulsarcql

wget https://raw.githubusercontent.com/pulsarIO/realtime-analytics/master/metriccalculator/pulsar.cql -O /tmp/pulsarcql/pulsar.cql

sudo docker run -it --rm --link cassandraserver:cass1 -v /tmp/pulsarcql:/data poklet/cassandra bash -c 'cqlsh $CASS1_PORT_9160_TCP_ADDR -f /data/pulsar.cql'

echo '>>> Sleep 10 seconds'
sleep 10

echo '>>> Start Pulsar pipeline'

sudo docker run -d -p 0.0.0.0:8000:9999 -p 0.0.0.0:8081:8080 --link zkserver:zkserver --link mongoserver:mongoserver -t "jetstream/config"
sleep 5

sudo docker run -d -p 0.0.0.0:8001:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/replay"
sleep 5

sudo docker run -d -p 0.0.0.0:8003:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/sessionizer"
sleep 5

sudo docker run -d -p 0.0.0.0:8004:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/distributor"
sleep 5

sudo docker run -d -p 0.0.0.0:8005:9999 --name metriccalculator --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver  --link cassandraserver:cassandraserver -t "pulsar/metriccalculator"
sleep 5

sudo docker run -d -p 0.0.0.0:8002:9999 -p 0.0.0.0:8080:8080 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/collector"
sleep 5

echo '>>> Start demo'

sudo docker run -d -p 0.0.0.0:8006:9999 -p 0.0.0.0:8083:8083 --name metricservice --link zkserver:zkserver --link mongoserver:mongoserver --link cassandraserver:cassandraserver -t "pulsar/metricservice"
sleep 5

sudo docker run -d -p 0.0.0.0:8007:9999 -p 0.0.0.0:8088:8088 --link zkserver:zkserver --link mongoserver:mongoserver --link metricservice:metricservice --link metriccalculator:metriccalculator -t "pulsar/metricui"

# The twitter sample use the twitter4j api, to run the app, it requires twitter OAUTH token (https://dev.twitter.com/oauth/overview/application-owner-access-tokens)

#sleep 5

#sudo docker run -d -p 0.0.0.0:8008:9999 --link zkserver:zkserver --link mongoserver:mongoserver -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET=  -e TWITTER4J_OAUTH_ACCESSTOKEN=  -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample

#sleep 5



실행하기


dependency docker container 실행하기


1. zookeeper 실행하기

sudo docker run -d --name zkserver -t "jetstream/zookeeper"


2. mongo 실행하기

sudo docker run -d --name mongoserver -t "mongo"


3. kafka 실행하기

sudo docker run -d --name kafkaserver -t "jetstream/kafka"


4. Cassandra 실행하기

sudo docker run -d --name cassandraserver -t "poklet/cassandra"


Cassandra Tables 와 keyspace 생성하기


1. table을 설치하기 위하여 CQL Client를 실행하기

sudo docker run -it --rm --link cassandraserver:cass1 -v $PATH_CQL:/data poklet/cassandra bash -c 'cqlsh $CASS1_PORT_9160_TCP_ADDR -f /data/pulsar.cql'


2. Pulsar init CQL파일 : pulsar.cql을 적용한다.


Config app과 replay app 실행하기


1. Config 서버 실행하기

sudo docker run -d -p 0.0.0.0:8000:9999 -p 0.0.0.0:8081:8080 --link zkserver:zkserver --link mongoserver:mongoserver -t "jetstream/config"


2. Replay app 실행하기

sudo docker run -d -p 0.0.0.0:8001:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/replay"


Pipeline app 실행하기


1. Collector 실행하기

sudo docker run -d -p 0.0.0.0:8002:9999 -p 0.0.0.0:8080:8080 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/collector"


2. Sessionizer 실행하기

sudo docker run -d -p 0.0.0.0:8003:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/sessionizer"


3. Distributor 실행하기

sudo docker run -d -p 0.0.0.0:8005:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/distributor"


4. Metric Calculator 실행하기

sudo docker run -d -p 0.0.0.0:8006:9999 --name metriccalculator --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver --link cassandraserver:cassandraserver -t "pulsar/metriccalculator"


Demo app 실행하기


1. MetricService 실행하기

sudo docker run -d -p 0.0.0.0:8006:9999 -p 0.0.0.0:8083:8083 --name metricservice --link zkserver:zkserver --link mongoserver:mongoserver --link cassandraserver:cassandraserver -t "pulsar/metricservice"


2. MetricUI 실행하기

sudo docker run -d -p 0.0.0.0:8007:9999 -p 0.0.0.0:8088:8088 --link zkserver:zkserver --link mongoserver:mongoserver --link metricservice:metricservice --link metriccalculator:metriccalculator -t "pulsar/metricui"


3. twittersample 실행하기

Twittersample은 twitter oauth token이 필요하다. 아래의 링크에서 token값을 얻어서 적용하면 된다.

https://dev.twitter.com/oauth/overview/application-owner-access-tokens

sudo docker run -d -p 0.0.0.0:8008:9999 --link zkserver:zkserver --link mongoserver:mongoserver -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET= -e TWITTER4J_OAUTH_ACCESSTOKEN= -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample Twittersample requires twitter oauth token, seehttps://dev.twitter.com/oauth/overview/application-owner-access-tokens


Pipeline app 모니터링 하기


각 앱은 http port 로 모니터링 가능하다.

1. Replay - http://<hostname>:8001

2. Collector - http://<hostname>:8002, Collector Rest end point -  http://<hostname>:8080

3. Sessionizer - http://<hostname>:8003

4. Distributor - http://<hostname>:8004

5. MetricCalculator - http://<hostname>:8005

6. Configuration - http://<hostname>:8000, Configuration UI  - http://<hostname>:8081

7. Query data via CQL Client: sudo docker run -it --rm --link cassandraserver:cass "poklet/cassandra" cqlsh cass향향

8. MetricService - http://<hostname>:8006

9. MetricUI - http://<hostname>:8007, Dashboard port: http://<hostname>:8088

10. TwitterSample - http://<hostname>:8008



Distributed deployment


Pulsar app이 실행되고 있는 분산환경에서는 app container를 실행시킬 경우 zkserver와 mongoserver에 파라미터처럼 전달해야 한다. Pulsar app이 container host network에 직접 바인딩 할 수 있게 해야함과 동시에 각 app들은 다른 host에 전달해야 한다.


Dependency docker container 실행하기


1. zkServer 실행하기

sudo docker run -d -p 0.0.0.0:2181:2181 -t "jetstream/zookeeper"


2. mongo 실행하기

sudo docker run -d -p 0.0.0.0:27017:27017 -t "mongo"


3. kafka 실행하기

sudo docker run -d -p 0.0.0.0:9092:9092 -p 0.0.0.0:2181:2181 -t "jetstream/kafka"


4. cassandra 실행하기

sudo docker run -d -p 0.0.0.0:9106:9160 -p 0.0.0.0:22:22 -p 0.0.0.0:61621:61621 -p 0.0.0.0:7000:7000 -p 0.0.0.0:7001:7001 -p 0.0.0.0:7199:7199 -p 0.0.0.0:8012:8012 -p 0.0.0.0:9042:9042 -t "poklet/cassandra"


Config app과 Replay app 실행하기


1. Config server 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -t "jetstream/config"


2. Replay app 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_ZK="kafkazkconn" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/replay"


Pipeline app 실행하기


1. Collector 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/collector"


2. Sessionizer 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/sessionizer"


3. Distributor 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/distributor"


4. Metriccalculator 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -e PULSAR_CASSANDRA="cassandraseeds" -t "pulsar/metriccalculator"


Demo app 실행하기


1. MetricService 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_CASSANDRA="cassandraserverip" -t "pulsar/metricservice"


2. MetricUI 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e METRIC_SERVER_HOST="metricserverip" -e METRIC_CALCULATOR_HOST="metriccalculatorserverip" -t "pulsar/metricui"


3. twittersample 실행하기

Twittersample은 twitter oauth token이 필요하다. 아래의 링크에서 token값을 얻어서 적용하면 된다.

https://dev.twitter.com/oauth/overview/application-owner-access-tokens

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET= -e TWITTER4J_OAUTH_ACCESSTOKEN= -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample 


EC2 deplyment


EC2에 적용하는 법은 https://docs.docker.com/installation/amazon/ 에서 docker 설치 방법을 참고한 후, 위의 스텝을 실행해보면 된다.



이번 포스팅에서는 Pulsar 설치와 실행을 정리해 보았다. 

Pulsar는 모든 app을 Docker 이미지로 만들어서 배포하고 있기때문에 매우 간단하게 설치 및 실행이 가능하다. 또한 클라우드 배포도 지원하기 때문에 Docker를 사용할 수 있는 instance에서는 쉽게 설치 및 사용할 수 있다고 한다. 


다음 포스팅에서는 Pulsar에서 제공하는 White Paper를 정리해보기로 하면서 이번 포스팅을 마친다.


참고자료 : https://github.com/pulsarIO/realtime-analytics/wiki/Get-Started



Posted by 알 수 없는 사용자
,



Apache Kafka 성능 테스트


조석현(zoe@embian.com)
Business Developer / Consultant at Embian Inc.



Apache KafkaLinkedIn에서 자신들의 내부 데이터 처리를 위해 개발한 DPMS(Distributed Processing Message System)입니다. 전통적 메세지 큐 시스템(Message Queue System)과는 다르게 Apache Kafka는 Broker Cluster를 여러 대의 Machine으로 구성하여 분산처리(Distributed Processing)가 가능하다는 장점을 가지고 있습니다. Big Data시장의 발전과 함께 가장 주목받고 있는 Queue System이기도 합니다.

본 포스팅에서는 Apache Kafka의 주요 특징을 살펴보고 분산 처리 성능을 확인하기 위한 성능 테스트와 결론을 정리해 보았습니다.



Index

1. About Apache Kafka

2. Performance Test

3. Conclusion




1. About Apache Kafka

Apache KafkaLinkedIn의 Legacy System인 ActiveMQSplunk를 대체하기 위하여 개발된 Message Queue System입니다. LinkedIn은 2009년 서비스 가입자가 폭증하며 자신들의 내부 데이터 처리 과정에서 한계를 느끼고, 그들만의 Messaging System을 개발하게 되었습니다. 현재 LinkedIn에서 사용자들에게 잘 알려진 PYMK(People You May Know)등의 서비스가 대표적인 적용사례입니다. 해외에서는 Twitter, Netflix, Tumblr, Foursquare 등의 기업에서 Apache Kafka를 도입하여 사용하고 있습니다.

Kafka는 개발과정에서 Open Source로 관리되었으며 Apache Incubator를 지나 현재는 Apache 정식 프로젝트로 등록되어 관리되고 있습니다. 게시물 작성일 기준으로 0.8.0 버젼이 Stable Release입니다.



1.1. Kafka's Features

Apache Kafka는 LinkedIn에서 설계 시점부터 Big Data를 염두에 두고 Kafka를 설계하였습니다. Kafka 공식 홈페이지에서 자랑하는 Kafka의 주요 특징은 아래와 같습니다.


  • Scala/Java로 작성되어 있다.
  • 분산처리(Distributed Processing)가 가능하다.
  • Log Data를 디스크에 기록한다.
  • Publish-Subscribe구조로 되어 있다.
  • 확장 가능(Scalablility)하다.
  • 처리량이 높다.(High Throughput)
  • 응답속도가 빠르다.(Low Latency)


Apache Kafka의 가장 큰 특징은 확장 가능(Scalablitiy)이라고 할 수 있습니다. 그리고 Log Data를 메모리가 아닌 디스크로 저장함으로서 In-Memory기반의 메세지 큐에 비해 Data의 안정성을 높여주며, Message Queue의 역할과 Log Aggregation의 역할을 함께 할 수 있다는 특징이 있습니다. 



1.2. Kafka Architecture

Apache Kafka는 전통적 메세지 큐 방식인 Publish-Subscribe의 구조로 구성되어 있습니다. Producer는 Message를 Broker로 발행(Publish)하고, Consumer는 Broker에서 메세지를 구독(Consume)하는 형태입니다. 각 Broker는 여러 대의 Broker Cluster로 구성 가능하며, Zookeeper에 의해 각 노드가 모니터링됩니다. 기존 메세지 큐와 가장 큰 차이점은 여러 개의 Partition으로 메세지가 분산처리된다는 점입니다.


Apache Kafka에서 제공하는 Kafka의 기본 컨셉은 아래의 그림과 같습니다.


Apache Kafka의 Basic Concept과 Architecture, 사용법 등의 내용들은 향후 Posting할 예정입니다.




2. Performance Test


2.1. 테스트 배경 및 목적

Kafka를 도입하려는 많은 기업의 경영자와 엔지니어들은 Kafka를 도입하기 위하여 Kafka의 가치를 검토할 필요가 있습니다. End-User에게 보다 좋은 서비스를 제공하기 위하여, Application의 환경 또한 중요한 요소이며, Kafka 도입을 통해 기업이 얻는 Benefits을 판단하는 것은 중요한 Decision Point입니다.

앞서 언급한대로, Kafka는 Scale Out을 통한 분산처리가 핵심이라고 할 수 있습니다. Kafka Broker Cluster를 구성하였을때, Broker 수가 증가함에 따라 Throughput이 얼마나 증가하는지, Kafka Cluster Set의 최대 처리량은 언제 어떤 시점에서 나타나는지, Log를 기록하는 디스크의 성능에 따라 얼마나 성능이 차이나는지 확인할 필요가 있습니다.


"그래서 Apache Kafka는 대체 성능이 얼마나 좋은건데??"


이를 위하여 성능측정의 가이드라인이 필요하다고 판단하였고, 저 역시도 기술스택 도입 이전의 검토단계로서 "Apache Kafka의 성능이 얼마나 되는 것인가?" 라는 막연한 질문에서 출발하여 자료를 찾아보기 시작했습니다. Kafka의 성능 수치에 대한 검색결과는 내부 측정 결과와 다소 차이가 있었고, 환경 구성이나 배경에 대한 설명이 부족하여 검색결과를 신뢰하기 어려워 직접 Performance Test를 수행하게 되었습니다.


Performance Test의 목적은 아래와 같이 정의하였습니다.


 

1. Broker 수 증가에 따른 Kafka의 분산처리 성능 확인

 2. Kafka의 Network 대역폭 한계 Throughput 관찰

 3. SSD vs HDD의 Throughput 비교




2.2. 수행 테스트 목록

수행 테스트는 아래 3가지로 정의하였으며,
Producer - Broker구간으로 한정하여 테스트를 수행하였습니다.



2.3. 테스트 툴 선택 / 평가기준

Kafka에서는 Benchmark Test를 위해 기본적으로 Performance 측정 도구를 제공합니다.

Kafka를 설치하면 {$KAFKA_HOME}/bin에 kafka-producer-perf-test.sh 파일과 kafka-consumer-perf-test.sh 파일을 확인할 수 있는데, 이 파일을 실행하여 Performance Test를 수행할 수 있습니다. 테스트 범위가 Producer-Broker 구간이므로, kafka-producer-perf-test.sh 쉘 스크립트를 이용하여 테스트를 수행하였습니다.


개별 메세지를 전송할 때 평가 기준은 처리량(Throughput)과 응답시간(Latency)을 기준으로 최초 정의하였으나, Kafka에서 제공하는 테스트 도구에서는 응답시간을 지원하지 않고 Producer - Broker구간에서는 응답속도(Latency)가 필요하지 않아 평가 기준에서는 배제하였습니다.


평가기준 : Throughput(초당 처리량)

   - 단위 : MegaByte/second




2.4. 테스트 시나리오 및 수행방법

  Test 1. Broker 수 증가 테스트(SSD)

Producer Machine을 3대로 고정하고, Broker Machine의 대수를 1대부터 3대까지 증가시키며 Throughput의 변화를 확인합니다. 각 Broker의 Storage는 SSD를 장착하였습니다.


  Test 2. Broker 수 증가 테스트(HDD)

Producer Machine을 3대로 고정하고, Broker Machine의 대수를 1대부터 3대까지 증가시키며 Throughput의 변화를 확인합니다. 각 Broker의 Storage를 HDD로 교체하여 장착하였습니다.


  Test 3. 최대 처리량 측정 테스트

Producer Machine과 Broker Machine의 대수를 동시에 1대부터 5대까지 증가시키며 Throughput의 변화를 확인합니다. 각 Broker의 Storage는 최대 성능을 고려하여 SSD로 교체하여 장착하였습니다.

* Producer - Broker 구성이 5대 - 5대 로 구성되어야 하나, 장비 제약으로 인해 4대 - 5대 구성으로 대체하였습니다.



2.5. 테스트 환경구성

테스트를 위한 Network 구성과 Machine 상세는 아래 그림과 같습니다.


* Broker Machine의 Storage는 테스트 방법에 따라 SSD type과 HDD type을 사용하였습니다.



각 Machine에 설치된 OS/Application은 아래와 같습니다.

OS : Ubuntu Desktop 12.04( Precise Pangolin) 64bit

Java : Oracle Java 1.7.0_51-b13

Kafka : Apache Kafka 0.8.0 Release(src)

Network Monitor Support Tool : sar, htop 1.0.1


Zookeeper의 환경설정( {$KAFKA_HOME}/config/zookeeper.properties )과 Broker( {$KAFKA_HOME}/config/server.properties )의 환경설정은 설치시 Default Setting으로 테스트를 수행했습니다.

Kafka Official Site에서는 Zookeeper Cluster를 독립 Machine에서 사용할 것을 권고하고 있으나, 테스트 장비의 한계상 Broker Machine과 함께 구성하였습니다.



2.6. 테스트 제한사항

Broker의 수를 증가시키기 위해 예비 테스트를 수행해 본 결과 간단한 설정값이 Broker 성능에 큰 영향을 미칠 수 있음을 확인했습니다. 객관성있는 테스트를 위해 서버의 설정값과 테스트 변수들은 고정하였습니다.


본 테스트에 사용된 모든 Producer와 Broker Machine에서는 Kafka 0.8.0 Stable Release 버젼을 이용하여 테스트를 수행하였습니다.


Broker의 환경설정 파일은 아래 내용과 같습니다.


[{$KAFKA_HOME}/config/server.properties]

broker.id=1

port=9092

num.network.threads=2

num.io.threads=2

socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=1

log.flush.interval.messages=10000

log.flush.interval.ms=1000

log.retention.hours=168

log.segment.bytes=536870912

zookeeper.connection.timeout.ms=1000000

zk.connect=zookeeper:2181



Kafka에서 제공하는 Performance 도구는 아래 명령으로 실행하였습니다.


 

bin/kafka-producer-perf-test.sh --broker-list=embian4:9092 --messages 20000000 --topic test --message-size 800 --threads 24 --batch-size 200 --compression-codec 0 --request-num-acks 0 --message-send-gap-ms 0 --hide-header



여기서 중요한 설정은 producer.type인데 async로 설정하였습니다. 명령어에서는  producer.type에 관한 내용이 생략되어 있으며, 옵션이 없을시 async로 동작합니다.

비동기 모드(async)는 Throughput 측면에서 Kafka의 성능을 최대한 활용할 수 있어 필수적입니다. 이 모드에서 각 Producer는 Broker에게 chunk 단위로 메세지를 보내기 위하여 In-memory queue를 가지며, 미리 설정된 Batch Size나 Time Interval에 도달하면 batch로 메세지를 전송합니다. 이는 압축을 훨씬 더 효율적으로 만들며, 네트워크도 더 효율적으로 사용할 수 있는 장점이 있습니다.


알려진 Use-Case에 따르면 Topic의 수가 증가함에 따라 Throughput의 변화가 없었기 때문에 구성된 Broker Cluster에서 topic은 1개로 지정하였고, Kafka는 Broker Cluster에 Replication 기능을 지원하여 데이터의 안정성과 신뢰성을 보장할 수 있지만, 모든 메세지에 대해 Broker로부터 답신 응답(acknowledgement response)를 기다리게 됩니다. 이에 따라, Throughput의 효율성 측면에서는 처리속도가 느려진다는 단점이 있어 본 테스트에서는 Replication 기능을 사용하지 않았습니다.


본 포스팅에 언급되지 않은 모든 기타 설정들은 Default Setting으로 설정하였습니다.



2.7. 테스트 결과


- Test 1. Broker 수 증가에 따른 Throughput (SSD)


Broker의 수가 증가함에 따라 Throughput이 증가하는 모습을 확인하였습니다.

Broker가 1대에서 2대로 증가시 약 33%의 Throughput이 증가하였고, 2대에서 3대로 증가시 약 28% Throughput이 증가하고 있습니다.



- Test 2. Broker 수 증가에 따른 Throughput(HDD)


SSD를 사용할때보다 Throughput이 확연히 낮아진 모습을 보여주고 있습니다. 하지만, Broker의 수가 증가함에 따라 Throughput이 증가하는 모습을 보여줍니다.

Broker가 1대에서 2대로 증가시 약 75% Throughput이 증가하였고, 2대에서 3대로 증가시 약 26% Throughput이 증가하였습니다.



Kafka Official Site에서는 디스크의 IO성능이 Kafka Broker의 성능에 큰 영향을 미칠 수 있음을 언급하고 있습니다. 앞서 수행한 Test 1과 Test 2를 비교하여 SSD와 HDD를 장착하였을 경우의 성능을 비교해 본 그래프는 아래와 같습니다.


SSD 사용시 HDD 대비 최대 2.7배 Throughput이 향상되는 것을 확인했습니다.



 - Test 3. 최대 처리량 측정 테스트


Producer 4대 / Broker 5대의 구성에서 초당 최대 처리량인 129.7MB/sec를 측정하였습니다.





3. Conclusion


위의 테스트를 통해 얻은 수치에 근거하여 얻은 결론은 아래와 같습니다.


1. Broker 수 증가에 따라 Throughput이 증가한다(1대->2대 : 33%, 2대->3대: 28%, SSD 사용시).

2. SSD를 사용하는 것이 HDD를 사용하는 것보다 우수한 성능을 보인다(최대 2.7배).

3. Producer 4대 / Broker 5대의 구성에서부터 Gigabit 대역폭에 가까운 Throughput을 보인다(129.7MB/sec).



기본적으로 Broker 수 증가에 따른 Scaling이 이루어지는 모습을 확인할 수 있었습니다. 다만, Broker Machine의 수를 증가시키는 비율대로 Throughput이 향상되지는 않고 있어서(1대->2대: 33% 증가, 2대->3대: 28% 증가, SSD 사용시), Broker Machine의 추가에 따른 비용 대비 성능비는 그리 높지 못하다고 보여집니다. 
따라서 Broker Machine의 무조건적인 추가보다는 topic 별로 Broker Machine을 분리하여 운영하는 방식이 더 효율적이라고 생각합니다.


Kafka는 기본적으로 Sequential Access를 사용합니다. 이 때문에, 일반적으로 HDD에 비해 2배정도 Disk IO가 좋은 것으로 알려진 SSD가 최대 2.7배 높은 성능을 보여주고 있습니다. Kafka 도입시 Machine의 Disk의 성능에 대해 충분히 고려해야 합니다.


최대 Throughput 측정에 있어서는 장비 구성의 물리적 제약 때문에 Saturation Point를 측정하지는 못하였습니다.  다만 Producer 4대와 Broker 5를 사용하여 총 9대의 장비를 구성하였을 경우, 129.7MB/sec가 측정되어 Gigabit 대역폭에 가까운 Throughput을 확인하였습니다.




※ Embian은 본 포스팅에 대한 Feedback을 환영합니다. 만약 포스팅 내용에 대한 문의사항이 있으시다면, 블로그 하단 Comment 또는 이메일(zoe@embian.com)을 통해 연락주시면 Feedback을 드리도록 하겠습니다.




[Reference]

1. Apache Kafka Official Site http://kafka.apache.org/

2. Kafka: A Distributed Messaging System for Log Processing, Jay Kreps, Neha Narkhede, Jun Rao from LinkedIn, at NetDB workshop 2011

3. Building LinkedIn’s Real-time Activity Data Pipeline, Ken Goodhope, Joel Koshy, Jay Kreps, Neha Narkhede, Richard Park, Jun Rao, Victor Yang Ye

4. KAFKA 0.8 PRODUCER PERFORMANCEPiotr Kozikowski, Apr 8 2013

5. Performance testingAdded by Jay Kreps, last edited by John Fung on Jan 29, 2013

6. Tom's Hardware - Benchmark Results: Sequential Read/WriteBy Patrick Schmid and Achim Roos

Posted by 알 수 없는 사용자
,