Flume+Kafka实时数据采集

Flume的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = bigdata-01
avro-memory-kafka.sources.avro-source.port = 44444

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = bigdata-01:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

avro-memory-kafka.channels.memory-channel.type = memory

avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动Flume

1
2
3
4
5
6
7
8
9
10
11
flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console

flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console

启动消费者进行监控

1
kafka-console-consumer.sh --zookeeper bigdata-01:2181 -topic hello_topic

向日志文件中追加内容

1
2
3
4
5
6
7
8
9
[root@bigdata-01 ~]# cd data/
[root@bigdata-01 data]# ll
total 4
-rw-r--r--. 1 root root 112 Feb 6 05:37 data.log
[root@bigdata-01 data]# echo hellospark >> data.log
[root@bigdata-01 data]# echo hellospark2 >> data.log
[root@bigdata-01 data]# echo hellospark3 >> data.log
[root@bigdata-01 data]# echo hellospark4 >> data.log
[root@bigdata-01 data]# echo hellospark5 >> data.log

这时,消费者就可以消费到日志文件中的新内容了。

Flume的版本不同,配置文件(*.conf)需要配置的内容也不完全相同。