Reference

1. Storm-starter : https://github.com/apache/storm/tree/master/examples/storm-starter

2. Storm : https://github.com/apache/storm

3. Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm : http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/


소개

회사 업무로 Apache Storm(이하 Storm)과 관련된 프로젝트를 완료하고 휴식중에, Storm 기초를 설명할 수 있을만한 예제를 만들어보기 위해서 Storm Starter를 참조하여 간단한 프로젝트를 만들었다.

이 프로젝트는 Twitter Sample Public Status API(https://dev.twitter.com/streaming/reference/get/statuses/sample)를 사용하여 Twitter realtime stream data의 일부를 Input으로 하고, HashTag 정보를 추출한 후 일정 시간 간격(emit frequency)으로 일정 시간 동안(window length)의 일정 갯수(TOP_N)의 Top HashTag를 생성하여 출력하는 프로젝트이다.

Storm-starter project에서 많은 소스코드를 가져 왔으며 Twitter Library는 Twitter4J를 사용한다.

Project source : https://github.com/forcemax/storm_twitter_hashtag


실행하기

0. Prerequisites

Java 1.7 이상, Storm 0.9.5, Maven, Git

(Twitter API를 사용하기 위한 consumerKey, consumerSecret, accessToken, accessTokenSecret을 변경하지 않으면 실행이 안된다.)

1. 소스 가져오기

git clone https://github.com/forcemax/storm_twitter_hashtag

2. 소스 빌드하기

$ mvn clean package

3. Storm Cluster에 Topology submit

$ storm jar StormTwitterHashtag-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.embian.forcemax.twitter.StormTwitterHashtagTopologyRunner server

4. Storm UI에서 확인하기



그림. topology 목록



그림. topology 상세 정보


설명

- Rolling Count Algorithm은 Reference 3번 사이트를 참조

- Topology는 다음과 같이 구성되어 있다. Twitter API를 사용하기 위해서는 Twitter에 App 등록을 해야하며, App 등록을 하면 consumerKey, consumerSecret, accessToken, accessTokenSecret 값을 얻을 수 있다. 다음 코드에 해당 값을 넣어서 사용한다.



그림. Topology 구성도

- TwitterSpout은 Twitter4J Library를 사용하며, LinkedBlockingQueue를 사용해서 새로운 Public Status가 있을때 Queue에 저장한다. nextTuple() 호출시에 Queue에서 꺼내서 ExtractHashTagBolt에 넘긴다.

- ExtractHashTagBolt는 받은 Public Status에서 HashTag만 뽑아내서, RollingCountBolt로 넘긴다. HashTag의 갯수 만큼 emit이 발생한다.

- RollingCountBolt는 생성할 때 인자로 받은 window length와 emit frequency 값을 바탕으로, emit frequency마다 window length에 속하는 데이터에서 word별 count를 계산해서 IntermediateRankingsBolt로 넘긴다. 이때, emit frequency마다 emit을 하기 위해서 TickTuple을 사용하는데, TickTuple은 Storm 0.8에 새로 들어간 기능이며 Component(Spout, Bolt)내에서 일정 주기 마다 Tuple을 발생시키는 기능이다.

IntermediateRankingsBolt와 TotalRankingsBolt는 생성할때 인자로 topN, emit frequency를 받으며, 입력된 word별 count를 바탕으로 상위 topN개의 word와 count를 뽑아내고 emit frequency 마다 emit한다. IntermediateRankingsBolt는 parallelism hint를 크게 주어 map-reduce 구조에서 map의 역할을 하고, TotalRankingsBolt는 parallelism hint를 1로 주고 reduce의 역할을 한다. emit frequency마다 emit을 하기 위해서 RollingCountBolt와 마찬가지로 TickTuple을 사용한다.

- 마지막으로 PrinterBolt는 TotalRankingsBolt에서 emit한 Tuple을 출력하기 위해서 사용하며 특별한 기능은 없다.


이 프로젝트는 Storm의 Spout(nextTuple과 open), Bolt(execute, prepare)와 Topology wiring에 대한 이해만 있다면 코드를 보는데 아무 무리가 없을 정도로 간단한 예제이다. 그러나 외부 서비스(Twitter)과의 연계를 통한 Spout 구성, Atomic하게 역할을 분리한 Bolt, parallelism hint를 조절하여 성능을 향상시키는 방법을 확인해 보기에 알맞은 예제이다.

Twitter Sample Public Status API는 10분에 20000 statuses 정도의 데이터밖에 제공하지 않으므로, 한대의 Storm에서 처리하기에 충분하다.

초보자가 알아보기 쉽게 코드가 구성되어 있으니, Storm을 이용하여 realtime CEP 엔진을 공부하려는 분들에게 많은 도움이 되었으면 한다.


posted by forcemax

Posted by 비회원

eBay Pulsar를 분석하면서 받은 느낌은 다음과 같다.

"eBay Pulsar를 바로 써먹는 것은 불가능해보인다. Jetstream을 이용하면 CEP 시스템을 쉽게 구축할 수 있을 것 같고 eBay Pulsar는 Jetstream을 활용하기 위해서 참고하는 소스 정도로 사용하면 될 것 같다."

문제는 Jetstream의 문서를 봐도 이해가 잘 되도록 되어있지는 않다는 것이다.


Jetstream을 이용해서 간단한 Pipeline을 구성하면서 Jetstream에 대해서 이해할 수 있도록 하자.


Jetstream App 만들기

maven이 설치되어있다면 다음과 같은 명령으로 Jetstream App을 만들 수 있다.

mvn archetype:generate -DarchetypeGroupId=com.ebay.jetstream.archetype -DarchetypeArtifactId=simpleapp -DarchetypeVersion=4.0.2

위의 명령을 실행시키면 groupId, artifactId 등을 물어본다. 적당히 입력하면 Jetstream Sample App이 만들어진다.

$ mvn archetype:generate -DarchetypeGroupId=com.ebay.jetstream.archetype -DarchetypeArtifactId=simpleapp -DarchetypeVersion=4.0.2

....

Define value for property 'groupId': : com.embian.test

Define value for property 'artifactId': : pulsar-test

Define value for property 'version': 1.0-SNAPSHOT:

Define value for property 'package': com.embian.test:

Confirm properties configuration:

groupId: com.embian.test

artifactId: pulsar-test

version: 1.0-SNAPSHOT

package: com.embian.test

...

mvn을 돌린 위치에 artifactId로 입력했던 pulsar-test라는 디렉토리가 생성된 것을 확인할 수 있다.

pulsar-test는 자신이 사용하는 IDE에 맞게 설정해서 Import하면 된다. 


XML파일 이해하기

Jetstream에서 사용하는 주요한 XML은 다음과 같다.

buildsrc/JetstreamConf/appwiring.xml

InboundChannel과 OutboundChannel에 대한 정의를 가지고 있다.

기본으로 생성된 App의 설정파일은 다음과 같다. XML을 한번에 보면 머리아프기 때문에 역할별로 떼어서 보도록 하겠다.

XML의 제일 상단 부분에 위치한 내용이다. 따로 설명이 필요 없으니 Pass.

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://www.springframework.org/schema/beans

    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"

    default-lazy-init="false">

그 후에 나오는 부분은 다음과 같다.

   <bean id="InboundMessageBinder"

        class="com.ebay.jetstream.event.support.channel.ChannelBinding"

        depends-on="MessageService">

        <property name="channel" ref="InboundMessages"/>

    </bean>

    

    <bean id="InboundMessages"

        class="com.ebay.jetstream.event.channel.messaging.InboundMessagingChannel">

        <property name="address" ref="InboundChannelAddress" />

        <property name="waitTimeBeforeShutdown" value="15000"/>

        <property name="eventSinks">

            <list>

                <ref bean="SampleProcessor" />

            </list>

        </property>

    </bean>


    <bean id="InboundChannelAddress"

        class="com.ebay.jetstream.event.channel.messaging.MessagingChannelAddress">

        <property name="channelTopics">

            <list>

                <value>Jetstream.sample/SampleEvent</value>

            </list>

        </property>

    </bean>

    

    <bean id="SampleProcessor" class="com.embian.test.processor.SampleProcessor">

        <property name="eventSinks">

            <list>

                <ref bean="OutboundMessages" />

            </list>

        </property>

    </bean>

Inbound에 대한 설정이다. 

InboundMessageBinder(첫번째 Bean)에서는 Message 전송 방식을 어떤식으로 쓸 것인지를 정의한다. depends-on="MessageService" 부분이 이 부분이다. MessageService에 대해서는 appwiring.xml에 나와있지 않다. 해당 정보는 나중에 볼 messagecontext.xml에서 확인할 수 있다.

아무튼.. MessageService라는 방식으로 Message를 전송하면서 Channel은 InboundMessages라는 Bean을 사용한다는 것이다. 

두번째 Bean인 InboundMessages를 보면 InboundChannelAddress에 주소가 정의되어있고 eventSinks로 SampleProcessor를 추가한 것을 확인할 수 있다.

그 밑에는 각각 InboundChannelAddress와 SampleProcessor에 대해서 정의되어있다.

InboundChannelAddress는 topic으로 Jetstream.sample/SampleEvent를 가지는 채널을 의미하고 SampleProcessor는 com.embian.test.processor.SampleProcessor 클래스를 나타내고 있다. 


종합해서 보면 Inbound는 Jetstream.sample/SampleEvent라는 토픽을 가지고 있는 통로를 통해서 메시지를 받아오고 받아온 메시지는 SampleProcessor에서 처리한다는 것이다.

SampleProcessor부분에 보면 eventSinks로 OutboundMessages가 정의되어있다. SampleProcessor에서 처리를 끝낸 것은 OutboundMessages를 통해서 전달된다는 것을 의미한다.


여기까지의 내용을 그림으로 표시하면 다음과 같다.


<그림 1. pulsar-test의 Inbound 설정 내용>

Inbound에 대한 설정 바로 밑에는 Outbound에 대한 설정이 있다.

<bean id="OutboundMessageChanneBinder" class="com.ebay.jetstream.event.support.channel.ChannelBinding"

        depends-on="MessageService">

        <property name="channel" ref="OutboundMessages" />

    </bean>

    

    <bean id="OutboundMessages"

        class="com.ebay.jetstream.event.channel.messaging.OutboundMessagingChannel"

        depends-on="MessageService">

        <property name="address" ref="OutboundMessageChannelAddress" />

    </bean>


    <bean id="OutboundMessageChannelAddress"

        class="com.ebay.jetstream.event.channel.messaging.MessagingChannelAddress">

        <property name="channelTopics">

            <list>

                <value>Jetstream.sample2/SampleEvent</value>

            </list>

        </property>

    </bean>

OutboundMessageChannelBinder에서 OutboundMessages를 채널로 정의하고 있고 OutboundMessages의 주소는 OutboundMessageChannelAddress에서 정의하고 있다. OutbountMessageChannelBinder와 OutboundMessages에 대한 정의는 MessageService를 참고하고 있다는 것을 알 수 있다.

요약하자면.. OutboundMessage는 Jetstream.sample2/SampleEvent를 토픽으로 가지고 있는 통로를 사용한다는 것이다.

Outbound까지의 내용을 <그림 1>에 추가하면 다음과 같다.


여기까지 해서 Pipeline 구성에 대한 가장 기본적인 wiring이 끝난 것이다.

outbound에 대한 설정 밑에는 Shutdown에 대한 설정이 들어가있다.

<bean id="ShutDownOrchestrator" class="com.ebay.jetstream.event.support.ShutDownOrchestrator"

        lazy-init="false">

        <property name="shutDownComponent">

            <list>

                <ref bean="InboundMessages" />

                <ref bean="SampleProcessor" />

                <ref bean="OutboundMessages" />

            </list>

        </property>

    </bean>

오류가 발생한 경우 Shutdown을 시키는 processor에 대해서 정의한 것이다.

Shutdown이 발생하면 채널과 processor를 닫는 방법에 대해서 추가할 수 있도록 한 것이다.

그리고 appwiring.xml은 이해하기 쉽게 이름을 변경하는 것도 가능하다.


buildsrc/JetstreamConf/messagecontext.xml

appwiring.xml은 Processor와 Channel간의 연결에 대해서 설정하는 반면에 messagecontext.xml은 실제 메시지 전달을 위해서 어떤 방식으로 전달할 것인가를 나타내는 설정이다.

appwiring.xml에서 언급했던 MessageService에 대해서 찾아보면 다음과 같이 되어있다.

<bean id="MessageService"

        class="com.ebay.jetstream.messaging.config.MessageServiceConfiguration"

        depends-on="SystemPropertiesConfiguration">

        <property name="messageServiceProperties" ref="MessageServiceProperties" />

    </bean>

MessageServiceProperties를 확인해보면 Jetstream.sample과 Jetstream.sample2에 대해서 정의되어있는 것을 확인할 수 있다.

<bean id="MessageServiceProperties"

        class="com.ebay.jetstream.messaging.config.MessageServiceProperties">

        <property name="nicUsage" ref="NICUsage" />

        <property name="transports">

            <list>

                <ref bean="zookeeper" />


                <bean

                    class="com.ebay.jetstream.messaging.transport.netty.config.NettyTransportConfig">

                    <property name="transportClass"

                        value="com.ebay.jetstream.messaging.transport.netty.NettyTransport" />

                    <property name="transportName" value="netty" />

                    <property name="protocol" value="tcp" />

                    <property name="contextList">

                        <list>

                            <bean

                                class="com.ebay.jetstream.messaging.transport.netty.config.NettyContextConfig">

                                <!--  TODO: Change to your context name -->

                                <property name="contextname" value="Jetstream.sample" />

                                <property name="port" value="15590" />

                                <property name="scheduler" ref="consistenthashingaffinityscheduler"/>

                            </bean>

                        </list>

                    </property>

                    <property name="sendbuffersize" value="1048576" />

                    <property name="receivebuffersize" value="1048576" />

                    <property name="downstreamDispatchQueueSize" value="262144" />

                    <property name="connectionTimeoutInSecs" value="10" />

                    <property name="asyncConnect" value="true" />

                    <property name="numAcceptorIoProcessors" value="1" />

                    <property name="numConnectorIoProcessors" value="1" />

                    <property name="requireDNS" value="false" />

                    <property name="netmask" value="#{systemProperties['jetstream.runtime.netmask'] ?: '127.0.0.1/8'}" />

                    <property name="connectionPoolSz" value="1" />

                    <property name="maxNettyBackLog" value="20000" />

                    <property name="idleTimeoutInSecs" value="8640000"/>

                    <property name="enableCompression" value="false" />

                    <property name="tcpKeepAlive" value="true"/>

                </bean>


                <bean

                    class="com.ebay.jetstream.messaging.transport.netty.config.NettyTransportConfig">

                    <property name="transportClass"

                        value="com.ebay.jetstream.messaging.transport.netty.NettyTransport" />

                    <property name="transportName" value="netty" />

                    <property name="protocol" value="tcp" />

                    <property name="contextList">

                        <list>

                            <bean

                                class="com.ebay.jetstream.messaging.transport.netty.config.NettyContextConfig">

                                <!--  TODO: Change to your context name -->

                                <property name="contextname" value="Jetstream.sample2" />

                                <property name="port" value="15591" />

                                <property name="scheduler" ref="consistenthashingaffinityscheduler"/>

                            </bean>

                        </list>

                    </property>

                    <property name="sendbuffersize" value="1048576" />

                    <property name="receivebuffersize" value="1048576" />

                    <property name="downstreamDispatchQueueSize" value="262144" />

                    <property name="connectionTimeoutInSecs" value="10" />

                    <property name="asyncConnect" value="true" />

                    <property name="numAcceptorIoProcessors" value="1" />

                    <property name="numConnectorIoProcessors" value="1" />

                    <property name="requireDNS" value="false" />

                    <property name="netmask" value="#{systemProperties['jetstream.runtime.netmask'] ?: '127.0.0.1/8'}" />

                    <property name="connectionPoolSz" value="1" />

                    <property name="maxNettyBackLog" value="20000" />

                    <property name="idleTimeoutInSecs" value="8640000"/>

                    <property name="enableCompression" value="false" />

                    <property name="tcpKeepAlive" value="true"/>

                </bean>

            </list>

        </property>

        <property name="upstreamDispatchQueueSize" value="300000" />

        <property name="upstreamDispatchThreadPoolSize" value="2" />

    </bean>


Jetstream.sample은 15590 포트를, Jetstream.sample2는 15591 포트를 통해서 통신하도록 정의되어있다.


Processor 만들기

XML 파일을 통해서 Inbound와 Outbound에 대한 연결 설정이 끝나면 해당 App이 Inbound를 통해서 들어온 Message를 어떻게 처리한 후 Outbound로 보낼지에 대해서 구성해야 한다. 

Inbound의 설정 중 eventSinks에서 정의한 Processor Class에서 Data의 처리를 수행하게 된다. 아래의 XML은 appwiring.xml에서 Processor에 대한 정의 부분이다. 이미 앞에서 알아봤던 XML에 기록되어있는 것을 다시한번 확인해보면 다음과 같다.

<bean id="InboundMessages"

        class="com.ebay.jetstream.event.channel.messaging.InboundMessagingChannel">

        <property name="address" ref="InboundChannelAddress" />

        <property name="waitTimeBeforeShutdown" value="15000"/>

        <property name="eventSinks">

            <list>

                <ref bean="SampleProcessor" />

            </list>

        </property>

    </bean>


<bean id="SampleProcessor" class="com.embian.test.processor.SampleProcessor">

        <property name="eventSinks">

            <list>

                <ref bean="OutboundMessages" />

            </list>

        </property>

    </bean>

위의 XML에서는 Inbound로 들어오는 Message를 SampleProcessor로 전달한다는 것을 의미한다. 그리고 SampleProcessor는 결과를 OutboundMessages로 보내도록 정의되어있다.


SampleProcessor는 com.embian.test.processor.SampleProcessor라는 Custom Processor를 의미한다. Processor에 EsperProcessor를 붙여서 바로 EPL처리를 하는 것도 가능하다. 먼저 Custom Processor에 대해서 알아본 후 EsperProcessor에 대해서 알아보도록 하겠다.


- CustomProcessor

CustomProcessor는 Jetstream의 AbstractEventProcessor를 상속받아서 구현할 수 있다. SampleProcessor의 내용을 보면 다음과 같다.

package com.embian.test.processor;


import org.springframework.context.ApplicationEvent;

import org.springframework.jmx.export.annotation.ManagedResource;



import com.ebay.jetstream.event.EventException;

import com.ebay.jetstream.event.JetstreamEvent;

import com.ebay.jetstream.event.support.AbstractEventProcessor;

import com.ebay.jetstream.management.Management;


@ManagedResource(objectName = "Event/Processor", description = "SampleProcessor")

public class SampleProcessor extends AbstractEventProcessor {

    private JetstreamEvent lastEvent;

    

    @Override

    public void afterPropertiesSet() throws Exception {

        Management.addBean(getBeanName(), this);

    }


    public JetstreamEvent getLastEvent() {

        return lastEvent;

    }


    @Override

    public int getPendingEvents() {

        return 0;

    }


    @Override

    public void pause() {

        

    }


    @Override

    protected void processApplicationEvent(ApplicationEvent event) {

        

    }


    @Override

    public void resume() {

        

    }


    @Override

    public void sendEvent(JetstreamEvent event) throws EventException {

        lastEvent = event;

        super.incrementEventRecievedCounter();

        super.fireSendEvent(event);

        super.incrementEventSentCounter();

    }


    public void setLastEvent(JetstreamEvent lastEvent) {

        this.lastEvent = lastEvent;

    }


    @Override

    public void shutDown() {

        

    }


}


위의 코드는 Jetstream App을 만들면 기본적으로 생성되는 코드이다. 각 method들이 유용하게 사용될 수 있지만 여기서는 가장 기본적인 method에 대해서만 알아보도록 하겠다.


afterPropertiesSet method

afterPropertiesSet method는 이름 그대로 Processor가 properties를 설정한 직후 호출되는 method이다. 초기에 설정 등이 필요한 부분은 여기에서 처리하면 된다. 

sendEvent method

sendEvent method는 XML에서 정의된 eventSinks로 message를 보낼 때 사용된다. 코드 내에서  fireSendEvent method를 사용하는 것을 볼 수 있다. 이 method가 eventSinks로 message를 보내는 역할을 한다. 만약 Pulsar의 예제처럼 IP를 통해서 국가 코드를 얻고자 하는 경우 fireSendEvent를 하기 전에 필요한 정보를 추가하면 된다.

event에서 정보를 읽거나 쓸때는 event.put, event.get method를 사용할 수 있다.


- EsperProcessor

만약 위의 예제에서 SampleProcessor를 CustomProcessor가 아닌 EsperProcessor를 사용하도록 하기 위해서는 appwiring.xml을 다음과 같이 수정하면 된다.

<bean id="SampleProcessor"

        class="com.ebay.jetstream.event.processor.esper.EsperProcessor">

        <property name="esperEventListener" ref="EsperEventListener" />

        <property name="configuration" ref="EsperConfiguration" />

        <property name="epl" ref="EPL" />

        <property name="eventSinks">

            <list>

                <ref bean="OutboundMessages" />

            </list>

        </property>

        <property name="esperExceptionHandler">

            <bean id="esperExceptionHandler"

                class="com.ebay.jetstream.event.processor.esper.EsperExceptionHandler"></bean>

        </property>

    </bean>

property 중 esperEventListener와 configuration, epl은 bean으로 등록되어있어야 한다.

이 bean들을 등록하기 위한 XML은 다음과 같다.

EsperSetup.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--

Pulsar

Copyright (C) 2013-2015 eBay Software Foundation

Licensed under the GPL v2 license.  See LICENSE for full terms.

  -->

<beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"

    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"

    default-lazy-init="true">


    <bean id="EsperEventListener"

        class="com.ebay.jetstream.event.processor.esper.EsperEventListener">

    </bean>


      <bean id="EsperConfiguration"

        class="com.ebay.jetstream.event.processor.esper.EsperConfiguration">

        <property name="internalTimerEnabled" value="true" />

        <property name="msecResolution" value="1" />

        <property name="timeSourceNano" value="false" />

        <property name="declaredEvents" ref="EventDefinition" />

        <property name="listenerDispatchTimeout" value="1000" />

        <property name="listenerDispatchPreserveOrder" value="false" />

        <property name="insertIntoDispatchTimeout" value="100" />

        <property name="insertIntoDispatchPreserveOrder" value="false" />

        <property name="threadPoolSize" value="2" />

        <property name="queueSizeLimit" value="30000" />

        <property name="executionLogging" value="true" />

        <property name="timerLogging" value="true" />

        <property name="exceptionHandlerFactoryClass"

            value="com.ebay.jetstream.event.processor.esper.JetstreamExceptionHandlerFactory" />

        <property name="autoImport">

            <list>

            </list>

        </property>

    </bean>


    <bean id="EventDefinition"

        class="com.ebay.jetstream.event.processor.esper.EsperDeclaredEvents">

        <property name="eventTypes">

            <list>

                <bean

                    class="com.ebay.jetstream.event.processor.esper.MapEventType">

                    <property name="eventAlias" value="SampleEvent" />

                    <property name="eventFields">

                        <map>

                            <entry key="data" value="java.lang.String" />

                        </map>

                    </property>

                </bean>

            </list>

        </property>

    </bean>

</beans>


EsperEventListener는 Jetstream의 Esper Listener 설정을 나타내고 EsperConfiguration은 기본적인 Esper 설정을 나타낸다. EsperSetup.xml에서 관심있게 봐야 하는 부분은 EventDefinition이라는 bean이다.

EPL을 사용하기 위해서 Event Type을 정의하는 부분이다. eventTypes라는 property에는 EPL에서 사용할 Event의 Type이 정의된다. 여기서는 string값을 갖는 data라는 이름의 항목만 있는 SampleEvent라는 이벤트를 정의했다.


EPL.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--

Pulsar

Copyright (C) 2013-2015 eBay Software Foundation

Licensed under the GPL v2 license.  See LICENSE for full terms.

  -->

<beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://www.springframework.org/schema/beans

    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"

    default-lazy-init="true">


    <bean id="EPL" class="com.ebay.jetstream.event.processor.esper.EPL">

        <property name="statementBlock">

            <value>

        <![CDATA[

               

        

            @OutputTo("OutboundMessages")

            select data, count(*) as cnt from SampleEvent.win:time(10)  group by data;

        ]]>

            </value>

        </property>

    </bean>

</beans>

EPL은 위와 같이 설정된다. @OutputTo라는 Annotation을 통해서 해당 EPL의 결과를 eventSinks에서 정의된 목록 중 어디로 보낼지를 결정할 수 있다. 위의 예제는 10초동안 data가 종류별로 몇개가 들어왔는지를 세어보는 EPL이다.


이와 같이 설정되어있을 경우.. Inbound를 통해서 들어온 이벤트는 다음과 같이 정의가 되어있다는 것을 알 수 있다.

{

     "data" : 문자열

}

이 이벤트가 EPL을 통해서 OutboundMessage로 전달될 때는 다음과 같은 형태가 된다.

{

"data": 문자열1,

"cnt": 문자열개수

}

결과 이벤트는 위의 내용 말고도 어떤 이벤트 타입인지가 js_ev_type이라는 이름 등을 통해서 추가된다.


Custom Channel 만들기

이벤트가 생성되는 시점이나 이벤트를 저장소 등에 저장하고 싶을 때 Custom Channel을 통해서 처리할 수 있다. Jetstream에서는 AbstractInboundChannel을 상속받아서 Custom Channel을 만드는 것이 가능하다. 


여기서는 0.5초마다 단어를 선택해서 outbound로 던지는 예제를 보면서 Custom Channel을 만들어보도록 하겠다.

package com.embian.channel;


import java.util.Timer;

import java.util.TimerTask;


import org.springframework.context.ApplicationEvent;

import org.springframework.jmx.export.annotation.ManagedOperation;


import com.ebay.jetstream.event.EventException;

import com.ebay.jetstream.event.JetstreamEvent;

import com.ebay.jetstream.event.channel.AbstractInboundChannel;

import com.ebay.jetstream.event.channel.ChannelAddress;

import com.ebay.jetstream.management.Management;


public class WordCountChannel extends AbstractInboundChannel {

private Timer timer = new Timer("Timer");

private TimerTask timerTask;

private Boolean channelOpened;

private class TimingTask extends TimerTask {

public void run() {

System.err.println("!!!!!!!!!!! task run");

generate();

}

}

    @Override

    public void afterPropertiesSet() throws Exception {

     Management.removeBeanOrFolder(getBeanName(), this);

Management.addBean(getBeanName(), this);

    timerTask = new TimingTask();

timer.schedule(timerTask, 30 * 1000);

        init();

    }

    

    private void init() {

    channelOpened = false;

    }


    @ManagedOperation

    public void generate() {

// TODO Auto-generated method stub

    String[] wordArray = {"WORD1", "WORD2", "WORD3", "WORD4"};

    int idx= 0;

    while(true) {

    if (channelOpened) {

    JetstreamEvent event = new JetstreamEvent();

       event.setEventType("SampleEvent");

       event.put("data", a[idx]);

       fireSendEvent(event);

       idx= (idx+ 1) % wordArray.length;

       

    }

    try {

Thread.sleep(1 * 500);

} catch (InterruptedException e) {

e.printStackTrace();

}

    }

}


@Override

    public void close() throws EventException {

        super.close();

        shutDown();

    }


    @Override

    public void flush() throws EventException {

        

    }


    @Override

    public ChannelAddress getAddress() {

        return null;

    }


    

    @Override

    public int getPendingEvents() {

        return 0;

    }


    @Override

    public void open() throws EventException {

        super.open();

        channelOpened = true;         

    }


    @Override

    public void pause() {

        close();

    }


    @Override

    protected void processApplicationEvent(ApplicationEvent event) {

        

    }


    @Override

    public void resume() {

        open();

    }


    @Override

    public void shutDown() {

       

    }

}


위의 코드에서 관심있게 봐야 할 부분은 afterPropertiesSet method와 open method이다.

afterPropertiesSet method

afterPropertiesSet method는 이름 그대로 Processor가 properties를 설정한 직후 호출되는 method이다. 초기에 설정 등이 필요한 부분은 여기에서 처리하면 된다. 

open method

Channel이 열릴때 호출되는 method이다.


매우 간단한 코드이기 때문에 약간의 시간만 투자하면 파악할 수 있다. afterPropertiesSet method에 보면 Management.removeBeanOrFolder를 한 후 다시 Management.addBean을 하는 것을 볼 수 있다. 이건 Pulsar 예제에서 이렇게 사용하고 있어서 똑같이 넣어준 것이다. (왜 remove한 후에 add한 것인지 이유는 모름 -_-)



실행하기

Jetstream을 실행하기 위해서는 먼저 Zookeeper가 동작중이어야 한다. 그리고 실행할 때 설정해줘야 하는 정보는 다음과 같다.


- Main Class : com.ebay.jetstream.application.JetstreamApplication

- arguments : -p 8009 -n appname -cv 1.0

-p : 모니터링 포트

-n : App 이름

-cv : Config 버전, Config를 사용하지 않을때는 그냥 1.0 정도로 넣으면 된다.

- VM arguments : -Djetstream.runtime.zkserver.host=127.0.0.1 -Djetstream.runtime.zkserver.port=2181 -Djetstream.runtime.netmask=0.0.0.0/0

jetstream.runtime.zkserver.host : Zookeeper Host

jetstream.runtime.zkserver.port : Zookeeper Port

jetstream.runtime.netmask : Listen할 netmask

- 시스템 환경변수 

JETSTREAM_HOME : appwiring.xml이 위치하는 buildsrc 디렉토리의 경로

MONGO_HOME : Mongodb의 접속 주소, Config를 사용하지 않을때는 설정하지 않아도 된다.



Posted by micacute

Storm-ESPER는 Storm과 ESPER를 이용한 실시간 데이터 처리 엔진이다. 

Storm-ESPER에 대해 궁금하다면, 엠비안에서 진행했던 Strom-ESPER 프로젝트의 Prototype과 테스트에 대한 블로그에서 참조 할 수 있다. (Storm & Esper Prototype 및 Test)

Storm-ESPER 프로젝트를 받았을 때, 프로젝트에 대한 프로토타입 테스트 결과문서, 설치문서, 설치 패키지가 존재하였다. 하지만 패키지와 문서가 CentOS를 위한 것이었기 때문에, 내 작업환경인 Ubuntu에서 실행되지 않았다. 그리하여 Storm-ESPER실행을 위한 각 모듈을 따로 컴파일하여 실행시켜줘야하는 번거로움이 있었다. 그뿐만 아니라 실행하는 과정에서 컴파일러 버전이 맞지 않거나 실행에 필요한 jar파일들이 없어서 고생하기도 했다. 

이러한 실행의 어려움을 해소하기 위해, Storm-ESPER 실행방법을 자세히 설명해 보도록 하겠다. 

모든 설치 및 실행방법은 Ubuntu시스템 기준으로 설명한다.


Storm-ESPER 실행전 필요조건 1 - 필요한 프로그램 설치

1. zookeeper-3.4.6

압축파일을 미러에서 받은 후 압축을 푼다.

$ wget http://mirrors.ukfast.co.uk/sites/ftp.apache.org/zookeeper/stable/zookeeper-3.4.6.tar.gz  
$ tar -xvf zookeeper-3.4.6.tar.gz  

config 파일을 생성한다. 

$ vi conf/zoo.cfg

# The number of milliseconds of each tick  
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

Zookeeper를 실행한다.

$ ~/zookeeper-3.4.6/bin/zkServer.sh start

2. RabbitMQ Server

RabbitMQ는 repository에서 설치할 수 있다.

$ apt-get install rabbitmq-server

 WebUI Admin plugin을 설치하여 모니터링을 할 수 있게 한다. 플러그인 설치 후 리스타트를 해준다.

$ rabbitmq-­plugins enable rabbitmq_management 
service rabbitmq­-server restart

3. Apache Storm 0.9.4

압축파일을 미러에서 받은 후 압축을 푼다. (이번 테스트에서 사용한 Storm버전은 0.9.4이다.)

$ wget http://mirror.apache-kr.org/storm/apache-storm-0.9.4/apache-storm-0.9.4.tar.gz 
$ tar -xvf apache-storm-0.9.4.tar.gz

conf 파일을 변경해준다.

 $ vi ~/apache-storm-0.9.4/conf/storm.yaml

########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
    - zookeeper가 설치되어있는 호스트 ip
#     - "localhost"

 nimbus.host: nimbus가 설치되어있는 호스트 ip


Storm-ESPER 실행전 필요조건 2 - 파일수정 및 Storm 실행

1. conf 파일 수정 및 라이브러리 파일 추가

먼저 Storm-ESPER를 컴파일하기 전에  conf파일을 수정해야한다. "storm-esper"프로젝트의 conf폴더에 있는 yaml파일들을 조건에 맞게 수정해 준다. 아래의 예시를 따라서 본인의 ip를 사용하면 된다.

~/storm-esper/trunk/conf/deploy.yaml

자세히 보기

~/storm-esper/trunk/conf/nimbus_conf.yaml

자세히 보기


~/storm-esper/trunk/conf/supervisor_conf.yaml

자세히 보기

~/storm-esper/trunk/conf/esper_supervisor_conf.yaml

자세히 보기


EPL명령어 제어를 위한 "storm-esper-client" 프로젝트의 conf폴더의 conf.yaml파일을 수정해 준다.

~/storm-esper-client/trunk/conf/conf.yaml

rabbitmq.server.queue.name: "cmd_queue"
rabbitmq.server.queue.ip: "192.168.0.28"
rabbitmq.server.username: "guest"
rabbitmq.server.userpass: "guest"

2. pom.xml 파일 수정 

"storm-esper" 프로젝트의 pom.xml을 다음과 같이 수정해준다.

기존 코드:
  <dependency>
    <groupId>storm</groupId>
    <artifactId>storm</artifactId>
    <version>0.8.2</version>
    </dependency>

수정 코드:
    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.9.4</version>
    </dependency>


3. jar파일 추가

Storm-ESPER엔진을 실행하기 전에 custom scheduler로 EsperBolt를 특정 supervisor(esper-supervisor)에 할당하기 위하여 Nimbus용 jar파일을 생성하여 Storm 라이브러리에 위치시켜야 한다. 또한, Storm-ESPER연동을 위하여 Esper와 RabbitMQ  라이브러리를 Storm 라이브러리에 복사하여야 한다.

Apache Storm 0.9.4가 설치되어있는 폴더 내 lib 폴더에 라이브러리 파일을 추가하는 법은 아래와 같다. 

1. "storm-esper-scheduler"프로젝트를 컴파일 하여 jar파일 생성 및 라이브러리 파일 복사

$ cd ~/storm-esper-scheduler/trunk
$ mvn clean dependency:copy-dependencies install
$ cd target
$ cp storm-esper-scheduler-0.0.1.jar ~/apache-storm-0.9.4/

2. "storm_esper" 프로젝트를 컴파일 하여 dependency jar파일 생성 및 라이브러리 파일 복사

$ cd ~/storm_esper/trunk
$ mvn clean dependency:copy-dependencies install
$ cd target/dependency
$ cp amqp-client-3.1.4.jar cglib-nodep-2.2.jar antlr-runtime-3.2.jar esper-4.10.0.jar ~/apache-storm-0.9.4/lib/

4. Storm실행

이제 Storm을 실행할 준비가 되었다. 아래의 명령어로 Storm을 실행한다. 테스트용 실행방법이므로 Nimbus, Supervisor, UI를 전부 한대의 머신에서 실행해 보겠다. 

$ ~/apache-storm-0.9.4/bin/storm nimbus &
$ ~/apache-storm-0.9.4/bin/storm supervisor &
$ ~/apache-storm-0.9.4/bin/storm ui &


Strom-ESPER 실행



<그림 1. Storm-ESPER 동작 구성도>


1. DataThrower

테스트용 더미 데이터를 DataThrower를 사용하여 RabbitMQ에 전송해야 한다. 더미 데이터는 DataThrower프로젝트를 컴파일하여 생성된 jar파일을 실행하여 "in_queue"에 전송한다. 이때 throw.conf파일의 "inqueueName":"in_queue"로 수정해준다.

$ java -cp ~/maven_data/trunk/target/maven_data-0.0.1-SNAPSHOT.jar com.embian.maven_data.tester.DataThrower ~/maven_data/conf/throw.conf >> ~/maven_data/logs/throw.log

2. submit topology

"storm-esper"프로젝트를 컴파일하여 생성된 토폴로지를 스톰에 올려주는 스크립트를 실행한다.

$ sh ~/storm_esper/trunk/etc/submit_topology.sh

3. EPL command 실행

"storm-esper-client"프로젝트를 다음의 명령어로 컴파일한다.

$ cd ~/storm-esper-client/trunk
$ mvn clean dependency:copy-dependencies install -Dmaven.test.skip=true

첨부된 스크립트를 통하여 사용할 EPL을 적용하면 된다.

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"EPL command", "params":{"bolt_id":"볼트 명", "statement":{"name":"커맨드 이름", "body":"EPL 쿼리", "state":"STARTED", "do_output": true/false}}}'


