-
Notifications
You must be signed in to change notification settings - Fork 323
quickstart.zh_CN
文档>> 快速开始
完整的安装指南在这里。
最新的发行版地址在:https://github.com/adyliu/jafka/releases (其它下载地址)
$wget https://github.com/adyliu/jafka/releases/download/3.0.4/jafka-3.0.4.tgz
$tar xzf jafka-3.0.4.tgz
$cd jafka-3.0.4
可选配置,设置一个环境变量。
$export $JAFKA_HOME=/opt/apps/jafka-3.0.4
以下假设所有操作目录都在$JAFKA_HOME下。
拷贝默认配置
$ cp conf/server.properties.sample conf/server.properties
$ cp conf/log4j.properties.sample conf/log4j.properties
启动服务端
$bin/run.sh console
默认情况下,无需任何配置即可运行服务端。这时服务端会将9092端口绑定到所有网卡上。
2017-12-22 16:48:28.660 INFO Server - Starting Jafka server 3.0.4 (brokerid=1)
HttpAdaptor version 3.0.1 started on port 9094
2017-12-22 16:48:29.132 INFO Mx4jLoader - mx4j successfuly loaded
2017-12-22 16:48:29.144 INFO Server - Jafka(brokerid=1) started at *:9092, cost 0 seconds
broker的id默认是1,TCP端口9092。
更多运行方式参考安装指南
使用自带的小命令行就可以发送简单的文本消息。
$ bin/producer-console.sh --broker-list 0:localhost:9092 --topic demo
> Welcome to jafka
> 中文中国
producer-console.sh有一些参数,这可以通过执行下面的命令得到。
$bin/producer-console.sh
发送消息只需要在提示符号'>'输入文本即可,没有出错意味着发送成功,直接回车或者输入CTRL+C退出程序。
现在是时候消费刚才发送的消息。
同样Jafka自带一个小程序能够消费简单的文本消息。
$ bin/simple-consumer-console.sh --topic demo --server jafka://localhost:9092
[1] 26: Welcome to jafka
[2] 48: 中文中国
连接上服务端后,立即就看到有消息消费了。默认情况下simple-consumer-console.sh输出消息的序号(实际上不存在)以及消息的下一个偏移量(offset)。
解压缩后只需要执行上面三条命令就可以完成简单的消息发送和接受演示。这就是一个简单的消息系统。
我们希望利用提供的API手动编码能够发送和接受一些消息。
首先写一个简单的消息发送者。
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("broker.list", "0:127.0.0.1:9092");
props.put("serializer.class", StringEncoder.class.getName());
//
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
//
StringProducerData data = new StringProducerData("demo");
for(int i=0;i<1000;i++) {
data.add("Hello world #"+i);
}
//
try {
long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
producer.send(data);
}
long cost = System.currentTimeMillis() - start;
System.out.println("send 100000 message cost: "+cost+" ms");
} finally {
producer.close();
}
}
看起来有点复杂,我们简单分解下。
首先需要配置服务端的地址。一个jfaka服务端地址格式如下:
brokerId:host:port
- brokerId 用于标识服务进程,这在一个集群里面是全局唯一的
- host/port 用户描述服务监听的ip地址和端口,默认情况下会在所有网卡的9092端口监听数据。
配置完服务端信息后,我们需要提供一个消息编码。
消息编码用于将任意消息类型编码成字节数组,这些字节数组就是我们的消息体。
默认情况下Jafka解析字节数组编码,也就是原封不动的发送出去。这里简单替换下,使用字符串UTF-8编码。
使用上面简单的参数就可以构造出来一个简单的消息发送客户端。
消息发送客户端(Producer)用于管理与服务端之间的连接,并将消息按照指定的编码方式发送给服务端。
用于使用字符串编码,因此这里只能发送字符串的数据。每一个消息数据包都可以带有多条消息,只需要满足一个消息数据包的大小不超过默认的1M即可。比如下面就构造发往主题为demo的100条消息的数据包:
StringProducerData data = new StringProducerData("demo");
for(int i=0;i<1000;i++) {
data.add("Hello world #"+i);
}
最后发送消息只需要调用producer.send()即可。上述例子中循环发送100次。
接受消息的逻辑非常简单,只需要配置服务端的地址,然后从偏移量0开始顺序消费消息即可。
下面的逻辑是简单的将接受的消息以UTF-8的字符串展示。
SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092);
//
long offset = 0;
while (true) {
FetchRequest request = new FetchRequest("test", 0, offset);
for (MessageAndOffset msg : consumer.fetch(request)) {
System.out.println(Utils.toString(msg.message.payload(), "UTF-8"));
offset = msg.offset;
}
}
Jafka 使用zookeeper进行自动broker寻址以及消费者负载均衡。
测试时可以使用一个单进程的zookeeper用于替换zookeeper集群。
$bin/zookeeper-server.sh conf/zookeeper.properties
修改conf/server.properties,开启zookeeper自动注册,
enable.zookeeper=true
现在就可以重新启动server了:
$ bin/run.sh console
2017-12-22 16:57:16.424 INFO Server - Starting Jafka server 3.0.4 (brokerid=1)
2017-12-22 16:57:16.534 INFO ServerRegister - connecting to zookeeper: 127.0.0.1:2181
2017-12-22 16:57:16.602 INFO ZkEventThread - Starting ZkClient event thread.
2017-12-22 16:57:16.654 INFO ClientCnxn$SendThread - Opening socket connection to server localhost.localdomain/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2017-12-22 16:57:16.666 INFO ClientCnxn$SendThread - Socket connection established to localhost.localdomain/127.0.0.1:2181, initiating session
2017-12-22 16:57:16.773 INFO ClientCnxn$SendThread - Session establishment complete on server localhost.localdomain/127.0.0.1:2181, sessionid = 0x1607d6e073c0000, negotiated timeout = 6000
2017-12-22 16:57:16.777 INFO ZkClient - zookeeper state changed (SyncConnected)
2017-12-22 16:57:17.046 INFO Mx4jLoader - mx4j successfuly loaded
2017-12-22 16:57:17.047 INFO ServerRegister - Registering broker /brokers/ids/1
HttpAdaptor version 3.0.1 started on port 9094
2017-12-22 16:57:17.155 INFO ServerRegister - Registering broker /brokers/ids/1 succeeded with www.imxylz.com-1513933037047:www.imxylz.com:9092:true
2017-12-22 16:57:17.182 INFO Server - Jafka(brokerid=1) started at *:9092, cost 0 seconds
服务端启动后自动向zookeeper注册服务端的信息,例如ip地址、端口、已存在的消息等。
$ bin/producer-console.sh --zookeeper localhost:2181 --topic demo
Enter you message and exit with empty string.
> Jafka second day
> Jafka use zookeeper to search brokers and consumers
>
和上面启动的消息发送者类似,只不过这里使用zookeeper配置自动寻找服务端,而不是指定服务端地址。
$ bin/consumer-console.sh --zookeeper localhost:2181 --topic demo --from-beginning
Jafka second day
Jafka use zookeeper to search brokers and consumers
这时候很快就看到刚才发送的消息了。
由于使用zookeeper作为配置中心,因此可以启动更多的服务端、消息发送者、消息接受者。只需要保证都连接zookeeper,并且所有的服务端都有唯一的brokerId(位于server.properties中).
上面是使用自带的程序发送简单的文本消息。这里利用API来进行开发。
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("serializer.class", StringEncoder.class.getName());
//
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
//
StringProducerData data = new StringProducerData("demo");
for(int i=0;i<100;i++) {
data.add("Hello world #"+i);
}
//
try {
long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
producer.send(data);
}
long cost = System.currentTimeMillis() - start;
System.out.println("send 10000 message cost: "+cost+" ms");
} finally {
producer.close();
}
}
和不使用zookeeper的消息发送者对比,只需要将服务端配置信息替换成zookeeper连接地址即可。其它完全一致。
接受消息看起来稍微有点复杂,简单来说是如下几步:
-
配置zookeeper以及客户端groupid
-
与服务端的连接
-
创建消息流
-
启动线程池消费消息
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("groupid", "test_group"); // ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector connector = Consumer.create(consumerConfig); // Map<String, List<MessageStream<String>>> topicMessageStreams = connector.createMessageStreams(ImmutableMap.of("demo", 2), new StringDecoder()); List<MessageStream<String>> streams = topicMessageStreams.get("demo"); // ExecutorService executor = Executors.newFixedThreadPool(2); final AtomicInteger count = new AtomicInteger(); for (final MessageStream<String> stream : streams) { executor.submit(new Runnable() { public void run() { for (String message : stream) { System.out.println(count.incrementAndGet() + " => " + message); } } }); } // executor.awaitTermination(1, TimeUnit.HOURS); }
所有消息的消费方式几乎都相同,只是消费的topic名称不同而已。
** 是不是很简单,动手试试吧**
Language: English | Simple Chinese