until we have metadata for the topic or maxWaitTimeMs is exceeded. // In case we already have cached metadata for the topic, but the requested partition is greater // than expected, issue an update request only once. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. do { log.trace("Requesting metadata update for topic {}.", topic); metadata.add(topic); int version = metadata.requestUpdate(); sender.wakeup(); try { metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (cluster.unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); Default is “60 seconds” @bsideup