分布式消息发布订阅系统Kafka

Kafka概述

  • 和消息系统类似
  • 消息中间件:生产者和消费者
  • A distributed streaming platform

Kafka架构和核心概念

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

单节点单broker的Kafka部署及使用

1
2
3
4
5
6
7
# $KAFKA_HOME/config/server.properties

broker.id=0
listeners=PLAINTEXT://:9092
host.name=bigdata-01
log.dirs=/root/app/tmp/kafka-logs
zookeeper.connect=bigdata-01:2181

启动kafka

1
2
3
bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

USAGE: /root/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*

创建topic:zk

1
bin/kafka-topics.sh --create --zookeeper bigdata-01:2181 --replication-factor 1 --partitions 1 --topic hello_topic

查看所有topic

1
bin/kafka-topics.sh --list --zookeeper bigdata-01:2181

发送消息:broker

1
bin/kafka-console-producer.sh --broker-list bigdata-01:9092 --topic hello_topic

消费消息:zk

1
bin/kafka-console-consumer.sh --zookeeper bigdata-01:2181 -topic hello_topic

查看所有topic的详细信息:

1
kafka-topics.sh --describe --zookeeper bigdata-01:2181

查看指定topic的详细信息:

1
kafka-topics.sh --describe --zookeeper bigdata-01:2181 -topic hello_topic

单节点多broker的Kafka部署及使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#配置多个配置文件
server-1.properties
server-2.properties
server-3.properties

#启动
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

#创建topic
bin/kafka-topics.sh --create --zookeeper bigdata-01:2181 --replication-factor 3 --partitions 1 --topic my-replication-topic

#启动一个生产者
kafka-console-producer.sh --broker-list bigdata-01:9093,bigdata-01:9094,bigdata-01:9095 --topic my-replication-topic

#启动一个消费者
kafka-console-consumer.sh --zookeeper bigdata-01:2181 -topic my-replication-topic

Kafka容错性测试

1
2
3
4
5
[root@bigdata-01 ~]# kafka-topics.sh --describe --zookeeper bigdata-01:2181
Topic:hello_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: hello_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replication-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

此时leader为broker3,如果终止该进程,会发生什么呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@bigdata-01 ~]# jps -m
2882 ConsoleConsumer --zookeeper bigdata-01:2181 -topic my-replication-topic
2660 Kafka /root/app/kafka_2.11-0.9.0.0/config/server-3.properties
2550 Kafka /root/app/kafka_2.11-0.9.0.0/config/server-1.properties
2843 ConsoleProducer --broker-list bigdata-01:9093,bigdata-01:9094,bigdata-01:9095 --topic my-replication-topic
2955 Jps -m
1934 QuorumPeerMain /root/app/zookeeper-3.4.5-cdh5.7.0/bin/../conf/zoo.cfg

[root@bigdata-01 ~]# kill -9 2550
[root@bigdata-01 ~]# kill -9 2660
[root@bigdata-01 ~]# kafka-topics.sh --describe --zookeeper bigdata-01:2181
Topic:hello_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: hello_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replication-topic Partition: 0 Leader: 2 Replicas: 3,1,2 Isr: 2

发现杀掉broker1,3以后,选举2为leader,并不影响消息的发送和接收。

Kafka的Java API

使用IDEA + Maven构建开发环境,pom.xml配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<properties>
<scala.version>2.11.8</scala.version>
<kafka.version>0.9.0.0</kafka.version>
</properties>

<dependencies>
<!--kafka dependency-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>

创建Kafka的配置类

1
2
3
4
5
6
7
8
9
/**
* Kafka常用配置文件
*/
public class KafkaProperties {
public static final String ZK = "bigdata-01:2181";
public static final String TOPIC = "hello_topic";
public static final String BROKER_LIST = "bigdata-01:9092";
public static final String GROUP_ID = "test_group_1";
}

Kafka的生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
* Kafka生产者
*/
public class KafkaProducer extends Thread{
private String topic;
private Producer<Integer,String> producer;
public KafkaProducer(String topic){
this.topic = topic;
Properties properties = new Properties();
properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("request.required.acks","1");
producer = new Producer<Integer, String>(new ProducerConfig(properties));
}

@Override
public void run() {
int messageNo = 1;
while (true){
String message = "message_" + messageNo;
producer.send(new KeyedMessage<Integer, String>(topic, message));
System.out.println("Sent: " + message);
messageNo ++ ;
try{
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
}
}
}

Kafka的消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static com.wjx.spark.kafka.KafkaProperties.GROUP_ID;

/**
* Kafka消费者
*/
public class KafkaConsumer extends Thread{
private String topic;
public KafkaConsumer(String topic){
this.topic = topic;
}
private ConsumerConnector createConnector(){
Properties properties = new Properties();
properties.put("zookeeper.connect",KafkaProperties.ZK);
properties.put("group.id",KafkaProperties.GROUP_ID);
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}

@Override
public void run() {
ConsumerConnector consumer = createConnector();
Map<String,Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);

//String:topic
//List<KafkaStream<byte[], byte[]>> 对应的数据流
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);//获取每次接收到的数据
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("received: " + message);
}
}
}

Kafka客户端启动测试

1
2
3
4
5
6
7
8
9
/**
* Kafka Java API测试
*/
public class KafkaClientApp {
public static void main(String[] args) {
new KafkaProducer(KafkaProperties.TOPIC).start();
new KafkaConsumer(KafkaProperties.TOPIC).start();
}
}