前期准备
了解Flume 架构及核心组件
Flume 架构及核心组件
Source
: 收集(指定数据源从哪里获取)
Channel
: 聚集
Sink
: 输出(把数据写到哪里去)
学习使用 Flume
通过一个简单的小例子学习使用 Flume
使用 Flume 的关键就是写配置文件
配置文件的构成:
A) 配置 Source
B) 配置 Channel
C) 配置 Sink
D) 把以上三个组件串起来
A simple example
1 | # example.conf: A single-node Flume configuration |
实战一
需求
需求:从指定网络端口采集数据输出到控制台
写配置文件
在 /abs/app/apache-flume-1.6.0-cdh5.7.0-bin/conf
目录中新建 example.conf
如下:
1 | # example.conf: A single-node Flume configuration |
启动 agent
Flume 官网启动 agent
的命令:
1 | $ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template |
agent options
:
1 | --name,-n <name> the name of this agent (required) |
实际用的启动 agent
的命令:
1 | flume-ng agent -n a1 -c $FLUME_HOME $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console |
// Dflume.root.logger=INFO,console
为将输出结果显示到控制台
启动失败1
2
3
4
5Info: Including Hive libraries found via () for Hive access
+ exec /abs/app/jdk1.8.0_161/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/abs/app/apache-flume-1.6.0-cdh5.7.0-bin:/abs/app/apache-flume-1.6.0-cdh5.7.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -n a1 -f /abs/app/apache-flume-1.6.0-cdh5.7.0-bin/conf/example.conf
log4j:WARN No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
上网查了一下,别人是 -c 的路径指定错误,我的也错了。
-c 后面跟的是 Flume
的 conf
目录
所以正确的启动命令为:
1 | flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console |
正常启动后可以看到如下:
可以看到 Sink
和 Source
都启动了
绑定的主机名为 hadoop
的 IP 和绑定的端口号都有显示
验证
1 | [root@hadoop ~]# telnet hadoop 44444 |
显示找不到 telnet ,用 yum install telnet
安装telnet
telnet 进入 hadoop 的 44444 端口进行输入单词按 Enter
agent 的那一端显示如下:
从图中可以看到如下:
1 | Event: { headers:{} body: 73 70 61 72 6B 0D spark. } |
Event 是 Flume 数据传输的基本单元
Event = 可选的 header + byte array
以上实现了从指定网络端口采集数据输出到控制台的需求。
实战二
需求
需求:监控一个文件实时采集新增的数据输出到控制台
根据需求可以采用以下方案实现:
Agent 选型: exec source
+ memory channel
+ logger sink
写配置文件
在 /abs/data
目录新建 data.log
1 | touch data.log |
在 /abs/app/apache-flume-1.6.0-cdh5.7.0-bin/conf
目录中新建 exec-memory-logger.conf
如下:
1 | # exec-memory-logger.conf: A realtime single-node Flume configuration |
启动 agent
Flume 启动 agent
的命令:
1 | flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/exec-memory-logger.conf -Dflume.root.logger=INFO,console |
// Dflume.root.logger=INFO,console
为将输出结果显示到控制台
正常启动后可以看到如下:
可以看到 Source
、 Channel
和 Sink
的类型和启动类型以及 Source
要执行的命令
验证
在 /abs/data
目录输入 echo hello >> data.log
agent 的那一端显示如下:
以上实现了监控一个文件实时采集新增的数据输出到控制台的需求。
拓展
参照 Flume 用户指南
如果用 Flume 采集数据做离线处理,可以使用 HDFS Sink
如果用 Flume 采集数据做实时处理,可以使用 Kafka Sink
这里只提供一个拓展,根据具体的需求使用。
实战三
需求
需求:将 A 服务器上的日志实时采集到 B 服务器
根据需求可以采用以下方案实现:
Agent A 选型: exec source
+ memory channel
+ avro sink
Agent B 选型: avro source
+ memory channel
+ logger sink
写配置文件
在 /abs/app/apache-flume-1.6.0-cdh5.7.0-bin/conf
目录中新建如下配置文件:
exec-memory-avro.conf
:
1 | # exec-memory-avro.conf: A realtime Flume configuration |
avro-memory-logger.conf
:
1 | # avro-memory-logger.conf: A realtime Flume configuration |
启动 agent
两个 Agent ,先启动 Agent A ,再启动 Agent B
先启动 avro-memory-logger
:
1 | flume-ng agent -n avro-memory-logger -c $FLUME_HOME/conf -f $FLUME_HOME/conf/avro-memory-logger.conf -Dflume.root.logger=INFO,console |
再启动 exec-memory-avro
:
1 | flume-ng agent -n exec-memory-avro -c $FLUME_HOME/conf -f $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console |
验证
在 /abs/data/
目录中输入以下命令:
1 | echo hello spark >> data.log |
Agent avro-memory-logger
显示如下:
以上实现了将 A 服务器上的日志实时采集到 B 服务器的需求。
这里采用的是一个服务器开三个窗口,有条件的可以尝试用两台服务器进行这个实战练习