Storm + Kafka 연동하기

 실시간 처리를 위하여 Storm을 분석하고 있습니다.  

 Spark가 배치성 처리를 위한다면, Storm은 더욱 실시간성 처리가 가능할 것 같아서 분석중입니다. 실시간 처리를 하려면 일단 Kafka와 연결을 해야하는데, Storm의 홈페이지에는 별로 자세한 내용이 없고, 구글검색을 해도 정확하게 찾는 내용이 없는 것 같아 직접 글을 써볼까 합니다.


 저의 개발환경은 Spark와 마찬가지로 Intellij의 Maven을 활용합니다. 


새로운 java 프로젝트를 만듭니다. 저는 1.8 버전으로 만들었습니다. 그리고 pom.xml에 Dependency를 추가합니다 .

Storm으로 개발 할 것이기 때문에, storm-core를 넣고, 지난 spark때 사용했던 maven-shade-plugin도 추가합니다. 


<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*.*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META_INF/*.DSA</exclude>
<exclude>META_INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<finalName>${project.artifactId}-cyberx-${project.version}</finalName>
</configuration>
</plugin>
</plugins>


 그리고 이제부터가 헤맨 부분입니다.  

Kafka를 연동해야 하는데, 어떤 Dependency를 추가해야 할지 모르겠습니다. 

Storm의 문서를 찾아보니 http://storm.apache.org/releases/1.1.0/storm-kafka.html  여기에 Kafka 내용이 있습니다. 

 <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


이렇게 추가를 하면 된다는군요. 

저는 consumer를 사용할 것 이기 때문에 http://storm.apache.org/releases/1.1.0/storm-kafka-client.html 이 페이지의 내용도 추가하겠습니다. 

  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>


 그러면 이제 Kafka 개발할 준비가 되었습니다. 이제 Kafka가 실제로 잘 돌아가는지에 대한 예제 코드를 만들어보겠습니다. 실제 작업을 처리할 Bolt들 부터 만들겠습니다. 예제는 역시 WordCount가 만만하기 때문에 WordCount로 선택했습니다. 

(예제 소스는 https://www.tutorialspoint.com/apache_kafka/apache_kafka_integration_storm.htm 이곳을 참조 하였습니다)


 먼저 데이터를 스페이스로 구분하여 Split 하는 Bolt를 만듭니다. IRichBolt를 implements하고, 함수들을 선언하여야 합니다. 

import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.task.TopologyContext;

import java.util.Map;

public class SplitBolt implements IRichBolt{
private OutputCollector collector;


public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}

public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");

for(String word: words) {
word = word.trim();

if(word.length() != 0) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}

collector.ack(input);
}

public void cleanup() {

}

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}

public Map<String, Object> getComponentConfiguration() {
return null;
}
}

 실제 구현 부분은 execute 함수에 있고, declareOutputFields 함수에 "word"라는 필드로 선언하였습니다.  

 다음으로 Split되어 이제 한단어로 표시된 word들을 Count하는 Bolt를 만듭니다. 


package com.cyber.kafkaStorm;

import org.apache.storm.tuple.Tuple;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.task.TopologyContext;

import java.util.Map;
import java.util.HashMap;


public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;


public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.counters = new HashMap<String, Integer>();
this.collector = outputCollector;
}

public void execute(Tuple input) {
String str = input.getString(0);

if(!counters.containsKey(str)) {
counters.put(str, 1);
} else {
Integer c = counters.get(str) + 1;
counters.put(str, c);
}

collector.ack(input);
}

public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()) {

    System.out.println(entry.getKey() + " : " + entry.getValue());
    }

}

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}

public Map<String, Object> getComponentConfiguration() {
return null;
}
}

 counter를 Map<String, Integer> 형식으로 만들었고, 데이터가 없으면 단어와 카운트1을 저장하고, 데이터가 이미 존재하면 카운트만 1 증가시킵니다. 역시 execute 함수에 정의합니다. 

cleanup 함수는 작업이 최종 종료될 때 실행되는 함수 입니다. 여기서는 Map의 내용을 프린트 합니다. 



이제 Bolt들은 준비가 되었고, Spout과 Topology만 완성하면 됩니다. Spout은 Kafka가 되기 때문에 Topology만 만들겠습니다. 

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.Broker;
import org.apache.storm.kafka.StaticHosts;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.KafkaConfig;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.StringScheme;


/**
* Created by hsbjjang on 2017-07-19.
*/
public class KafkaStormTopology {
public static void main(String[] args){
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);

String zkConnString = "주키퍼주소";
String topic = "토픽명";
BrokerHosts hosts = new ZkHosts(zkConnString);

SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" + topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter");

///* 배포 방식 바꾸기
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSampleCyber", config, builder.createTopology());
try { Thread.sleep(10000); } catch (InterruptedException e) { } // waiting 10s
cluster.killTopology("KafkaStormSampleCyber");
cluster.shutdown();



}
}


중간에 "주키퍼주소", "토픽명" 부분은 자기 환경에 맞춰서 넣어주면 됩니다. 

kafka config를 만들어서 작성하고, Spout은 Kafka로, 그리고 Bolt들을 순서대로 연결해주었습니다. 

그리고 10초간 기다리다가 10초가 지나면 Topology를 종료합니다. 


Kafka Topic을 만들어서 아무 데이터나 넣고, Storm을 실행합니다. 저는 jar파일로 묶어서, 리눅스 서버에서 실행하였습니다.

storm jar jar파일명 패키지명.mainClass명

 

그러면 실행결과 가 나옵니다.

 

 

 

이상 Storm과 kafka 연동하여 WordCount 예제였습니다. 

 


 



 








'빅데이터' 카테고리의 다른 글

도커 공인 ip 설정  (0) 2017.08.04
스마트의 시작, Ontology_4  (0) 2017.07.26
Storm + Kafka 연동하기  (0) 2017.07.21
docker 이해  (0) 2017.07.07
스마트의 시작, Ontology_3  (0) 2017.06.30
Spark 개발환경 구축 - Zeppelin Spark Interpreter에 HBase 연결하기  (0) 2017.06.23

New Multi-Channel Dynamic CMS