博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka的单机部署版本
阅读量:7240 次
发布时间:2019-06-29

本文共 5027 字,大约阅读时间需要 16 分钟。

hot3.png

本部署使用的版本为kafka_2.8.0-0.8.0。 

参考了http://blog.csdn.net/itleochen/article/details/17451455这篇博文; 
并根据官网介绍http://kafka.apache.org/documentation.html#quickstart完成。 
废话少说,直接上步骤 
1.下载kafka_2.8.0-0.8.0.tar.gz 
https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz 
2.解压缩 
tar -vxf kafka_2.8.0-0.8.0.tar.gz 
3.修改配置文件 
修改conf/server.properties 
host.name=192.168.110.129(修改为主机ip,不然服务器返回给客户端的是主机的hostname,客户端并不一定能够识别) 
修改conf/zookeeper.properties 属性文件 
dataDir=/usr/local/tmp/zookeeper   (zookeeper临时数据文件) 
4.启动zookeeper和kafka 
cd bin 
启动zookeeper 
./zookeeper-server-start.sh ../config/zookeeper.properties & (&推出命令行,服务守护执行) 
启动kafka 
./kafka-server-start.sh ../config/server.properties & 
5.验证是否成功 
*创建主题 
./kafka-create-topic.sh --partition 1 --replica 1 --zookeeper localhost:2181 --topic test 
检查是否创建主题成功 
./kafka-list-topic.sh --zookeeper localhost:2181 
*启动produce 
./bin/kafka-console-producer.sh --broker-list 192.168.110.129:9092  --topic test 
*启动consumer 
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test 
6.关闭kafka和zookeeper 
./kafka-server-stop.sh ../config/server.properties 
./zookeeper-server-stop.sh 
心得总结: 
1.produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口; 
2.必须先创建topic才能使用; 
3.topic本质是以文件的形式储存在zookeeper上的。

 

消费者

package com.kafka;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties;public class KafkaConsumer{	private final ConsumerConnector consumer;	private KafkaConsumer()	{		Properties props = new Properties();		// zookeeper 配置		props.put( "zookeeper.connect", "192.168.110.129:2181" );		// group 代表一个消费组		props.put( "group.id", "jd-group" );		// zk连接超时		props.put( "zookeeper.session.timeout.ms", "4000" );		props.put( "zookeeper.sync.time.ms", "200" );		props.put( "auto.commit.interval.ms", "1000" );		props.put( "auto.offset.reset", "smallest" );		// 序列化类		props.put( "serializer.class", "kafka.serializer.StringEncoder" );		ConsumerConfig config = new ConsumerConfig( props );		consumer = kafka.consumer.Consumer.createJavaConsumerConnector( config );	}	void consume()	{		Map
topicCountMap = new HashMap
(); topicCountMap.put( KafkaProducer.TOPIC, new Integer( 1 ) ); StringDecoder keyDecoder = new StringDecoder( new VerifiableProperties() ); StringDecoder valueDecoder = new StringDecoder( new VerifiableProperties() ); Map
>> consumerMap = consumer.createMessageStreams( topicCountMap, keyDecoder, valueDecoder ); KafkaStream
stream = consumerMap.get( KafkaProducer.TOPIC ).get( 0 ); ConsumerIterator
it = stream.iterator(); while (it.hasNext()) System.out.println( it.next().message() ); } public static void main(String[] args) { new KafkaConsumer().consume(); }}

生产者

package com.kafka;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/** * Hello world! * */public class KafkaProducer{	private final Producer
producer; public final static String TOPIC = "TEST-TOPIC"; private KafkaProducer() { Properties props = new Properties(); // 此处配置的是kafka的端口 props.put( "metadata.broker.list", "192.168.110.129:9092" ); // 配置value的序列化类 props.put( "serializer.class", "kafka.serializer.StringEncoder" ); // 配置key的序列化类 props.put( "key.serializer.class", "kafka.serializer.StringEncoder" ); // request.required.acks // 0, which means that the producer never waits for an acknowledgement from the broker (the same // behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees // (some data will be lost when a server fails). // 1, which means that the producer gets an acknowledgement after the leader replica has received the // data. This option provides better durability as the client waits until the server acknowledges the // request as successful (only messages that were written to the now-dead leader but not yet // replicated will be lost). // -1, which means that the producer gets an acknowledgement after all in-sync replicas have received // the data. This option provides the best durability, we guarantee that no messages will be lost as // long as at least one in sync replica remains. props.put( "request.required.acks", "-1" ); producer = new Producer
( new ProducerConfig( props ) ); } void produce() { int messageNo = 1000; final int COUNT = 2000; while (messageNo < COUNT) { String key = String.valueOf( messageNo ); String data = "hello kafka message " + key; producer.send( new KeyedMessage
( TOPIC, key, data ) ); // System.out.println( data ); messageNo++; } } public static void main(String[] args) { new KafkaProducer().produce(); }}

 

转载于:https://my.oschina.net/phoebus789/blog/733670

你可能感兴趣的文章
解决jenkins启动完会kill掉的衍生进程
查看>>
关于Linux下s、t、i、a权限
查看>>
js 获取CSS样式
查看>>
Symfony2安装时欢迎页面CSS混乱的解决方案
查看>>
Selenium-webdriver 系列Python教程(3)————如何执行一段JS
查看>>
Apple 企业开发者账号&邓白氏码申请记录
查看>>
视差动画原理分析【1】
查看>>
JavaScript基础(三):语句和符号
查看>>
分治法
查看>>
Windows下Zookeeper简单配置
查看>>
lucene4.7 过滤Filter(六) ---特殊的filter(DuplicateFilte)
查看>>
Oracle——22序列(sequence)
查看>>
Javascript全局函数
查看>>
jQuery表格插件和分页插件
查看>>
基于AOP动态切换数据源实现读写分离
查看>>
multer上传文件并在前台显示
查看>>
Android布局--FrameLayout
查看>>
makefile(四)使用变量
查看>>
[iOS开发]iOS列表单元格高度不固定
查看>>
android学习笔记之一常用控件
查看>>