최근 회사에서 storm kafka에 대해 공부하면서 작은 미니 프로젝트를 했는데 공부했던 경험을 정리도 할겸 포스팅을 한다.

먼저 storm과 kafka에 대해 간단히 알아보도록 하자. 


storm 


storm은 실시간 분산 처리 시스템이고, 방대한 양에 데이터 스트림을 안정적으로 처리한다. storm은 실시간 분석, 머신러닝 등에 사용된다. storm 클러스터는 Hadoop 클러스터와 표면적으로 유사한데 Hadoop에서  "MapReduce job"을 실행하는 반면에, storm은 "topology"를 실행시킨다.  "jobs" 와 "topology"는 매우 다른데 한가지 핵심적으로 다른 점은 MapReduce job 은 결국 끝나게 되지만 토폴로지는 kill하지 않는 이상 계속 message를 처리한다. 

storm에 대한 좀 더 자세한 설명은 링크링크2 를 확인해 보기 바란다.








kafka


kafka는 LinkedIn에서 자신들의 내부 데이터 처리를 위해 개발한 Distriubted Processing Message System이다. 전통적인 메세지큐시스템과는 다르게 Apache Kafka는 Broker Cluster를 여러 대의 Machine으로 구성하여 분산처리가 가능하다는 장점을 가지고 있다. Big Data시장의 발전과 함께 가장 주목받고 있는 Queue System이기도 하다. 

Kafka에 대한 좀 더 자세한 설명은 링크를 확인해 보기 바란다.  

아래의 사이트도 설명이 잘 되어있다.

http://epicdevs.com/17 

http://kafka.apache.org/documentation.html#introduction




storm kafka에 대한 간단한 소개를 마쳤으니, 이제 내가 했던 작은 프로젝트에 대해 간단히 설명하고 프로젝트에 대한 step 하나 하나 설명해 보도록 하겠다. 


storm kafka 미니 프로젝트 


producer에서 로그 파일을 읽어  message를 broker(연습 용이므로 broker는 하나만 사용한다)로 publish하고  storm spout에서 consume하여 bolt에서 처리하도록 한는것이 프로젝트 목표이다.




로그 파일의 log는 다음과 같은 형태로 되어 있다고 가정한다.

@timestamp : 2015-11-10T15:32:06.046+09:00; doctype : sns; key : 974cfc83-99e0-420e-bfd1-2262e4e82dbd; appid : com.facebook.katana; appversion : 48


개발환경은 ubuntu 14.04를 기반으로 한다. 


step 1. kafka 준비 (kafka 다운로드 페이지)


다운 받은 kafka 앞축을 풀고 설치된 kafka디렉토리로 이동한다. 


(1)zookeeper 서버 실행 (zookeepr에 대한 자세한 설명은 여기 링크를 참조 하기 바란다.  )

$ bin/zookeeper-server-start.sh config/zookeeper.properties


(2)kafka서버 실행 

$ bin/kafka-server-start.sh config/server.properties


(3)topic 만들기 

kafka의 broker는 topic이라는 기준으로 메시지를 관리한다. producer에서 특정 topic의 메시지를 생성 한 후 broker에 전달하면 broker는 전달받은 메시지를 topic별로 분류하여 쌓아놓는다.  "onlytest"라는 이름의 topic을 생성하도록 하겠다. 

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic onlytest



step 2. producer 준비 



log 파일을 읽어 message를 send하는 간단한 코드이다. 추가 properties 설정에 대한 자세한 사항은 이 링크에서 확인해 보기 바란다.


import java.io.File;

import java.util.Properties;

import org.apache.commons.io.input.Tailer;

import org.apache.commons.io.input.TailerListenerAdapter;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;


public class TestProducer {

private static final int SLEEP = 500;

public kafka.javaapi.producer.Producer<String,String> producer;

public void setConfig(){

Properties properties = new Properties();

       properties.put("metadata.broker.list","localhost:9092"); // broker list 필수!

       properties.put("serializer.class","kafka.serializer.StringEncoder"); //메시지를 serialize할때 사용하는 인코더

       ProducerConfig producerConfig = new ProducerConfig(properties);

       producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);

}

public static void main(String[] args) throws InterruptedException{

      

       TestProducer testProducer = new TestProducer();

       testProducer.setConfig();

       testProducer.run();

   }

private void run() throws InterruptedException {

OnlyLogListenter onlyLogListenter = new OnlyLogListenter(producer);

       Tailer tailer = Tailer.create(new File("your file path"), onlyLogListenter,SLEEP);

       while(true){

        Thread.sleep(SLEEP);

       }

   }

public class OnlyLogListenter extends TailerListenerAdapter{

kafka.javaapi.producer.Producer<String,String> producer;

public OnlyLogListenter(kafka.javaapi.producer.Producer<String,String> producer){

this.producer = producer;

}

@Override

public void handle(String line){

System.err.println(line);

KeyedMessage<String, String> message =new KeyedMessage<String, String>("onlytest",line);

    producer.send(message);

}

}

}

 

step 3. storm 준비 


(1) storm 다운받기 

storm은 요기 링크에서 다운로드 받으면 된다.(source code가 아닌 release 버전으로 다운로드 한다.) 나는 0.9.5버전을 다운로드 받았다.압축을 풀고 해당 디렉토리로 이동한다. 

나의 경우는 local에서 production cluster 모드로 테스트 하려고 하기 때문에  conf 디렉토리에 storm.yaml을 수정하도록 하겠다.

$ vi ~스톰 디렉토리/conf/storm.yaml


(2) storm.yaml 파일 수정하기 

