Storm-ESPER는 Storm과 ESPER를 이용한 실시간 데이터 처리 엔진이다. 

Storm-ESPER에 대해 궁금하다면, 엠비안에서 진행했던 Strom-ESPER 프로젝트의 Prototype과 테스트에 대한 블로그에서 참조 할 수 있다. (Storm & Esper Prototype 및 Test)

Storm-ESPER 프로젝트를 받았을 때, 프로젝트에 대한 프로토타입 테스트 결과문서, 설치문서, 설치 패키지가 존재하였다. 하지만 패키지와 문서가 CentOS를 위한 것이었기 때문에, 내 작업환경인 Ubuntu에서 실행되지 않았다. 그리하여 Storm-ESPER실행을 위한 각 모듈을 따로 컴파일하여 실행시켜줘야하는 번거로움이 있었다. 그뿐만 아니라 실행하는 과정에서 컴파일러 버전이 맞지 않거나 실행에 필요한 jar파일들이 없어서 고생하기도 했다. 

이러한 실행의 어려움을 해소하기 위해, Storm-ESPER 실행방법을 자세히 설명해 보도록 하겠다. 

모든 설치 및 실행방법은 Ubuntu시스템 기준으로 설명한다.


Storm-ESPER 실행전 필요조건 1 - 필요한 프로그램 설치

1. zookeeper-3.4.6

압축파일을 미러에서 받은 후 압축을 푼다.

$ wget http://mirrors.ukfast.co.uk/sites/ftp.apache.org/zookeeper/stable/zookeeper-3.4.6.tar.gz  
$ tar -xvf zookeeper-3.4.6.tar.gz  

config 파일을 생성한다. 

$ vi conf/zoo.cfg

# The number of milliseconds of each tick  
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

Zookeeper를 실행한다.

$ ~/zookeeper-3.4.6/bin/zkServer.sh start

2. RabbitMQ Server

RabbitMQ는 repository에서 설치할 수 있다.

$ apt-get install rabbitmq-server

 WebUI Admin plugin을 설치하여 모니터링을 할 수 있게 한다. 플러그인 설치 후 리스타트를 해준다.

$ rabbitmq-­plugins enable rabbitmq_management 
service rabbitmq­-server restart

3. Apache Storm 0.9.4

압축파일을 미러에서 받은 후 압축을 푼다. (이번 테스트에서 사용한 Storm버전은 0.9.4이다.)

$ wget http://mirror.apache-kr.org/storm/apache-storm-0.9.4/apache-storm-0.9.4.tar.gz 
$ tar -xvf apache-storm-0.9.4.tar.gz

conf 파일을 변경해준다.

 $ vi ~/apache-storm-0.9.4/conf/storm.yaml

########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
    - zookeeper가 설치되어있는 호스트 ip
#     - "localhost"

 nimbus.host: nimbus가 설치되어있는 호스트 ip


Storm-ESPER 실행전 필요조건 2 - 파일수정 및 Storm 실행

1. conf 파일 수정 및 라이브러리 파일 추가

먼저 Storm-ESPER를 컴파일하기 전에  conf파일을 수정해야한다. "storm-esper"프로젝트의 conf폴더에 있는 yaml파일들을 조건에 맞게 수정해 준다. 아래의 예시를 따라서 본인의 ip를 사용하면 된다.

~/storm-esper/trunk/conf/deploy.yaml

자세히 보기

~/storm-esper/trunk/conf/nimbus_conf.yaml

자세히 보기


~/storm-esper/trunk/conf/supervisor_conf.yaml

자세히 보기

~/storm-esper/trunk/conf/esper_supervisor_conf.yaml

자세히 보기


EPL명령어 제어를 위한 "storm-esper-client" 프로젝트의 conf폴더의 conf.yaml파일을 수정해 준다.

~/storm-esper-client/trunk/conf/conf.yaml

rabbitmq.server.queue.name: "cmd_queue"
rabbitmq.server.queue.ip: "192.168.0.28"
rabbitmq.server.username: "guest"
rabbitmq.server.userpass: "guest"

2. pom.xml 파일 수정 

"storm-esper" 프로젝트의 pom.xml을 다음과 같이 수정해준다.

기존 코드:
  <dependency>
    <groupId>storm</groupId>
    <artifactId>storm</artifactId>
    <version>0.8.2</version>
    </dependency>

수정 코드:
    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.9.4</version>
    </dependency>


3. jar파일 추가

Storm-ESPER엔진을 실행하기 전에 custom scheduler로 EsperBolt를 특정 supervisor(esper-supervisor)에 할당하기 위하여 Nimbus용 jar파일을 생성하여 Storm 라이브러리에 위치시켜야 한다. 또한, Storm-ESPER연동을 위하여 Esper와 RabbitMQ  라이브러리를 Storm 라이브러리에 복사하여야 한다.

