Kafka概述
- 和消息系统类似
- 消息中间件:生产者和消费者
- A distributed streaming platform
Kafka架构和核心概念

- Kafka is run as a cluster on one or more servers that can span multiple datacenters.
- The Kafka cluster stores streams of records in categories called topics.
- Each record consists of a key, a value, and a timestamp.
单节点单broker的Kafka部署及使用
1 2 3 4 5 6 7
|
broker.id=0 listeners=PLAINTEXT://:9092 host.name=bigdata-01 log.dirs=/root/app/tmp/kafka-logs zookeeper.connect=bigdata-01:2181
|
启动kafka
1 2 3
| bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
USAGE: /root/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
|
创建topic:zk
1
| bin/kafka-topics.sh --create --zookeeper bigdata-01:2181 --replication-factor 1 --partitions 1 --topic hello_topic
|
查看所有topic
1
| bin/kafka-topics.sh --list --zookeeper bigdata-01:2181
|
发送消息:broker
1
| bin/kafka-console-producer.sh --broker-list bigdata-01:9092 --topic hello_topic
|
消费消息:zk
1
| bin/kafka-console-consumer.sh --zookeeper bigdata-01:2181 -topic hello_topic
|
查看所有topic的详细信息:
1
| kafka-topics.sh --describe --zookeeper bigdata-01:2181
|
查看指定topic的详细信息:
1
| kafka-topics.sh --describe --zookeeper bigdata-01:2181 -topic hello_topic
|
单节点多broker的Kafka部署及使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| server-1.properties server-2.properties server-3.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties & kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties & kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &
bin/kafka-topics.sh --create --zookeeper bigdata-01:2181 --replication-factor 3 --partitions 1 --topic my-replication-topic
kafka-console-producer.sh --broker-list bigdata-01:9093,bigdata-01:9094,bigdata-01:9095 --topic my-replication-topic
kafka-console-consumer.sh --zookeeper bigdata-01:2181 -topic my-replication-topic
|
Kafka容错性测试
1 2 3 4 5
| [root@bigdata-01 ~] Topic:hello_topic PartitionCount:1 ReplicationFactor:1 Configs: Topic: hello_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replication-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
|
此时leader为broker3,如果终止该进程,会发生什么呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| [root@bigdata-01 ~] 2882 ConsoleConsumer --zookeeper bigdata-01:2181 -topic my-replication-topic 2660 Kafka /root/app/kafka_2.11-0.9.0.0/config/server-3.properties 2550 Kafka /root/app/kafka_2.11-0.9.0.0/config/server-1.properties 2843 ConsoleProducer --broker-list bigdata-01:9093,bigdata-01:9094,bigdata-01:9095 --topic my-replication-topic 2955 Jps -m 1934 QuorumPeerMain /root/app/zookeeper-3.4.5-cdh5.7.0/bin/../conf/zoo.cfg
[root@bigdata-01 ~] [root@bigdata-01 ~] [root@bigdata-01 ~] Topic:hello_topic PartitionCount:1 ReplicationFactor:1 Configs: Topic: hello_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replication-topic Partition: 0 Leader: 2 Replicas: 3,1,2 Isr: 2
|
发现杀掉broker1,3以后,选举2为leader,并不影响消息的发送和接收。
Kafka的Java API
使用IDEA + Maven构建开发环境,pom.xml配置如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| <properties> <scala.version>2.11.8</scala.version> <kafka.version>0.9.0.0</kafka.version> </properties>
<dependencies>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency>
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> </dependencies>
|
创建Kafka的配置类
1 2 3 4 5 6 7 8 9
|
public class KafkaProperties { public static final String ZK = "bigdata-01:2181"; public static final String TOPIC = "hello_topic"; public static final String BROKER_LIST = "bigdata-01:9092"; public static final String GROUP_ID = "test_group_1"; }
|
Kafka的生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;
import java.util.Properties;
public class KafkaProducer extends Thread{ private String topic; private Producer<Integer,String> producer; public KafkaProducer(String topic){ this.topic = topic; Properties properties = new Properties(); properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST); properties.put("serializer.class","kafka.serializer.StringEncoder"); properties.put("request.required.acks","1"); producer = new Producer<Integer, String>(new ProducerConfig(properties)); }
@Override public void run() { int messageNo = 1; while (true){ String message = "message_" + messageNo; producer.send(new KeyedMessage<Integer, String>(topic, message)); System.out.println("Sent: " + message); messageNo ++ ; try{ Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } } } }
|
Kafka的消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;
import static com.wjx.spark.kafka.KafkaProperties.GROUP_ID;
public class KafkaConsumer extends Thread{ private String topic; public KafkaConsumer(String topic){ this.topic = topic; } private ConsumerConnector createConnector(){ Properties properties = new Properties(); properties.put("zookeeper.connect",KafkaProperties.ZK); properties.put("group.id",KafkaProperties.GROUP_ID); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); }
@Override public void run() { ConsumerConnector consumer = createConnector(); Map<String,Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()){ String message = new String(iterator.next().message()); System.out.println("received: " + message); } } }
|
Kafka客户端启动测试
1 2 3 4 5 6 7 8 9
|
public class KafkaClientApp { public static void main(String[] args) { new KafkaProducer(KafkaProperties.TOPIC).start(); new KafkaConsumer(KafkaProperties.TOPIC).start(); } }
|