4. DataReceiver 

Storm-ESPER가 처리한 데이터는 "out_queue"에 누적된다. DataReceiver는 단순하게 "out_queue"의 데이터를 읽어와서 파일로 저장한다. DataReceiver 프로젝트를 컴파일하여 생성된 jar파일을 실행하여 파일로 저장한다.

$ java -cp ~/maven_data/trunk/target/maven_data-0.0.1-SNAPSHOT.jar com.embian.maven_data.tester.DataReceiver ~/maven_data/conf/receive.conf >> ~/maven_data/logs/receive.log


예제 실행

간단한 예제를 통하여 실행이 잘 되는지 확인해 보자.

시나리오

1. nginx의 access log를 JSON으로 만든 데이터 스트림에서 5분동안 응답시간이 1초가 넘는 Request의 통계를 출력

2. 5분동안 Page A -> Page B -> Page C의 경로로 움직인 IP에 대해서 출력

nginx의  access log를 JSON으로 만든 데이터 스트림 샘플

{"status": "200", "body_bytes_sent": "9784", "remote_user": " ", "request_time": "0.000", "http_referer": "http://www.mmm.com/bbs/board.php?page=A&wr_id=1", "remote_addr": "192.168.1.1", "request": "GET /skin/board/aa.aaa/aa.js/aaa.js?time=1434393200 HTTP/1.1", "http_user_agent": "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko", "upstream_response_time": "-", "time_local": "16/Jun/2015:03:33:20 +0900"}

