All messages in out queue failed to be delivered to broker when kafka brokers restarted -
i working on c++ kafka client: librdkafka. lib here https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp. program writing 2000000 messages broker. during process, restarted broker. sometimes, no messages failed delivered broker. times 100,000 messages failed delivered broker. queue.buffering.max.messages=100000. it seems messages in out queue lost? error rdkafka::message delivery report: local: unknown partition.
i found new problems:(1) sometimes, 200 messages sent broker twice.(2) sometimes, message sent broker already, dr_cb() called. told me message failed delivered broker. trying figure out whether problem of broker or client. has similar problems? in fact, need reliable transmission , delivery reports between client , broker server. considering using c client now. not sure whether problem happens again...
the log of broker is:
[2015-07-21 17:48:33,471] info 0 elected leader (kafka.server.zookeeperleaderelector)
[2015-07-21 17:48:33,717] info new leader 0 (kafka.server.zookeeperleaderelector$leaderchangelistener)
[2015-07-21 17:48:33,718] error [kafkaapi-0] error when handling request name: topicmetadatarequest; version: 0; correlationid: 5017; clientid: rdkafka; topics: test (kafka.server.kafkaapis) kafka.admin.adminoperationexception: replication factor: 1 larger available brokers: 0 @ kafka.admin.adminutils$.assignreplicastobrokers(adminutils.scala:70) @ kafka.admin.adminutils$.createtopic(adminutils.scala:171) @ kafka.server.kafkaapis$$anonfun$19.apply(kafkaapis.scala:520) @ kafka.server.kafkaapis$$anonfun$19.apply(kafkaapis.scala:503) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:194) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:194) @ scala.collection.immutable.set$set1.foreach(set.scala:86) @ scala.collection.traversablelike$class.map(traversablelike.scala:194) @ scala.collection.immutable.set$set1.scala$collection$setlike$$super$map(set.scala:73) @ scala.collection.setlike$class.map(setlike.scala:93) @ scala.collection.immutable.set$set1.map(set.scala:73) @ kafka.server.kafkaapis.gettopicmetadata(kafkaapis.scala:503) @ kafka.server.kafkaapis.handletopicmetadatarequest(kafkaapis.scala:542) @ kafka.server.kafkaapis.handle(kafkaapis.scala:62) @ kafka.server.kafkarequesthandler.run(kafkarequesthandler.scala:59) @ java.lang.thread.run(thread.java:745)
[2015-07-21 17:48:33,743] info registered broker 0 @ path /brokers/ids/0 address cyclops-9803:9092. (kafka.utils.zkutils$)
[2015-07-21 17:48:33,759] info [kafka server 0], started (kafka.server.kafkaserver)
[2015-07-21 17:48:33,803] info closing socket connection /127.0.0.1. (kafka.network.processor)
[2015-07-21 17:48:33,858] info [replicafetchermanager on broker 0] removed fetcher partitions [test,0] (kafka.server.replicafetchermanager)
[2015-07-21 17:48:34,000] info [replicafetchermanager on broker 0] removed fetcher partitions [test,0] (kafka.server.replicafetchermanager)
[2015-07-21 17:48:34,017] info closing socket connection /127.0.0.1. (kafka.network.processor)
my producer configuration is:
global config
client.id=rdkafka
metadata.broker.list=localhost:9092
message.max.bytes=4000000
receive.message.max.bytes=100000000
metadata.request.timeout.ms=900000
topic.metadata.refresh.interval.ms=-1
topic.metadata.refresh.fast.cnt=10
topic.metadata.refresh.fast.interval.ms=250
topic.metadata.refresh.sparse=false
socket.timeout.ms=300000
socket.send.buffer.bytes=0
socket.receive.buffer.bytes=0
socket.keepalive.enable=false
socket.max.fails=10
broker.address.ttl=300000
broker.address.family=any
statistics.interval.ms=0
error_cb=0x5288a60
stats_cb=0x5288ba0
log_cb=0x54942a0
log_level=6
socket_cb=0x549e6c0
open_cb=0x54acf90
opaque=0x9167898
internal.termination.signal=0
queued.min.messages=100000
queued.max.messages.kbytes=1000000
fetch.wait.max.ms=100
fetch.message.max.bytes=1048576
fetch.min.bytes=1
fetch.error.backoff.ms=500
queue.buffering.max.messages=100000
queue.buffering.max.ms=1000
message.send.max.retries=10
retry.backoff.ms=100
compression.codec=none
batch.num.messages=1000
delivery.report.only.error=true
topic config
request.required.acks=1
enforce.isr.cnt=0
request.timeout.ms=5000
message.timeout.ms=300000
produce.offset.report=false
auto.commit.enable=true
auto.commit.interval.ms=60000
auto.offset.reset=largest
offset.store.path=.
offset.store.sync.interval.ms=-1
offset.store.method=file
consume.callback.max.messages=0
the consumer output is:
[2015-07-22 20:57:21,052] warn fetching topic metadata correlation id 1 topics [set(test)] broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.clientutils$) java.nio.channels.closedchannelexception
at kafka.network.blockingchannel.send(blockingchannel.scala:100)
at kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:73)
at kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer.scala:72)
at kafka.producer.syncproducer.send(syncproducer.scala:113)
at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:58)
at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:93)
at kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager.scala:66)
at kafka.utils.shutdownablethread.run(shutdownablethread.scala:60)
[2015-07-22 20:57:21,073] warn [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], failed find leader set([test,0]) (kafka.consumer.consumerfetchermanager$leaderfinderthread) kafka.common.kafkaexception: fetching topic metadata topics [set(test)] broker [arraybuffer(id:0,host:cyclops-9803,port:9092)] failed
at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:72)
at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:93)
at kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager.scala:66)
at kafka.utils.shutdownablethread.run(shutdownablethread.scala:60) caused by: java.nio.channels.closedchannelexception
at kafka.network.blockingchannel.send(blockingchannel.scala:100)
at kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:73)
at kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer.scala:72)
at kafka.producer.syncproducer.send(syncproducer.scala:113)
at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:58)
any suggestions welcome. thanks.
in asyn mode, client should handle kind of problem. no idea how make sure messages in out queue can delivered broker 100% probability. can make sure message in out queue. if failed delivery, should put message queue again. if failed delivery, dr_cb() called. in function, try put message out queue again. maybe not best way. now, using way.
Comments
Post a Comment