flume系列(三) 有更新!

  |   0 评论   |   1,840 浏览

37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!

三、组件说明

file:///G:/xiaoben/20-installpack/apache-flume-1.6.0-bin/apache-flume-1.6.0-bin/docs/FlumeUserGuide.html

 

Kafka Source

Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topic.

Property Name

Default

Description

channels

 

type

The component type name, needs to be 

org.apache.flume.source.kafka,KafkaSource

zookeeperConnect

URI of ZooKeeper used by Kafka cluster

groupId

flume

Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group

topic

Kafka topic we’ll read messages from. At the time, this is a single topic only.

batchSize

1000

Maximum number of messages written to Channel in one batch

batchDurationMillis

1000

Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached.

Other Kafka Consumer Properties

These properties are used to configure the Kafka Consumer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.. For example: kafka.consumer.timeout.ms Check Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs> for details

Note 

The Kafka Source overrides two Kafka consumer parameters: auto.commit.enable is set to “false” by the source and we commit every batch. For improved performance this can be set to “true”, however, this can lead to loss of dataconsumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive setting this to a higher value can reduce CPU utilization (we’ll poll Kafka in less of a tight loop), but also means higher latency in writing batches to channel (since we’ll wait longer for data to arrive).

Example for agent named tier1:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource

tier1.sources.source1.channels = channel1

tier1.sources.source1.zookeeperConnect = localhost:2181

tier1.sources.source1.topic = test1

tier1.sources.source1.groupId = flume

tier1.sources.source1.kafka.consumer.timeout.ms = 100

 

Memory Channel

The events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures. Required properties are in bold.

Property Name

Default

Description

type

The component type name, needs to be memory

capacity

100

The maximum number of events stored in the channel

transactionCapacity

100

The maximum number of events the channel will take from a source or give to a sink per transaction

keep-alive

3

Timeout in seconds for adding or removing an event

byteCapacityBufferPercentage

20

Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.

byteCapacity

see description

Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentageconfiguration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.

Example for agent named a1:

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 10000

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

n transactionCapacity

sinkbatchsizesink会一次从channel中取多少个event去发送,而这个发送最终以事务的形式去发送的,因此这个batchsizeevent会传送到一个事务的缓存队列中(takeList),这是一个双向队列,这个队列可以在事务失败时进行回滚(也就是把取出来的数据吐memeryChannelqueue中),它的初始大小就是channetransactionCapacity定义的大小,源码中有: 

takeList = new LinkedBlockingDeque<Event>(transCapacity); 

所以:channeltransactionCapacity参数不能小于sinkbatchsize

 

HDFS Sink

This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.

The following are the escape sequences supported:

Alias

Description

%{host}

Substitute value of event header named “host”. Arbitrary header names are supported.

%t

Unix time in milliseconds

%a

locale’s short weekday name (Mon, Tue, ...)

%A

locale’s full weekday name (Monday, Tuesday, ...)

%b

locale’s short month name (Jan, Feb, ...)

%B

locale’s long month name (January, February, ...)

%c

locale’s date and time (Thu Mar 3 23:05:25 2005)

%d

day of month (01)

%e

day of month without padding (1)

%D

date; same as %m/%d/%y

%H

hour (00..23)

%I

hour (01..12)

%j

day of year (001..366)

%k

hour ( 0..23)

%m

month (01..12)

%n

month without padding (1..12)

%M

minute (00..59)

%p

locale’s equivalent of am or pm

%s

seconds since 1970-01-01 00:00:00 UTC

%S

second (00..59)

%y

last two digits of year (00..99)

%Y

year (2010)

%z

+hhmm numeric timezone (for example, -0400)

The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.

Note

For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unlesshdfs.useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.

Name

Default

Description

channel

 

type

The component type name, needs to be hdfs

hdfs.path

HDFS directory path (eg hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Name prefixed to files created by Flume in hdfs directory

hdfs.fileSuffix

Suffix to append to file (eg .avro - NOTE: period is not automatically added)

hdfs.inUsePrefix

Prefix that is used for temporal files that flume actively writes into

hdfs.inUseSuffix

.tmp

Suffix that is used for temporal files that flume actively writes into

hdfs.rollInterval

30

Number of seconds to wait before rolling current file (0 = never roll based on time interval)

hdfs.rollSize

1024

File size to trigger roll, in bytes (0: never roll based on file size)

hdfs.rollCount

10

Number of events written to file before it rolled (0 = never roll based on number of events)

hdfs.idleTimeout

0

Timeout after which inactive files get closed (0 = disable automatic closing of idle files)

hdfs.batchSize

100

number of events written to file before it is flushed to HDFS

hdfs.codeC

Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy

hdfs.fileType

SequenceFile

File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles

5000

Allow only this number of open files. If this number is exceeded, the oldest file is closed.

hdfs.minBlockReplicas

Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.

hdfs.writeFormat

Format for sequence file records. One of “Text” or “Writable” (the default).

hdfs.callTimeout

10000

Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.

hdfs.threadsPoolSize

10

Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)