아래와 같은 항목을 storm.yaml 파일에 추가하도록 하자  nimbus라는 것을 발견할 수 있는데 이에 대한 설명은 다음 링크에서 확인해 볼수 있다.

#zookeeper 서버 설정. local에서 production cluster모드로 테스트 해볼 것이니 실제 아이피 주소로 셋팅하자.

storm.zookeeper.servers:

      - 192.168.0.11


#nimbus host 및 seed 설정. local에서 production cluster모드로 테스트 해볼 것이니 실제 아이피 주소로 셋팅하자.

nimbus.host: 192.168.0.11

nimbus.seed: "192.168.0.11"


#storm local dir 설정 

storm.local.dir: "storm local dir path "


#storm ui  

ui.port : 8087


위와 같이 파일을 수정하고 저장한다. 


step 4. storm topology 만들기  


kafka broker에 전달된 메세지를 spout에 consume하여 bolt에서 처리 하도록 하는 코드이다. 




(1) zookeeper url을 설정한다. 

storm을 local cluster 모드로 테스트 해보자고 한다면 zookeeper 는 "localhost:2181"이 될것이다. 만약에 production cluster모드로 local에서 테스트 해보자고 한다면 "127.0.0.1:2181" 이 아닌 자신의 "실제  ip 주소 : 2181"로 셋팅하면 된다. 

String zkUrl = "zookeeper url:2181";


(2)kafkaspout설정을 해준다. 

아래 코드에 대해 좀더 자세히 알아 보고 싶으면 다음 링크에서 확인해 볼수 있다.

ZkHosts hosts = new ZkHosts(zkUrl);

SpoutConfig spoutConfig = new SpoutConfig(hosts, "onlytest", "/onlytest", UUID.randomUUID().toString());

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


(3)spout과 bolt설정을 해준다. 

shuffleGrouping,fieldsGrouping 이라고 보이는데 이것을 stream grouping이라하고 토폴로지에  두개의 컴포넌트사이에서 어떻게 튜플을 send할지 알려주는 것을 말한다. 여기서 쓴 shuffleGrouping은 튜플을 무작위로 동일한 비율로 나눠서 볼트에 task를 할당하는 것이고, fieldsgrouping은 튜플에 있는 필드 값을 기준으로 파티셔닝되어 각 볼트 task에 튜플을 할당한다.  그 외 다른 stream grouping을 살펴보고 싶다면 다음 링크에서 확인해 볼 수 있다. 

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", kafkaSpout, 1);

builder.setBolt("cutbolt", new CutLogBolt(), 8).shuffleGrouping("spout");

builder.setBolt("classifybolt", new ClassifyKeyBolt(), 8).fieldsGrouping("cutbolt",new Fields("key","doctype"));

builder.setBolt("docbolt", new DoctypeCountBolt(), 8).fieldsGrouping("classifybolt",new Fields("subdoctype"));


(4) nimbus host,storm local dir 등등을 설정해준다.

local mode와 cluster mode를 제량에 따라 설정할 수 있는데 나는 local에서 production cluster mode로 실행 하려고 했기 때문에 아래와 같이 설정했다. 

//=============================

// local mode

//=============================

// LocalCluster cluster = new LocalCluster();

// cluster.submitTopology("log-stat", conf, builder.createTopology());

// Thread.sleep(10000);

// cluster.shutdown();

//=============================

// cluster mode

//=============================

conf.put(Config.NIMBUS_HOST, "nimbus url");

conf.put(Config.STORM_LOCAL_DIR,"your storm local dir");

conf.put(Config.NIMBUS_THRIFT_PORT,6627);

conf.put(Config.STORM_ZOOKEEPER_PORT,2181);

conf.put(Config.STORM_ZOOKEEPER_SERVERS,Arrays.asList(new String[] {"zookeeper url"}));

// conf.setNumWorkers(20);

// conf.setMaxSpoutPending(5000);

StormSubmitter.submitTopology("onlytest", conf, builder.createTopology());


아래는 최종 topology 코드이다. 

import java.util.ArrayList;

import java.util.Arrays;

import java.util.List;

import java.util.UUID;


import soeun.storm.kafka.bolt.ClassifyKeyBolt;

import soeun.storm.kafka.bolt.CutLogBolt;

import soeun.storm.kafka.bolt.DoctypeCountBolt;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;


public class StormKafakaSimpleTopology {

  

   public static void main(String[] args) throws Exception {


       String zkUrl = "zookeeper url:2181";        // zookeeper url

       String brokerUrl = "localhost:9092";


       if (args.length > 2 || (args.length == 1 && args[0].matches("^-h|--help$"))) {

           System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]");

           System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "]");

           System.exit(1);

       } else if (args.length == 1) {

           zkUrl = args[0];

       } else if (args.length == 2) {

           zkUrl = args[0];

           brokerUrl = args[1];

       }


       System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl);


       ZkHosts hosts = new ZkHosts(zkUrl);

       SpoutConfig spoutConfig = new SpoutConfig(hosts, "onlytest", "/onlytest", UUID.randomUUID().toString());

       spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

       KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

       TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", kafkaSpout, 1);

builder.setBolt("cutbolt", new CutLogBolt(), 8).shuffleGrouping("spout");

builder.setBolt("classifybolt", new ClassifyKeyBolt(), 8).fieldsGrouping("cutbolt",new Fields("key","doctype"));

builder.setBolt("docbolt", new DoctypeCountBolt(), 8).fieldsGrouping("classifybolt",new Fields("subdoctype"));

Config conf = new Config();

conf.setDebug(true);

List<String> nimbus_seeds = new ArrayList<String>();

nimbus_seeds.add("nimbus url");


if (args != null && args.length > 0) {

conf.setNumWorkers(3);


StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

}

