Kafka API 编程练习

IDEA + Maven 构建开发环境

安装与配置 IDEA

参照 INTELLIJ IDEA 教程 进行 IDEA 的安装与配置

安装与配置 Maven

参照 INTELLIJ IDEA MAVEN 配置  与  Maven3.3.9 的安装

进行 Maven 在 Windows10 下的安装与整合 IDEA 的配置

新建 Maven Project

打开 IDEA 然后 Create New Project

进行如下设置:

进行相关配置

新建的 Maven Project 有报错,在右下角的 Event Log 中点击 Import Changes

pom.xml 作以下修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 <properties>
<scala.version>2.11.8</scala.version>
<kafka.version>0.9.0.0</kafka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<!--Kafka 依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

</dependencies>

Kafka 的源码是用 Scala 编写的,我们如果要用 Java 来编程需要做如下设置

将目录结构设置为

Producer API 的使用

新建 Java 类

参照目录结构在 com.share.spark.kafka 包中新建 KafkaProperties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.share.spark.kafka;

/**
* Kafka常用配置文件
*/
public class KafkaProperties {

//定义开发过程中要用到的常量
//定义ZooKeeper
public static final String ZK = "192.168.10.100:2181";

//定义Topic,hello_topic是之前在CentOS7中创建好的
public static final String TOPIC = "hello_topic";

//定义broker_list,broker是Kafka配置文件中配置好的
public static final String BROKER_LIST = "192.168.10.100:9092";

}

参照目录结构在 com.share.spark.kafka 包中新建 KafkaProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.share.spark.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
* Kafka生产者
*/
public class KafkaProducer extends Thread{

private String topic;

private Producer <Integer, String> producer;

public KafkaProducer(String topic){
this.topic = topic;

Properties properties = new Properties();
//broker.list
properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
//序列化
properties.put("serializer.class","kafka.serializer.StringEncoder");
/*握手机制,发送消息是否需要服务器的反馈
0为生产者不等待leader的握手机制
1为leader写信息在本地日志中并立刻响应握手机制
-1为leader将等待所有副本的握手机制
*/
properties.put("request.required.acks","1");
producer = new Producer<Integer, String>(new ProducerConfig(properties));
}

@Override
public void run() {
int messageNo = 1;
while(true) {
String message = "message_" + messageNo;
producer.send(new KeyedMessage<Integer, String>(topic,message));
System.out.println("Sent :" +message);

messageNo ++;
try{
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
}
}
}

参照目录结构在 com.share.spark.kafka 包中新建 KafkaClientApp

1
2
3
4
5
6
7
8
9
10
package com.share.spark.kafka;

/**
* Kafka API 测试
*/
public class KafkaClientApp {
public static void main(String[] args) {
new KafkaProducer(KafkaProperties.TOPIC).start();
}
}

项目代码: Demo

准备与测试

说明: 服务器为 CentOS7,本地为 Windows10

本地访问服务器需要对服务器的防火墙进行相关设置(这里采用临时关闭防火墙)

1
2
3
4
5
# 查看防火墙的状态
service firewalld status

# 临时关闭防火墙
service firewalld stop

修改 /abs/app/kafka_2.11-0.9.0.0/config 目录下的 server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 指定broker绑定的主机名
host.name=hadoop

# broker访问producers和consumers的主机名,如果没设置则采用host.name
# 需要注意的是这里默认是localhost,通过外部API来调用的话会报错
# 所以在这里需要指定他为服务器端的IP地址
advertised.host.name=192.168.10.100

# 将日志存放目录更改为自己指定的目录
# 因为默认的目录是在临时文件夹中,在服务器关机或重启之后会清除
log.dirs=/abs/app/tmp/kafka-logs

# 指定zookeeper
zookeeper.connect=hadoop:2181

以上设置无误后在服务器端开启服务如下:

1
2
3
4
5
6
7
8
# 启动ZooKeeper   在/abs/app/zookeeper-3.4.5-cdh5.7.0/bin 输入
./zkServer.sh start

# 启动Kafka 在/abs/app/kafka_2.11-0.9.0.0/bin 输入
kafka-server-start.sh $KAFKA_HOME/config/server.properties

# 启动消费者
kafka-console-consumer.sh --zookeeper hadoop:2181 --topic hello_topic

运行 KafkaClientApp ,可以在服务器的消费者端口看到本地发送的消息

Consumer API 的使用

新建 Java 类

参照目录结构在 com.share.spark.kafka 包中新建 KafkaConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.share.spark.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* Kafka消费者
*/
public class KafkaConsumer extends Thread{
private String topic;

public KafkaConsumer(String topic){
this.topic = topic;
}

private ConsumerConnector createConnector(){
Properties properties = new Properties();
properties.put("zookeeper.connect",KafkaProperties.ZK);
properties.put("group.id",KafkaProperties.GROUP_ID);

return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}

@Override
public void run() {
ConsumerConnector consumer = createConnector();
Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
topicCountMap.put(topic,1);

//String: topic
//List<KafkaStream<byte[], byte[]>> 对应的数据流
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);

//get(0)为获取我们每次接收到的数据
KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0);

//迭代
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

while (iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("rec: "+ message);

}

}
}

KafkaClientApp 类中添加启动 Consumer 的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.share.spark.kafka;

/**
* Kafka API 测试
*/
public class KafkaClientApp {
public static void main(String[] args) {

new KafkaProducer(KafkaProperties.TOPIC).start();

new KafkaConsumer(KafkaProperties.TOPIC).start();
}
}

准备与测试

说明: 服务器为 CentOS7,本地为 Windows10

为避免不必要的错误请先关闭 SELinux

本地访问服务器需要对服务器的防火墙进行相关设置(这里采用临时关闭防火墙)

1
2
3
4
5
# 查看防火墙的状态
service firewalld status

# 临时关闭防火墙
service firewalld stop

修改 /abs/app/kafka_2.11-0.9.0.0/config 目录下的 server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 指定broker绑定的主机名
host.name=hadoop

# broker访问producers和consumers的主机名,如果没设置则采用host.name
# 需要注意的是这里默认是localhost,通过外部API来调用的话会报错
# 所以在这里需要指定他为服务器端的IP地址
advertised.host.name=192.168.10.100

# 将日志存放目录更改为自己指定的目录
# 因为默认的目录是在临时文件夹中,在服务器关机或重启之后会清除
log.dirs=/abs/app/tmp/kafka-logs

# 指定zookeeper
# 在测试过程中出现ZooKeeper连接问题,把主机名改成IP后解决
# zookeeper.connect=hadoop:2181
zookeeper.connect=192.168.10.100:2181

以上设置无误后在服务器端开启服务如下:

1
2
3
4
5
6
7
8
# 启动ZooKeeper   在/abs/app/zookeeper-3.4.5-cdh5.7.0/bin 输入
./zkServer.sh start

# 启动Kafka 在/abs/app/kafka_2.11-0.9.0.0/bin 输入
kafka-server-start.sh $KAFKA_HOME/config/server.properties

# 启动消费者
kafka-console-consumer.sh --zookeeper hadoop:2181 --topic hello_topic

运行 KafkaClientApp ,可以在 IDEA 控制台上看到发送消息和接收消息

也可以在服务器的消费者端口看到本地发送的消息

------ 本文结束------
如果对您有帮助的话请我喝瓶水吧!