Apache Storm 0.9.4가 설치되어있는 폴더 내 lib 폴더에 라이브러리 파일을 추가하는 법은 아래와 같다. 

1. "storm-esper-scheduler"프로젝트를 컴파일 하여 jar파일 생성 및 라이브러리 파일 복사

$ cd ~/storm-esper-scheduler/trunk
$ mvn clean dependency:copy-dependencies install
$ cd target
$ cp storm-esper-scheduler-0.0.1.jar ~/apache-storm-0.9.4/

2. "storm_esper" 프로젝트를 컴파일 하여 dependency jar파일 생성 및 라이브러리 파일 복사

$ cd ~/storm_esper/trunk
$ mvn clean dependency:copy-dependencies install
$ cd target/dependency
$ cp amqp-client-3.1.4.jar cglib-nodep-2.2.jar antlr-runtime-3.2.jar esper-4.10.0.jar ~/apache-storm-0.9.4/lib/

4. Storm실행

이제 Storm을 실행할 준비가 되었다. 아래의 명령어로 Storm을 실행한다. 테스트용 실행방법이므로 Nimbus, Supervisor, UI를 전부 한대의 머신에서 실행해 보겠다. 

$ ~/apache-storm-0.9.4/bin/storm nimbus &
$ ~/apache-storm-0.9.4/bin/storm supervisor &
$ ~/apache-storm-0.9.4/bin/storm ui &


Strom-ESPER 실행



<그림 1. Storm-ESPER 동작 구성도>


1. DataThrower

테스트용 더미 데이터를 DataThrower를 사용하여 RabbitMQ에 전송해야 한다. 더미 데이터는 DataThrower프로젝트를 컴파일하여 생성된 jar파일을 실행하여 "in_queue"에 전송한다. 이때 throw.conf파일의 "inqueueName":"in_queue"로 수정해준다.

$ java -cp ~/maven_data/trunk/target/maven_data-0.0.1-SNAPSHOT.jar com.embian.maven_data.tester.DataThrower ~/maven_data/conf/throw.conf >> ~/maven_data/logs/throw.log

2. submit topology

"storm-esper"프로젝트를 컴파일하여 생성된 토폴로지를 스톰에 올려주는 스크립트를 실행한다.

$ sh ~/storm_esper/trunk/etc/submit_topology.sh

3. EPL command 실행

"storm-esper-client"프로젝트를 다음의 명령어로 컴파일한다.

$ cd ~/storm-esper-client/trunk
$ mvn clean dependency:copy-dependencies install -Dmaven.test.skip=true

첨부된 스크립트를 통하여 사용할 EPL을 적용하면 된다.

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"EPL command", "params":{"bolt_id":"볼트 명", "statement":{"name":"커맨드 이름", "body":"EPL 쿼리", "state":"STARTED", "do_output": true/false}}}'


4. DataReceiver 

Storm-ESPER가 처리한 데이터는 "out_queue"에 누적된다. DataReceiver는 단순하게 "out_queue"의 데이터를 읽어와서 파일로 저장한다. DataReceiver 프로젝트를 컴파일하여 생성된 jar파일을 실행하여 파일로 저장한다.

$ java -cp ~/maven_data/trunk/target/maven_data-0.0.1-SNAPSHOT.jar com.embian.maven_data.tester.DataReceiver ~/maven_data/conf/receive.conf >> ~/maven_data/logs/receive.log


예제 실행

간단한 예제를 통하여 실행이 잘 되는지 확인해 보자.

시나리오

1. nginx의 access log를 JSON으로 만든 데이터 스트림에서 5분동안 응답시간이 1초가 넘는 Request의 통계를 출력

2. 5분동안 Page A -> Page B -> Page C의 경로로 움직인 IP에 대해서 출력

nginx의  access log를 JSON으로 만든 데이터 스트림 샘플

{"status": "200", "body_bytes_sent": "9784", "remote_user": " ", "request_time": "0.000", "http_referer": "http://www.mmm.com/bbs/board.php?page=A&wr_id=1", "remote_addr": "192.168.1.1", "request": "GET /skin/board/aa.aaa/aa.js/aaa.js?time=1434393200 HTTP/1.1", "http_user_agent": "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko", "upstream_response_time": "-", "time_local": "16/Jun/2015:03:33:20 +0900"}

사용한 command를 초기화 하려면 다음을 실행한다.

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"clean", "params":{}}'


0. LogEvent스키마 등록

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"CreateLogEventType", "body":"create schema LogEvent(status string, body_bytes_sent string, remote_user string, request_time string, http_referer string, remote_addr string, request string, http_user_agent string, upstream_response_time string, time_local string)", "state":"STARTED", "do_output":false}}}'

command 실행 결과보기

1. nginx의 access log를 JSON으로 만든 데이터 스트림에서 5분동안 응답시간이 1초가 넘는 Request의 통계를 출력

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"Senario1-1", "body":"SELECT count(remote_addr) FROM LogEvent.win:time(5 min) WHERE cast(request_time,float) >= 1.000 ", "state":"STARTED", "do_output":true}}}'

command 실행 결과보기

2. 5분동안 Page A-> Page B-> Page C의 경로로 움직인 IP에대해서 출력

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"Senario1-2", "body":"SELECT target3.remote_addr, target3.request from  pattern [  every target=LogEvent(request like \"%page=A%\") -> (target2=LogEvent(request like \"%page=B%\" and target.remote_addr = target2.remote_addr ) -> target3=LogEvent(http_referer like \"%page=C%\" and target.remote_addr = target2.remote_addr and target2.remote_addr=target3.remote_addr))  ].win:time(5 min)", "state":"STARTED", "do_output":true}}}'

command 실행 결과보기


결과물은 DataReceiver를 통하여 파일로 저장된 내역에서 확인이 가능하다.

시나리오 1. 결과물

--queueServerAddr  :  192.168.0.28
--queueName        :  out_queue

{"newEvents":[{    "count(remote_addr)": 1
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 2
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 3
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 4
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 5
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 6
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 7
}],"oldEvents":[]}

시나리오 2. 결과물

