hive系列(四)kafka+flume+hive日志收集 有更新!

  |   0 评论   |   2,315 浏览

37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!

实践记录

(一)示例按天/小时分区存储

数据流:kafka->flume->hdfs->hive

1. kafka生产数据

public class ProducerDemo {

public static void main(String[] args) {

Properties props = new Properties();

 props.put("bootstrap.servers""192.168.211.133:9092");

 props.put("acks""all");

 props.put("retries", 0);

 props.put("batch.size", 16384);

 props.put("linger.ms", 1);

 props.put("buffer.memory", 33554432);

 props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");

 props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");

 

 Producer<String, String> producer = new KafkaProducer(props);

 for(int i = 22000; i < 30000; i++){

 try {

Thread.sleep(500);

catch (InterruptedException e) {

e.printStackTrace();

}

 producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)+"\t"+"{json"+i+"}"));

 }

 

 producer.close();

}

}

2. flume处理数据

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name at1 Dflume.root.logger=INFO,console

代理配置:

# The configuration file needs to define the sources, 

# the channels and the sinks.

# Sources, channels and sinks are defined per agent, 

# in this case called 'agent'

 

at1.sources = kafkaSource

at1.channels = memoryChannel

at1.sinks = hdfsSink

 

# For each one of the sources, the type is defined

#agent.sources.seqGenSrc.type = seq

#agent.sources.seqGenSrc.channels = memoryChannel

 

at1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource

at1.sources.kafkaSource.channels = memoryChannel

at1.sources.kafkaSource.zookeeperConnect = slave1:2181,slave2:2181,slave3:2181

at1.sources.kafkaSource.topic = test

at1.sources.kafkaSource.groupId = flume

at1.sources.kafkaSource.kafka.consumer.timeout.ms = 100

 

# Each sink's type must be defined

#agent.sinks.loggerSink.type = logger

#agent.sinks.loggerSink.channel = memoryChannel

 

at1.sinks.hdfsSink.type = hdfs

at1.sinks.hdfsSink.channel = memoryChannel

at1.sinks.hdfsSink.hdfs.path = /user/hive_mysql/warehouse/boomtest.db/log_node2/day=%Y%m%d/hour=%H

at1.sinks.hdfsSink.hdfs.filePrefix = events-%Y-%m-%d-%H

at1.sinks.hdfsSink.hdfs.fileType = DataStream

at1.sinks.hdfsSink.hdfs.writeFormat = Text

at1.sinks.hdfsSink.hdfs.rollInterval = 300

at1.sinks.hdfsSink.hdfs.rollSize = 0

at1.sinks.hdfsSink.hdfs.rollCount = 0

at1.sinks.hdfsSink.hdfs.batchSize=100

 

# Each channel's type is defined.

at1.channels.memoryChannel.type = memory

at1.channels.memoryChannel.keep-alive= 30

at1.channels.memoryChannel.capacity = 20000

at1.channels.memoryChannel.transactionCapacity = 10000

at1.channels.memoryChannel.byteCapacityBufferPercentage = 20

at1.channels.memoryChannel.byteCapacity = 800000

 

 

3. Hive创建表

./bin/hive --service metastore

 

CREATE EXTERNAL TABLE `log_node2`(

  `id` int, 

  `json` string

  )

PARTITIONED BY ( 

  `day` string,

  `hour` string)

ROW FORMAT DELIMITED 

  FIELDS TERMINATED BY '\t' 

STORED AS INPUTFORMAT 

  'org.apache.hadoop.mapred.TextInputFormat' 

OUTPUTFORMAT 

  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

4. 定期创建分区

需要定期主动创建相关分区。否则即使flume有把数据存储到相应的分区目录里,hive没有创建对应的分区是没办法获取到响应数据的。定期创建分区不论数据目录是否已经存在都可以创建,相当于只是在hive中对实际分区目录与表逻辑分区做了映射关系,实际分区目录不存在时hive会在创建分区的时候创建相应目录。

 

hive> alter table log_node2  add partition (day='20170615',hour='16')

查询分区数据:

hive> select * from log_node2 where day=20170615 and hour=16;

 


CREATE EXTERNAL TABLE `log_bandwidth`(

  `f01` int, 

  `f02` string,

`f03` string,

`f04` string,

`f06` string

  )

PARTITIONED BY ( 

  `day` string,

  `hour` string)

ROW FORMAT DELIMITED 

  FIELDS TERMINATED BY '\t' 

STORED AS INPUTFORMAT 

  'org.apache.hadoop.mapred.TextInputFormat' 

OUTPUTFORMAT 

  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

 

评论

发表评论

validate