代码中关于Host和Port是割裂开的,这会导致单机环境下的伪分布式Kafka集群环境下,这个例子没法运行。
实际情况是需要将host和port绑定到一起,
package kafka.examples.lowlevel; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; import java.nio.ByteBuffer; import java.util.*; public class KafkaLowLevelConsumer { //参数说明: /* Maximum number of messages to read (so we don’t loop forever) Topic to read from Partition to read from One broker to use for Metadata lookup Port the brokers listen on */ public static void main(String args[]) { KafkaLowLevelConsumer consumer = new KafkaLowLevelConsumer(); //读取的消息数 long maxReads = Long.parseLong(args[0]); //读取的topic名称 String topic = args[1]; //读取的partition,从0开始的 int partition = Integer.parseInt(args[2]); List<String> seeds = new ArrayList<String>(); seeds.add(args[3]); //seed broker的监听端口,每个Topic和Partition的信息是存放于zk目录:/brokers/topics/learn.topic.p8.r2 int port = Integer.parseInt(args[4]); try { consumer.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } private List<String> replicaBrokers = new ArrayList<String>(); public KafkaLowLevelConsumer() { replicaBrokers = new ArrayList<String>(); } public void run(long maxReads, String topic, int partition, List<String> seedBrokers, int port) throws Exception { //获取指定topic和partition的元信息,PartitionMetadata的leader和replicas方法返回leader和replicas brokers PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition); if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting"); return; } //获取lead partition所在的broker if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting"); return; } //获取leader broker的host信息,不包括端口信息 String leadBroker = metadata.leader().host(); String clientName = "Client_" + topic + "_" + partition; //构造SimpleConsumer,为什么port和leaderBroker不一致? //这里的leadBroker, port是配对的,应该是metadata.leader().port() SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); //获取读取的offset long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(topic, partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka .build(); FetchResponse fetchResponse = consumer.fetch(req); //Since the SimpleConsumer doesn't handle lead Broker failures, you have to write a bit of code to handle it. if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(topic, partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for the last element to reset readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, topic, partition, port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; maxReads--; } //This method uses the findLeader() logic we defined earlier to find the new leader, // except here we only try to connect to one of the replicas for the topic/partition. // This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard. //Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer. // In reality ZooKeeper often does the failover very quickly so you never sleep if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } // Finding Starting Offset for Reads // Now define where to start reading data. Kafka includes two constants to help, // kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, // kafka.api.OffsetRequest.LatestTime() will only stream new messages. // Don’t assume that offset 0 is the beginning offset, since messages age out of the log over time. public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); //Get a list of valid offsets (up to maxSize) before the given time. OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } //Since the SimpleConsumer doesn't handle lead Broker failures, you have to write a bit of code to handle it. //Here, once the fetch returns an error, we log the reason, close the consumer then try to figure out who the new leader is. private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep; PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } ///根据指定的Broker,查找指定topic和partition的Lead Partition //Finding the Lead Broker for a Topic and Partition //The easiest way to do this is to pass in a set of known Brokers to your logic, // either via a properties file or the command line. // These don’t have to be all the Brokers in the cluster, // rather just a set where you can start looking for a live Broker to query for Leader information. //seedBrokers是使用的replicaBrokers列表 //调用PartitionMetadata的leader和replicas方法可以得到该Partition对应的Leader和Replicas Broker信息 private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : seedBrokers) { SimpleConsumer consumer = null; try { /** class SimpleConsumer(val host: String, val port: Int, val soTimeout: Int, val bufferSize: Int, val clientId: String) */ consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); //The call to topicsMetadata() asks the Broker you are connected to for all the details about the topic we are interested in List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { //The loop on partitionsMetadata iterates through all the partitions until we find the one we want. Once we find it, we can break out of all the loops. for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + topic + ", " + partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { replicaBrokers.clear(); ///将replicaBrokers进行缓存 for (kafka.cluster.Broker replica : returnMetaData.replicas()) { replicaBrokers.add(replica.host()); } } return returnMetaData; } }
参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
相关推荐
Go-consumergroup:采用golang编写的kafka consumer库
这是使用java操作kafka consumer api的一个demo,欢迎下载交流,博客地址:https://blog.csdn.net/qq_26803795
研究了一段时间后,根据网上的例子,做大量的削减及根据需要做出的最简化使用实例,并且加入了获取kafka的server端的状态信息,根据状态信息配置启动时读写位置
kafka_hdfs_consumer实现
使用场景:生产环境海量数据,用kafka-console-consumer 消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢...
kettle kafka 消费者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
记录一下,防止忘记记录一下,
KafkaConsumerDemo.java
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。
Simple application demonstrate kafka java springboot
kafka学习过程,maven工程,包含基础过程、提升过程。可供大家学习一下,里面有详细注释,一个groupid多个Consumer来消费消息和一个Consumer且有多个线程消费
kafka kafka kafka kafka kafka
一共包含两个程序,分别是Kafka生产者工具、Kafka消费者工具。 1、使用bootstrap、userName、password连接kafka。 2、可使用text、json格式发送topic消息。 3、异步producer、customer,收发消息畅通无阻。 Kafka...
kafka连接工具
本文是系列文章的第4篇,第一篇"第二篇第三篇第四篇本文主要介绍了KafkaHighLevelConsumer,ConsumerGroup,ConsumerRebalance,LowLevelConsumer实现的语义,和适用场景。以及未来版本中对HighLevelConsumer的重新...
kafka
Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制 ,我拿第三种进行了 配置 。你可以直接下载 运行并测试
kafka-consumer kafka-consumer是基于Kafka-0.8.20封装的consumer。kafka-consumer的目的在于让业务开发人员不必了解kafka就能开发。并且提供消息过滤功能。 Example 关于kafka-consumer的使用,可以参考如下代码。 ...
Simple-Kafka(开发阶段) Simple-Kafka 旨在为生产者和消费者提供一个更简单的 Java API。 因此,对 Apache Kafka 知之甚少的人可以轻松地将这个实时消息传递系统集成到他们的项目中。 主要特征 1 : 更简单的Java ...