else {


//=============================

// local mode

//=============================

// LocalCluster cluster = new LocalCluster();

// cluster.submitTopology("log-stat", conf, builder.createTopology());

// Thread.sleep(10000);

// cluster.shutdown();

//=============================

// cluster mode

//=============================

conf.put(Config.NIMBUS_HOST, "nimbus url");

conf.put(Config.STORM_LOCAL_DIR,"your storm local dir");

conf.put(Config.NIMBUS_THRIFT_PORT,6627);

conf.put(Config.STORM_ZOOKEEPER_PORT,2181);

conf.put(Config.STORM_ZOOKEEPER_SERVERS,Arrays.asList(new String[] {"zookeeper url"}));

// conf.setNumWorkers(20);

// conf.setMaxSpoutPending(5000);

StormSubmitter.submitTopology("onlytest", conf, builder.createTopology());


}

}

}


CutLogBolt.java

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class CutLogBolt extends BaseBasicBolt{

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String[] splitArray = input.getString(0).split(";");

String key = "";

String doctype = "";

for(int i = 0; i < splitArray.length; i++){

if(splitArray[i].contains("key"))

key  = splitArray[i];

if(splitArray[i].contains("doctype"))

doctype = splitArray[i];

}

collector.emit(new Values(key,doctype));

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("key","doctype"));

}


}


ClassifyKeyBolt.java

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class ClassifyKeyBolt extends BaseBasicBolt{


@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String[] splitdoctype = input.getStringByField("doctype").split(":");

String[] splitkey = input.getStringByField("key").split(":");

if(splitkey.length == 2 && splitdoctype.length == 2){

String doctype  = splitdoctype[1].trim();

String key  = splitkey[1].trim();

// System.err.println(key + ":" + doctype);

collector.emit(new Values(key + ":" + doctype));

}

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("subdoctype"));

}

}


DoctypeCountBolt.java 

import java.util.HashMap;

import java.util.Map;


import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class DoctypeCountBolt extends BaseBasicBolt {

Map<String,Integer> docMap = new HashMap<String,Integer>();

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String doctype = input.getStringByField("subdoctype");

Integer count = docMap.get(doctype);

if(count == null)

count = 0;

count++;

docMap.put(doctype, count);

System.out.println(docMap);

collector.emit(new Values(docMap));

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("docmap"));

}

}


(5) pom.xml 


pom.xml 설정할 때 주의 할 것은 storm과 kafka의 버전의 따른 dependency 를 꼭 확인해 보고 pom.xml에 추가해야 한다는 것이다.

현재 나는 storm 버전이 0.9.5이기 때문에 

storm-core : 0.9.5

storm-kafka : 0.9.5

앞서 설치한 kafka는  kafka_2.10_0.8.1 이기 때문에 버전에 맞게 설정했다. 버전에 맞지 않으면 엄청난 삽질을 하게 된다. 

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

 <modelVersion>4.0.0</modelVersion>

 <groupId>com.soeun.storm</groupId>

 <artifactId>ministorm</artifactId>

 <version>0.0.1-SNAPSHOT</version>

 <packaging>jar</packaging>

 <name>ministorm</name>

 <url>http://maven.apache.org</url>

 <properties>

   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

 </properties>

<dependencies>

 <dependency>

   <groupId>junit</groupId>

   <artifactId>junit</artifactId>

   <version>4.11</version>

   <scope>test</scope>

 </dependency>

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-core</artifactId>

<version>0.9.5</version>

<scope>provided</scope>

</dependency>

 <dependency>

     <groupId>org.apache.storm</groupId>

     <artifactId>storm-kafka</artifactId>

     <version>0.9.5</version>

   </dependency>

   <dependency>

<groupId>org.testng</groupId>

<artifactId>testng</artifactId>

<version>6.8</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.mockito</groupId>

<artifactId>mockito-all</artifactId>

<version>1.9.0</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.8.1</version>

<exclusions>

       <exclusion>

         <groupId>org.apache.zookeeper</groupId>

         <artifactId>zookeeper</artifactId>

       </exclusion>

       <exclusion>

               <groupId>com.sun.jmx</groupId>

               <artifactId>jmxri</artifactId>

           </exclusion>

           <exclusion>

               <groupId>com.sun.jdmk</groupId>

               <artifactId>jmxtools</artifactId>

           </exclusion>

           <exclusion>

               <groupId>javax.jms</groupId>

               <artifactId>jms</artifactId>

           </exclusion>

     </exclusions>

</dependency>

</dependencies>

<build>

 <plugins>

   <plugin>

     <artifactId>maven-assembly-plugin</artifactId>

     <version>2.2.1</version>

     <configuration>

       <descriptorRefs>

         <descriptorRef>jar-with-dependencies</descriptorRef>

       </descriptorRefs>

       <archive>

         <manifest>

           <mainClass />

         </manifest>

       </archive>

     </configuration>

     <executions>

       <execution>

         <id>make-assembly</id>

         <phase>package</phase>

         <goals>

           <goal>single</goal>

         </goals>

       </execution>

     </executions>

   </plugin>

 </plugins>

</build>

</project>


(6) maven install 


앞에 코드가 작성되어있는 프로젝트 디렉토리에 이동한후 maven clean install해준다.  

$ mvn clean install


target폴더에  ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar 파일이 생긴 것을 확인 할 수 있을 것이다. 


step 5. storm nimbus,supervisor,ui 실행 


스톰이 설치 되어 있는 디렉토리로 이동하여 nimbus,supervisor,ui를 실행 시킨다. 

$ bin/storm nimbus


$ bin/storm supervisor


$ bin/storm ui


ui를 실행시키면 http://localhost:8087로 접근하여 topology 상황을 웹으로 확인해 볼 수 있다. 



