빅데이터

데이터 수집 – flume [2/2]

CyberI 2017. 6. 9. 15:21

Apache Flume

<그림 출처 : https://flume.apache.org/FlumeUserGuide.html>

저번 글에서는 간단하게 flume이 무엇인지 그리고 어떻게 사용되는지에 대해 알아보았습니다. 이번에는 이미 정해진 동작을 수행하는 flume이 아닌 사용자가 원하는 대로 구성을 할 수 있는 custom 형태의 source, sink를 구성 해보겠습니다.



위와 같은 형태로 구성을 해보고 실제 동작시켜 보는 것까지 하려고 합니다.

예제의 데이터 흐름은 아래와 같습니다.

<!--[if !supportLists]-->1.     <!--[endif]-->Source는 클라이언트가 UDP서버로 동작하는 custom source에 키보드로 입력한 데이터를 받게 됩니다.

<!--[if !supportLists]-->2.     <!--[endif]-->Channelsource로부터의 데이터를 memory channel에 담게 됩니다.

<!--[if !supportLists]-->3.     <!--[endif]-->Sink는 그리고 Channel에 담겨진 데이터를 Custom sink에서 읽어와 화면에 출력해줍니다.



소스코드

flume 공식사이트인 http://flume.apache.org/FlumeDeveloperGuide.html 에서 custom 작성과 관련된 정보를 얻을 수 있습니다.

클라이언트는 go로 작성하였고 나머지는 flume에서 쓰기 위해 java로 작성하였습니다.


<Client.go>

package main
 
import (
"fmt"
"net"
"time"
"os"    
"bufio"
)
 
func CheckError(err error) {
    if err  != nil {
        fmt.Println("Error: " , err)
    }
}
 
func main() {
    ServerAddr,err := net.ResolveUDPAddr("udp","127.0.0.1:10001")
    CheckError(err)
 
    LocalAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
    CheckError(err)
 
    Conn, err := net.DialUDP("udp", LocalAddr, ServerAddr)
    CheckError(err)
 
    defer Conn.Close()
    for {
        temp := bufio.NewReader(os.Stdin)
        msg,_ := temp.ReadString('\n')
        buf := []byte(msg)
        _,err := Conn.Write(buf[:len(buf)-1])
        if err != nil {
            fmt.Println(msg, err)
        }
        time.Sleep(time.Second * 1)
    }
}


<UDPSource.java>

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;

import java.util.HashMap;
import java.util.Map;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;


public class UDPSource extends AbstractSource implements Configurable, PollableSource {

  private String servPort;
  private DatagramSocket socket;
  private Channel memCh;

  @Override
  public void configure(Context context) {
    String optPort = context.getString("port", "10001");
    this.servPort = optPort;
  }

  @Override
  public void start() {
    try {
    System.out.println(Integer.parseInt(servPort));
    socket = new DatagramSocket(Integer.parseInt(servPort));
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Override
  public void stop () {
  }

  @Override
  public Status process() throws EventDeliveryException {

    Status status = Status.READY;
    Event event = new SimpleEvent();
    Map<String, String> header = new HashMap<>();
    header.put("recv","TEST");

    try {
      while(true) {
        try {
          byte[] msg = new byte[1024];
          DatagramPacket packet = new DatagramPacket(msg, msg.length);
          socket.receive(packet);
          event.setHeaders(header);
          event.setBody(packet.getData());
          getChannelProcessor().processEvent(event);
        } catch (Exception e) {
          e.printStackTrace();
 status = Status.BACKOFF;
          break;
        }
      }
    } catch (Throwable t) {
       status = Status.BACKOFF;
       if (t instanceof Error) {
       throw (Error)t;
       }
    }
    return status;
  }
  public long getMaxBackOffSleepInterval() {
    return 1;
  }
  public long getBackOffSleepIncrement() {
    return 1;
  }
}



<PrintSink.java>

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class PrintSink extends AbstractSink implements Configurable {
  @Override
  public void configure(Context context) {
  }

  @Override
  public void start() {
   super.start(); 
  }

  @Override
  public void stop () {
   super.stop();
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = Status.READY;
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
 Event event = ch.take();
      if(event != null) {
        String msg = new String(event.getBody());
        System.out.println(msg);
      } else {
        status = Status.BACKOFF;
      }
      txn.commit();
    } catch (Exception e) {
      if (txn != null) {
        txn.commit();
      }
      e.printStackTrace();
    } finally {
      if (txn != null) {
        txn.close();
      }
    }
    return status;
  }
}




컴파일

각 java 파일을 컴파일을 하여 class파일을 만든 후 jar로 만들면 됩니다.

먼저 컴파일을 수행할 때 classpath에 flume의 lib폴더를 지정해주어야 정상적으로 컴파일이 됩니다.

#javac -cp "../../lib/*" PrintSink.java

그 다음 만들어진 class file을 jar로 만들어 줍니다.

jar -cvf PrintSink.jar PrintSink.class

jar파일이 만들어지면 flume의 lib 폴더에 이동시켜 줍니다.

mv PrintSink.jar ../../lib

각 source, sink에 대해서 위 작업을 똑같이 해줍니다.



실행 및 결과


실행시킬 agent의 설정을 아래와 같이 구성해 줍니다.

<flume.conf>


그러면 이제 flume을 실행시켜 결과를 확인해 보도록 하겠습니다.


flume에서 코드에서 정의된 대로 열려진 포트를 출력하고 클라이언트에서 입력한 hello와 world가 각각 정상적으로 화면에 출력되는 것을 볼 수 있습니다.

이렇게 사용법이 고정되어있지 않고 사용자 입맛에 맞춰 기능을 정의할 수 있도록하는 기능을 통한 유연성 제공이 flume의 큰 장점이지 않을까 생각됩니다.