All messages in out queue failed to be delivered to broker when kafka brokers restarted -


i working on c++ kafka client: librdkafka. lib here https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp. program writing 2000000 messages broker. during process, restarted broker. sometimes, no messages failed delivered broker. times 100,000 messages failed delivered broker. queue.buffering.max.messages=100000. it seems messages in out queue lost? error rdkafka::message delivery report: local: unknown partition.

i found new problems:(1) sometimes, 200 messages sent broker twice.(2) sometimes, message sent broker already, dr_cb() called. told me message failed delivered broker. trying figure out whether problem of broker or client. has similar problems? in fact, need reliable transmission , delivery reports between client , broker server. considering using c client now. not sure whether problem happens again...

the log of broker is:

[2015-07-21 17:48:33,471] info 0 elected leader (kafka.server.zookeeperleaderelector)

[2015-07-21 17:48:33,717] info new leader 0 (kafka.server.zookeeperleaderelector$leaderchangelistener)

[2015-07-21 17:48:33,718] error [kafkaapi-0] error when handling request name: topicmetadatarequest; version: 0; correlationid: 5017; clientid: rdkafka; topics: test (kafka.server.kafkaapis) kafka.admin.adminoperationexception: replication factor: 1 larger available brokers: 0 @ kafka.admin.adminutils$.assignreplicastobrokers(adminutils.scala:70) @ kafka.admin.adminutils$.createtopic(adminutils.scala:171) @ kafka.server.kafkaapis$$anonfun$19.apply(kafkaapis.scala:520) @ kafka.server.kafkaapis$$anonfun$19.apply(kafkaapis.scala:503) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:194) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:194) @ scala.collection.immutable.set$set1.foreach(set.scala:86) @ scala.collection.traversablelike$class.map(traversablelike.scala:194) @ scala.collection.immutable.set$set1.scala$collection$setlike$$super$map(set.scala:73) @ scala.collection.setlike$class.map(setlike.scala:93) @ scala.collection.immutable.set$set1.map(set.scala:73) @ kafka.server.kafkaapis.gettopicmetadata(kafkaapis.scala:503) @ kafka.server.kafkaapis.handletopicmetadatarequest(kafkaapis.scala:542) @ kafka.server.kafkaapis.handle(kafkaapis.scala:62) @ kafka.server.kafkarequesthandler.run(kafkarequesthandler.scala:59) @ java.lang.thread.run(thread.java:745)

[2015-07-21 17:48:33,743] info registered broker 0 @ path /brokers/ids/0 address cyclops-9803:9092. (kafka.utils.zkutils$)

[2015-07-21 17:48:33,759] info [kafka server 0], started (kafka.server.kafkaserver)

[2015-07-21 17:48:33,803] info closing socket connection /127.0.0.1. (kafka.network.processor)

[2015-07-21 17:48:33,858] info [replicafetchermanager on broker 0] removed fetcher partitions [test,0] (kafka.server.replicafetchermanager)

[2015-07-21 17:48:34,000] info [replicafetchermanager on broker 0] removed fetcher partitions [test,0] (kafka.server.replicafetchermanager)

[2015-07-21 17:48:34,017] info closing socket connection /127.0.0.1. (kafka.network.processor)

my producer configuration is:

global config

client.id=rdkafka

metadata.broker.list=localhost:9092

message.max.bytes=4000000

receive.message.max.bytes=100000000

metadata.request.timeout.ms=900000

topic.metadata.refresh.interval.ms=-1

topic.metadata.refresh.fast.cnt=10

topic.metadata.refresh.fast.interval.ms=250

topic.metadata.refresh.sparse=false

socket.timeout.ms=300000

socket.send.buffer.bytes=0

socket.receive.buffer.bytes=0

socket.keepalive.enable=false

socket.max.fails=10

broker.address.ttl=300000

broker.address.family=any

statistics.interval.ms=0

error_cb=0x5288a60

stats_cb=0x5288ba0

log_cb=0x54942a0

log_level=6

socket_cb=0x549e6c0

open_cb=0x54acf90

opaque=0x9167898

internal.termination.signal=0

queued.min.messages=100000

queued.max.messages.kbytes=1000000

fetch.wait.max.ms=100

fetch.message.max.bytes=1048576

fetch.min.bytes=1

fetch.error.backoff.ms=500

queue.buffering.max.messages=100000

queue.buffering.max.ms=1000

message.send.max.retries=10

retry.backoff.ms=100

compression.codec=none

batch.num.messages=1000

delivery.report.only.error=true

topic config

request.required.acks=1

enforce.isr.cnt=0

request.timeout.ms=5000

message.timeout.ms=300000

produce.offset.report=false

auto.commit.enable=true

auto.commit.interval.ms=60000

auto.offset.reset=largest

offset.store.path=.

offset.store.sync.interval.ms=-1

offset.store.method=file

consume.callback.max.messages=0

the consumer output is:

[2015-07-22 20:57:21,052] warn fetching topic metadata correlation id 1 topics [set(test)] broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.clientutils$) java.nio.channels.closedchannelexception

at kafka.network.blockingchannel.send(blockingchannel.scala:100)

at kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:73)

at kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer.scala:72)

at kafka.producer.syncproducer.send(syncproducer.scala:113)

at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:58)

at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:93)

at kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager.scala:66)

at kafka.utils.shutdownablethread.run(shutdownablethread.scala:60)

[2015-07-22 20:57:21,073] warn [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], failed find leader set([test,0]) (kafka.consumer.consumerfetchermanager$leaderfinderthread) kafka.common.kafkaexception: fetching topic metadata topics [set(test)] broker [arraybuffer(id:0,host:cyclops-9803,port:9092)] failed

at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:72)

at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:93)

at kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager.scala:66)

at kafka.utils.shutdownablethread.run(shutdownablethread.scala:60) caused by: java.nio.channels.closedchannelexception

at kafka.network.blockingchannel.send(blockingchannel.scala:100)

at kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:73)

at kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer.scala:72)

at kafka.producer.syncproducer.send(syncproducer.scala:113)

at kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:58)

any suggestions welcome. thanks.

in asyn mode, client should handle kind of problem. no idea how make sure messages in out queue can delivered broker 100% probability. can make sure message in out queue. if failed delivery, should put message queue again. if failed delivery, dr_cb() called. in function, try put message out queue again. maybe not best way. now, using way.


Comments

Popular posts from this blog

python - Healpy: From Data to Healpix map -

c - Bitwise operation with (signed) enum value -

xslt - Unnest parent nodes by child node -