step 6.  ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar summit 


ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar 를 아래와 같이 실행 시킨다 


$ bin/storm jar ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar {main class path} 


ex)

bin/storm jar ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar soeun.storm.kafka.topology.StormKafakaSimpleTopology 


정상적으로 submit 하면 다음과 같은 메세지가 출력된다.

..............


[main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: onlytest


http://localhost:8087로 접근하면 아래와 같은 화면이 나올것이다 Topology summary에 "onlytest"라는 이름이 있으면 성공한 것이다.




step 7.  test

 

이제 위에서 작성하였던 TestProducer를 실행시켜 "onlytest"라는 topic의 메세지가 broker에 전송하여 storm worker 로그에 잘 찍히는지 확인해 보자 log는 storm디렉토리/logs/worker-.. .log에서 확인해 볼수 있다. 

$ tail -f worker-{}.log 

 또한 storm ui 에 Topology Visualization에서도 확인해 볼 수 있다. 




예제 소스는 다음 링크에 존재한다. 


-끝-


p.s storm kafka에 대한 좀더 자세한 사항을  알아보고자 한다면 다음 레퍼런스를 참고하기 바란다. 


참고 레퍼런스:

http://storm.apache.org/index.html

https://storm.apache.org/documentation/Setting-up-development-environment.html

http://bcho.tistory.com/995

https://github.com/apache/storm/blob/master/docs/documentation/Tutorial.md

https://github.com/apache/storm/tree/master/examples/storm-starter

https://storm.apache.org/documentation/Tutorial.html

Posted by 알 수 없는 사용자
,

Reference

1. Storm-starter : https://github.com/apache/storm/tree/master/examples/storm-starter

2. Storm : https://github.com/apache/storm

3. Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm : http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/


소개

회사 업무로 Apache Storm(이하 Storm)과 관련된 프로젝트를 완료하고 휴식중에, Storm 기초를 설명할 수 있을만한 예제를 만들어보기 위해서 Storm Starter를 참조하여 간단한 프로젝트를 만들었다.

이 프로젝트는 Twitter Sample Public Status API(https://dev.twitter.com/streaming/reference/get/statuses/sample)를 사용하여 Twitter realtime stream data의 일부를 Input으로 하고, HashTag 정보를 추출한 후 일정 시간 간격(emit frequency)으로 일정 시간 동안(window length)의 일정 갯수(TOP_N)의 Top HashTag를 생성하여 출력하는 프로젝트이다.

Storm-starter project에서 많은 소스코드를 가져 왔으며 Twitter Library는 Twitter4J를 사용한다.

Project source : https://github.com/forcemax/storm_twitter_hashtag


실행하기

0. Prerequisites

Java 1.7 이상, Storm 0.9.5, Maven, Git

(Twitter API를 사용하기 위한 consumerKey, consumerSecret, accessToken, accessTokenSecret을 변경하지 않으면 실행이 안된다.)

1. 소스 가져오기

git clone https://github.com/forcemax/storm_twitter_hashtag

2. 소스 빌드하기

$ mvn clean package

3. Storm Cluster에 Topology submit

$ storm jar StormTwitterHashtag-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.embian.forcemax.twitter.StormTwitterHashtagTopologyRunner server

4. Storm UI에서 확인하기



그림. topology 목록



그림. topology 상세 정보


설명

- Rolling Count Algorithm은 Reference 3번 사이트를 참조

- Topology는 다음과 같이 구성되어 있다. Twitter API를 사용하기 위해서는 Twitter에 App 등록을 해야하며, App 등록을 하면 consumerKey, consumerSecret, accessToken, accessTokenSecret 값을 얻을 수 있다. 다음 코드에 해당 값을 넣어서 사용한다.



그림. Topology 구성도

- TwitterSpout은 Twitter4J Library를 사용하며, LinkedBlockingQueue를 사용해서 새로운 Public Status가 있을때 Queue에 저장한다. nextTuple() 호출시에 Queue에서 꺼내서 ExtractHashTagBolt에 넘긴다.

- ExtractHashTagBolt는 받은 Public Status에서 HashTag만 뽑아내서, RollingCountBolt로 넘긴다. HashTag의 갯수 만큼 emit이 발생한다.

- RollingCountBolt는 생성할 때 인자로 받은 window length와 emit frequency 값을 바탕으로, emit frequency마다 window length에 속하는 데이터에서 word별 count를 계산해서 IntermediateRankingsBolt로 넘긴다. 이때, emit frequency마다 emit을 하기 위해서 TickTuple을 사용하는데, TickTuple은 Storm 0.8에 새로 들어간 기능이며 Component(Spout, Bolt)내에서 일정 주기 마다 Tuple을 발생시키는 기능이다.

IntermediateRankingsBolt와 TotalRankingsBolt는 생성할때 인자로 topN, emit frequency를 받으며, 입력된 word별 count를 바탕으로 상위 topN개의 word와 count를 뽑아내고 emit frequency 마다 emit한다. IntermediateRankingsBolt는 parallelism hint를 크게 주어 map-reduce 구조에서 map의 역할을 하고, TotalRankingsBolt는 parallelism hint를 1로 주고 reduce의 역할을 한다. emit frequency마다 emit을 하기 위해서 RollingCountBolt와 마찬가지로 TickTuple을 사용한다.

- 마지막으로 PrinterBolt는 TotalRankingsBolt에서 emit한 Tuple을 출력하기 위해서 사용하며 특별한 기능은 없다.


이 프로젝트는 Storm의 Spout(nextTuple과 open), Bolt(execute, prepare)와 Topology wiring에 대한 이해만 있다면 코드를 보는데 아무 무리가 없을 정도로 간단한 예제이다. 그러나 외부 서비스(Twitter)과의 연계를 통한 Spout 구성, Atomic하게 역할을 분리한 Bolt, parallelism hint를 조절하여 성능을 향상시키는 방법을 확인해 보기에 알맞은 예제이다.

Twitter Sample Public Status API는 10분에 20000 statuses 정도의 데이터밖에 제공하지 않으므로, 한대의 Storm에서 처리하기에 충분하다.

초보자가 알아보기 쉽게 코드가 구성되어 있으니, Storm을 이용하여 realtime CEP 엔진을 공부하려는 분들에게 많은 도움이 되었으면 한다.


posted by forcemax

Posted by 알 수 없는 사용자
,

Storm-ESPER는 Storm과 ESPER를 이용한 실시간 데이터 처리 엔진이다. 

Storm-ESPER에 대해 궁금하다면, 엠비안에서 진행했던 Strom-ESPER 프로젝트의 Prototype과 테스트에 대한 블로그에서 참조 할 수 있다. (Storm & Esper Prototype 및 Test)

Storm-ESPER 프로젝트를 받았을 때, 프로젝트에 대한 프로토타입 테스트 결과문서, 설치문서, 설치 패키지가 존재하였다. 하지만 패키지와 문서가 CentOS를 위한 것이었기 때문에, 내 작업환경인 Ubuntu에서 실행되지 않았다. 그리하여 Storm-ESPER실행을 위한 각 모듈을 따로 컴파일하여 실행시켜줘야하는 번거로움이 있었다. 그뿐만 아니라 실행하는 과정에서 컴파일러 버전이 맞지 않거나 실행에 필요한 jar파일들이 없어서 고생하기도 했다. 

이러한 실행의 어려움을 해소하기 위해, Storm-ESPER 실행방법을 자세히 설명해 보도록 하겠다. 

모든 설치 및 실행방법은 Ubuntu시스템 기준으로 설명한다.


Storm-ESPER 실행전 필요조건 1 - 필요한 프로그램 설치

1. zookeeper-3.4.6

압축파일을 미러에서 받은 후 압축을 푼다.

$ wget http://mirrors.ukfast.co.uk/sites/ftp.apache.org/zookeeper/stable/zookeeper-3.4.6.tar.gz  
$ tar -xvf zookeeper-3.4.6.tar.gz  

config 파일을 생성한다. 

$ vi conf/zoo.cfg

# The number of milliseconds of each tick  
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

Zookeeper를 실행한다.

$ ~/zookeeper-3.4.6/bin/zkServer.sh start

2. RabbitMQ Server

RabbitMQ는 repository에서 설치할 수 있다.

$ apt-get install rabbitmq-server

 WebUI Admin plugin을 설치하여 모니터링을 할 수 있게 한다. 플러그인 설치 후 리스타트를 해준다.

$ rabbitmq-­plugins enable rabbitmq_management 
service rabbitmq­-server restart

3. Apache Storm 0.9.4

압축파일을 미러에서 받은 후 압축을 푼다. (이번 테스트에서 사용한 Storm버전은 0.9.4이다.)

$ wget http://mirror.apache-kr.org/storm/apache-storm-0.9.4/apache-storm-0.9.4.tar.gz 
$ tar -xvf apache-storm-0.9.4.tar.gz

conf 파일을 변경해준다.

 $ vi ~/apache-storm-0.9.4/conf/storm.yaml

########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
    - zookeeper가 설치되어있는 호스트 ip
#     - "localhost"

 nimbus.host: nimbus가 설치되어있는 호스트 ip


Storm-ESPER 실행전 필요조건 2 - 파일수정 및 Storm 실행

1. conf 파일 수정 및 라이브러리 파일 추가

먼저 Storm-ESPER를 컴파일하기 전에  conf파일을 수정해야한다. "storm-esper"프로젝트의 conf폴더에 있는 yaml파일들을 조건에 맞게 수정해 준다. 아래의 예시를 따라서 본인의 ip를 사용하면 된다.

~/storm-esper/trunk/conf/deploy.yaml

~/storm-esper/trunk/conf/nimbus_conf.yaml


~/storm-esper/trunk/conf/supervisor_conf.yaml

~/storm-esper/trunk/conf/esper_supervisor_conf.yaml


EPL명령어 제어를 위한 "storm-esper-client" 프로젝트의 conf폴더의 conf.yaml파일을 수정해 준다.

~/storm-esper-client/trunk/conf/conf.yaml

rabbitmq.server.queue.name: "cmd_queue"
rabbitmq.server.queue.ip: "192.168.0.28"
rabbitmq.server.username: "guest"
rabbitmq.server.userpass: "guest"

2. pom.xml 파일 수정 

"storm-esper" 프로젝트의 pom.xml을 다음과 같이 수정해준다.

기존 코드:
  <dependency>
    <groupId>storm</groupId>
    <artifactId>storm</artifactId>
    <version>0.8.2</version>
    </dependency>

수정 코드:
    <dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.9.4</version>
    </dependency>


3. jar파일 추가

Storm-ESPER엔진을 실행하기 전에 custom scheduler로 EsperBolt를 특정 supervisor(esper-supervisor)에 할당하기 위하여 Nimbus용 jar파일을 생성하여 Storm 라이브러리에 위치시켜야 한다. 또한, Storm-ESPER연동을 위하여 Esper와 RabbitMQ  라이브러리를 Storm 라이브러리에 복사하여야 한다.

Apache Storm 0.9.4가 설치되어있는 폴더 내 lib 폴더에 라이브러리 파일을 추가하는 법은 아래와 같다. 

1. "storm-esper-scheduler"프로젝트를 컴파일 하여 jar파일 생성 및 라이브러리 파일 복사

$ cd ~/storm-esper-scheduler/trunk
$ mvn clean dependency:copy-dependencies install
$ cd target
$ cp storm-esper-scheduler-0.0.1.jar ~/apache-storm-0.9.4/

2. "storm_esper" 프로젝트를 컴파일 하여 dependency jar파일 생성 및 라이브러리 파일 복사

$ cd ~/storm_esper/trunk
$ mvn clean dependency:copy-dependencies install
$ cd target/dependency
$ cp amqp-client-3.1.4.jar cglib-nodep-2.2.jar antlr-runtime-3.2.jar esper-4.10.0.jar ~/apache-storm-0.9.4/lib/

4. Storm실행

이제 Storm을 실행할 준비가 되었다. 아래의 명령어로 Storm을 실행한다. 테스트용 실행방법이므로 Nimbus, Supervisor, UI를 전부 한대의 머신에서 실행해 보겠다. 

$ ~/apache-storm-0.9.4/bin/storm nimbus &
$ ~/apache-storm-0.9.4/bin/storm supervisor &
$ ~/apache-storm-0.9.4/bin/storm ui &


Strom-ESPER 실행



<그림 1. Storm-ESPER 동작 구성도>


1. DataThrower

테스트용 더미 데이터를 DataThrower를 사용하여 RabbitMQ에 전송해야 한다. 더미 데이터는 DataThrower프로젝트를 컴파일하여 생성된 jar파일을 실행하여 "in_queue"에 전송한다. 이때 throw.conf파일의 "inqueueName":"in_queue"로 수정해준다.

$ java -cp ~/maven_data/trunk/target/maven_data-0.0.1-SNAPSHOT.jar com.embian.maven_data.tester.DataThrower ~/maven_data/conf/throw.conf >> ~/maven_data/logs/throw.log

2. submit topology

"storm-esper"프로젝트를 컴파일하여 생성된 토폴로지를 스톰에 올려주는 스크립트를 실행한다.

$ sh ~/storm_esper/trunk/etc/submit_topology.sh

3. EPL command 실행

"storm-esper-client"프로젝트를 다음의 명령어로 컴파일한다.

$ cd ~/storm-esper-client/trunk
$ mvn clean dependency:copy-dependencies install -Dmaven.test.skip=true

첨부된 스크립트를 통하여 사용할 EPL을 적용하면 된다.

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"EPL command", "params":{"bolt_id":"볼트 명", "statement":{"name":"커맨드 이름", "body":"EPL 쿼리", "state":"STARTED", "do_output": true/false}}}'


4. DataReceiver 

Storm-ESPER가 처리한 데이터는 "out_queue"에 누적된다. DataReceiver는 단순하게 "out_queue"의 데이터를 읽어와서 파일로 저장한다. DataReceiver 프로젝트를 컴파일하여 생성된 jar파일을 실행하여 파일로 저장한다.

$ java -cp ~/maven_data/trunk/target/maven_data-0.0.1-SNAPSHOT.jar com.embian.maven_data.tester.DataReceiver ~/maven_data/conf/receive.conf >> ~/maven_data/logs/receive.log


예제 실행

간단한 예제를 통하여 실행이 잘 되는지 확인해 보자.

시나리오

1. nginx의 access log를 JSON으로 만든 데이터 스트림에서 5분동안 응답시간이 1초가 넘는 Request의 통계를 출력

2. 5분동안 Page A -> Page B -> Page C의 경로로 움직인 IP에 대해서 출력

nginx의  access log를 JSON으로 만든 데이터 스트림 샘플

{"status": "200", "body_bytes_sent": "9784", "remote_user": " ", "request_time": "0.000", "http_referer": "http://www.mmm.com/bbs/board.php?page=A&wr_id=1", "remote_addr": "192.168.1.1", "request": "GET /skin/board/aa.aaa/aa.js/aaa.js?time=1434393200 HTTP/1.1", "http_user_agent": "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko", "upstream_response_time": "-", "time_local": "16/Jun/2015:03:33:20 +0900"}

사용한 command를 초기화 하려면 다음을 실행한다.

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"clean", "params":{}}'


