Kafka 实战练习

前期准备

了解 Kafka 架构及核心组件

待补充

学习使用 Kafka

详情见Kafka 环境部署Kafka API 编程练习

实战练习

实战结构图

概述: 文字描述待补充

需求分析

需求:将 A 服务器上的日志实时采集到 B 服务器供 Kafka 消费

根据需求可以采用以下方案实现:

Agent A 选型: exec source + memory channel + avro sink

Agent B 选型: avro source + memory channel + kafka sink

写配置文件

/abs/app/apache-flume-1.6.0-cdh5.7.0-bin/conf 目录中新建 avro-memory-kafka.conf

参照 Flume 1.6.0文档avro-memory-kafka.conf 进行相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# avro-memory-kafka.conf: A realtime Flume configuration
# Name the components on this agent
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = hadoop
avro-memory-kafka.sources.avro-source.port = 44444

# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
avro-memory-kafka.sinks.kafka-sink.
avro-memory-kafka.sinks.kafka-sink.

# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动相关组件

启动 ZooKeeper

/abs/app/zookeeper-3.4.5-cdh5.7.0/bin 目录输入 ./zkServer.sh start

启动 kafka

/abs/app/kafka_2.11-0.9.0.0/bin 目录中进行如下操作:

1
kafka-server-start.sh $KAFKA_HOME/config/server.properties

启动 agent

先启动 avro-memory-kafka:

1
flume-ng agent -n avro-memory-kafka -c $FLUME_HOME/conf -f $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console

exec-memory-avro 在之前 Flume 实战练习 笔记中已经创建

再启动 exec-memory-avro:

1
flume-ng agent -n exec-memory-avro -c $FLUME_HOME/conf -f $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console

验证

jps -m

1
2
3
4
5
2595 Kafka /abs/app/kafka_2.11-0.9.0.0/config/server.properties
2807 Jps -m
2760 Application -n exec-memory-avro -f /abs/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/exec-memory-avro.conf
2526 QuorumPeerMain /abs/app/zookeeper-3.4.5-cdh5.7.0/bin/../conf/zoo.cfg
2703 Application -n avro-memory-kafka -f /abs/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/avro-memory-kafka.conf

消费消息

1
kafka-console-consumer.sh --zookeeper hadoop:2181 --topic hello_topic

/abs/data 目录中进行以下操作:

1
2
3
4
5
echo hellospark >> data.log
echo hellospark2 >> data.log
echo hellospark3 >> data.log
echo hellospark4 >> data.log
echo hellospark5 >> data.log

可以在消费端看到刚才输入的内容

------ 本文结束------
如果对您有帮助的话请我喝瓶水吧!