flume系列(三) 有更新!
- 37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!
- 三、组件说明
- Kafka Source
- Memory Channel
- n transactionCapacity
- HDFS Sink
- n path
- n filePrefix
- n fileSuffix
- n inUsePrefix
- n inUseSuffix
- n rollInterval
- n rollSize
- n rollCount
- n idleTimeout
- n batchSize
- n codeC
- n fileType
- n maxOpenFiles
- n minBlockReplicas
- n writeFormat
- n callTimeout
- n threadsPoolSize
- n rollTimerPoolSize
- n kerberosPrincipal
- n kerberosKeytab
- n proxyUser
- n round
- n roundValue
- n roundUnit
- n timeZone
- n useLocalTimeStamp
- n closeTries
- n retryInterval
- n serializer
37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!
三、组件说明
Kafka Source
Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topic.
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be org.apache.flume.source.kafka,KafkaSource |
zookeeperConnect |
– |
URI of ZooKeeper used by Kafka cluster |
groupId |
flume |
Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group |
topic |
– |
Kafka topic we’ll read messages from. At the time, this is a single topic only. |
batchSize |
1000 |
Maximum number of messages written to Channel in one batch |
batchDurationMillis |
1000 |
Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached. |
Other Kafka Consumer Properties |
– |
These properties are used to configure the Kafka Consumer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.. For example: kafka.consumer.timeout.ms Check Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs> for details |
Note
The Kafka Source overrides two Kafka consumer parameters: auto.commit.enable is set to “false” by the source and we commit every batch. For improved performance this can be set to “true”, however, this can lead to loss of data. consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive setting this to a higher value can reduce CPU utilization (we’ll poll Kafka in less of a tight loop), but also means higher latency in writing batches to channel (since we’ll wait longer for data to arrive).
Example for agent named tier1:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = test1
tier1.sources.source1.groupId = flume
tier1.sources.source1.kafka.consumer.timeout.ms = 100
Memory Channel
The events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures. Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be memory |
capacity |
100 |
The maximum number of events stored in the channel |
transactionCapacity |
100 |
The maximum number of events the channel will take from a source or give to a sink per transaction |
keep-alive |
3 |
Timeout in seconds for adding or removing an event |
byteCapacityBufferPercentage |
20 |
Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. |
byteCapacity |
see description |
Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentageconfiguration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB. |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
n transactionCapacity
sink的batchsize就是sink会一次从channel中取多少个event去发送,而这个发送最终是以事务的形式去发送的,因此这个batchsize的event会传送到一个事务的缓存队列中(takeList),这是一个双向队列,这个队列可以在事务失败时进行回滚(也就是把取出来的数据吐memeryChannel的queue中),它的初始大小就是channe中transactionCapacity定义的大小,源码中有:
takeList = new LinkedBlockingDeque<Event>(transCapacity);
所以:channel的transactionCapacity参数不能小于sink的batchsize。
HDFS Sink
This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.
The following are the escape sequences supported:
Alias |
Description |
%{host} |
Substitute value of event header named “host”. Arbitrary header names are supported. |
%t |
Unix time in milliseconds |
%a |
locale’s short weekday name (Mon, Tue, ...) |
%A |
locale’s full weekday name (Monday, Tuesday, ...) |
%b |
locale’s short month name (Jan, Feb, ...) |
%B |
locale’s long month name (January, February, ...) |
%c |
locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d |
day of month (01) |
%e |
day of month without padding (1) |
%D |
date; same as %m/%d/%y |
%H |
hour (00..23) |
%I |
hour (01..12) |
%j |
day of year (001..366) |
%k |
hour ( 0..23) |
%m |
month (01..12) |
%n |
month without padding (1..12) |
%M |
minute (00..59) |
%p |
locale’s equivalent of am or pm |
%s |
seconds since 1970-01-01 00:00:00 UTC |
%S |
second (00..59) |
%y |
last two digits of year (00..99) |
%Y |
year (2010) |
%z |
+hhmm numeric timezone (for example, -0400) |
The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.
Note
For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unlesshdfs.useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.
Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be hdfs |
hdfs.path |
– |
HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix |
FlumeData |
Name prefixed to files created by Flume in hdfs directory |
hdfs.fileSuffix |
– |
Suffix to append to file (eg .avro - NOTE: period is not automatically added) |
hdfs.inUsePrefix |
– |
Prefix that is used for temporal files that flume actively writes into |
hdfs.inUseSuffix |
.tmp |
Suffix that is used for temporal files that flume actively writes into |
hdfs.rollInterval |
30 |
Number of seconds to wait before rolling current file (0 = never roll based on time interval) |
hdfs.rollSize |
1024 |
File size to trigger roll, in bytes (0: never roll based on file size) |
hdfs.rollCount |
10 |
Number of events written to file before it rolled (0 = never roll based on number of events) |
hdfs.idleTimeout |
0 |
Timeout after which inactive files get closed (0 = disable automatic closing of idle files) |
hdfs.batchSize |
100 |
number of events written to file before it is flushed to HDFS |
hdfs.codeC |
– |
Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy |
hdfs.fileType |
SequenceFile |
File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC |
hdfs.maxOpenFiles |
5000 |
Allow only this number of open files. If this number is exceeded, the oldest file is closed. |
hdfs.minBlockReplicas |
– |
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. |
hdfs.writeFormat |
– |
Format for sequence file records. One of “Text” or “Writable” (the default). |
hdfs.callTimeout |
10000 |
Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring. |
hdfs.threadsPoolSize |
10 |
Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) |
hdfs.rollTimerPoolSize |
1 |
Number of threads per HDFS sink for scheduling timed file rolling |
hdfs.kerberosPrincipal |
– |
Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab |
– |
Kerberos keytab for accessing secure HDFS |
hdfs.proxyUser |
|
|
hdfs.round |
false |
Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) |
hdfs.roundValue |
1 |
Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. |
hdfs.roundUnit |
second |
The unit of the round down value - second, minute or hour. |
hdfs.timeZone |
Local Time |
Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. |
hdfs.useLocalTimeStamp |
false |
Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
hdfs.closeTries |
0 |
Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart. |
hdfs.retryInterval |
180 |
Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension. |
serializer |
TEXT |
Other possible options include avro_event or the fully-qualified class name of an implementation of theEventSerializer.Builder interface. |
serializer.* |
|
|
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.
n path
写入hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/webdata/
可以使用flume提供的日期及%{host}表达式。
n filePrefix
默认值:FlumeData
写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。
n fileSuffix
写入hdfs的文件名后缀,比如:.lzo .log等。
n inUsePrefix
临时文件的文件名前缀,hdfs sink会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件;
n inUseSuffix
默认值:.tmp
临时文件的文件名后缀。
n rollInterval
默认值:30
hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;
如果设置成0,则表示不根据时间来滚动文件;
注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
n rollSize
默认值:1024
当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
如果设置成0,则表示不根据临时文件大小来滚动文件;
n rollCount
默认值:10
当events数据达到该数量时候,将临时文件滚动成目标文件;
如果设置成0,则表示不根据events数据来滚动文件;
n idleTimeout
默认值:0
当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;
n batchSize
默认值:100
每个批次刷新到HDFS上的events数量;
n codeC
文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
n fileType
默认值:SequenceFile
文件格式,包括:SequenceFile, DataStream,CompressedStream
当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;
当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;
n maxOpenFiles
默认值:5000
最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭;
n minBlockReplicas
默认值:HDFS副本数
写入HDFS文件块的最小副本数。
该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件。
待研究。
n writeFormat
写sequence文件的格式。包含:Text, Writable(默认)
n callTimeout
默认值:10000
执行HDFS操作的超时时间(单位:毫秒);
n threadsPoolSize
默认值:10
hdfs sink启动的操作HDFS的线程数。
n rollTimerPoolSize
默认值:1
hdfs sink启动的根据时间滚动文件的线程数。
n kerberosPrincipal
HDFS安全认证kerberos配置;
n kerberosKeytab
HDFS安全认证kerberos配置;
n proxyUser
代理用户
n round
默认值:false
是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式;
n roundValue
默认值:1
时间上进行“舍弃”的值;
n roundUnit
默认值:seconds
时间上进行”舍弃”的单位,包含:second,minute,hour
示例:
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为:
/flume/events/20151016/17:30/00
因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。
n timeZone
默认值:Local Time
时区。
n useLocalTimeStamp
默认值:flase
是否使用当地时间。
n closeTries
默认值:0
hdfs sink关闭文件的尝试次数;
如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。
设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。
n retryInterval
默认值:180(秒)
hdfs sink尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1.
n serializer
默认值:TEXT
序列化类型。其他还有:avro_event或者是实现了EventSerializer.Builder的类名。
下面的配置中,在HDFS的/tmp/lxw1234/目录下,每天生成一个格式为20151016的目录,
目标文件每5分钟生成一个,文件名格式为:log_20151016_13.1444973768543.lzo
目标文件采用lzo压缩。
agent_lxw1234.sinks.sink1.type = hdfs
agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d
agent_lxw1234.sinks.sink1.hdfs.filePrefix = log_%Y%m%d_%H
agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .lzo
agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
agent_lxw1234.sinks.sink1.hdfs.fileType = CompressedStream
agent_lxw1234.sinks.sink1.hdfs.rollCount = 0
agent_lxw1234.sinks.sink1.hdfs.rollSize = 0
agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600
agent_lxw1234.sinks.sink1.hdfs.codeC = lzop
agent_lxw1234.sinks.sink1.hdfs.batchSize = 100
agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10
agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0
agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1