hive系列(四)kafka+flume+hive日志收集 有更新!
- 37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!
- 实践记录
- (一)示例按天/小时分区存储
- 1. kafka生产数据
- 2. flume处理数据
- 3. Hive创建表
- 4. 定期创建分区
37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!
实践记录
(一)示例按天/小时分区存储
数据流:kafka->flume->hdfs->hive
1. kafka生产数据
public class ProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.211.133:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer(props); for(int i = 22000; i < 30000; i++){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)+"\t"+"{json"+i+"}")); } producer.close(); } } |
2. flume处理数据
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name at1 Dflume.root.logger=INFO,console |
代理配置:
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' at1.sources = kafkaSource at1.channels = memoryChannel at1.sinks = hdfsSink # For each one of the sources, the type is defined #agent.sources.seqGenSrc.type = seq #agent.sources.seqGenSrc.channels = memoryChannel at1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource at1.sources.kafkaSource.channels = memoryChannel at1.sources.kafkaSource.zookeeperConnect = slave1:2181,slave2:2181,slave3:2181 at1.sources.kafkaSource.topic = test at1.sources.kafkaSource.groupId = flume at1.sources.kafkaSource.kafka.consumer.timeout.ms = 100 # Each sink's type must be defined #agent.sinks.loggerSink.type = logger #agent.sinks.loggerSink.channel = memoryChannel at1.sinks.hdfsSink.type = hdfs at1.sinks.hdfsSink.channel = memoryChannel at1.sinks.hdfsSink.hdfs.path = /user/hive_mysql/warehouse/boomtest.db/log_node2/day=%Y%m%d/hour=%H at1.sinks.hdfsSink.hdfs.filePrefix = events-%Y-%m-%d-%H at1.sinks.hdfsSink.hdfs.fileType = DataStream at1.sinks.hdfsSink.hdfs.writeFormat = Text at1.sinks.hdfsSink.hdfs.rollInterval = 300 at1.sinks.hdfsSink.hdfs.rollSize = 0 at1.sinks.hdfsSink.hdfs.rollCount = 0 at1.sinks.hdfsSink.hdfs.batchSize=100 # Each channel's type is defined. at1.channels.memoryChannel.type = memory at1.channels.memoryChannel.keep-alive= 30 at1.channels.memoryChannel.capacity = 20000 at1.channels.memoryChannel.transactionCapacity = 10000 at1.channels.memoryChannel.byteCapacityBufferPercentage = 20 at1.channels.memoryChannel.byteCapacity = 800000 |
3. Hive创建表
./bin/hive --service metastore |
CREATE EXTERNAL TABLE `log_node2`( `id` int, `json` string ) PARTITIONED BY ( `day` string, `hour` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
4. 定期创建分区
需要定期主动创建相关分区。否则即使flume有把数据存储到相应的分区目录里,hive没有创建对应的分区是没办法获取到响应数据的。定期创建分区不论数据目录是否已经存在都可以创建,相当于只是在hive中对实际分区目录与表逻辑分区做了映射关系,实际分区目录不存在时hive会在创建分区的时候创建相应目录。
hive> alter table log_node2 add partition (day='20170615',hour='16') |
查询分区数据:
hive> select * from log_node2 where day=20170615 and hour=16; |
CREATE EXTERNAL TABLE `log_bandwidth`( `f01` int, `f02` string, `f03` string, `f04` string, `f06` string ) PARTITIONED BY ( `day` string, `hour` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |