kafka系列(二)安装与配置部署 有更新!

  |   0 评论   |   1,440 浏览

37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!

三、安装配置

1. Kafka安装前提

l 安装JDK环境(配置JAVA_HOME,)

l 安装Scala环境(配置SCALA_HOME)

l 安装zookeeper环境

2. Kafka环境安装步骤

1) 下载解压

官方下载编译包:http://archive.apache.org/dist/kafka/

这里的kafka使用3台做集群,上传安装包到slave1后,分别scp到另外两节点上。各自解压到指定目录下。

[root@slave1 boom]# rz

rz waiting to receive.

 zmodem trl+C ȡ

 

  100%   34372 KB 34372 KB/s 00:00:01       0 Errors...

 

[root@slave1 boom]# ll

总用量 96396

-rw-r--r--. 1 root root 35197662 5月  11 20:14 kafka_2.10-0.10.1.1.tgz

-rw-r--r--. 1 root root 28460530 5月  11 19:35 scala-2.11.7.tgz

-rw-r--r--. 1 root root 35042811 5月   9 13:36 zookeeper-3.4.10.tar.gz    

[root@slave1 boom]# scp kafka_2.10-0.10.1.1.tgz slave2:/root/boom/

kafka_2.10-0.10.1.1.tgz                                                              100%   34MB  33.6MB/s   00:00    

[root@slave1 boom]# scp kafka_2.10-0.10.1.1.tgz slave3:/root/boom/

kafka_2.10-0.10.1.1.tgz                                                              100%   34MB  33.6MB/s   00:00    

[root@slave1 boom]# mkdir /usr/program/kafka

[root@slave1 boom]# tar -zxf kafka_2.10-0.10.1.1.tgz -C /usr/program/kafka

[root@slave1 boom]# cd /usr/program/kafka

[root@slave1 kafka]# mv kafka_2.10-0.10.1.1/* ./

[root@slave1 kafka]# rm -rf kafka_2.10-0.10.1.1

[root@slave1 kafka]# ll

总用量 52

drwxr-xr-x. 3 root root  4096 12月 16 03:08 bin

drwxr-xr-x. 2 root root  4096 12月 16 03:08 config

drwxr-xr-x. 2 root root  4096 5月  11 20:22 libs

-rw-r--r--. 1 root root 28824 12月 16 02:04 LICENSE

-rw-r--r--. 1 root root   336 12月 16 02:04 NOTICE

drwxr-xr-x. 2 root root  4096 12月 16 03:08 site-docs

2) 配置参数

每个节点的broker.id都不能一样,host.name配置本节点hostname。其它配置各个节点一样。

broker.id=0

host.name=slave1

port=9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/usr/data/kafka

num.partitions=1

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=slave1:2181,slave2:2181,slave3:2181

zookeeper.connection.timeout.ms=6000

 

[root@slave1 kafka]# cd config

[root@slave1 config]# vi server.properties 

[root@slave1 config]# cat server.properties | grep -v '#\|^$'

broker.id=0

host.name=slave1

port=9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/usr/data/kafka

num.partitions=1

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=slave1:2181,slave2:2181,slave3:2181

zookeeper.connection.timeout.ms=6000

3) 启动服务

启动zookeeper集群后,启动kafka:

各个节点上启动kafka

[root@slave3 kafka]# ./bin/kafka-server-start.sh config/server.properties

[2017-05-12 11:03:42,951] INFO KafkaConfig values: 

        advertised.host.name = null

        advertised.listeners = null

        advertised.port = null

        authorizer.class.name = 

        auto.create.topics.enable = true

        auto.leader.rebalance.enable = true

        background.threads = 10

        broker.id = 8

        broker.id.generation.enable = true

        broker.rack = null

。。。

[2017-05-12 11:03:45,912] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(slave2,9092,PLAINTEXT) (kafka.utils.ZkUtils)