0. LogEvent스키마 등록

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"CreateLogEventType", "body":"create schema LogEvent(status string, body_bytes_sent string, remote_user string, request_time string, http_referer string, remote_addr string, request string, http_user_agent string, upstream_response_time string, time_local string)", "state":"STARTED", "do_output":false}}}'

1. nginx의 access log를 JSON으로 만든 데이터 스트림에서 5분동안 응답시간이 1초가 넘는 Request의 통계를 출력

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"Senario1-1", "body":"SELECT count(remote_addr) FROM LogEvent.win:time(5 min) WHERE cast(request_time,float) >= 1.000 ", "state":"STARTED", "do_output":true}}}'

2. 5분동안 Page A-> Page B-> Page C의 경로로 움직인 IP에대해서 출력

$ sh ~/storm-esper-client/trunk/etc/run_tester.sh '{"cmd":"add_statement", "params":{"bolt_id":"esper-bolt-1", "statement":{"name":"Senario1-2", "body":"SELECT target3.remote_addr, target3.request from  pattern [  every target=LogEvent(request like \"%page=A%\") -> (target2=LogEvent(request like \"%page=B%\" and target.remote_addr = target2.remote_addr ) -> target3=LogEvent(http_referer like \"%page=C%\" and target.remote_addr = target2.remote_addr and target2.remote_addr=target3.remote_addr))  ].win:time(5 min)", "state":"STARTED", "do_output":true}}}'