사용한 command를 초기화 하려면 다음을 실행한다.

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"clean", "params":{}}'


0. LogEvent스키마 등록

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"CreateLogEventType", "body":"create schema LogEvent(status string, body_bytes_sent string, remote_user string, request_time string, http_referer string, remote_addr string, request string, http_user_agent string, upstream_response_time string, time_local string)", "state":"STARTED", "do_output":false}}}'

command 실행 결과보기

1. nginx의 access log를 JSON으로 만든 데이터 스트림에서 5분동안 응답시간이 1초가 넘는 Request의 통계를 출력

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"Senario1-1", "body":"SELECT count(remote_addr) FROM LogEvent.win:time(5 min) WHERE cast(request_time,float) >= 1.000 ", "state":"STARTED", "do_output":true}}}'

command 실행 결과보기

2. 5분동안 Page A-> Page B-> Page C의 경로로 움직인 IP에대해서 출력

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"Senario1-2", "body":"SELECT target3.remote_addr, target3.request from  pattern [  every target=LogEvent(request like \"%page=A%\") -> (target2=LogEvent(request like \"%page=B%\" and target.remote_addr = target2.remote_addr ) -> target3=LogEvent(http_referer like \"%page=C%\" and target.remote_addr = target2.remote_addr and target2.remote_addr=target3.remote_addr))  ].win:time(5 min)", "state":"STARTED", "do_output":true}}}'

command 실행 결과보기


결과물은 DataReceiver를 통하여 파일로 저장된 내역에서 확인이 가능하다.

시나리오 1. 결과물

--queueServerAddr  :  192.168.0.28
--queueName        :  out_queue

{"newEvents":[{    "count(remote_addr)": 1
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 2
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 3
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 4
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 5
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 6
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 7
}],"oldEvents":[]}

시나리오 2. 결과물