[2017-05-12 11:03:45,935] INFO Kafka version : 0.10.1.1 (org.apache.kafka.common.utils.AppInfoParser)

[2017-05-12 11:03:45,935] INFO Kafka commitId : f10ef2720b03b247 (org.apache.kafka.common.utils.AppInfoParser)

[2017-05-12 11:03:45,940] INFO [Kafka Server 1], started (kafka.server.KafkaServer)

 

    [root@slave3 config]# jps

2729 Kafka

2427 QuorumPeerMain

3023 Jps

[root@slave3 config]# 

zookeeper中查看连接信息:

    [root@slave1 ~]# echo wchs | nc 192.168.211.133 2181

1 connections watching 7 paths

Total watches:7

You have new mail in /var/spool/mail/root

[root@slave1 ~]# echo wchs | nc 192.168.211.134 2181

1 connections watching 2 paths

Total watches:2

[root@slave1 ~]# echo wchs | nc 192.168.211.133 2181

1 connections watching 7 paths

Total watches:7

[root@slave1 ~]# echo wchs | nc 192.168.211.135 2181

1 connections watching 2 paths

Total watches:2

[root@slave1 ~]# echo cons | nc 192.168.211.133 2181 

 /192.168.211.134:40098[1](queued=0,recved=493,sent=496,sid=0x15bfa978a430000,lop=PING,est=1494558223940,to=6000,lcxid=0x36,lzxid=0xffffffffffffffff,lresp=1494559101038,llat=1,minlat=0,avglat=0,maxlat=56)

 /192.168.211.133:46577[0](queued=0,recved=1,sent=0)

 

[root@slave1 ~]# echo cons | nc 192.168.211.134 2181  

 /192.168.211.133:49602[0](queued=0,recved=1,sent=0)

 /192.168.211.133:49596[1](queued=0,recved=470,sent=470,sid=0x25bfa978aa70000,lop=PING,est=1494558223935,to=6000,lcxid=0x1a,lzxid=0xffffffffffffffff,lresp=1494559110880,llat=1,minlat=0,avglat=0,maxlat=37)

 

[root@slave1 ~]# echo cons | nc 192.168.211.135 2181  

 /192.168.211.135:43259[1](queued=0,recved=477,sent=478,sid=0x35bfa978a7d0000,lop=PING,est=1494558223853,to=6000,lcxid=0x1f,lzxid=0x600000013,lresp=1494559114865,llat=0,minlat=0,avglat=0,maxlat=15)

 /192.168.211.133:52970[0](queued=0,recved=1,sent=0)

 

4) 创建一个主题(topic)

创建一个名为“test”Topic,只有一个分区和一个备份:

[root@slave1 kafka]# bin/kafka-topics.sh --create --zookeeper slave1:2181 --replication-factor 1 --partitions 1 --topic test  

Created topic "test".

[root@slave1 kafka]# bin/kafka-topics.sh --create --zookeeper slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic test2

Created topic "test2".

5) 查看topic列表

创建好之后,可以通过运行以下命令,查看已创建的topic信息:

[root@slave1 kafka]# bin/kafka-topics.sh --list --zookeeper slave1:2181   

test

test2

或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic

6) 发送消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。运行producer(生产者),然后在控制台输入几条消息到服务器。

 

[root@slave1 kafka]# bin/kafka-console-producer.sh --broker-list slave2:9092 --topic test

ni hao a

bucuo^H^H^H

bucuo oo^H^H 

haha

7) 消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。 

[root@slave2 kafka]# bin/kafka-console-consumer.sh --zookeeper slave1:2181,slave2:2181 --topic test --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

ni hao a

bucuo

bucuo  o

haha

可以在2台不同的终端上运行上述命令,当在运行生产者时,消费者就能消费到生产者发送的消息。所有的命令行工具有很多的选项,可以查看文档来了解更多的功能。

8) 查看topic信息

