kafka producer blocking on callback

Multi tool use
Multi tool use


kafka producer blocking on callback



I'm testing the async send() in my kafka producer.
The cluster I want to connect to is offline.
My assumption would be that I send 10000 individual requests (lenght of listToSend) quickly.
Next the timeout (60s) would kick in and after 60 seconds I would see the callbacks hit me with logger.error(s"failed to send record ${x._2}", e)
However it seems to take forever for the method to finish.


logger.error(s"failed to send record ${x._2}", e)



That's why I added in the logger.debug("test: am I sending data") line.


logger.debug("test: am I sending data")



It prints, then nothing happens for 60 seconds. I see the failed callback for the 1st record. And only then will it move on.



Is this normal behavior or am I missing something fundamental?


listToSend.foreach { x =>
logger.debug("test: am I sending data")
// note: I added this 'val future =' in an attempt to fix this, to no avail
val future = producer.send(new ProducerRecord[String, String](topic, x._2), new Callback {
override def onCompletion(metadata: RecordMetadata, e: Exception) {

if (e != null) {
//todo: handle failed sends, timeouts, ...
logger.error(s"failed to send record ${x._2}", e)
}
else { //nice to have: implement logic here, or call another method to process metadata
logger.debug("~Callback success~")
}
}
}
)
}



note: I do not want to block this code, I want to keep it async. However it seems to be blocking on the send() regardless.





You need to get the Future
– cricket_007
Jul 2 at 14:16





isn't that blocking? I added "val future = " in an attempt to fix this, it wasn't originally there
– Havnar
Jul 2 at 14:25






producer.send returns a Future[RecordMetadata] (or something like that). You can call val meta = future.get() to actually do the block, as well as producer.close
– cricket_007
Jul 2 at 15:43


producer.send


Future[RecordMetadata]


val meta = future.get()


producer.close





But I don't want to block, that's the thing. It's blocking now, but I just want to "get rid" of my records fast and let the logic handle any kind of failures/downtime/...
– Havnar
Jul 3 at 7:15





Then you need to set acks to zero in the producer config and not use a Callback or assign the future
– cricket_007
Jul 3 at 13:22









By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

dDYsm2VGUmK7msr,SXwfcMeZmOeU6j2Hn1V,zvUlY72s7RR10s,L Iimk RFzbDa4JequKzOc,VbopGsO,K9X6zaG
7tYFkwHTRzyTVIx 8VqwybEszr0sMxUwdW,6XxDZgprC5u

Popular posts from this blog

PHP contact form sending but not receiving emails

Do graphics cards have individual ID by which single devices can be distinguished?

Create weekly swift ios local notifications