{"newEvents":[{    "target2.remote_addr": "173.254.216.67",
    "target2.request": "GET page=C HTTP/1.0"
}{    "target2.remote_addr": "173.254.216.67",
    "target2.request": "GET page=C HTTP/1.0"
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "203.133.168.168",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "203.133.168.168",
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "211.53.23.131",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "211.53.23.131",
    "target2.request": "GET /page=C HTTP/1.1"
}{    "target2.remote_addr": "211.53.23.131",
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "66.249.71.111",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "66.249.71.111",
      "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "66.249.71.111",}],"oldEvents":[]}


간편한 실행이 어려웠고, 새로운 시스템을 학습해가며 접했기 때문에, 꽤 애를 먹은 프로젝트 실행을 거쳤다. 스크립트가 있어서 그나마 편하게 실행을 할 수 있었던 것 같다. 위의 설치 및 실행기는 단독 머신에서 실행을 기준으로 설명되어있지만, 여러대의 머신에서도 설정파일만 적용해주면 동일하게 실행할 수 있다. 

위의 실행방법을 하나씩 따라해보면 실제로는 실행하기 어렵지 않을 것으로 보인다. 이상으로 Storm-ESPER의 실행기를 마친다.


Posted by jjangAh



이전에 Pulsar 설치와 실행에 대하여 포스팅을 하였는데, 이번에는 Pulsar에서 제공하고 있는 Pulsar White Paper를 간략하게 정리해보도록 하겠다. 원문은 아래 링크에서 확인 가능하다. 

Pulsar White Paper 원문 보러 가기


White Paper

1. Introduction

User behavior event는 user-agent 또는 IP주소와 같은 많은 정보들을 가지고 있다. eBay에서는 다량의 event들을 처리하고 있는데, BOT이 생성한 event들을 걸러내어 처리할 수 있다.

이 문서에서는 Pulsar의 data pipeline에 대한 디자인을 제안한다. 계산 대부분을 CEP 엔진을 사용하여 고가용성의 분산처리 시스템을 구축하였다. 우리는 이벤트 스트림을 데이터 베이스 테이블처럼 사용하는 것을 제안한다. Pulsar는 SQL 쿼리가 데이터 베이스 테이블을 사용하는 것처럼 실시간 스트림에대해 집합을 생성하는 것을 가능하게 한다. 이 문서에서는 Pulsar에서 이벤트 스트림을 처리할 때 이벤트를 보완하게 하고, 필터하고, 변화시키는 방식에 대해서 설명하였다.

2. DATA AND PROCESSING MODEL 

2.1 User Behavior Data 

사용자 행동 데이터의 예측이 불가능한 패턴이 초당 100만 개가 넘게 생산된다. 이러한 데이터를 빠르게 처리하기 위해 in-memory 처리 방식을 해야 한다.

2.2 Data Enrichment    

BOT 데이터들은 필터로 거른다. 필터링을 거친 데이터를 지역, 기기, 인구 통계 등의 데이터로 좀 더 풍부하게 만든다.

2.3 Sessionization

관련된 이벤트를 공통된 키로 구분하여 그룹핑해주는 상태 관리 프로세스이다.

2.4 User defined partial views of original streams

event들의 dimension이 아주 많지만 실제 사용자들은 원하는 특정 demention만 원하기 때문에  사용자들이 설정할 수 있게 하였다.

2.5 Computing aggregates in real-time for groups of multiple dimensions

다양한 dimension을 그룹화 하기 위해서 실시간으로 summary data를 산출 한다.

2.6 Computing Top N, Percentiles. and Distinct Count Metrics in real time

distinct count와 percentile을 구하기 위하여 HyperLogLog와 TDigest라는 알고리즘을 사용하여 공간과 시간을 절약한다. 이 알고리즘들은 근사 알고리즘으로써, 1%대의 에러를 가지는데, 이는 통계학적으로 문제없음이라고 판단한다.  

2.7 Dealing with out of order and delayed events

이벤트가 비정상이거나 지연되는경우, 롤업기능으로 처리 가능하다.

3. ARCHITECTURE

pipeline

<Fig. 1. Real-time Data Pipeline>

3.1 Complex Event Processing Framework

Jetstream 은 Spring으로 작성한 CEP 엔진으로써, 모든 pulsar의 real-time pipeline은 Jetstream으로 작성하였다. Jetstream은 다음과 같은 기능이 탑재되어있다.

1. 처리 로직을 서술적 SQL로 정의할 수 있다.
2. 어플리케이션 재시작이 필요없는 Hot deploy SQL.
3. Annotation 플러그인 프레임워크를 사용하여 SQL 기능성을 증대시킨다.
4. SQL을 사용하여 pipeline을 타고 전달한다.
5. SQL을 사용하여 stream affinity를 동적으로  생성할 수 있다.
6. Spring IOC를 사용한 Declarative pipeline stitching은 런타임에 동적 토폴로지 변경을 가능하게 한다.
7. Esper의 통합기능을 통한 CEP 기능.
8. elastic scaling이 되는 Clustering.
9. Cloud deployment가 가능한다.
10. push pull 모델을 사용한 pub-sub messaging기능을 가지고 있다.

3.2 Messaging 

3.2.1 Ingest Messaging 

REST interface를 사용하여 이벤트를 받아서 처리한다.

3.2.2 Push vs. Pull Messaging

push messaging과 Pull messaging의 장단점이 있기 때문에 어떤것을 선택해야 할 지에 대해 고민하였다.

3.2.3 Messaging Options

Messaging은 하이브리드로 구현하기로 하였다. Jetstream은 kafka를 사용하여 pub-sub cluster messaging을 할 수 있게 하였다.

4. REAL-TIME PIPELINE

4.1 Collector

Real-time pipeline의 첫 번째 단계. Jetstream CEP엔진이 있는 곳이다. 이벤트를 REST interface로 받아서 처리한다. CEP엔진은 BOT 이벤트들을 필터링해준다.

4.1.1 Geo and Device Classification Enrichment

BOT 이벤트들이 걸러내진 이벤트들에 지역 정보를 추가한다.

4.2 Bot Detector

BOT signature cache에 등록된 패턴을 걸러내도 그 외의 패턴들은 걸러내기 쉽지 않다. 하지만 CEP엔진으로 BOT의 특징을 잡아내어 BOT signature cache에 등록하여 Collector가 BOT을 걸러낼 수 있게 한다. 

4.3 Sessionizer

Real-time pipeline의 두 번째 단계이다. 이 단계의 주된 기능은 tenancy based sessionization 을 지원하는 것이다. Sessionization 은 session 기간에 특정 식별자를 포함하는 이벤트의 임시 그룹핑 과정이다. 유니크한 식별자의 이벤트가 처음 탐지되면 각 window는 시작된다. 이 window는 이벤트가 더 들어오지 않으면 끝나게 된다.

4.3.1 Session Meta Data, Counters and State

이벤트가 도달하면, 세션당 메타데이터, 빈도수 계산 및 상태를 관리한다. 모든 처리 로직은 SQL로 작성되었다.

4.3.2 Session Store

Backing store에 back up 되는 local off-heap cache에 세션 레코드를 저장하였다. Sessionizer 에서는 cache entry에 TTL을 세팅 가능하고 해당 entry가 만기 되었을 때 정확한 알림을 받을 수 있어야 하는 것이 필요하였다. 그래서 특별한 off-heap cache를 개발하였다. 

4.3.3 Session Backing Store

Session Backing Store를 위해, 다음의 사항들을 고려하였다.

a. local read, write, delete 지원 
b. local cross data center replication 지원
c. eventual consistency 지원 (고가용성을 달성하기 위해 분산 컴퓨팅에 사용되는 일관성 모델이다. 데이터에 새로운 업데이트가 이루어지지 않은 경우, 마지막으로 업데이트된 값을 반환하는 것을 비공식적으로 보장한다.
d. 저장 항목에 대한 life cycle을 관리한다. (TTL 지원)
e. 쓰기와 읽기의 비율을 10:4로 지원한다.
f. 초당 읽기 쓰기를 1M까지 지원한다.(Scale to)
g. 변수 크기는 200bytes에서 50Kbytes까지 지원한다. (Scale well)
h. Cloud에서 배포를 선호한다.
i. 주어진 클라이언트 노드에서 삽입된 키의 범위를 조회하기 위하여 보조 인덱스를 만든다.

위의 사항을 충족시키는 적합한 Session store를 찾기 위하여 Cassandra, Couchbase, 자체 개발database를 비교했다. Cassandra와 Couchbase는 Disk기반 솔루션이었기 때문에, in-memory방식이 필요한 우리는 자체 개발 database를 Session store로 선택하였다.

4.3.4 SQL extensions

Jetstream은 Esper의 SQL 언어를 확장하기 위하여 사용자들이 원하는 annotation을 작성할 수  있게 annotation 플러그인 프레임워크를 제공한다.

4.4 Event Distributor

pipeline의 세 번째 단계이다. 이 단계의 주 기능은 pipeline subscriber의 커스텀 뷰를 생성하게 하는 것이다.

4.4.1 Event filtering, Mutation and Routing

이벤트들은 SQL 문법으로 변화시키고 필터링하고 전달할 수 있다. 또한, annotaion을 사용하여 이벤트가 지나갈 루트를 지정해줄 수 있다.

4.5 Metrics calculator for Multi-dimensional OLAP

Metrics Calculator는 다양한 dimension과 time series data를 생산하는 사용자 정의의 metrics를 계산하는 실시간 metric computation engine이다. Metrics Calculator는, multiple dimension을 그룹핑하여 추출할 수 있는 SQL쿼리를 실행 할 수 있게 인터페이스를 제공해준다. 이 단계에서는 이벤트를 SQL로 컨트롤 가능하다.

4.5.1 Harvesting Metrics

이벤트들은 random scheduling이나 affinity based scheduling을 통해 Metrics Calulator application cluster에 스케쥴 될 수 있다. Metrics는 short tumbling wondows(10초동안) live stream에서 추출 된다. Window가 진행되고 있을때 unique dimension grouping을 위해 metric event가 생산된다.

4.5.2 Aggregating across the Metrics Calculator cluster

SQL문과 annotation을 사용하여 여러노드간의 aggregation이 가능하다.

4.5.3 Creating rollups in time series database

time series data를 생성하여 DB에 저장하거나 실시간 dashboard에서 widget으로 보여준다.

4.5.4 Metric Store

Metric Store를 하기 위하여 아래의 DB들을 비교해 보았다.

Open TSDB vs. Cassandra vs. DRUID 

 

Open TSDB 

Cassandra 

장점

빠른 데이터 수집
몇몇 자체 집합 함수 사용 가능

른 데이터 수집
Rollup 가능

단점

Rollup생성 불가
기존에 있는 집합으로
새로운 집합 생성을 지원하지 않음

group by 와 집합함수를 지원하지 않음 

Pulsar는 필요조건을 충족시키기 위하여 DRUID를 사용하였다.

5. CONCLUDING REMARKS

이 문서에서는 Pulsar를 구현할 때 고려했던 부분들의 일부이다. 실시간으로 사용자 행동분석과 관련된 문제들을 위하여 데이터와 프로세싱모델에 관해 설명하고 있다. Pulsar는 eBay cloud에서 1년간 사용되었다. 0.01% 미만의 안정적인 유실로 초당 10만 개의 events들을 처리했다. 95퍼센타일로 pipeline의 end to end 지연은 100ms 미만이다. 99.99%의 가용성으로 파이프라인을 성공적으로 구동시켰다. eBay의 몇몇 팀은 성공적으로 우리의 플랫폼을 사용하여 in-session 개인화, 광고, 인터넷 마케팅, 빌링, 비지니스 모니터링 등 많은 문제에 대한 해결책을 찾았다.

Pulsar의 주요 사용처는 사용자 행동 분석에 포커스를 맞추고 있지만, Pulsar를 실시간 처리가 요구되는 많은 다른 사용 경우에 대해서 쓰이기를 원한다.


White Paper에서는 실제로 eBay에서 Pulsar를 사용하여 사용자 행동분석 패턴을 분석하여 여러 가지 문제들을 해결하였다고 서술하고 있다. 또, Pulsar는 빠르고 신뢰도 높은 데이터 처리를 할 뿐만 아니라 실시간으로 데이터 분석이 가능함을 강조하였다. 그 기반은 Jetstram이라는 독자적인 플랫폼이고, 익숙한 SQL문과 annotation을 사용하여 데이터 분석이 가능함을 설명하고 있다.

White Paper가 아주 상세한 부분까지는 서술하고 있지 않지만, 어떤 데이터들을 어떻게, 무엇을 사용하여 처리하는가에 대한 궁금증을 풀 수 있기에는 충분한 것 같다. 자세한 쿼리 예제나 구조가 궁금하면 Pulsar 공식홈페이지에서 확인할 수 있다.

http://gopulsar.io/


마치며...

번역과 이해를 같이 해야하여 어려웠지만, 실시간 데이터 분석 툴의 구조를 나름은 자세히 알 수 있게 되어 의미있는 시간이었음을 상기하며, 이번 포스팅은 여기에서 마친다.



참고자료: http://gopulsar.io/docs/Whitepaper_Pulsar_Real-timeAnalyticsatScale.pdf


Posted by jjangAh



eBay에서 Pulsar라는 실시간 데이터 분석 툴을 오픈소스로 공개하였다. 

공식 문서에 따르면, Pulsar는 Scalability, Availability, Flexibility를 강조한 실시간 데이터 분석 및 모니터링 툴이다. 

Pulsar의 key point와 정리는 다음의 포스트를 참고하기 바란다.


eBay Pulsar: real-time analytics platform


위의 포스팅에서 간략하게 정리가 되어있기 때문에, 이번 포스팅에서는 Pulsar의 구성에 관해 설명하고 설치 방법에 대하여 번역하였다. 해당 원문은 다음 링크에서 확인 가능하다.


https://github.com/pulsarIO/realtime-analytics/wiki/Get-Started


Pulsar는...


스트리밍 SQL을 사용한 인터넷 스케일 실시간 분석 툴이다.


Pipeline Components


Pulsar pipeline은 Jetstream framework의 상단에 내장되어 있다. 이 pipeline은 아래와 같은 구성으로 되어있다.

1. Collector : 이벤트를 지역 정보 분류와 기기 타입을 탐지하여 REST endpoint를 통해 수집한다. 2. Sessionizer : 이벤트를 세션화 한 후, 그 세션 상태를 관리하고 마커 이벤트를 만들어 낸다. 3. Distributor : 이벤트 라우터처럼 다른 consumer들에 이벤트를 필터하고 변화시킨다. 4. Metrics Calculator : 다양한 방면에 대한 측정 기준을 계산하고 metrics store에 지속시킨다. 5. Replay : 이벤트들을 downstream 애플리케이션으로 진행하게 하지 못했을 때, Pipeline 애플리케이션들은 문제가 발생한 이벤트를 kafka에 지속시킨다. Replay는 이벤트들을 kafka에서 가져오고, pipeline에서 100%의 신뢰도를 주기 위해 원래의 downstream 애플리케이션으로 진행하게 하는 역할을 한다. 6. ConfigApp - 모든 pipeline을 위한 Dynamic provision configuration들은 Jetstream framework에 built-in app에서 설정 할 수 있다.


Dependencies


pipeline을 실행하려면 다음의 사항들이 요구된다.

1. Zookeeper 2. Mongo 3. Kafka 4. Cassandra

* Docker에 올려져 있는 이미지를 사용 하면 설치하기 쉽다.


Single node Deployment


한 개의 docker 호스트 안에서, 각 docker container instance들은 virtual network를 사용하여 통신한다.

Docker를 지원하는 node를 준비한다.  (* Docker 이미지로 설치를 쉽게 할 수 있다. 이번 포스팅에서는 Docker로 설치하는 법만 포스팅하겠다.)

1. Docker를 지원하는 리눅스 instance를 준비한다. 이때 Docker 버전은 1.3.0 이상이어야 한다. 2. instance의 최소 사양은 8GB RAM | 2 VCPU | 30.0GB Disk


설치하기


스크립트를 실행하여 docker에서 dependencies 항목에 있는 이미지와 pulsar, 그리고 데모까지 한번에 가져올 수 있다. (데모는 바로 실행 가능)

다운로드 링크 또는 


run sh <(curl -shttps://raw.githubusercontent.com/pulsarIO/realtime-analytics/master/Demo/rundemo.sh)


또는 아래 스크립트를 실행하면 된다.

#!/bin/sh

echo '>>> Start external dependency'
sudo docker run -d --name zkserver -t "jetstream/zookeeper"
sleep 5

sudo docker run -d --name mongoserver -t "mongo"
sleep 5

sudo docker run -d --name kafkaserver -t "jetstream/kafka"
sleep 5

sudo docker run -d --name cassandraserver -t "poklet/cassandra"

echo '>>> Sleep 30 seconds'
sleep 30

mkdir /tmp/pulsarcql

wget https://raw.githubusercontent.com/pulsarIO/realtime-analytics/master/metriccalculator/pulsar.cql -O /tmp/pulsarcql/pulsar.cql

sudo docker run -it --rm --link cassandraserver:cass1 -v /tmp/pulsarcql:/data poklet/cassandra bash -c 'cqlsh $CASS1_PORT_9160_TCP_ADDR -f /data/pulsar.cql'

echo '>>> Sleep 10 seconds'
sleep 10

echo '>>> Start Pulsar pipeline'

sudo docker run -d -p 0.0.0.0:8000:9999 -p 0.0.0.0:8081:8080 --link zkserver:zkserver --link mongoserver:mongoserver -t "jetstream/config"
sleep 5

sudo docker run -d -p 0.0.0.0:8001:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/replay"
sleep 5

sudo docker run -d -p 0.0.0.0:8003:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/sessionizer"
sleep 5

sudo docker run -d -p 0.0.0.0:8004:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/distributor"
sleep 5

sudo docker run -d -p 0.0.0.0:8005:9999 --name metriccalculator --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver  --link cassandraserver:cassandraserver -t "pulsar/metriccalculator"
sleep 5

sudo docker run -d -p 0.0.0.0:8002:9999 -p 0.0.0.0:8080:8080 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/collector"
sleep 5

echo '>>> Start demo'

sudo docker run -d -p 0.0.0.0:8006:9999 -p 0.0.0.0:8083:8083 --name metricservice --link zkserver:zkserver --link mongoserver:mongoserver --link cassandraserver:cassandraserver -t "pulsar/metricservice"
sleep 5

sudo docker run -d -p 0.0.0.0:8007:9999 -p 0.0.0.0:8088:8088 --link zkserver:zkserver --link mongoserver:mongoserver --link metricservice:metricservice --link metriccalculator:metriccalculator -t "pulsar/metricui"

# The twitter sample use the twitter4j api, to run the app, it requires twitter OAUTH token (https://dev.twitter.com/oauth/overview/application-owner-access-tokens)

#sleep 5

#sudo docker run -d -p 0.0.0.0:8008:9999 --link zkserver:zkserver --link mongoserver:mongoserver -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET=  -e TWITTER4J_OAUTH_ACCESSTOKEN=  -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample

#sleep 5



실행하기


dependency docker container 실행하기


1. zookeeper 실행하기

sudo docker run -d --name zkserver -t "jetstream/zookeeper"


2. mongo 실행하기

sudo docker run -d --name mongoserver -t "mongo"


3. kafka 실행하기

sudo docker run -d --name kafkaserver -t "jetstream/kafka"


4. Cassandra 실행하기

sudo docker run -d --name cassandraserver -t "poklet/cassandra"


Cassandra Tables 와 keyspace 생성하기


1. table을 설치하기 위하여 CQL Client를 실행하기

sudo docker run -it --rm --link cassandraserver:cass1 -v $PATH_CQL:/data poklet/cassandra bash -c 'cqlsh $CASS1_PORT_9160_TCP_ADDR -f /data/pulsar.cql'


2. Pulsar init CQL파일 : pulsar.cql을 적용한다.


Config app과 replay app 실행하기


1. Config 서버 실행하기

sudo docker run -d -p 0.0.0.0:8000:9999 -p 0.0.0.0:8081:8080 --link zkserver:zkserver --link mongoserver:mongoserver -t "jetstream/config"


2. Replay app 실행하기

sudo docker run -d -p 0.0.0.0:8001:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/replay"


Pipeline app 실행하기


1. Collector 실행하기

sudo docker run -d -p 0.0.0.0:8002:9999 -p 0.0.0.0:8080:8080 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/collector"


2. Sessionizer 실행하기

sudo docker run -d -p 0.0.0.0:8003:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/sessionizer"


3. Distributor 실행하기

sudo docker run -d -p 0.0.0.0:8005:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/distributor"


4. Metric Calculator 실행하기

sudo docker run -d -p 0.0.0.0:8006:9999 --name metriccalculator --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver --link cassandraserver:cassandraserver -t "pulsar/metriccalculator"


Demo app 실행하기


1. MetricService 실행하기

sudo docker run -d -p 0.0.0.0:8006:9999 -p 0.0.0.0:8083:8083 --name metricservice --link zkserver:zkserver --link mongoserver:mongoserver --link cassandraserver:cassandraserver -t "pulsar/metricservice"


2. MetricUI 실행하기

sudo docker run -d -p 0.0.0.0:8007:9999 -p 0.0.0.0:8088:8088 --link zkserver:zkserver --link mongoserver:mongoserver --link metricservice:metricservice --link metriccalculator:metriccalculator -t "pulsar/metricui"


3. twittersample 실행하기

Twittersample은 twitter oauth token이 필요하다. 아래의 링크에서 token값을 얻어서 적용하면 된다.

https://dev.twitter.com/oauth/overview/application-owner-access-tokens

sudo docker run -d -p 0.0.0.0:8008:9999 --link zkserver:zkserver --link mongoserver:mongoserver -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET= -e TWITTER4J_OAUTH_ACCESSTOKEN= -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample Twittersample requires twitter oauth token, seehttps://dev.twitter.com/oauth/overview/application-owner-access-tokens


Pipeline app 모니터링 하기


각 앱은 http port 로 모니터링 가능하다.

1. Replay - http://<hostname>:8001

2. Collector - http://<hostname>:8002, Collector Rest end point -  http://<hostname>:8080

3. Sessionizer - http://<hostname>:8003

4. Distributor - http://<hostname>:8004

5. MetricCalculator - http://<hostname>:8005

6. Configuration - http://<hostname>:8000, Configuration UI  - http://<hostname>:8081

7. Query data via CQL Client: sudo docker run -it --rm --link cassandraserver:cass "poklet/cassandra" cqlsh cass향향

8. MetricService - http://<hostname>:8006

9. MetricUI - http://<hostname>:8007, Dashboard port: http://<hostname>:8088

10. TwitterSample - http://<hostname>:8008



Distributed deployment


Pulsar app이 실행되고 있는 분산환경에서는 app container를 실행시킬 경우 zkserver와 mongoserver에 파라미터처럼 전달해야 한다. Pulsar app이 container host network에 직접 바인딩 할 수 있게 해야함과 동시에 각 app들은 다른 host에 전달해야 한다.


Dependency docker container 실행하기


1. zkServer 실행하기

sudo docker run -d -p 0.0.0.0:2181:2181 -t "jetstream/zookeeper"


2. mongo 실행하기

sudo docker run -d -p 0.0.0.0:27017:27017 -t "mongo"


3. kafka 실행하기

sudo docker run -d -p 0.0.0.0:9092:9092 -p 0.0.0.0:2181:2181 -t "jetstream/kafka"


4. cassandra 실행하기

sudo docker run -d -p 0.0.0.0:9106:9160 -p 0.0.0.0:22:22 -p 0.0.0.0:61621:61621 -p 0.0.0.0:7000:7000 -p 0.0.0.0:7001:7001 -p 0.0.0.0:7199:7199 -p 0.0.0.0:8012:8012 -p 0.0.0.0:9042:9042 -t "poklet/cassandra"


Config app과 Replay app 실행하기


1. Config server 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -t "jetstream/config"


2. Replay app 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_ZK="kafkazkconn" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/replay"


Pipeline app 실행하기


1. Collector 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/collector"


2. Sessionizer 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/sessionizer"


3. Distributor 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/distributor"


4. Metriccalculator 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -e PULSAR_CASSANDRA="cassandraseeds" -t "pulsar/metriccalculator"


Demo app 실행하기


1. MetricService 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_CASSANDRA="cassandraserverip" -t "pulsar/metricservice"


2. MetricUI 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e METRIC_SERVER_HOST="metricserverip" -e METRIC_CALCULATOR_HOST="metriccalculatorserverip" -t "pulsar/metricui"


3. twittersample 실행하기

Twittersample은 twitter oauth token이 필요하다. 아래의 링크에서 token값을 얻어서 적용하면 된다.

https://dev.twitter.com/oauth/overview/application-owner-access-tokens

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET= -e TWITTER4J_OAUTH_ACCESSTOKEN= -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample 


EC2 deplyment


EC2에 적용하는 법은 https://docs.docker.com/installation/amazon/ 에서 docker 설치 방법을 참고한 후, 위의 스텝을 실행해보면 된다.



이번 포스팅에서는 Pulsar 설치와 실행을 정리해 보았다. 

Pulsar는 모든 app을 Docker 이미지로 만들어서 배포하고 있기때문에 매우 간단하게 설치 및 실행이 가능하다. 또한 클라우드 배포도 지원하기 때문에 Docker를 사용할 수 있는 instance에서는 쉽게 설치 및 사용할 수 있다고 한다. 


다음 포스팅에서는 Pulsar에서 제공하는 White Paper를 정리해보기로 하면서 이번 포스팅을 마친다.


참고자료 : https://github.com/pulsarIO/realtime-analytics/wiki/Get-Started



Posted by jjangAh

eBay Pulsar

Pulsar – an open-source, real-time analytics platform and stream processing framework. Pulsar can be used to collect and process user and business events in real time, providing key insights and enabling systems to react to user activities within seconds. In addition to real-time sessionization and multi-dimensional metrics aggregation over time windows, Pulsar uses a SQL-like event processing language to offer custom stream creation through data enrichment, mutation, and filtering. Pulsar scales to a million events per second with high availability. It can be easily integrated with metrics stores like Cassandra and Druid.





SlideShare : http://www.slideshare.net/kyoungmoyang/ebay-pulsar


References: 

http://www.ebaytechblog.com/2015/02/23/announcing-pulsar-real-time-analytics-at-scale/#.VQIuqBCsVW2

http://gopulsar.io/

http://gopulsar.io/html/docs.html

https://github.com/pulsarIO/realtime-analytics/wiki

https://spark.apache.org/

https://storm.apache.org/

http://www.espertech.com/

Posted by 양경모