flume系列(四) 有更新!

  |   1 评论   |   1,521 浏览

37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!

四、自定义拦截器示例

该示例作为一个解析json格式为按\t分隔的行(只解析第一级)。作为拦截器解析消息行。代码如下:

1. 继承Interceptor接口

ParaserInterceptor 

package com.boom.flume.interceptor;

 

 

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Set;

 

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

 

import com.google.common.base.Charsets;

import com.google.gson.JsonElement;

import com.google.gson.JsonObject;

import com.google.gson.JsonParser;

import com.google.gson.JsonSyntaxException;

public class ParaserInterceptor implements Interceptor {

 

public void close() {

// TODO Auto-generated method stub

}

 

public void initialize() {

// TODO Auto-generated method stub

}

 

public Event intercept(Event event) {

String message = new String(event.getBody(), Charsets.UTF_8);

JsonParser jsonParser = new JsonParser();

JsonElement element = null;

try {

element = jsonParser.parse(message);

catch (JsonSyntaxException e) {

return event;

}

if (element.isJsonObject()) {

JsonObject jsonObject = element.getAsJsonObject();

Set<Entry<String, JsonElement>> set = jsonObject.entrySet();

List<String> list = new ArrayList<String>();

Map<String, String> dataMap = new HashMap<String, String>();

for ( Entry<String, JsonElement> entry: set) {

list.add(entry.getKey());

dataMap.put(entry.getKey(), entry.getValue().toString());

}

Collections.sort(list);

StringBuffer sg = new StringBuffer();

for (int i = 0; i < list.size(); i++) {

sg.append(dataMap.get(list.get(i)));

if(i!=list.size()-1){

sg.append("\t");

}

}

event.setBody(sg.toString().getBytes( Charsets.UTF_8));

}

return event;

}

 

public List<Event> intercept(List<Event> arg0) {

List<Event> resList = new ArrayList<Event>();

for (Event event : arg0) {

resList.add(intercept(event));

}

return resList;

}

public static class Builder implements Interceptor.Builder{  

        public Interceptor build() {  

            return new ParaserInterceptor();  

        }  

  

 

public void configure(Context arg0) {

}  

    }

public static void main(String[] args) {

String message = "{'003':0.5,'001':0.8,'002':{'00201':12,'00202':34},'006':'带宽信息','004':'flume'}";

JsonParser jsonParser = new JsonParser();

JsonElement element = jsonParser.parse(message);

if (element.isJsonObject()) {

JsonObject jsonObject = element.getAsJsonObject();

Set<Entry<String, JsonElement>> set = jsonObject.entrySet();

List<String> list = new ArrayList<String>();

Map<String, String> dataMap = new HashMap<String, String>();

for ( Entry<String, JsonElement> entry: set) {

list.add(entry.getKey());

dataMap.put(entry.getKey(), entry.getValue().toString());

}

Collections.sort(list);

StringBuffer sg = new StringBuffer();

for (int i = 0; i < list.size(); i++) {

sg.append(dataMap.get(list.get(i)));

if(i!=list.size()-1){

sg.append("\t");

}

}

System.out.println(sg.toString());

}

}

}

 

 

2. pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.boom.flume</groupId>

  <artifactId>paraserInterceptor</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <build/>

  

    <dependencies>

            <dependency>

             <groupId>org.apache.flume</groupId >

            <artifactId>flume-ng-core</artifactId >

             <version>1.6.0</version >

             <scope>provided</scope >

            </dependency>

 </dependencies>

  

</project>

 

3. jar包上传到plugins.d目录下

jar作为flume的插件放到

flume/plugins.d/parser/lib目录下

修改配置文件:

at1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource

at1.sources.kafkaSource.channels = memoryChannel

at1.sources.kafkaSource.zookeeperConnect = slave1:2181,slave2:2181,slave3:2181

at1.sources.kafkaSource.topic = test

at1.sources.kafkaSource.interceptors=i1

at1.sources.kafkaSource.interceptors.i1.type=com.boom.flume.interceptor.ParaserInterceptor$Builder

at1.sources.kafkaSource.groupId = flume

at1.sources.kafkaSource.kafka.consumer.timeout.ms = 100

重新启动flume,拦截器生效。

 

 


 

 

 

评论

发表评论

validate