결과물은 DataReceiver를 통하여 파일로 저장된 내역에서 확인이 가능하다.

시나리오 1. 결과물

--queueServerAddr  :  192.168.0.28
--queueName        :  out_queue

{"newEvents":[{    "count(remote_addr)": 1
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 2
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 3
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 4
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 5
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 6
}],"oldEvents":[]}
{"newEvents":[{    "count(remote_addr)": 7
}],"oldEvents":[]}

시나리오 2. 결과물

{"newEvents":[{    "target2.remote_addr": "173.254.216.67",
    "target2.request": "GET page=C HTTP/1.0"
}{    "target2.remote_addr": "173.254.216.67",
    "target2.request": "GET page=C HTTP/1.0"
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "203.133.168.168",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "203.133.168.168",
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "211.53.23.131",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "211.53.23.131",
    "target2.request": "GET /page=C HTTP/1.1"
}{    "target2.remote_addr": "211.53.23.131",
}],"oldEvents":[]}
{"newEvents":[{    "target2.remote_addr": "66.249.71.111",
    "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "66.249.71.111",
      "target2.request": "GET page=C HTTP/1.1"
}{    "target2.remote_addr": "66.249.71.111",}],"oldEvents":[]}


간편한 실행이 어려웠고, 새로운 시스템을 학습해가며 접했기 때문에, 꽤 애를 먹은 프로젝트 실행을 거쳤다. 스크립트가 있어서 그나마 편하게 실행을 할 수 있었던 것 같다. 위의 설치 및 실행기는 단독 머신에서 실행을 기준으로 설명되어있지만, 여러대의 머신에서도 설정파일만 적용해주면 동일하게 실행할 수 있다. 