hdfs.rollTimerPoolSize

1

Number of threads per HDFS sink for scheduling timed file rolling

hdfs.kerberosPrincipal

Kerberos user principal for accessing secure HDFS

hdfs.kerberosKeytab

Kerberos keytab for accessing secure HDFS

hdfs.proxyUser

 

 

hdfs.round

false

Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)

hdfs.roundValue

1

Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.

hdfs.roundUnit

second

The unit of the round down value - second, minute or hour.

hdfs.timeZone

Local Time

Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.

hdfs.useLocalTimeStamp

false

Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.

hdfs.closeTries

0

Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.

hdfs.retryInterval

180

Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.

serializer

TEXT

Other possible options include avro_event or the fully-qualified class name of an implementation of theEventSerializer.Builder interface.

serializer.*

 

 

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.filePrefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.

 

n path

写入hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/webdata/

可以使用flume提供的日期及%{host}表达式。

n filePrefix

默认值:FlumeData

写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。

n fileSuffix

写入hdfs的文件名后缀,比如:.lzo .log等。

n inUsePrefix

临时文件的文件名前缀,hdfs sink会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件;

n inUseSuffix

默认值:.tmp

临时文件的文件名后缀。

n rollInterval

默认值:30

hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;

如果设置成0,则表示不根据时间来滚动文件;

注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;

n rollSize

默认值:1024

当临时文件达到该大小(单位:bytes)时,滚动成目标文件;

如果设置成0,则表示不根据临时文件大小来滚动文件;

n rollCount

默认值:10

events数据达到该数量时候,将临时文件滚动成目标文件;

如果设置成0,则表示不根据events数据来滚动文件;

n idleTimeout

默认值:0
当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;

n batchSize

默认值:100

每个批次刷新到HDFS上的events数量;

n codeC

文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy

n fileType

默认值:SequenceFile

文件格式,包括:SequenceFile, DataStream,CompressedStream

当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;

当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;

n maxOpenFiles

默认值:5000

最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭;

n minBlockReplicas

默认值:HDFS副本数

写入HDFS文件块的最小副本数。

该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件。

待研究。

n writeFormat

sequence文件的格式。包含:Text, Writable(默认)

n callTimeout

默认值:10000

       执行HDFS操作的超时时间(单位:毫秒);

n threadsPoolSize

默认值:10

hdfs sink启动的操作HDFS的线程数。

n rollTimerPoolSize

默认值:1

hdfs sink启动的根据时间滚动文件的线程数。

n kerberosPrincipal

HDFS安全认证kerberos配置;

n kerberosKeytab

HDFS安全认证kerberos配置;

n proxyUser

代理用户

n round

默认值:false

是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式;

n roundValue

默认值:1

时间上进行“舍弃”的值;

n roundUnit

默认值:seconds

时间上进行”舍弃”的单位,包含:second,minute,hour

示例:

a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为:

/flume/events/20151016/17:30/00

因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。

n timeZone

默认值:Local Time

时区。

n useLocalTimeStamp

默认值:flase

是否使用当地时间。

n closeTries

默认值:0

hdfs sink关闭文件的尝试次数;

如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。

设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。

n retryInterval

默认值:180(秒)

hdfs sink尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1.

n serializer

默认值:TEXT

序列化类型。其他还有:avro_event或者是实现了EventSerializer.Builder的类名。

下面的配置中,在HDFS/tmp/lxw1234/目录下,每天生成一个格式为20151016的目录,

目标文件每5分钟生成一个,文件名格式为:log_20151016_13.1444973768543.lzo

目标文件采用lzo压缩。

agent_lxw1234.sinks.sink1.type = hdfs

agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d

agent_lxw1234.sinks.sink1.hdfs.filePrefix = log_%Y%m%d_%H

agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .lzo

agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true

agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text

agent_lxw1234.sinks.sink1.hdfs.fileType = CompressedStream

agent_lxw1234.sinks.sink1.hdfs.rollCount = 0

agent_lxw1234.sinks.sink1.hdfs.rollSize = 0

agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600

agent_lxw1234.sinks.sink1.hdfs.codeC = lzop

agent_lxw1234.sinks.sink1.hdfs.batchSize = 100

agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10

agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0

agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1

评论

发表评论

validate