[root@slave1 kafka]# bin/kafka-topics.sh --describe --zookeeper slave1:2181,slave2:2181 --topic test

Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:

        Topic: test     Partition: 0    Leader: 8       Replicas: 8     Isr: 8

这是一个解释输出,第一行是所有分区的摘要,每一个线提供一个分区信息,因为我们只有一个分区,所有只有一条线。

l "leader":该节点负责所有指定分区的读和写,每个节点的领导都是随机选择的。

l "replicas":备份的节点,无论该节点是否是leader或者目前是否还活着,只是显示。

l "isr":备份节点的集合,也就是活着的节点集合。

上述的8:是指在kafka配置文件中boker-id

 

9) 创建多备份topic,测试集群容错

现在,我们创建一个新topic,把备份设置为:3

[root@slave1 kafka]# bin/kafka-topics.sh --create --zookeeper slave1:2181,slave2:2181 --replication-factor 3 --partitions 1 --topic test3

Created topic "test3"

查看主题topic test3的信息,有多个备份,其中broder:1leader

[root@slave1 kafka]# bin/kafka-topics.sh --describe --zookeeper slave1:2181,slave2:2181 --topic test3

Topic:test3     PartitionCount:1        ReplicationFactor:3     Configs:

        Topic: test3    Partition: 0    Leader:       Replicas: 1,8,0 Isr: 1,8,0

让我们来发布一些信息在新的topic上:

[root@slave1 kafka]# bin/kafka-console-producer.sh --broker-list slave2:9092 --topic test3          

nihao

heha

dadfasd

dfdfdfd

现在,消费这些消息。

[root@slave2 kafka]# bin/kafka-console-consumer.sh --zookeeper slave1:2181,slave2:2181 --topic test3 --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

nihao

heha

dadfasd

dfdfdfd

测试集群的容错,killleaderBroker1作为当前的leader,也就是killBroker1

[root@slave2 kafka]# jps

2624 Kafka

6257 Jps

2328 QuorumPeerMain

You have new mail in /var/spool/mail/root

[root@slave2 kafka]# kill -9 2624

备份节点之一成为新的leader,而broker1已经不在同步备份集合里了。 

[root@slave1 kafka]# bin/kafka-topics.sh --describe --zookeeper slave1:2181,slave2:2181,slave3:2181 --topic test3

Topic:test3     PartitionCount:1        ReplicationFactor:3     Configs:

        Topic: test3    Partition: 0    Leader:       Replicas: 1,8,0 Isr: 8,0

 

但是,消息仍然没丢:

[root@slave2 kafka]# bin/kafka-console-consumer.sh --zookeeper slave1:2181,slave2:2181,slave3:2181 --topic test3 --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

nihao

heha

dadfasd

dfdfdfd

hahadfdfdf

dfdfdfd

 

10) 使用Kafka Connect 来导入/导出数据

从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。Kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。在这个快速入门里,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件,首先,我们首先 创建一些种子数据用来测试:

[root@slave1 kafka]echo -e "foo\nbar" > test.txt

修改connect-standalone.properties配置

[root@slave1 kafka]# vi config/connect-standalone.properties 

 

# These are defaults. This file just demonstrates how to override some settings.

bootstrap.servers=slave1:9092

接下来,运行Kafka Connect提供3个配置文件作为参数。

[root@slave1 kafka]# bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

[2017-05-19 11:27:45,588] INFO StandaloneConfig values: 

        access.control.allow.methods = 

        access.control.allow.origin = 

        bootstrap.servers = [slave1:9092]

这是示例的配置文件,使用默认的本地集群配置并创建了2个连接器:第一个是导入连接器config/connect-file-source.properties,从导入文件中读取并发布到Kafka主题,第二个是导出连接器config/connect-file-sink.properties,从kafka主题读取消息输出到外部文件,在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始,导入连接器应该读取从test.txt写入到topic导出连接器从主题connect-test读取消息写入到文件test.sink.txt中。
我们可以通过验证输出文件的内容来验证数据数据已经全部导出:

[root@slave1 kafka]# cat test.sink.txt 

foo

bar

asdfd

注意,导入的数据也已经在Kafka主题里,所以我们可以使用该命令查看这个主题:

[root@slave1 kafka]# bin/kafka-console-consumer.sh --zookeeper slave1:2181,slave2:2181 --topic connect-test --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

{"schema":{"type":"string","optional":false},"payload":"foo"}

{"schema":{"type":"string","optional":false},"payload":"bar"}

{"schema":{"type":"string","optional":false},"payload":"asdfd"}

连接器继续处理数据,因此我们可以添加数据到文件并通过管道移动:

[root@slave1 kafka]# echo "adfd" >> test.txt

你应该会看到出现在消费者控台输出一行信息并导出到文件

[root@slave1 kafka]# bin/kafka-console-consumer.sh --zookeeper slave1:2181,slave2:2181 --topic connect-test --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

{"schema":{"type":"string","optional":false},"payload":"foo"}

{"schema":{"type":"string","optional":false},"payload":"bar"}

{"schema":{"type":"string","optional":false},"payload":"asdfd"}

{"schema":{"type":"string","optional":false},"payload":"adfd"

查看导出文件

[root@slave1 kafka]# cat test.sink.txt 

foo

bar

asdfd

adfd

 

11) 使用Kafka Stream来处理数据

Kafka Streamkafka的客户端库,用于实时流处理和分析存储在kafka broker的数据,这个快速入门示例将演示如何运行一个流应用程序。一个WordCountDemo的例子(为了方便阅读,使用的是java8 lambda表达式)

KTable wordCounts = textLines

    // Split each text line, by whitespace, into words.

    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("W+")))

 

    // Ensure the words are available as record keys for the next aggregate operation.

    .map((key, value) -> new KeyValue<>(valuevalue))

 

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".

    .countByKey("Counts")

它实现了wordcount算法,从输入的文本计算出一个词出现的次数。然而,不像其他的WordCount的例子,你可能会看到,在有限的数据之前,执行的演示应用程序的行为略有不同,因为它的目的是在一个无限的操作,数据流。类似的有界变量,它是一种动态算法,跟踪和更新的单词计数。然而,由于它必须假设潜在的无界输入数据,它会定期输出其当前状态和结果,同时继续处理更多的数据,因为它不知道什么时候它处理过的所有的输入数据。

现在准备输入数据到kafkatopic中,随后kafka Stream应用处理这个topic的数据。

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

接下来,使用控制台的producer 将输入的数据发送到指定的topicstreams-file-input)中,(在实践中,stream数据可能会持续流入,其中kafka的应用将启动并运行)

> bin/kafka-topics.sh --create \

            --zookeeper localhost:2181 \            --replication-factor 1 \            --partitions 1 \            --topic streams-file-input

cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-file-input

现在,我们运行 WordCount 处理输入的数据:

> ./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo

不会有任何的STDOUT输出,除了日志,结果不断地写回另一个topicstreams-wordcount-output),demo运行几秒,然后,不像典型的流处理应用程序,自动终止。

现在我们检查WordCountDemo应用,从输出的topic读取。

> ./bin/kafka-console-consumer --zookeeper localhost:2181 

            --topic streams-wordcount-output             --from-beginning             --formatter kafka.tools.DefaultMessageFormatter             --property print.key=true             --property print.key=true             --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer             --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

输出数据打印到控台(你可以使用Ctrl-C停止):

all     1

streams 1

lead    1to      1

kafka   1

hello   1

kafka   2

streams 2join    1

kafka   3

summit  1

^C

第一列是messagekey,第二列是messagevalue,要注意,输出的实际是一个连续的更新流,其中每条数据(即:原始输出的每行)是一个单词的最新的count,又叫记录键“kafka”。对于同一个key有多个记录,每个记录之后是前一个的更新。

 


评论

发表评论

validate