위의 실행방법을 하나씩 따라해보면 실제로는 실행하기 어렵지 않을 것으로 보인다. 이상으로 Storm-ESPER의 실행기를 마친다.


Posted by jjangAh
,

시스템 에러 탐지 및 금융사기 탐지, 자동화 시스템의 상태 탐지 등의 분야에서 실시간 데이터 분석은 필수적이나, 데이터량의 폭증에 따라 기존의 데이터 분석 시스템은 그 한계를 드러내고 있다.

이러한 한계를 극복하기 위하여 Big Data를 활용한 준 실시간 분석 시스템이 적용되고 있으나, 이 역시 Batch 형식의 작동방식에 따른 근본적인 문제점을 가지고 있어, 진정한 실시간 분석으로 보기는 어렵다.

따라서 Time Critical한 분야에서의 대용량 데이터에 대한 실시간 스트리밍 분석은 향후 시장에서의 주요한 기술로서 가치가 있다.

 

엠비안에서는 Distributed Realtime Computation System Storm과 대표적인 CEP엔진인 Esper를 사용하여 Big Stream을 실시간 분석하는 기술에 대해서 연구를 하였다. 연구의 주요 목표는 Storm의 비즈니스 환경에서 유연하지 못한 단점과 Esper Scale out의 한계를 서로 보완할 수 있는 Architecture를 만드는 것이다.

 

여기서는 엠비안에서 진행했던 Storm Esper의 결합에 대한 연구 과정 및 결과에 대해서 간단히 알아보도록 하자.



사용자 설정 기반의 실시간 Big Data 분석 시스템 Prototype Architecture

 

ü  분산 처리 환경을 통하여 초당 10,000건의 대용량 Log 를 처리

ü  유입된 Log Data Batch 형태가 아닌 실시간 Streaming 방식으로 처리

ü  실시간 Streaming 처리의 진행 상황을 API로 제공

ü  이용자가 지정한 규칙(Rule)에 따라 유입된 로그에 대한 분석

4가지 목표로 모든 데이터(Event , 모니터링, 설정 데이터)MessageQueue를 통하여 전달되는 Prototype Architecture를 마련하였으며 테스트 진행하였다.



테스트는 Prototype Architecture Engine 부분에 변화(Esper, Storm&Esper)를 주며 진행하였다.

  

첫번째 테스트는 Engine 부분을 Esper 만 사용하여 테스트를 진행하였으며 구성은 다음과 같다.

  


1.     Esper : EPL이 다양한 시나리오를 수용할 수 있는지 확인하기 위한 테스트

 우선 3가지의 테스트 시나리오를 마련하여 EPL statement로 작성하였다. EPL statement로 처리한 결과인 Output Data Input Data Hit ratio와 대조하는 방법을 통해 EPL의 정합성을 확인하고자 하였다.

 테스트 결과 3가지 시나리오 모두 예상 결과와 일치하였다. 따라서 다양한 시나리오가 EPL statement로 표현이 충분히 가능할 것으로 보인다.

  

2.     Esper : 시스템 unit 고안하기 위한 테스트 

시스템 resource에 영향을 줄 수 있는 요인

ü  Input Data

ü  Hit Ratio

ü  EPL 종류

ü  EPL 개수

ü  Window Time

ü  Heap Memory

에 변화를 주어 resource의 변화(CPU를 사용률, Memory 사용량)를 측정하였다.

 

테스트 결과는 다음과 같다.

ü  Input Data의 증가량에 비례하여 CPU 사용률과 Memory 사용량이 증가한다.

ü  Hit Ratio CPU, Memory 변화량에 크게 영향을 주지 않는 것으로 보인다.

ü  Input Data에 영향을 미치는 조건 (예를 들면 window time, new DataEvent)을 배재한 EPL CPU, Memory 변화량에 크게 영향을 주지 않는 것으로 보인다.

ü  동일한 EventData를 사용할 경우 EPL의 갯수와 상관없이 EPL   max(window time)과 비례한다.

ü  Window time이 증가할수록 CPU사용율, Memory 사용량이 증가한다.

