Apache Flume
<그림 출처 : https://flume.apache.org/FlumeUserGuide.html>
저번 글에서는 간단하게 flume이 무엇인지 그리고 어떻게 사용되는지에 대해 알아보았습니다. 이번에는 이미 정해진 동작을 수행하는 flume이 아닌 사용자가 원하는 대로 구성을 할 수 있는 custom 형태의 source, sink를 구성 해보겠습니다.
위와 같은 형태로 구성을 해보고 실제 동작시켜 보는 것까지 하려고 합니다.
예제의 데이터 흐름은 아래와 같습니다.
<!--[if !supportLists]-->1. <!--[endif]-->Source는 클라이언트가 UDP서버로 동작하는 custom source에 키보드로 입력한 데이터를 받게 됩니다.
<!--[if !supportLists]-->2. <!--[endif]-->Channel은 source로부터의 데이터를 memory channel에 담게 됩니다.
<!--[if !supportLists]-->3. <!--[endif]-->Sink는 그리고 Channel에 담겨진 데이터를 Custom sink에서 읽어와 화면에 출력해줍니다.
소스코드
flume 공식사이트인 http://flume.apache.org/FlumeDeveloperGuide.html 에서 custom 작성과 관련된 정보를 얻을 수 있습니다.
클라이언트는 go로 작성하였고 나머지는 flume에서 쓰기 위해 java로 작성하였습니다.
package main
import (
"fmt"
"net"
"time"
"os"
"bufio"
)
func CheckError(err error) {
if err != nil {
fmt.Println("Error: " , err)
}
}
func main() {
ServerAddr,err := net.ResolveUDPAddr("udp","127.0.0.1:10001")
CheckError(err)
LocalAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
CheckError(err)
Conn, err := net.DialUDP("udp", LocalAddr, ServerAddr)
CheckError(err)
defer Conn.Close()
for {
temp := bufio.NewReader(os.Stdin)
msg,_ := temp.ReadString('\n')
buf := []byte(msg)
_,err := Conn.Write(buf[:len(buf)-1])
if err != nil {
fmt.Println(msg, err)
}
time.Sleep(time.Second * 1)
}
}
<UDPSource.java>
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
public class UDPSource extends AbstractSource implements Configurable, PollableSource {
private String servPort;
private DatagramSocket socket;
private Channel memCh;
@Override
public void configure(Context context) {
String optPort = context.getString("port", "10001");
this.servPort = optPort;
}
@Override
public void start() {
try {
System.out.println(Integer.parseInt(servPort));
socket = new DatagramSocket(Integer.parseInt(servPort));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void stop () {
}
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Event event = new SimpleEvent();
Map<String, String> header = new HashMap<>();
header.put("recv","TEST");
try {
while(true) {
try {
byte[] msg = new byte[1024];
DatagramPacket packet = new DatagramPacket(msg, msg.length);
socket.receive(packet);
event.setHeaders(header);
event.setBody(packet.getData());
getChannelProcessor().processEvent(event);
} catch (Exception e) {
e.printStackTrace();
status = Status.BACKOFF;
break;
}
}
} catch (Throwable t) {
status = Status.BACKOFF;
if (t instanceof Error) {
throw (Error)t;
}
}
return status;
}
public long getMaxBackOffSleepInterval() {
return 1;
}
public long getBackOffSleepIncrement() {
return 1;
}
}
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class PrintSink extends AbstractSink implements Configurable {
@Override
public void configure(Context context) {
}
@Override
public void start() {
super.start();
}
@Override
public void stop () {
super.stop();
}
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
Event event = ch.take();
if(event != null) {
String msg = new String(event.getBody());
System.out.println(msg);
} else {
status = Status.BACKOFF;
}
txn.commit();
} catch (Exception e) {
if (txn != null) {
txn.commit();
}
e.printStackTrace();
} finally {
if (txn != null) {
txn.close();
}
}
return status;
}
}
컴파일
각 java 파일을 컴파일을 하여 class파일을 만든 후 jar로 만들면 됩니다.
먼저 컴파일을 수행할 때 classpath에 flume의 lib폴더를 지정해주어야 정상적으로 컴파일이 됩니다.
#javac -cp "../../lib/*" PrintSink.java
그 다음 만들어진 class file을 jar로 만들어 줍니다.
jar -cvf PrintSink.jar PrintSink.class
jar파일이 만들어지면 flume의 lib 폴더에 이동시켜 줍니다.
mv PrintSink.jar ../../lib
각 source, sink에 대해서 위 작업을 똑같이 해줍니다.
실행 및 결과
실행시킬 agent의 설정을 아래와 같이 구성해 줍니다.
<flume.conf>
그러면 이제 flume을 실행시켜 결과를 확인해 보도록 하겠습니다.
flume에서 코드에서 정의된 대로 열려진 포트를 출력하고 클라이언트에서 입력한 hello와 world가 각각 정상적으로 화면에 출력되는 것을 볼 수 있습니다.
이렇게 사용법이 고정되어있지 않고 사용자 입맛에 맞춰 기능을 정의할 수 있도록하는 기능을 통한 유연성 제공이 flume의 큰 장점이지 않을까 생각됩니다.