flume系列(二) 有更新!
37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!
二、部署安装
1. 下载解压
下载地址http://flume.apache.org/releases/index.html,下载解压到指定目录
[root@master boom]# tar -zxf ./apache-flume-1.6.0-bin.tar.gz -C /usr/program/flume [root@master boom]# rm -rf ./apache-flume-1.6.0-bin [root@master boom]# ll 总用量 114688 -rw-r--r--. 1 root root 10 5月 2 14:35 000000_0 -rw-r--r--. 1 root root 52550402 5月 5 17:11 apache-flume-1.6.0-bin.tar.gz -rw-r--r--. 1 root root 1375200 1月 28 2016 redis-3.0.7.tar.gz -rw-r--r--. 1 root root 28460530 5月 11 19:31 scala-2.11.7.tgz -rw-r--r--. 1 root root 35042811 5月 9 13:31 zookeeper-3.4.10.tar.gz [root@master boom]# cd /usr/program/flume/ [root@master flume]# ll 总用量 4 drwxr-xr-x. 7 root root 4096 6月 14 14:09 apache-flume-1.6.0-bin [root@master flume]# mv ./apache-flume-1.6.0-bin/* ./ [root@master flume]# ll 总用量 144 drwxr-xr-x. 2 root root 4096 6月 14 14:09 apache-flume-1.6.0-bin drwxr-xr-x. 2 501 games 4096 6月 14 14:09 bin -rw-r--r--. 1 501 games 69856 5月 9 2015 CHANGELOG drwxr-xr-x. 2 501 games 4096 6月 14 14:09 conf -rw-r--r--. 1 501 games 6172 5月 9 2015 DEVNOTES drwxr-xr-x. 10 501 games 4096 5月 12 2015 docs drwxr-xr-x. 2 root root 4096 6月 14 14:09 lib -rw-r--r--. 1 501 games 25903 5月 9 2015 LICENSE -rw-r--r--. 1 501 games 249 5月 9 2015 NOTICE -rw-r--r--. 1 501 games 1779 5月 9 2015 README -rw-r--r--. 1 501 games 1585 5月 9 2015 RELEASE-NOTES drwxr-xr-x. 2 root root 4096 6月 14 14:09 tools [root@master flume]# rm -rf apache-flume-1.6.0-bin/ [root@master flume]# ll 总用量 140 drwxr-xr-x. 2 501 games 4096 6月 14 14:09 bin -rw-r--r--. 1 501 games 69856 5月 9 2015 CHANGELOG drwxr-xr-x. 2 501 games 4096 6月 14 14:09 conf -rw-r--r--. 1 501 games 6172 5月 9 2015 DEVNOTES drwxr-xr-x. 10 501 games 4096 5月 12 2015 docs drwxr-xr-x. 2 root root 4096 6月 14 14:09 lib -rw-r--r--. 1 501 games 25903 5月 9 2015 LICENSE -rw-r--r--. 1 501 games 249 5月 9 2015 NOTICE -rw-r--r--. 1 501 games 1779 5月 9 2015 README -rw-r--r--. 1 501 games 1585 5月 9 2015 RELEASE-NOTES drwxr-xr-x. 2 root root 4096 6月 14 14:09 tools [root@master flume]# cd conf/ [root@master conf]# ll 总用量 16 -rw-r--r--. 1 501 games 1661 5月 9 2015 flume-conf.properties.template -rw-r--r--. 1 501 games 1110 5月 9 2015 flume-env.ps1.template -rw-r--r--. 1 501 games 1214 5月 9 2015 flume-env.sh.template -rw-r--r--. 1 501 games 3107 5月 9 2015 log4j.properties [root@master conf]#cp flume-conf.properties.template flume-conf.properties |
2. 配置一个代理
这个代理示例,数据来源是从kafka topic中拉取数据,数据去向是存储到hdfs中,具体配置说明后面详细介绍。
拷贝一份 flume-conf.properties,并修改为如下配置。
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' # 自定义代理的名称及 三个重要组件名称,后面的配置都要基于这4个自定义变量名配置(at1,kafkaSource,memoryChannel,hdfsSink) at1.sources = kafkaSource at1.channels = memoryChannel at1.sinks = hdfsSink # For each one of the sources, the type is defined #agent.sources.seqGenSrc.type = seq #agent.sources.seqGenSrc.channels = memoryChannel at1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource at1.sources.kafkaSource.channels = memoryChannel at1.sources.kafkaSource.zookeeperConnect = slave1:2181,slave2:2181,slave3:2181 at1.sources.kafkaSource.topic = test at1.sources.kafkaSource.groupId = flume at1.sources.kafkaSource.kafka.consumer.timeout.ms = 100 # Each sink's type must be defined #agent.sinks.loggerSink.type = logger #agent.sinks.loggerSink.channel = memoryChannel at1.sinks.hdfsSink.type = hdfs at1.sinks.hdfsSink.channel = memoryChannel at1.sinks.hdfsSink.hdfs.path = /flume/events/%Y-%m-%d at1.sinks.hdfsSink.hdfs.filePrefix = events-%Y-%m-%d-%H at1.sinks.hdfsSink.hdfs.fileType = DataStream at1.sinks.hdfsSink.hdfs.writeFormat = Text at1.sinks.hdfsSink.hdfs.rollInterval = 300 at1.sinks.hdfsSink.hdfs.rollSize = 0 at1.sinks.hdfsSink.hdfs.rollCount = 0 at1.sinks.hdfsSink.hdfs.batchSize=100 # Each channel's type is defined. at1.channels.memoryChannel.type = memory at1.channels.memoryChannel.keep-alive= 30 at1.channels.memoryChannel.capacity = 20000 at1.channels.memoryChannel.transactionCapacity = 10000 at1.channels.memoryChannel.byteCapacityBufferPercentage = 20 at1.channels.memoryChannel.byteCapacity = 800000 |
[root@master conf]# rz rz waiting to receive. zmodem trl+C ȡ 100% 1 KB 1 KB/s 00:00:01 0 Errors.. [root@master conf]# ll 总用量 20 -rw-r--r--. 1 root root 2044 6月 14 16:28 flume-conf.properties -rw-r--r--. 1 501 games 1661 5月 9 2015 flume-conf.properties.template -rw-r--r--. 1 501 games 1110 5月 9 2015 flume-env.ps1.template -rw-r--r--. 1 501 games 1214 5月 9 2015 flume-env.sh.template -rw-r--r--. 1 501 games 3107 5月 9 2015 log4j.properties [root@master conf]# cat flume-conf.properties at1.sources = kafkaSource at1.channels = memoryChannel at1.sinks = hdfsSink # For each one of the sources, the type is defined #agent.sources.seqGenSrc.type = seq #agent.sources.seqGenSrc.channels = memoryChannel at1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource at1.sources.kafkaSource.channels = memoryChannel at1.sources.kafkaSource.zookeeperConnect = slave1:2181,slave2:2181,slave3:2181 at1.sources.kafkaSource.topic = test at1.sources.kafkaSource.groupId = flume at1.sources.kafkaSource.kafka.consumer.timeout.ms = 100 # Each sink's type must be defined #agent.sinks.loggerSink.type = logger #agent.sinks.loggerSink.channel = memoryChannel at1.sinks.hdfsSink.type = hdfs at1.sinks.hdfsSink.channel = memoryChannel at1.sinks.hdfsSink.hdfs.path = /flume/events/%y-%m-%d/%H%M at1.sinks.hdfsSink.hdfs.filePrefix = events- at1.sinks.hdfsSink.hdfs.round = true at1.sinks.hdfsSink.hdfs.roundValue = 10 at1.sinks.hdfsSink.hdfs.roundUnit = minute # Each channel's type is defined. at1.channels.memoryChannel.type = memory at1.channels.memoryChannel.capacity = 100 [root@master conf]# cd ../ [root@master flume]# cd bin/ [root@master bin]# ll 总用量 36 -rwxr-xr-x. 1 501 games 12845 5月 9 2015 flume-ng -rw-r--r--. 1 501 games 936 5月 9 2015 flume-ng.cmd -rwxr-xr-x. 1 501 games 14041 5月 9 2015 flume-ng.ps1 [root@master bin]# cd ../ [root@master flume]# bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name at1 Dflume.root.logger=INFO,console |
3. 启动该代理
[root@master bin]# cd ../ [root@master flume]# bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name at1 Dflume.root.logger=INFO,console Info: Including Hadoop libraries found via (/usr/program/hadoop/bin/hadoop) for HDFS access Info: Excluding /usr/program/hadoop/share/hadoop/common/lib/slf4j-api-1.7.10.jar from classpath Info: Excluding /usr/program/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar from classpath Info: Including Hive libraries found via () for Hive access |
开启一个kafka生产者,并且生产消息到test Topic中
[root@slave2 kafka]# bin/kafka-console-producer.sh --broker-list slave2:9092 --topic test shide haha |
默认是序列化存储
[root@master ~]# hdfs dfs -cat /flume/events/17-06-15/0910/events-.1497489212478 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritableUUq7\ |
评论
发表评论
|
|