ü  Heap Memory 량은 Memory 사용량에는 영향을 주지 않는 것으로 보인다.

 

따라서, 시스템 resource에 영향을 주는 요인은 Input Data량의 변화이며, Input Data의 증가량에 비례하여 resource 사용률이 증가함을 확인하였다.

 

위 결과를 바탕으로 “10,000/sec 데이타를 window time 1sec 가지고 있는 1개의 statement”1ss로 정의하는 시스템 unit을 고안해 낼 수 있었다.

 1ss의 경험은 현 시스템에서 어느 정도의 데이타 처리가 가능한지 예측 가능하게 한다.

예를 들어, 테스트 시스템(4Core, Mem 16GB, EventData 10,000/sec) 1ss의 메모리 사용량을 계산하면 (참고로 0ss일때 Memory 사용량은 약 50M) 16M이므로 이 시스템은 약 1000ss 시스템으로 예측 할 수 있다.

 

 

두번째 테스트는 Engine 부분을 Storm&Esper 사용하여 테스트를 진행하였으며 구성은 다음과 같다.

 


1.     Storm&Esper : Scale out 가능한 Topology 구성 테스트

Storm&Esper 를 사용했을 때 아래 6가지 테스트를 통해 Scale out 가능한 Topology 구성을 알아내고자 하였다.

 

ü  Pre Test : Storm Single Test



Noop”(초당 3 tuple을 보내는 Bolt) CPU Memory 사용률을 측정하여 Storm&Esper를 사용함으로써 증가되는 CPU Memory 사용률 확인하였다.

 

ü  Set0 : 1up Test (1 Unit Processing)



1대의 서버에서 1개의 Esper Bolt3가지 테스트 시나리오의 EPL statement를 구동하여 “Esper Bolt” CPU, Memory 사용률을 측정했다.

이를 통해 Esper 단독 테스트의 동일한 조건(모든 EPL 구동)의 결과와 비교하여 1up

resource 수치를 확보하였다.

 

ü  Set1 : Rule Per Bolt Test (Rule 분산 Test)



     여러개의 EPL Statement를 여러개의 Esper Bolt에 분산하여 처리하는 것이 가능하며

     resource 사용에는 효율적인지 알아보기 위한 테스트를 진행하였다.

 

그 결과 분산 처리는 가능하나 resource 사용에는 효율적으로 보이지 않았다.

 

ü  Set2 : Rule Partition Test



     1개의 rule “context EPL statement”를 통해 병렬 처리 가능한지 알아보려고 하였으나

     다음의 2가지의 문제로 진행하지 못하였다.

     - Context granularity option “Esper Document”에서 명시한 대로 동작하지 않았다.

     - Context filter option은 적용에 제약 사항이 있을 수 있다.

 

 

Storm Esper의 결합에 대한 연구에 대한 결론

 

Storm Esper의 결합에 대한 연구는 글 초반에 언급한 4가지 목표를 가지고 진행되었다.

이에 위의 3가지 테스트를 통해 다음과 같은 결론이 도출되었으며 그에 따른 Prototype Architecture이 마련되었다.

 

1.     여러 시나리오에 대해 EPL로 표현은 충분히 가능한가?
충분이 가능하며 Context, Enumeration method(lamda expression), Pattern expression, match and recognize 등 다양한 표현 방법과 match 방법 제공을 통해 확장적이 표현 또한 가능하다.
그러나 사용자 별로 EPL의 이해도가 달랐을 때 Performance의 영향을 줄 수 있어 주의가 필요하다.

 

2.     초당 3,000건의 데이타 처리가 가능한가?
Stream Processing
에서는 대량의 데이타가 시스템에 유입 할 때 크게 느는 것은 메모리에 대한 요구사항이다.
이에 1 Second  Statement’(1 SS) 라는 시스템 Unit을 고안하여 시스템 사양에 따른 데이타 처리량을 예측하도록 하였다.
따라서, 앞서 진행한 테스트 시스템(4Core, Mem16GB, Heap Mem 12G)을 기준으로 약4~5분의 window time에서 초당 10,000건의 데이타 처리도 가능하다는 것을 예측할 수 있다.

 

3.     scale-out 가능한 Topology 구성은 무엇인가?
resource
측면에서는 효율적이지는 못했지만 Rule 갯수 만큼 Scale out이 가능함을 확인하였다.
또한 Rule resource에 영향을 주는 "window time"을 기준으로 분산하는 것이 적당하다고 판단된다.

 

4.     Rule의 동적 설정이 가능한가?
Esper Engine
EPL Statement의 동적 설정(추가, 삭제, 상태변경 등)에 필요한 API를 제공한다.
따라서, 아래 그림과 같은 Rule 분산을 통한 Esper Bolt Scale out이 가능하고 storm stream 을 이용하여 EPL statement 동적 설정이 가능한 prototype을 제작하였다.

 


 

이번 연구성과물의 적용범위

 

상기 프로젝트의 결과물을 Big Data Log형 데이타의 실시간 분석에 사용될 수 있으며, 예로는 

ü  Enterprise 환경에서 Application Server들의 통합적인 기능오류 분석

ü  Intrusion Detection

ü  상기 기능들을 포함하고 Rule Life Cycle을 고려한 Commerical Package로 재구성

등에 쓰일 수 있다.

 

연구 결과물은 Commerical Package가 아닌 Prototype인 관계로 실제 사이트 적용시에는  반제품의 형식으로 System Integration이 필요하나, 우리의 결과물이 그러하듯이 기본적인 Scaling Storm단에서 이루어지기 때문에 Storm Spout Bolt Chaining하여 서비스에서 필요한 Scale Out을 쉽게 구현할 수 있을 것이다.

Posted by 알 수 없는 사용자
,