{"newEvents":[{    "target2.remote_addr": "173.254.216.67",
    "target2.request": "GET page=C HTTP/1.0"
}{    "target2.remote_addr": "173.254.216.67",
    "target2.request": "GET page=C HTTP/1.0"
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "203.133.168.168",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "203.133.168.168",
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "211.53.23.131",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "211.53.23.131",
    "target2.request": "GET /page=C HTTP/1.1"
}{    "target2.remote_addr": "211.53.23.131",
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "66.249.71.111",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "66.249.71.111",
      "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "66.249.71.111",}],"oldEvents":[]}


간편한 실행이 어려웠고, 새로운 시스템을 학습해가며 접했기 때문에, 꽤 애를 먹은 프로젝트 실행을 거쳤다. 스크립트가 있어서 그나마 편하게 실행을 할 수 있었던 것 같다. 위의 설치 및 실행기는 단독 머신에서 실행을 기준으로 설명되어있지만, 여러대의 머신에서도 설정파일만 적용해주면 동일하게 실행할 수 있다. 

위의 실행방법을 하나씩 따라해보면 실제로는 실행하기 어렵지 않을 것으로 보인다. 이상으로 Storm-ESPER의 실행기를 마친다.


Posted by jjangAh

Apache Storm과 Spark는 그 동안 많이 사용되고 있는 실시간 데이터 처리 엔진이다. 그런데 eBay가 Pulsar를 발표하면서 실시간 데이터 처리 엔진에 후보가 하나 더 늘게되었다.





Storm-ESPER는 Apache Storm과 EsperTech의 ESPER를 결합한 것으로 엠비안에서 해당 과제에 대해서 연구한 적이 있다. (Storm & Esper Prototype 및 Test

여기서는 엠비안에서 진행했던 Storm-ESPER 엔진과 eBay Pulsar(정확히는 Pulsar를 구성하는 Jetstream)를 비교해보고자 한다. 


비교를 하기 위해서 다음의 상황을 가정한다.

시나리오 1
기존 시스템은 nginx를 통해서 웹서비스를 운영하고 있다. 기존 시스템에 CEP 엔진을 붙여서 다음의 조건을 만족시키도 싶다.

1. Response Time이 3초 이상인 모든 Request는 DB에 저장하고 해당 내용을 관리자에게 메일을 발송한다.
2. 만약 국내 IP일 경우 페이지 A를 방문한 사람들이 어느 페이지로 이동했는지에 대한 통계를 집계한다.


* Cost :  도입을 위해 어느정도의 커스터마이징이 필요한가?

곤충을 머리, 가슴, 배로 나누는 것 처럼 시스템도 입력, 처리, 출력으로 나눌 수 있다. 이 3가지 부분이 커스터마이징이 필요한 부분이다. 먼저 입력 부분에서 해결해야 할 문제는 nginx의 access log를 수집해야 한다는 것이다. 이 문제는 Storm-ESPER, Pulsar 둘 다 가지고 있다. nginx의 access log는 fluentd를 사용해서 수집하여 Json형태로 변형한 후 Message Queue로 던지게 하면 간단히 해결할 수 있다.

nginx에서 Message Queue까지 던지게 하는 부분은 Storm-ESPER나 eBay Pulsar 모두 필요한 단계이다. (물론 다른 방법으로 데이터를 수집할 수 있도록 할 수 있다. 여기서는 fluentd를 통해서 Message Queue로 던지는 방식으로 구현했다고 가정한다.)

이제, Storm-ESPER와 Jetstream(eBay Pulsar)으로 위의 시나리오를 구현할 경우 어떤 차이가 있는지 알아보도록 하자.
    • Storm-ESPER
      Storm-ESPER의 구성은 다음과 같이 될 수 있다.


[ 그림 1. Storm-ESPER의 구성 ]


Storm-ESPER의 기본 구성에서 추가해줘야 하는 작업은 다음과 같다.

1. EPL Bolt에서 GeoIP를 써서 IP의 국가를 추출할 수 있는 클래스 작성
2. 메일발송, 통계 집계 등을 위한 Sinker 작성

이렇게 2가지만 추가로 구현하면 위의 시나리오를 만족할 수 있다. 물론 EPL Bolt에 다음과 같은 EPL을 처리하도록 추가해줘야 하지만 이것은 Storm-ESPER의 기본 구성에 속하는 운영 툴을 통해서 동적으로 추가하는 것이 가능하다.

    • Jetstream (eBay Pulsar)
      Jetstream의 구성은 다음과 같이 될 수 있다.

[그림 2. Jetstream (eBay Pulsar)의 구성]

각 Stage의 이름은 Pulsar의 예제에서 따왔지만 실제 Pulsar의 코드를 바로 사용하는 것은 아니다. 각 Stage별로 추가해야 하는 작업은 다음과 같다.

1. Collector
 - Message Queue로부터 데이터를 읽어와서 GeoIP를 써서 IP의 국가를 추출하는 클래스를 작성
 - Pipeline Wiring을 수정하여 outbound Channel을 Distributor쪽으로 맞춰준다.
2. Distributor
 - 각 EPL을 가지고 있는 Bean을 작성
 - 각 EPL들의 결과를 구분하여 Metric Calculator쪽으로 outbound Channel을 맞춰준다.
3. Metric Calculator
 - 메일발송, 통계 집계 등을 위한 코드 작성

추가해야 하는 작업은 Storm-ESPER에서 했던 작업들에 추가로 Wiring을 위한 설정을 해줘야 한다. Storm-ESPER가 EPL을 동적으로 추가/삭제할 수 있는 운영 툴을 제공해주는 반면에 Jetstream은 Config에서 해당 기능을 할 수 있도록 해준다.


Storm-ESPER와 eBay Pulsar 모두 도입을 위해서는 Customizing이 필요하다. Business Logic에 따라서 그 비용의 차이가 생길 수 있지만 두 시스템 모두 가장 기본이 될 수 있는 부분에 대해서는 준비가 되어있기 때문에 Business Logic을 제외한 나머지 불필요한 코드 작성을 최소화 할 수 있게 해준다.

결론.
Storm-ESPER는 데이터의 입출력에 대해서 미리 정의해놨기 때문에 Integration이 쉽고 Business Logic에 대해서만 집중할 수 있다. 그리고 Bolt간의 통신 등은 Storm에서 이미 정의되어있는대로 하기 때문에 Stage간 연결을 신경쓸 필요가 없다. 단, Topology 구성에 대해서는 고민을 해야 한다.

eBay Pulsar는 입출력 부분에 대해서도 신경써야 하지만 부담될 정도는 아니다. 그리고 Stage간 wiring도 직접 해줘야 하기 때문에 약간의 수고가 필요하다. Stage간 wiring은 XML파일 수정하는 것 정도이다.

Storm-ESPER : ★★★★☆
eBay Pulsar : ★★★☆☆


* Cost : Business Logic이 변경되는 경우 쉽게 대응할 수 있나?

시나리오 1-1
시나리오 1에서 구축한 시스템에 다음의 조건을 추가한다.
국내 IP일 경우 페이지 A를 방문한 후 B 페이지를 방문하는 사용자가 있을 경우 관리자에게 메일을 발송한다.

Business Logic이 변경되는 경우에 대한 대응은 도입하는 것 만큼이나 중요한 문제이다. 시간과 노력을 들여서 도입한 시스템이 Business Logic의 변경을 쫓아가지 못할 경우 전혀 사용하지 못하는 상황까지 발생할 수 있다. 
사실 개발하는 입장에서는 Business Logic이 변경된다 하더라도 그에 맞게 수정할 수 있다. 하지만 문제는 어느정도의 시간과 노력이 필요하느냐이다.

일반적으로 CEP와 관련되어 변경될 수 있는 Business Logic을 예상해보면 다음과 같다.
1. IFTTT와 같이 특정 조건 발생시 특정 행동을 할 수 있도록 하는 로직의 추가
2. Input의 형태 변화 및 다양화

Input의 형태가 변경되거나 새로운 Input이 추가될 경우 Storm-ESPER의 경우 단순히 Spout을 추가하는 것으로 어느정도 처리가 가능하고 Pulsar도 마찬가지로 Collector를 추가하는 것으로 처리가 가능하다.

시나리오 1-1과 같이 특정 조건이 추가되는 경우에는 Storm-ESPER와 Pulsar 모두 EPL에서 처리할 수 있도록 하는 것이 유리하다. EPL은 대충 다음과 같은 느낌으로 나올 수 있다.

SELECT target2.* from  pattern [ every target=LogEvent(request like "%page=A%" and geo="KR") -> (target2=LogEvent(request like "%page=B%" and target.remote_addr = target2.remote_addr and target2.geo="KR" ))];

이제 EPL을 Storm-ESPER와 eBay Pulsar에 추가하기 위해서는 어떤 작업이 필요한지 알아보도록 하자.
    • Storm-ESPER
      엠비안에서 진행한 Storm-ESPER는 EPL을 동적으로 추가/삭제할 수 있는 툴을 제공한다. 이 툴을 사용해서 Event Type을 지정해주고 EPL Bolt에 EPL을 추가시키면 된다.

      그리고 결과의 형태에 따라서 기존의 메일 발송 Sinker를 그대로 사용할 수도 있고 추가로 작성해줘야 할 수도 있다.

    • Jetstream (eBay Pulsar)
      eBay Pulsar는 Config를 통해서 동적으로 EPL을 추가하는 것이 가능하다. Config의 웹 인터페이스로 로그인해서 Bean을 등록해주면 된다.

      그런데 만약 시스템 구축 초기에 Config를 사용하지 않고 각 Stage의 XML에서만 Bean을 관리할 경우 XML파일을 수정해서 EPL을 등록해야 한다.

      Storm-ESPER와 마찬가지로 메일 발송을 그대로 사용할 수도 있고 추가로 작성해줘야 할 수도 있다.

결론.
Storm-ESPER는 기본적인 Data Flow가 잡혀있는 상태이기 때문에 변경사항이 발생할 경우 Data Flow만 유지될 수 있다면 추가적인 작업을 최소화할 수 있다. 반면에 Jetstream (eBay Pulsar)는 초기에 Config 구성을 어떻게 했느냐에 따라서 변경사항에 대해서 단순히 Config에 Bean만 등록하는 것으로 마무리할 수 있고 각 Stage의 XML파일을 수정해야 할 수도 있다. 물론 초기 설정에 대한 문제이기 때문에 Config를 활용하도록 구성한 경우라면 Storm-ESPER만큼이나 쉽게 작업할 수 있다.

Storm-ESPER : ★★★★☆
eBay Pulsar : ★★★


* Stability : Failover가 가능한가?

서비스 운영에서 Failover는 필수요소가 되어가고 있다. Storm-ESPER와 eBay Pulsar도 Failover에 대해서 점검해볼 필요가 있다.

    • Storm-ESPER
      Storm-ESPER의 경우 기본적으로 Storm 위에 Esper를 올려놓은 것이다. 그렇기 때문에 Failover에 대해선는 Storm과 같다고 할 수 있다.

      Storm을 구성하는 각각의 요소들에 문제가 생긴 경우 어떻게 동작하는지 알아보면 다음과 같다.
      1. Bolt : Bolt가 죽는 경우 Supervisor가 해당 Bolt를 restart하게 된다.
      2. Supervisor : Supervisor가 죽는 경우 Nimbus에 의해서 Reassign 및 Restart하게 된다.
      3. Nimbus : Nimbus가 죽는 경우 Topology의 동작에는 문제가 없다. 단, Nimbus가 죽는 경우 Supervisor에 대한 관리를 못하게 된다. Storm의 구성에서는 Nimbus를 Restart하거나 Reassign할 수 있는 시스템이 없다. 

      Nimbus가 SPOF라고 하는 말들이 있긴 하다. 하지만 SPOF라면 말 그대로 Nimbus가 죽었을 경우 전체 Topology가 동작을 못하고 죽어버리든지 그와 비슷한 상황에 빠져야 하는데 실제로는 그렇지 않다.
      그리고 Nimbus가 죽은 경우 다시 실행만 시켜놓음 되기 때문에 번거롭고 신경쓰이는 상황이기는 하지만 SPOF까지는 아니다.

    • Jetstream (eBay Pulsar)
      Pulsar는 Clustering구성을 통해서 Failover를 할 수 있도록 되어있다. 그런데 Pulsar는 Storm과는 다르게 노드가 Down된 경우 restart해주는 기능 등이 없다. 하지만 Nimbus와 같이 단일 노드로 구성되는 요소는 없다.

      그리고 Storm과는 달리 Pulsar는 3개의 이벤트 전달 모델을 가지고 있다.
      1. Push 모델
          Storm과 같이 다음 노드로 Push하는 모델이다. 
      2. Pull 모델
          Message Queue를 통해서 이벤트를 받는쪽에서 Queue의 내용을 가져가는 모델이다.
          Screenshot
      3. Hybrid 모델
          Push 모델과 Pull모델을 합친 것으로 기본적인 경우 Push를 통해서 데이터 전달을 하다가 문제가 생겨서 데이터 처리를 못하게 되는 경우 Message Queue에 넣고 replay 시키는 모델이다.
      Screenshot

      이와 같은 3개의 모델을 활용하여 Storm보다 더 유연한 Topology구성이 가능하다.
결론.

Storm-ESPER는 Nimbus, Supervisor 등에 의해서 문제가 생긴경우 자동으로 복구를 시도하게 된다. 하지만 Storm에서는 Nimbus의 문제에 대해서는 대안이 없는 상황이다.
eBay Pulsar는 3가지 데이터 전달 모델을 활용해서 Cluster구성을 하는 것이 가능하다.

Storm-ESPER : ★★★
eBay Pulsar : ★★★★

* Extensibility : Scale out이 가능한가?

시스템의 규모를 초기에 정확히 예측하는 것은 불가능하다. 그렇기 때문에 시스템 규모를 확장해야 하는 경우에는 단순히 시스템을 추가하는 것으로 서비스 규모를 확장할 수 있도록 하는 것이 중요하다. Storm-ESPER, eBay Pulsar의 Scalability에 대해서 알아보면 다음과 같다.

    • Storm-ESPER
      Storm-ESPER는 앞에서 언급한대로 Storm의 시스템 구성을 가지고 있다. Storm에서는 Scale out이 가능하다. 하지만 Node를 추가한 후에는 Topology를 Restart해야 한다는 문제점이 있다.
      Topology를 Restart하는 동안은 데이터 처리를 못하는 것은 당연하다. 그렇기 때문에 엠비안에서 진행한 Storm-ESPER에서는 Input과 Output부분에 Message Queue를 위치시켜서 Topology가 Restart되는 동안 발생할 수 있는 문제를 최소화 시켰다.

      Message Queue에 의해서 데이터가 유실되는 것이 아니기 때문에 대부분의 경우 Topology가 Restart되는 것에 대해서 문제가 발생하지 않는다. 단, Topology의 Hot Deploy는 Storm-ESPER에서는 불가능하다는 단점이 있다.

    • Jetstream (eBay Pulsar)
      eBay Pulsar는 Pipeline의 설정이 모두 XML에서 정의한 Bean에 들어가있고 Bean은 Config에서 관리하도록 할 수 있기 때문에 Config만 활용한다면 Node가 추가되거나 Topology가 변경되는 상황이라 하더라도 Topology의 중단 없이 서비스가 가능하다.

결론.
Storm-ESPER와 eBay Pulsar 모두 Scale out은 가능하다. 하지만 Storm-ESPER의 경우 Topology를 Restart해야 하는 문제가 있다. Storm-ESPER는 Input, Output의 처리에 Message Queue를 사용하기 때문에 Business 환경에서는 Critical한 문제를 일으키지는 않지만 Topology가 Restart되는 동안에는 데이터 처리를 할 수 없게 된다.

Storm-ESPER : ★★★
eBay Pulsar : ★★★★


비교 결과.

앞에서는 Storm-ESPER와 eBay Pulsar에 대해서 비용, 안정성, 확장성이란 주제로 비교를 해보았다. 비교 항목마다 점수를 준 것을 종합해보면 다음과 같다.

비교항목 

Storm-ESPER 

eBay Pulsar 

비용 

8/10 

6/10 

안정성

3/5 

4/5 

확장성

3/5 

5/5 

합계

14/20 

15/20 

Storm-ESPER는 비용적인 측면에서 좋은 점수를 받았고 eBay Pulsar는 확장성 부분에서 좋은 점수를 받았다. 안정성은 eBay Pulsar가 조금 더 좋게 나왔다.

Storm-ESPER는 기본적인 Data Flow가 정의되어있기 때문에 Business Logic을 EPL을 통해서 적용만 하면 대부분의 환경에서 활용 가능하다는 장점이 있다. 이런 장점 때문에 비용적인 측면에서 좋은 점수를 받은 것으로 생각된다. 하지만 Storm이 가지는 Nimbus에 대한 문제점 등은 어쩔 수 없이 단점으로 가지고 있다.


eBay Pulsar는 Pipeline을 다양하게 구성할 수 있고 Config를 이용한 Hot Deploy가 가능하다는 것이 가장 큰 장점이라 할 수 있다. 하지만 eBay Pulsar는 예제 수준 정도이고 실제 사용하려면 Jetstream을 사용하여 Pipeline을 구성해야 한다. 통신 모델을 정의하고 데이터 형태를 정의하는 등의 작업을 XML 또는 Config에 대햐 한다는 단점이 있다.

그리고 눈에띄게 보이는 차이점은 Storm-ESPER는 EPL의 결과를 Output Queue에 던져서 다음 처리를 할 수 있도록 한 반면에 eBay Pulsar는 EPL의 결과를 outbound channel로 던지기 전에 다른 Processor로 전달해서 부가적인 처리 등을 할 수 있다는 것이다.


쉽고 빠르게 CEP 엔진을 구축하기 위해서는 Storm-ESPER를, 좀 더 유연한 CEP설계를 하고 싶을 때에는 eBay Pulsar를 쓰는 것이 유리하다.



Posted by micacute

이전 포스트에서 eBay Pulsar에 대해서 간단히 알아봤다.


여기서는 Pulsar의 tutorial 및 demo를 통해서 Pulsar에 대해서 좀 더 자세히 알아보도록 한다.



Pulsar의 White Paper를 보면 다음과 같이 간단한 Architecture가 설명되어있다.


<Fig. 1. Real-time Data Pipeline>

Real Time Data Pipeline(이하 Pipeline)은 Collector, Sessionizer, Event Distributor, Metrics Calculator로 구성되어있는 것을 알 수 있다. 각 구성요소에 대해서는 이전 포스트(Pulsar - 오픈소스 실시간 분석 플랫폼 : White Paper)에 간단히 설명되어있다.

Pulsar의 Get Started 페이지에 Demo에 대해서 설명이 되어있다. 설명은 부실할 정보로 매우 간단하게 되어있고 Demo를 실행했을 때의 화면도 보여주고 있다. 

이 내용만으로는 직접 데모를 돌려보지 않고 동작을 이해하기는 어려울 것 같다. (데모 페이지의 Pre-Request에는 깨진 링크도 있다!)


데모를 실행하기 위한 Dependency

데모를 실행해보기 위해서는 먼저 Zookeeper, Mongo, Kafka, Cassandra를 설치해야 한다. 

Pulsar를 운영하기 위해서 위의 4가지 Dependency가 만족해야 하는 것은 아니다. 이건 어디까지나 Pulsar의 Demo를 실행해보기 위해서 필요한 것이고 모든 구성요소가 Dependency를 필요로 하는 것은 아니다.


아무튼.. Zookeeper, Mongo, Kafka, Cassandra를 설치해야 한다. 다행히 4가지 모두 Pulsar 또는 다른 사람들이 dockerize 해놨기 때문에 귀찮은 설치과정 없이 docker를 통해서 설치 및 실행이 가능하다.

각각에 대해서 알아보면 다음과 같다.

1. Zookeeper

zookeeper는 다음과 같이 실행시키도록 되어있다.

docker run -d --name zkserver -t "jetstream/zookeeper"

jetstream/zookeeper docker image는 기존에 있던 zookeeper image에서 base image를 변경한 것이다. 그리고 expose하는 port는 2181만 정의하고 있다.

2. Mongo

MongoDB는 docker hub에 등록되어있는 일반적인 이미지를 실행시키도록 되어있다.

docker run -d --name mongoserver -t "mongo"

3. Kafka

Kafka는 다음과 같이 jetstream에서 새로 만든 docker image를 사용하도록 되어있다.

docker run -d --name kafkaserver -t "jetstream/kafka"

jetstream/kafka는 zookeeper를 포함하는 kafka single node를 구성한 것이다. jetstream/kafka는 zookeeper에 등록되어있는 broker entry의 advertised ip로 host ip를 사용하도록 되어있다.

4. Cassandra

Cassandra는 docker hub에 등록되어있는 일반적인 이미지를 실행시키도록 되어있다.

docker run -d --name cassandraserver -t "poklet/cassandra"

Cassandra는 metriccalculator에서 집계한 결과를 저장하는 용도로 사용된다. 데이터를 저장하기 위한 테이블은 다음과 같이 생성한다.

mkdir /tmp/pulsarcql
wget https://raw.githubusercontent.com/pulsarIO/realtime-analytics/master/metriccalculator/pulsar.cql -O /tmp/pulsarcql/pulsar.cql
docker run -it --rm --link cassandraserver:cass1 -v /tmp/pulsarcql:/data poklet/cassandra bash -c 'cqlsh $CASS1_PORT_9160_TCP_ADDR -f /data/pulsar.cql'


위에서 설명한 Dependency들은 pipeline의 구성요소들이 사용하게 된다. 하지만 각각의 pipeline 구성요소가 위의 모든 dependency를 필요로 하는 것은 아니다.

각 구성요소별로 어떤 dependency를 필요로 하는가에 대한 것은 Pulsar에서 제공해준 demo 실행 스크립트(https://github.com/pulsarIO/realtime-analytics/blob/master/Demo/rundemo.sh)를 보면 알 수 있다.


Pipeline의 구성요소

demo 실행 스크립트를 확인해보면 다음과 같은 순서대로 pipeline 구성요소를 실행시킨다.

1. config

docker run -d -p 0.0.0.0:8000:9999 -p 0.0.0.0:8081:8080 --link zkserver:zkserver --link mongoserver:mongoserver -t "jetstream/config"

--link로 설정된 부분을 보면 config는 Zookeeper, MongoDB를 사용하는 것을 알 수 있다.

그런데 config(ConfigApp)는 위에서 보여준 Architecture에 포함되어있지 않다. config의 역할은 전체 pipeline의 provision configuration을 관리하는 것이다.

config에서 연결하는 시스템을 봤을 때 pipeline의 configuration을 저장하는 용도로 사용되는 것을 알 수 있다.

config는 Jetstream에 포함되어있는 구성요소이다.


2. replay

Replay에 대해서는 Pulsar의 Get Started 페이지에서 다음과 같이 설명하고 있다.

Replay는 Queue Full, 네트워크 부하 등으로 다른 stage에서 처리 실패한 이벤트를 replay시키는 역할을 한다. 

너무 의역한 것이기는 하지만 아무튼 이런 역할을 한다. -_-

replay는 이벤트에 tag를 달아서 해당 이벤트가 replay에 의해서 보내진 것인지를 알 수 있도록 한다.

demo 실행 스크립트에서는 replay를 다음과 같이 실행시킨다.

docker run -d -p 0.0.0.0:8001:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/replay"

replay는 kafka, mongodb, zookeeper에 대한 의존성을 가지고 있다. 


3. Sessionizer

Sessionizer는 이벤트의 session state를 관리하고 session marker event를 생성하는 역할을 한다. Pulsar 홈페이지에서는 Sessionizer는 si를 affinity key로 사용하여 동일한 si event는 동일한 노드로 라우팅된다고 설명하고 있다. 그런데 si가 어떤 것을 의미하는지에 대해서는 설명이 되어있지 않다. 그냥 Session Identifier 정도로 예상하고 있다. 

session state라는 것은 session의 메타데이터(자주 변하지 않는 ip, user agent정보 등)와 variables를 포함한다. sessionizer의 로직은 EPL에 의해저 정의되어있다.

demo 실행 스크립트에서는 다음과 같이 실행시키고 있다.

docker run -d -p 0.0.0.0:8003:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/sessionizer"

replay와 마찬가지로 cassandra를 제외한 나머지 3개 시스템(kafka, mongodb, zookeeper)에 대한 의존성을 가지고 있다. 


4. Distributor

Distributor는 이벤트를 각 목적지에 맞게 전달해주거나 걸러내거나 변형시키는 역할을 한다. 사용자는 EPL을 통해서 설정하는 것이 가능하다.

demo 실행 스크립트에서의 실행은 다음과 같다.

docker run -d -p 0.0.0.0:8004:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/distributor"


5. Metrics Calculator

이름 그대로 metric을 계산해서 저장하는 역할을 한다.

demo 실행 스크립트에서의 실행은 다음과 같다.

docker run -d -p 0.0.0.0:8005:9999 --name metriccalculator --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver  --link cassandraserver:cassandraserver -t "pulsar/metriccalculator"

다른 구성요소와는 달리 cassandra와 연결을 가지고 있는 것을 볼 수 있다.

Metrics Calculator는 2개의 stage로 구성되어있다. 첫번째 stage는 짧은 시간동안의 집계를 수행하고 그 후에 두번째 stage를 통해서 긴 시간동안의 집계를 수행하게 된다.


6. Collector

pipeline의 구성요소 중 가장 앞에 들어가는 요소인데 demo 실행 스크립트에서는 제일 마지막에 실행시킨다.

실행은 다음과 같이 되어있다.

docker run -d -p 0.0.0.0:8002:9999 -p 0.0.0.0:8080:8080 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/collector"
이름 그대로 event를 수집하는 역할을 한다. 


Demo의 실행
Pulsar 홈페이지에서는 다음의 3가지 Demo Applications를 제공해주고 있다.
1. MetricService : MetricUI를 위한 metric을 제공해주는 역할을 한다.
2. MetricUI : Metric Dashboard
3. TwitterSample : 트위터에서 발생하는 이벤트를 pulsar로 넣어주는 샘플

Pulsar의 demo 실행 스크립트에서는 MetricService, MetricUI순으로 실행한 후 제일 마지막에 TwitterSample을 실행시키도록 되어있다.

특징으로 보면.. TwitterSample은 데이터를 입력하는 역할을 하고 MetricService, MetricUI는 pipeline을 지나온 결과를 출력하는 역할을 한다.

그런데 TwitterSample의 코드에서 wiring관련 xml을 보면 다음과 같은 내용이 있다.

<bean id="OutboundMessageChannelAddress"

        class="com.ebay.jetstream.event.channel.messaging.MessagingChannelAddress">

        <property name="channelTopics">

            <list>

                <value>Pulsar.MC/ssnzEvent</value>

            </list>

        </property>

    </bean>


TwitterSample을 데이터를 만들어낸 후 Pulsar.MC/ssnzEvent로 보내는 것을 확인할 수 있다. 

Pulsar의 Architecture 그림을 보면 처음 들어온 이벤트는 Collector에 의해서 수집된 후 Sessionizer로 전달되어야 하는데 Collector의 inbound 설정을 보면 다음과 같이 되어있다.

<bean id="InboundChannelAddress"

        class="com.ebay.jetstream.event.channel.messaging.MessagingChannelAddress">

        <property name="channelTopics">

            <list>

                <value>Pulsar.collector/rawEvent</value>

            </list>

        </property>

    </bean>

위의 내용을 종합해보면 TwitterSample은 Collector로 이벤트를 전달하지 않도록 되어있다. Pulsar.MC/ssnzEvent를 통해서 전달받는 구성요소는 MetricCalculator이다.


Pulsar의 TwitterSample은 자신들이 그려놓은 Architecture와는 맞지 않게 Collector가 아닌 MetricCalculator로 이벤트를 전달하는 것을 알 수 있다.

왜 이런 사태가 발생했는지 간단히 알아보도록 하자.


Pulsar의 정체

Pulsar의 wiki에 보면 Pulsar에 대해서 설명하고 있다. 하지만 천천히 잘 읽어봐도 도대체 Pulsar가 뭔지에 대한 실체를 알기 힘들다. wiki의 Overview에서 설명하는 Pulsar의 특징은 다음과 같다.

1. Pulsar는 Open source realtime analytics platform이다.

2. 사용자가 발생시키는 event(user behavior event)를 실시간으로 수집하고 처리한다.

3. realtime sessionization, multi-dimensional metrics aggregation을 제공한다.

4. SQL-like한 언어를 써서 데이터를 가공하거나 걸러낼 수 있다.

5. 고가용성의 특징을 가지면서 초당 수백만의 이벤트까지 처리할 수 있도록 확장 가능하다.

6. Cassandra나 Druid와 같은 metrics stores와 쉽게 integration이 된다.


위의 특징을 봤을 때 가장 먼저 떠오르는 질문은 다음과 같다.

"user behavior event가 아닌 다른 event를 처리하는 것이 가능한가?"

Collector의 소스를 보면 답을 얻을 수 있다.

Collector에서 연결 관계를 설명하고 있는 appwiring.xml파일을 보면 다음과 같은 내용이 있다.

....생략....

    <bean id="InboundChannelAddress"

        class="com.ebay.jetstream.event.channel.messaging.MessagingChannelAddress">

        <property name="channelTopics">

            <list>

                <value>Pulsar.collector/rawEvent</value>

            </list>

        </property>

    </bean>

....생략.... 

    <bean id="InboundMessages"

        class="com.ebay.jetstream.event.channel.messaging.InboundMessagingChannel">

        <property name="address" ref="InboundChannelAddress" />

        <property name="waitTimeBeforeShutdown" value="5000"/>


        <property name="eventSinks">

            <list>

                <ref bean="EsperProcessor" />

            </list>

        </property>

    </bean>

    

    <bean id="EsperProcessor" class="com.ebay.jetstream.event.processor.esper.EsperProcessor">

        <property name="esperEventListener" ref="EsperEventListener" />

        <property name="configuration" ref="EsperConfiguration" />

        <property name="epl" ref="EPL" />

        <property name="eventSinks">

            <list>

                <ref bean="OutboundMessages" />

            </list>

        </property>

        <property name="adviceListener" ref="CollectorRawEventAdviceProcessor"/>

    </bean>

Pulsar.collector/rawEvent로부터 받은 이벤트를 EPL을 통해서 처리한 후 outBound로 보내는 것을 알 수 있다.

EPL에 대해서는 EPL.xml에 다음과 같이 정의되어있다.

<bean id="EPL" class="com.ebay.jetstream.event.processor.esper.EPL">

         <property name="statementBlock">

            <value>

                <![CDATA[

                

                    update istream PulsarRawEvent set ct = System.currentTimeMillis() 

                    where ct is null;

                    

                    insert into PulsarEvent

                    select 

                        DeviceEnrichmentUtil.getDeviceInfo(ua) as device, 

                        GeoEnrichmentUtil.getGeoInfo(ipv4) as geo, 

                        raw as originEvent 

                    from PulsarRawEvent as raw;


                    @ClusterAffinityTag(colname="si")

                    @OutputTo("OutboundMessages")

                    select device.userAgentVersion as _dd_bv,

                        device.userAgentFamily as _dd_bf,

                        device.userAgentType as _dd_d,

                        device.deviceCategory as _dd_dc,

                        device.osFamily as _dd_os,

                        device.osVersion as _dd_osv,

                        geo.city as _cty,

                        geo.continent as _con,

                        geo.country as _cn,

                        geo.region as _rgn,

                        geo.longitude as _lon,

                        geo.latitude as _lat,

                        geo.countryIsoCode as _tlcn,

                        originEvent.* from PulsarEvent;

                        

                ]]>

            </value>

        </property>

    </bean>

위의 EPL에서 기술하고 있는 ct라든지 ua, ipv4는 어디에서 온 것인지 궁금할 수 있다. Collector의 Simulator의 소스를 보면 Event.txt에서 이벤트의 형태를 가지고 있다는 것을 알 수 있다.

Event.txt는 다음과 같다.

{

    "si": "${siValue}",

    "ac": "727354723",

"tn": "TenantName",

"or": "OrignalName",

"ct": ${ctValue},

    "ipv4": "${ipValue}",

    "ua": "${uaValue}",

    "et": "Mobile",

    "rf": "http://referrer.somewhere.com",

    "url": "http://LandingPage.com",

    "itmT":"${itmTitle}",

    "itmP":${itmPrice},

    "cap":"${campaignName}",

    "capG":${cmapaignGMV},

    "capQ":${cmapaignQuantity}

}

Pulsar의 Collector에서 처리하고 있는 이벤트의 모습을 보면 Pulsar의 Overview를 읽고 생긴 질문에 답을 할 수 있게 된다.

"user behavior event가 아닌 다른 event를 처리하는 것이 가능한가?"

=> 넹~ 하지만 쉽지는 않아요..

다른 이벤트에 대해서 Collector의 xml을 수정해서 처리해야 하는 이벤트에 맞게 바꿔줘야 하고 EPL도 그것에 맞도록 수정해야 한다.

아무튼.. xml을 보고 수정할 수 있고 EPL을 쓸 수 있다면 그럭저럭 Pulsar를 사용해서 이벤트 처리를 할 수 있다.

Pulsar의 Wiki에서는 Pulsar의 각 Stage(Pipeline의 구성요소)는 모두 Distributed CEP Framework를 사용해서 만들었다고 소개하고 있다. Distributed CEP Framework의 이름은 Jetstream이다.

Collector, Sessionizer, Distributor의 코드를 보면 매우 단순하게 되어있다. 모두 Jetstream을 기반으로 했기 때문에 Stage간의 데이터 전달, Esper Integration 등을 신경쓸 필요가 없기 때문이다.

심지어는 TwitterSample도 Jetstream기반으로 만들어져있다!

아무튼.. Pulsar는 Jetstream을 기반으로 한 사용자 행동에 관련한 이벤트를 처리하는 Platform이라고 생각하면 된다.

그렇기 때문에 TwitterSample은 Jetstream 기반의 Component간의 데이터 처리에 대해서 보여준 것이지 Pulsar에 대한 Demo는 아니다.


Pulsar는 Collector, Sessionizer등으로 구성된 Pipeline을 활용하여 실시간 처리를 빠르게 할 수 있다는 장점 말고도 Pulsar를 구현한 Jetstream을 이용하여 상황에 맞는 Pipeline을 쉽게 구축할 수 있다는 장점을 가지고 있다.





Posted by micacute



이전에 Pulsar 설치와 실행에 대하여 포스팅을 하였는데, 이번에는 Pulsar에서 제공하고 있는 Pulsar White Paper를 간략하게 정리해보도록 하겠다. 원문은 아래 링크에서 확인 가능하다. 

Pulsar White Paper 원문 보러 가기


White Paper

1. Introduction

User behavior event는 user-agent 또는 IP주소와 같은 많은 정보들을 가지고 있다. eBay에서는 다량의 event들을 처리하고 있는데, BOT이 생성한 event들을 걸러내어 처리할 수 있다.

이 문서에서는 Pulsar의 data pipeline에 대한 디자인을 제안한다. 계산 대부분을 CEP 엔진을 사용하여 고가용성의 분산처리 시스템을 구축하였다. 우리는 이벤트 스트림을 데이터 베이스 테이블처럼 사용하는 것을 제안한다. Pulsar는 SQL 쿼리가 데이터 베이스 테이블을 사용하는 것처럼 실시간 스트림에대해 집합을 생성하는 것을 가능하게 한다. 이 문서에서는 Pulsar에서 이벤트 스트림을 처리할 때 이벤트를 보완하게 하고, 필터하고, 변화시키는 방식에 대해서 설명하였다.

2. DATA AND PROCESSING MODEL 

2.1 User Behavior Data 

사용자 행동 데이터의 예측이 불가능한 패턴이 초당 100만 개가 넘게 생산된다. 이러한 데이터를 빠르게 처리하기 위해 in-memory 처리 방식을 해야 한다.

2.2 Data Enrichment    

BOT 데이터들은 필터로 거른다. 필터링을 거친 데이터를 지역, 기기, 인구 통계 등의 데이터로 좀 더 풍부하게 만든다.

2.3 Sessionization

관련된 이벤트를 공통된 키로 구분하여 그룹핑해주는 상태 관리 프로세스이다.

2.4 User defined partial views of original streams

event들의 dimension이 아주 많지만 실제 사용자들은 원하는 특정 demention만 원하기 때문에  사용자들이 설정할 수 있게 하였다.

2.5 Computing aggregates in real-time for groups of multiple dimensions

다양한 dimension을 그룹화 하기 위해서 실시간으로 summary data를 산출 한다.

2.6 Computing Top N, Percentiles. and Distinct Count Metrics in real time

distinct count와 percentile을 구하기 위하여 HyperLogLog와 TDigest라는 알고리즘을 사용하여 공간과 시간을 절약한다. 이 알고리즘들은 근사 알고리즘으로써, 1%대의 에러를 가지는데, 이는 통계학적으로 문제없음이라고 판단한다.  

2.7 Dealing with out of order and delayed events

이벤트가 비정상이거나 지연되는경우, 롤업기능으로 처리 가능하다.

3. ARCHITECTURE

pipeline

<Fig. 1. Real-time Data Pipeline>

3.1 Complex Event Processing Framework

Jetstream 은 Spring으로 작성한 CEP 엔진으로써, 모든 pulsar의 real-time pipeline은 Jetstream으로 작성하였다. Jetstream은 다음과 같은 기능이 탑재되어있다.

1. 처리 로직을 서술적 SQL로 정의할 수 있다.
2. 어플리케이션 재시작이 필요없는 Hot deploy SQL.
3. Annotation 플러그인 프레임워크를 사용하여 SQL 기능성을 증대시킨다.
4. SQL을 사용하여 pipeline을 타고 전달한다.
5. SQL을 사용하여 stream affinity를 동적으로  생성할 수 있다.
6. Spring IOC를 사용한 Declarative pipeline stitching은 런타임에 동적 토폴로지 변경을 가능하게 한다.
7. Esper의 통합기능을 통한 CEP 기능.
8. elastic scaling이 되는 Clustering.
9. Cloud deployment가 가능한다.
10. push pull 모델을 사용한 pub-sub messaging기능을 가지고 있다.

3.2 Messaging 

3.2.1 Ingest Messaging 

REST interface를 사용하여 이벤트를 받아서 처리한다.

3.2.2 Push vs. Pull Messaging

push messaging과 Pull messaging의 장단점이 있기 때문에 어떤것을 선택해야 할 지에 대해 고민하였다.

3.2.3 Messaging Options

Messaging은 하이브리드로 구현하기로 하였다. Jetstream은 kafka를 사용하여 pub-sub cluster messaging을 할 수 있게 하였다.

4. REAL-TIME PIPELINE

4.1 Collector

Real-time pipeline의 첫 번째 단계. Jetstream CEP엔진이 있는 곳이다. 이벤트를 REST interface로 받아서 처리한다. CEP엔진은 BOT 이벤트들을 필터링해준다.

4.1.1 Geo and Device Classification Enrichment

BOT 이벤트들이 걸러내진 이벤트들에 지역 정보를 추가한다.

4.2 Bot Detector

BOT signature cache에 등록된 패턴을 걸러내도 그 외의 패턴들은 걸러내기 쉽지 않다. 하지만 CEP엔진으로 BOT의 특징을 잡아내어 BOT signature cache에 등록하여 Collector가 BOT을 걸러낼 수 있게 한다. 

4.3 Sessionizer

Real-time pipeline의 두 번째 단계이다. 이 단계의 주된 기능은 tenancy based sessionization 을 지원하는 것이다. Sessionization 은 session 기간에 특정 식별자를 포함하는 이벤트의 임시 그룹핑 과정이다. 유니크한 식별자의 이벤트가 처음 탐지되면 각 window는 시작된다. 이 window는 이벤트가 더 들어오지 않으면 끝나게 된다.

4.3.1 Session Meta Data, Counters and State

이벤트가 도달하면, 세션당 메타데이터, 빈도수 계산 및 상태를 관리한다. 모든 처리 로직은 SQL로 작성되었다.

4.3.2 Session Store

Backing store에 back up 되는 local off-heap cache에 세션 레코드를 저장하였다. Sessionizer 에서는 cache entry에 TTL을 세팅 가능하고 해당 entry가 만기 되었을 때 정확한 알림을 받을 수 있어야 하는 것이 필요하였다. 그래서 특별한 off-heap cache를 개발하였다. 

4.3.3 Session Backing Store

Session Backing Store를 위해, 다음의 사항들을 고려하였다.

a. local read, write, delete 지원 
b. local cross data center replication 지원
c. eventual consistency 지원 (고가용성을 달성하기 위해 분산 컴퓨팅에 사용되는 일관성 모델이다. 데이터에 새로운 업데이트가 이루어지지 않은 경우, 마지막으로 업데이트된 값을 반환하는 것을 비공식적으로 보장한다.
d. 저장 항목에 대한 life cycle을 관리한다. (TTL 지원)
e. 쓰기와 읽기의 비율을 10:4로 지원한다.
f. 초당 읽기 쓰기를 1M까지 지원한다.(Scale to)
g. 변수 크기는 200bytes에서 50Kbytes까지 지원한다. (Scale well)
h. Cloud에서 배포를 선호한다.
i. 주어진 클라이언트 노드에서 삽입된 키의 범위를 조회하기 위하여 보조 인덱스를 만든다.

위의 사항을 충족시키는 적합한 Session store를 찾기 위하여 Cassandra, Couchbase, 자체 개발database를 비교했다. Cassandra와 Couchbase는 Disk기반 솔루션이었기 때문에, in-memory방식이 필요한 우리는 자체 개발 database를 Session store로 선택하였다.

4.3.4 SQL extensions

Jetstream은 Esper의 SQL 언어를 확장하기 위하여 사용자들이 원하는 annotation을 작성할 수  있게 annotation 플러그인 프레임워크를 제공한다.

4.4 Event Distributor

pipeline의 세 번째 단계이다. 이 단계의 주 기능은 pipeline subscriber의 커스텀 뷰를 생성하게 하는 것이다.

4.4.1 Event filtering, Mutation and Routing

이벤트들은 SQL 문법으로 변화시키고 필터링하고 전달할 수 있다. 또한, annotaion을 사용하여 이벤트가 지나갈 루트를 지정해줄 수 있다.

4.5 Metrics calculator for Multi-dimensional OLAP

Metrics Calculator는 다양한 dimension과 time series data를 생산하는 사용자 정의의 metrics를 계산하는 실시간 metric computation engine이다. Metrics Calculator는, multiple dimension을 그룹핑하여 추출할 수 있는 SQL쿼리를 실행 할 수 있게 인터페이스를 제공해준다. 이 단계에서는 이벤트를 SQL로 컨트롤 가능하다.

4.5.1 Harvesting Metrics

이벤트들은 random scheduling이나 affinity based scheduling을 통해 Metrics Calulator application cluster에 스케쥴 될 수 있다. Metrics는 short tumbling wondows(10초동안) live stream에서 추출 된다. Window가 진행되고 있을때 unique dimension grouping을 위해 metric event가 생산된다.

4.5.2 Aggregating across the Metrics Calculator cluster

SQL문과 annotation을 사용하여 여러노드간의 aggregation이 가능하다.

4.5.3 Creating rollups in time series database

time series data를 생성하여 DB에 저장하거나 실시간 dashboard에서 widget으로 보여준다.

4.5.4 Metric Store

Metric Store를 하기 위하여 아래의 DB들을 비교해 보았다.

Open TSDB vs. Cassandra vs. DRUID 

 

Open TSDB 

Cassandra 

장점

빠른 데이터 수집
몇몇 자체 집합 함수 사용 가능

른 데이터 수집
Rollup 가능

단점

Rollup생성 불가
기존에 있는 집합으로
새로운 집합 생성을 지원하지 않음

group by 와 집합함수를 지원하지 않음 

Pulsar는 필요조건을 충족시키기 위하여 DRUID를 사용하였다.

5. CONCLUDING REMARKS

이 문서에서는 Pulsar를 구현할 때 고려했던 부분들의 일부이다. 실시간으로 사용자 행동분석과 관련된 문제들을 위하여 데이터와 프로세싱모델에 관해 설명하고 있다. Pulsar는 eBay cloud에서 1년간 사용되었다. 0.01% 미만의 안정적인 유실로 초당 10만 개의 events들을 처리했다. 95퍼센타일로 pipeline의 end to end 지연은 100ms 미만이다. 99.99%의 가용성으로 파이프라인을 성공적으로 구동시켰다. eBay의 몇몇 팀은 성공적으로 우리의 플랫폼을 사용하여 in-session 개인화, 광고, 인터넷 마케팅, 빌링, 비지니스 모니터링 등 많은 문제에 대한 해결책을 찾았다.

Pulsar의 주요 사용처는 사용자 행동 분석에 포커스를 맞추고 있지만, Pulsar를 실시간 처리가 요구되는 많은 다른 사용 경우에 대해서 쓰이기를 원한다.


White Paper에서는 실제로 eBay에서 Pulsar를 사용하여 사용자 행동분석 패턴을 분석하여 여러 가지 문제들을 해결하였다고 서술하고 있다. 또, Pulsar는 빠르고 신뢰도 높은 데이터 처리를 할 뿐만 아니라 실시간으로 데이터 분석이 가능함을 강조하였다. 그 기반은 Jetstram이라는 독자적인 플랫폼이고, 익숙한 SQL문과 annotation을 사용하여 데이터 분석이 가능함을 설명하고 있다.

White Paper가 아주 상세한 부분까지는 서술하고 있지 않지만, 어떤 데이터들을 어떻게, 무엇을 사용하여 처리하는가에 대한 궁금증을 풀 수 있기에는 충분한 것 같다. 자세한 쿼리 예제나 구조가 궁금하면 Pulsar 공식홈페이지에서 확인할 수 있다.

http://gopulsar.io/


마치며...

번역과 이해를 같이 해야하여 어려웠지만, 실시간 데이터 분석 툴의 구조를 나름은 자세히 알 수 있게 되어 의미있는 시간이었음을 상기하며, 이번 포스팅은 여기에서 마친다.



참고자료: http://gopulsar.io/docs/Whitepaper_Pulsar_Real-timeAnalyticsatScale.pdf


Posted by jjangAh



eBay에서 Pulsar라는 실시간 데이터 분석 툴을 오픈소스로 공개하였다. 

공식 문서에 따르면, Pulsar는 Scalability, Availability, Flexibility를 강조한 실시간 데이터 분석 및 모니터링 툴이다. 

Pulsar의 key point와 정리는 다음의 포스트를 참고하기 바란다.


eBay Pulsar: real-time analytics platform


위의 포스팅에서 간략하게 정리가 되어있기 때문에, 이번 포스팅에서는 Pulsar의 구성에 관해 설명하고 설치 방법에 대하여 번역하였다. 해당 원문은 다음 링크에서 확인 가능하다.


https://github.com/pulsarIO/realtime-analytics/wiki/Get-Started


Pulsar는...


스트리밍 SQL을 사용한 인터넷 스케일 실시간 분석 툴이다.


Pipeline Components


Pulsar pipeline은 Jetstream framework의 상단에 내장되어 있다. 이 pipeline은 아래와 같은 구성으로 되어있다.

1. Collector : 이벤트를 지역 정보 분류와 기기 타입을 탐지하여 REST endpoint를 통해 수집한다. 2. Sessionizer : 이벤트를 세션화 한 후, 그 세션 상태를 관리하고 마커 이벤트를 만들어 낸다. 3. Distributor : 이벤트 라우터처럼 다른 consumer들에 이벤트를 필터하고 변화시킨다. 4. Metrics Calculator : 다양한 방면에 대한 측정 기준을 계산하고 metrics store에 지속시킨다. 5. Replay : 이벤트들을 downstream 애플리케이션으로 진행하게 하지 못했을 때, Pipeline 애플리케이션들은 문제가 발생한 이벤트를 kafka에 지속시킨다. Replay는 이벤트들을 kafka에서 가져오고, pipeline에서 100%의 신뢰도를 주기 위해 원래의 downstream 애플리케이션으로 진행하게 하는 역할을 한다. 6. ConfigApp - 모든 pipeline을 위한 Dynamic provision configuration들은 Jetstream framework에 built-in app에서 설정 할 수 있다.


Dependencies


pipeline을 실행하려면 다음의 사항들이 요구된다.

1. Zookeeper 2. Mongo 3. Kafka 4. Cassandra

* Docker에 올려져 있는 이미지를 사용 하면 설치하기 쉽다.


Single node Deployment


한 개의 docker 호스트 안에서, 각 docker container instance들은 virtual network를 사용하여 통신한다.

Docker를 지원하는 node를 준비한다.  (* Docker 이미지로 설치를 쉽게 할 수 있다. 이번 포스팅에서는 Docker로 설치하는 법만 포스팅하겠다.)

1. Docker를 지원하는 리눅스 instance를 준비한다. 이때 Docker 버전은 1.3.0 이상이어야 한다. 2. instance의 최소 사양은 8GB RAM | 2 VCPU | 30.0GB Disk


설치하기


스크립트를 실행하여 docker에서 dependencies 항목에 있는 이미지와 pulsar, 그리고 데모까지 한번에 가져올 수 있다. (데모는 바로 실행 가능)

다운로드 링크 또는 


run sh <(curl -shttps://raw.githubusercontent.com/pulsarIO/realtime-analytics/master/Demo/rundemo.sh)


또는 아래 스크립트를 실행하면 된다.

#!/bin/sh

echo '>>> Start external dependency'
sudo docker run -d --name zkserver -t "jetstream/zookeeper"
sleep 5

sudo docker run -d --name mongoserver -t "mongo"
sleep 5

sudo docker run -d --name kafkaserver -t "jetstream/kafka"
sleep 5

sudo docker run -d --name cassandraserver -t "poklet/cassandra"

echo '>>> Sleep 30 seconds'
sleep 30

mkdir /tmp/pulsarcql

wget https://raw.githubusercontent.com/pulsarIO/realtime-analytics/master/metriccalculator/pulsar.cql -O /tmp/pulsarcql/pulsar.cql

sudo docker run -it --rm --link cassandraserver:cass1 -v /tmp/pulsarcql:/data poklet/cassandra bash -c 'cqlsh $CASS1_PORT_9160_TCP_ADDR -f /data/pulsar.cql'

echo '>>> Sleep 10 seconds'
sleep 10

echo '>>> Start Pulsar pipeline'

sudo docker run -d -p 0.0.0.0:8000:9999 -p 0.0.0.0:8081:8080 --link zkserver:zkserver --link mongoserver:mongoserver -t "jetstream/config"
sleep 5

sudo docker run -d -p 0.0.0.0:8001:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/replay"
sleep 5

sudo docker run -d -p 0.0.0.0:8003:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/sessionizer"
sleep 5

sudo docker run -d -p 0.0.0.0:8004:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/distributor"
sleep 5

sudo docker run -d -p 0.0.0.0:8005:9999 --name metriccalculator --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver  --link cassandraserver:cassandraserver -t "pulsar/metriccalculator"
sleep 5

sudo docker run -d -p 0.0.0.0:8002:9999 -p 0.0.0.0:8080:8080 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/collector"
sleep 5

echo '>>> Start demo'

sudo docker run -d -p 0.0.0.0:8006:9999 -p 0.0.0.0:8083:8083 --name metricservice --link zkserver:zkserver --link mongoserver:mongoserver --link cassandraserver:cassandraserver -t "pulsar/metricservice"
sleep 5

sudo docker run -d -p 0.0.0.0:8007:9999 -p 0.0.0.0:8088:8088 --link zkserver:zkserver --link mongoserver:mongoserver --link metricservice:metricservice --link metriccalculator:metriccalculator -t "pulsar/metricui"

# The twitter sample use the twitter4j api, to run the app, it requires twitter OAUTH token (https://dev.twitter.com/oauth/overview/application-owner-access-tokens)

#sleep 5

#sudo docker run -d -p 0.0.0.0:8008:9999 --link zkserver:zkserver --link mongoserver:mongoserver -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET=  -e TWITTER4J_OAUTH_ACCESSTOKEN=  -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample

#sleep 5



실행하기


dependency docker container 실행하기


1. zookeeper 실행하기

sudo docker run -d --name zkserver -t "jetstream/zookeeper"


2. mongo 실행하기

sudo docker run -d --name mongoserver -t "mongo"


3. kafka 실행하기

sudo docker run -d --name kafkaserver -t "jetstream/kafka"


4. Cassandra 실행하기

sudo docker run -d --name cassandraserver -t "poklet/cassandra"


Cassandra Tables 와 keyspace 생성하기


1. table을 설치하기 위하여 CQL Client를 실행하기

sudo docker run -it --rm --link cassandraserver:cass1 -v $PATH_CQL:/data poklet/cassandra bash -c 'cqlsh $CASS1_PORT_9160_TCP_ADDR -f /data/pulsar.cql'


2. Pulsar init CQL파일 : pulsar.cql을 적용한다.


Config app과 replay app 실행하기


1. Config 서버 실행하기

sudo docker run -d -p 0.0.0.0:8000:9999 -p 0.0.0.0:8081:8080 --link zkserver:zkserver --link mongoserver:mongoserver -t "jetstream/config"


2. Replay app 실행하기

sudo docker run -d -p 0.0.0.0:8001:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/replay"


Pipeline app 실행하기


1. Collector 실행하기

sudo docker run -d -p 0.0.0.0:8002:9999 -p 0.0.0.0:8080:8080 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/collector"


2. Sessionizer 실행하기

sudo docker run -d -p 0.0.0.0:8003:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/sessionizer"


3. Distributor 실행하기

sudo docker run -d -p 0.0.0.0:8005:9999 --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver -t "pulsar/distributor"


4. Metric Calculator 실행하기

sudo docker run -d -p 0.0.0.0:8006:9999 --name metriccalculator --link zkserver:zkserver --link mongoserver:mongoserver --link kafkaserver:kafkaserver --link cassandraserver:cassandraserver -t "pulsar/metriccalculator"


Demo app 실행하기


1. MetricService 실행하기

sudo docker run -d -p 0.0.0.0:8006:9999 -p 0.0.0.0:8083:8083 --name metricservice --link zkserver:zkserver --link mongoserver:mongoserver --link cassandraserver:cassandraserver -t "pulsar/metricservice"


2. MetricUI 실행하기

sudo docker run -d -p 0.0.0.0:8007:9999 -p 0.0.0.0:8088:8088 --link zkserver:zkserver --link mongoserver:mongoserver --link metricservice:metricservice --link metriccalculator:metriccalculator -t "pulsar/metricui"


3. twittersample 실행하기

Twittersample은 twitter oauth token이 필요하다. 아래의 링크에서 token값을 얻어서 적용하면 된다.

https://dev.twitter.com/oauth/overview/application-owner-access-tokens

sudo docker run -d -p 0.0.0.0:8008:9999 --link zkserver:zkserver --link mongoserver:mongoserver -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET= -e TWITTER4J_OAUTH_ACCESSTOKEN= -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample Twittersample requires twitter oauth token, seehttps://dev.twitter.com/oauth/overview/application-owner-access-tokens


Pipeline app 모니터링 하기


각 앱은 http port 로 모니터링 가능하다.

1. Replay - http://<hostname>:8001

2. Collector - http://<hostname>:8002, Collector Rest end point -  http://<hostname>:8080

3. Sessionizer - http://<hostname>:8003

4. Distributor - http://<hostname>:8004

5. MetricCalculator - http://<hostname>:8005

6. Configuration - http://<hostname>:8000, Configuration UI  - http://<hostname>:8081

7. Query data via CQL Client: sudo docker run -it --rm --link cassandraserver:cass "poklet/cassandra" cqlsh cass향향

8. MetricService - http://<hostname>:8006

9. MetricUI - http://<hostname>:8007, Dashboard port: http://<hostname>:8088

10. TwitterSample - http://<hostname>:8008



Distributed deployment


Pulsar app이 실행되고 있는 분산환경에서는 app container를 실행시킬 경우 zkserver와 mongoserver에 파라미터처럼 전달해야 한다. Pulsar app이 container host network에 직접 바인딩 할 수 있게 해야함과 동시에 각 app들은 다른 host에 전달해야 한다.


Dependency docker container 실행하기


1. zkServer 실행하기

sudo docker run -d -p 0.0.0.0:2181:2181 -t "jetstream/zookeeper"


2. mongo 실행하기

sudo docker run -d -p 0.0.0.0:27017:27017 -t "mongo"


3. kafka 실행하기

sudo docker run -d -p 0.0.0.0:9092:9092 -p 0.0.0.0:2181:2181 -t "jetstream/kafka"


4. cassandra 실행하기

sudo docker run -d -p 0.0.0.0:9106:9160 -p 0.0.0.0:22:22 -p 0.0.0.0:61621:61621 -p 0.0.0.0:7000:7000 -p 0.0.0.0:7001:7001 -p 0.0.0.0:7199:7199 -p 0.0.0.0:8012:8012 -p 0.0.0.0:9042:9042 -t "poklet/cassandra"


Config app과 Replay app 실행하기


1. Config server 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -t "jetstream/config"


2. Replay app 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_ZK="kafkazkconn" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/replay"


Pipeline app 실행하기


1. Collector 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/collector"


2. Sessionizer 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/sessionizer"


3. Distributor 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -t "pulsar/distributor"


4. Metriccalculator 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_KAFKA_BROKERS="kafkaborkers" -e PULSAR_CASSANDRA="cassandraseeds" -t "pulsar/metriccalculator"


Demo app 실행하기


1. MetricService 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e PULSAR_CASSANDRA="cassandraserverip" -t "pulsar/metricservice"


2. MetricUI 실행하기

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e METRIC_SERVER_HOST="metricserverip" -e METRIC_CALCULATOR_HOST="metriccalculatorserverip" -t "pulsar/metricui"


3. twittersample 실행하기

Twittersample은 twitter oauth token이 필요하다. 아래의 링크에서 token값을 얻어서 적용하면 된다.

https://dev.twitter.com/oauth/overview/application-owner-access-tokens

sudo docker run -d --net=host -e JETSTREAM_ZKSERVER_HOST="zkserverip" -e JETSTREAM_MONGOURL="mongo://mongoserverip:27017/config" -e TWITTER4J_OAUTH_CONSUMERKEY= -e TWITTER4J_OAUTH_CONSUMERSECRET= -e TWITTER4J_OAUTH_ACCESSTOKEN= -e TWITTER4J_OAUTH_ACCESSTOKENSECRET= -t pulsar/twittersample 


EC2 deplyment


EC2에 적용하는 법은 https://docs.docker.com/installation/amazon/ 에서 docker 설치 방법을 참고한 후, 위의 스텝을 실행해보면 된다.



이번 포스팅에서는 Pulsar 설치와 실행을 정리해 보았다. 

Pulsar는 모든 app을 Docker 이미지로 만들어서 배포하고 있기때문에 매우 간단하게 설치 및 실행이 가능하다. 또한 클라우드 배포도 지원하기 때문에 Docker를 사용할 수 있는 instance에서는 쉽게 설치 및 사용할 수 있다고 한다. 


다음 포스팅에서는 Pulsar에서 제공하는 White Paper를 정리해보기로 하면서 이번 포스팅을 마친다.


참고자료 : https://github.com/pulsarIO/realtime-analytics/wiki/Get-Started



Posted by jjangAh

 

 이번 포스팅에서는 실시간 분산 처리 시스템인 Storm과 CEP 처리 시스템인 Esper의 성능 테스트에 대해 적고자 한다. 지난 포스팅 중 Storm-Esper에 관한 연구 결과가 있으니 참조 바란다.

테스트 진행 구조상 어쩔수 없이 성능테스트에 Kafka까지 포함되었는데, Kafka 단일 테스트가 궁금하다면 여기를 참조하길 바란다.

이번 성능 테스트를 통해 이벤트 발생률과 양에 따른 Engine의 성능 변화분산 환경에서 노드 증가에 따른 성능 변화를 알아보고자 한다.


구조


이벤트가 Kafka로 들어가는 순간부터 이벤트가 Kafka를 통해 나오는 순간까지의 구간을 측정하였으며, Kafka로 이벤트를 보내고 받는 부분은 Performance Tester를 구현하여 사용하였다.

    * Kafka 단일 테스트는 여기 참조


테스트 환경


성능 테스트를 진행한 장비의 환경은 다음과 같다.


테스트 방법


먼저 이벤트 발생률과 양에 따른 Engine의 성능 변화를 알아보기 위해, 아래 (1)과 (2) 테스트를 진행하였다.

(1) 이벤트 발생 빈도

- 방법 : 이벤트가 발생하는 빈도를 0.01%, 25%, 50%, 75%, 100%로 증가시키며 TPS 측정

- 가설 : 이벤트 발생 빈도가 증가 할수록 성능 감소

(2) 이벤트 개수

- 방법 : 이벤트 개수를 50만개부터 250만개까지 50만개씩 증가시키며 증가시키며 TPS 측정 

- 가설 : 이벤트 개수가 증가해도 성능 일정


다음으로는 분산환경에서 노드 증가에 따른 성능 변화를 알아보기 위해, 아래 (3) 테스트를 진행하였다. 

(3) Worker 개

- 방법 : Worker 개수를 1부터 4까지 하나씩 증가시키며 TPS 측정

- 가설 : Worker 개수가 증가 할수록 성능 증가


Storm Topology 구성


(1) 이벤트 발생 빈도, (2) 이벤트 개수 테스트의 Topology 구성

(3) Worker 개수 테스트의 Topology 구성


테스트 결과


(1) 이벤트 발생 빈도

  • 고정 변
    1. 이벤트 개수 : 100만개
    2. Worker 개수 : 1개
  • 이벤트 발생 빈도가 증가할수록 성능은 감소한다는 가설과 일치


(2) 이벤트 개수

  • 고정변수
    1. 이벤트 발생 빈도 : 25%
    2. Worker 개수 : 1개
  • 이벤트 개수가 증가해도 성능은 일정하다는 가설과 일치


(3) Worker 개수

  • 고정 변수
    1. 이벤트 발생 빈도 : 50%
    2. 이벤트 개수 : 100만개
  • Worker 개수가 증가 할수록 성능은 증가한다는 가설과 일치한다.
  • 참고로 1-2 구간과 2-4 구간의 Throughput의 증가량이 다른 이유는 테스트에 사용된 장비의 성능이 일정하지 않기 때문이다.


결과 정리


테스트 모두 가설과 일치하는 결과가 나왔으며, 한번 더 간단히 정리하면 아래와 같다.

(1) 이벤트 발생 빈도

  이벤트 발생 빈도가 증가할수록 성능은 감소

(2) 이벤트 개수

  이벤트 개수가 증가해도 성능은 일정

(3) Worker 개수

  Worker 개수가 증가 할수록 성능은 증가

  


Reference 


  Storm    https://storm.apache.org/

  Esper     http://www.espertech.com/




Posted by minji7

eBay Pulsar

Pulsar – an open-source, real-time analytics platform and stream processing framework. Pulsar can be used to collect and process user and business events in real time, providing key insights and enabling systems to react to user activities within seconds. In addition to real-time sessionization and multi-dimensional metrics aggregation over time windows, Pulsar uses a SQL-like event processing language to offer custom stream creation through data enrichment, mutation, and filtering. Pulsar scales to a million events per second with high availability. It can be easily integrated with metrics stores like Cassandra and Druid.





SlideShare : http://www.slideshare.net/kyoungmoyang/ebay-pulsar


References: 

http://www.ebaytechblog.com/2015/02/23/announcing-pulsar-real-time-analytics-at-scale/#.VQIuqBCsVW2

http://gopulsar.io/

http://gopulsar.io/html/docs.html

https://github.com/pulsarIO/realtime-analytics/wiki

https://spark.apache.org/

https://storm.apache.org/

http://www.espertech.com/

Posted by 양경모

얼마전 ELK Stack이 Splunk 대체 Solution으로서 적합한가에 대해 논하고자 글을 적었다. ELK Stack을 사용해보고 Splunk와 비교하기 위해 글을 적으며 ELK Stack의 간단한 소개와 설치, 설정, 사용법에 대해 다룬 적이 있다.

하지만 얼마 지나지않아 Elasticsearch, Logstash, Kibana가 새로운 버전을 선보였다. 이에 이 글에서는 새로워진 ELK Stack을 사용해보고자 설치와 설정 및 사용법에 대해 설명한다.

ELK Stack이 무엇인지 잘 모른다면 이전 글을 참고하길 바란다.


설치 및 설정

아래의 내용은 1대의 Server 환경에서 설치 및 설정한 경우이며, Elasticsearch는 1.0.1, Logstash는 1.4.0 GA, Kibana는 3.0.0 GA를 사용하였다.


Elasticsearch 설치 및 설정

(1) Elasticsearch 홈페이지에서 Elasticsearch의 zip파일을 다운받아 unzip한다.


(2) Elasticsearch 모니터링을 위한 Plugin을 설치 한다.

  $ bin/plugin --install mobz/elasticsearch-head


(3) config/elasticsearch.yml의 node.name을 지정한다. 이는 노드 식별을 위한 이름이므로 유일성과 의미를 가진 이름을 사용한다.


Logstash 설치 및 설정

(1) Elasticsearch 홈페이지에서 Logstash의 zip파일을 다운받아 unzip한다.


(2) Logstash Config File을 생성하여 다음과 같이 저장한다.

  input {

    file {

      path => "/var/log/apache2/access.log"

      type => "apache"

    }

  }

  filter {

    grok {

      type => "apache"

      pattern => "%{COMBINEDAPACHELOG}"

    }

    date {

      type => "apache"

      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]

    }

  }

  output {

    stdout { debug => true debug_format => "json"}

    elasticsearch { host => "localhost" }
  }


Kibana 설치 및 설정

(1) Elasticsearch 홈페이지에서 Kibana의 zip파일을 다운받아 unzip한다.

 


(2) unzip한 파일은 Elasticsearch의 Plugin에 넣어 사용한다. 경로는 다음과 같다.

  path/to/elasticsearch/plugins/kibana/_site


(3) path/to/elasticsearch/plugins/kibana/_site/config.js의 Elasticsearch Server URL을 지정한다. 

     (예: elasticsearch: "http://localhost:9200")


실행

Elasticsearch 실행

unzip한 Elasticsearch 디렉토리안에서 실행한다.

  $ bin/elasticsearch


Logstash 실행

unzip한 Logstash 디렉토리안에서 실행한다.

  $ bin/Logstash -f logstash.conf

logstash.conf는 앞서 생성한 Logstash의 Config File이다.


Kibana 실행

Elasticsearch의 Plugin안에 넣었기 때문에 따로 실행할 필요가 없다. 

http://localhost:9200/_plugin/kibana/#/dashboard에서 Kibana가 잘 뜨는지 확인한다.


사용

Dashboard 생성

(1) http://localhost:9200/_plugin/kibana/#/dashboard에서 Blank Dashboard를 클릭한다.


(2) 설정 아이콘을 클릭한다.


(3) Dashboard의 Title을 입력하고 저장하면 새로운 Dashboard가 생성된다.

Panel 생성

(1) 'ADD A ROW' 버튼을 클릭 한다.


(2) Rows의 Title을 입력하고, 'Create Row' 버튼을 클릭하여 Row를 생성한 후 저장하고 창을 닫는다.


(3) Panel 생성을 위해 'Add panel to empty row' 버튼을 클릭한다.


(4) Panel Type을 선택하고 저장한후 창을 닫는다. 여기서는 'histogram'을 선택하였다.


(5) Panel의 Title 및 사용자가 원하는 설정을 하고난 후 'Add Panel' 버튼을 클릭하여 Panel을 생성하고 창을 닫는다.


(6) 위에서 선택한 Type의 Panel이 생성되었음을 확인할수 있다.


Query

다양한 Query를 이용하여 Search하고 결과를 확인할 수 있다. 

(Query문의 종류 :  http://lucene.apache.org/core/3_5_0/queryparsersyntax.html )



Apache로그를 이용하여 위와 같은 작업을 거쳐 Dashboard를 완성해 보았다.



Reference Sites

(1) Elasticsearch : http://www.elasticsearch.org/

(2) ELK Download : http://www.elasticsearch.org/overview/elkdownloads/

(3) Logstash 1.4.0 Docs : http://logstash.net/docs/1.4.0/tutorials/getting-started-with-logstash

(4) Elasticsearch 1.x version set up : http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/setup.html

(5) Whats cooking kibana : http://www.elasticsearch.org/blog/whats-cooking-kibana/



Posted by minji7