flume系列(四) 有更新!
- 37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!
- 四、自定义拦截器示例
- 1. 继承Interceptor接口
- 2. pom.xml:
- 3. 打jar包上传到plugins.d目录下
37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!
四、自定义拦截器示例
该示例作为一个解析json格式为按\t分隔的行(只解析第一级)。作为拦截器解析消息行。代码如下:
1. 继承Interceptor接口
ParaserInterceptor
package com.boom.flume.interceptor; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import com.google.common.base.Charsets; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; public class ParaserInterceptor implements Interceptor { public void close() { // TODO Auto-generated method stub } public void initialize() { // TODO Auto-generated method stub } public Event intercept(Event event) { String message = new String(event.getBody(), Charsets.UTF_8); JsonParser jsonParser = new JsonParser(); JsonElement element = null; try { element = jsonParser.parse(message); } catch (JsonSyntaxException e) { return event; } if (element.isJsonObject()) { JsonObject jsonObject = element.getAsJsonObject(); Set<Entry<String, JsonElement>> set = jsonObject.entrySet(); List<String> list = new ArrayList<String>(); Map<String, String> dataMap = new HashMap<String, String>(); for ( Entry<String, JsonElement> entry: set) { list.add(entry.getKey()); dataMap.put(entry.getKey(), entry.getValue().toString()); } Collections.sort(list); StringBuffer sg = new StringBuffer(); for (int i = 0; i < list.size(); i++) { sg.append(dataMap.get(list.get(i))); if(i!=list.size()-1){ sg.append("\t"); } } event.setBody(sg.toString().getBytes( Charsets.UTF_8)); } return event; } public List<Event> intercept(List<Event> arg0) { List<Event> resList = new ArrayList<Event>(); for (Event event : arg0) { resList.add(intercept(event)); } return resList; } public static class Builder implements Interceptor.Builder{ public Interceptor build() { return new ParaserInterceptor(); } public void configure(Context arg0) { } } public static void main(String[] args) { String message = "{'003':0.5,'001':0.8,'002':{'00201':12,'00202':34},'006':'带宽信息','004':'flume'}"; JsonParser jsonParser = new JsonParser(); JsonElement element = jsonParser.parse(message); if (element.isJsonObject()) { JsonObject jsonObject = element.getAsJsonObject(); Set<Entry<String, JsonElement>> set = jsonObject.entrySet(); List<String> list = new ArrayList<String>(); Map<String, String> dataMap = new HashMap<String, String>(); for ( Entry<String, JsonElement> entry: set) { list.add(entry.getKey()); dataMap.put(entry.getKey(), entry.getValue().toString()); } Collections.sort(list); StringBuffer sg = new StringBuffer(); for (int i = 0; i < list.size(); i++) { sg.append(dataMap.get(list.get(i))); if(i!=list.size()-1){ sg.append("\t"); } } System.out.println(sg.toString()); } } } |
2. pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.boom.flume</groupId> <artifactId>paraserInterceptor</artifactId> <version>0.0.1-SNAPSHOT</version> <build/> <dependencies> <dependency> <groupId>org.apache.flume</groupId > <artifactId>flume-ng-core</artifactId > <version>1.6.0</version > <scope>provided</scope > </dependency> </dependencies> </project> |
3. 打jar包上传到plugins.d目录下
将jar包作为flume的插件放到
flume/plugins.d/parser/lib目录下
修改配置文件:
at1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource at1.sources.kafkaSource.channels = memoryChannel at1.sources.kafkaSource.zookeeperConnect = slave1:2181,slave2:2181,slave3:2181 at1.sources.kafkaSource.topic = test at1.sources.kafkaSource.interceptors=i1 at1.sources.kafkaSource.interceptors.i1.type=com.boom.flume.interceptor.ParaserInterceptor$Builder at1.sources.kafkaSource.groupId = flume at1.sources.kafkaSource.kafka.consumer.timeout.ms = 100 |
重新启动flume,拦截器生效。
评论
-
菲菲人纷纷
发表评论
|
|