0.Kafka服务器配置
3个broker
1个topic,6个partition,副本因子是2
2个consumer,每个consumer三个线程并发读取
1. Producer
package kafka.examples.multibrokers.producers; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MultiBrokerProducer { private static Producer<String, String> producer; private static Properties props = new Properties(); static { props.put("metadata.broker.list", "192.168.26.140:9092,192.168.26.140:9093,192.168.26.140:9094"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "kafka.examples.multibrokers.partitioner.TopicPartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } public static void main(String[] args) { Random rnd = new Random(); String topic = "learn.topic.p8.r2"; for (long i = 0; i < 10000; i++) { String key = "" + rnd.nextInt(255); String msg = "The " + i + " message is for key - " + key; KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, msg); producer.send(data); System.out.println(i); } producer.close(); } }
2. Partitioner
package kafka.examples.multibrokers.partitioner; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; import java.util.Random; public class TopicPartitioner implements Partitioner { public TopicPartitioner(VerifiableProperties props) { } @Override public int partition(Object key, int numPartitions) { int hashCode; if (key == null) { hashCode = new Random().nextInt(255); } else { hashCode = key.hashCode(); } if (numPartitions <= 0) { return 0; } return hashCode % numPartitions; } }
3. Consumer
package kafka.examples.multibrokers.consumers; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class MultiThreadHLConsumer { private ExecutorService executor; private final ConsumerConnector consumer; private final String topic; public MultiThreadHLConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void doConsume(int threadCount) { Map<String, Integer> topicCount = new HashMap<String, Integer>(); // Define thread count for each topic topicCount.put(topic, new Integer(threadCount)); // Here we have used a single topic but we can also add multiple topics to topicCount MAP Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); System.out.println("streams length: " + streams.size()); // Launching the thread pool executor = Executors.newFixedThreadPool(threadCount); //Creating an object messages consumption final CountDownLatch latch = new CountDownLatch(3); for (final KafkaStream stream : streams) { executor.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) { System.out.println("Message from thread :: " + Thread.currentThread().getName() + " -- " + new String(consumerIte.next().message())); } latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (consumer != null) { consumer.shutdown(); } if (executor != null) executor.shutdown(); } public static void main(String[] args) { String topic = "learn.topic.p8.r2"; int threadCount = 3; MultiThreadHLConsumer simpleHLConsumer = new MultiThreadHLConsumer("192.168.26.140:2181", "learn.topic.p8.r2.consumers.group", topic); simpleHLConsumer.doConsume(threadCount); } }
4. 注意的问题
相关推荐
kafka_client_producer_consumer
kafka集群搭建,开启sasl认证,并通过Python调用Producer和Consumer
kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper...
kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
Kafka-Simple-Producer-Consumer:使用Java 8的kafka的生产者和消费者的简单变化
主要介绍了Springboot集成Kafka实现producer和consumer的示例代码,详细的介绍了什么是Kafka和安装Kafka以及在springboot项目中集成kafka收发message,感兴趣的小伙伴们可以参考一下
Simple application demonstrate kafka java springboot
kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现
Kafka Producer机制优化-提高发送消息可靠性
kafka集群类型: single broker(单节点单boker集群,亦即kafka只启一个broker消息中间件服务,producer、consumer、broker均通过zookeeper集群交换消息,具体可参考:http://exp-blog.com/2018/08/03/pid-2187/
Go-consumergroup:采用golang编写的kafka consumer库
Windows下kafka安装配置,producer和consumer启动测试,SASL/PLAIN身份认证测试
无论是Kafka集群,还是Producer和Consumer都依赖于Zookeeper集群保存一些meta信息,来保证系统可用性。 Kafka的特性包括高吞吐量、可持久化、可水平扩展、支持流数据处理等。它能够实时处理大量数据来满足需求,...
kafka_hdfs_consumer实现
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。
多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发。 producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理。 broker端不维护数据的消费状态,...
kafka从入门基础,深入理解、核心设计,包括:kafka结构中的broker\topic\Partition\Producer\Consumer等
使用场景:生产环境海量数据,用kafka-console-consumer 消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢...
也就是说如果只有个partition你在同组启动多少个consumer都没,partition的数量决定了此topic在同组中被可被均衡的程度,例如消费者少于和
kettle kafka 消费者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。