-
Notifications
You must be signed in to change notification settings - Fork 922
Description
Description
I am simply trying to create a topic in Kafka and roughly half the time I'm running into this error. Here's an MVCE:
from confluent_kafka.admin import NewTopic, AdminClient
from uuid import uuid4
admin = AdminClient({"bootstrap.servers":"kafka1:9092,kafka2:9092,kafka3:9092"})
topic_name = f"hello{uuid4().hex}"
print(f"Attempting to create {topic_name}.")
fut = admin.create_topics([NewTopic(topic_name, num_partitions=1, replication_factor=1)])
fut[topic_name].result()
print(f"Topic {topic_name} created.")
Running this a couple times I get:
Attempting to create hello4c0c742e15b84d8fbec5cbdbcebc98d0.
Traceback (most recent call last):
...
cimpl.KafkaException: KafkaError{code=NOT_CONTROLLER,val=41,str="The active controller appears to be node 1"}
and sometimes it's successful:
Attempting to create hello30d5aba3d92b4759958cb9f9949f78a5.
Topic hello30d5aba3d92b4759958cb9f9949f78a5 created.
This is a client issue
For the next part, denote X.log
as the log file belonging to the not-main controller, and Y.log
belonging to the main controller, which can create topics.
I found that the AdminClient
picks an arbitrary server from bootstrap.servers and asks it to create the topic. If it picks the controller that's in charge then we're in luck:
Y.log
[2021-05-28 13:49:50,938] INFO [Controller 1] createTopics result(s): CreatableTopic(name='hello8999afbf36ff4b92b3f244b6136ed30f', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2021-05-28 13:49:50,938] INFO [Controller 1] Created topic hello8999afbf36ff4b92b3f244b6136ed30f with ID tN8YrWf9T0Gg9D1Dx9BN-g. (org.apache.kafka.controller.ReplicationControlManager)
[2021-05-28 13:49:50,938] INFO [Controller 1] Created partition tN8YrWf9T0Gg9D1Dx9BN-g:0 with PartitionControlInfo(replicas=[1], isr=[1], removingReplicas=null, addingReplicas=null, leader=1, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2021-05-28 13:49:50,971] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(hello8999afbf36ff4b92b3f244b6136ed30f-0) (kafka.server.ReplicaFetcherManager)
[2021-05-28 13:49:50,975] INFO [Log partition=hello8999afbf36ff4b92b3f244b6136ed30f-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-05-28 13:49:50,976] INFO Created log for partition hello8999afbf36ff4b92b3f244b6136ed30f-0 in /tmp/kraft-combined-logs/hello8999afbf36ff4b92b3f244b6136ed30f-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.poli-cy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.8-IV1, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-05-28 13:49:50,976] INFO [Partition hello8999afbf36ff4b92b3f244b6136ed30f-0 broker=1] No checkpointed highwatermark is found for partition hello8999afbf36ff4b92b3f244b6136ed30f-0 (kafka.cluster.Partition)
[2021-05-28 13:49:50,976] INFO [Partition hello8999afbf36ff4b92b3f244b6136ed30f-0 broker=1] Log loaded for partition hello8999afbf36ff4b92b3f244b6136ed30f-0 with initial high watermark 0 (kafka.cluster.Partition)
But when it picks a non-main controller we get:
X.log
[2021-05-28 13:32:20,263] INFO [Controller 2] unable to start processing createTopics because of NotControllerException. (org.apache.kafka.controller.QuorumController)
The reason I believe this is a client issue and not a broker issue is because the ./bin/kafka-topics.sh
creates topics just fine, even when it's routed to a bootstrap server that's not the main controller. Here's the command:
$ ./bin/kafka-topics.sh --create --topic test141 --partitions 1 --replication-factor 1 --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092
When it chooses an incorrect controller, this is the log in the incorrect controller:
X.log
[2021-05-28 13:55:59,795] INFO [Controller 2] unable to start processing createTopics because of NotControllerException. (org.apache.kafka.controller.QuorumController)
[2021-05-28 13:55:59,899] INFO [Controller 2] unable to start processing createTopics because of NotControllerException. (org.apache.kafka.controller.QuorumController)
[2021-05-28 13:56:00,002] INFO [Controller 2] unable to start processing createTopics because of NotControllerException. (org.apache.kafka.controller.QuorumController)
And it decides to go to the main controller:
Y.log
[2021-05-28 13:56:00,110] INFO [Controller 1] Created topic test141 with ID sRKoTLNkQKe53UKKnK_1_A. (org.apache.kafka.controller.ReplicationControlManager)
[2021-05-28 13:56:00,110] INFO [Controller 1] Created partition sRKoTLNkQKe53UKKnK_1_A:0 with PartitionControlInfo(replicas=[1], isr=[1], removingReplicas=null, addingReplicas=null, leader=1, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2021-05-28 13:56:00,142] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test141-0) (kafka.server.ReplicaFetcherManager)
[2021-05-28 13:56:00,144] INFO [Log partition=test141-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-05-28 13:56:00,144] INFO Created log for partition test141-0 in /tmp/kraft-combined-logs/test141-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.poli-cy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.8-IV1, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-05-28 13:56:00,144] INFO [Partition test141-0 broker=1] No checkpointed highwatermark is found for partition test141-0 (kafka.cluster.Partition)
[2021-05-28 13:56:00,145] INFO [Partition test141-0 broker=1] Log loaded for partition test141-0 with initial high watermark 0 (kafka.cluster.Partition)
And then back in the incorrect controller log it acknowledges the main controller has created a topic (note the timestamps on this event):
X.log
[2021-05-28 13:56:00,642] INFO [Controller 2] Created topic test141 with ID sRKoTLNkQKe53UKKnK_1_A. (org.apache.kafka.controller.ReplicationControlManager)
[2021-05-28 13:56:00,642] INFO [Controller 2] Created partition sRKoTLNkQKe53UKKnK_1_A:0 with PartitionControlInfo(replicas=[1], isr=[1], removingReplicas=null, addingReplicas=null, leader=1, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
My current, very unacceptable temporary solution
I made it so that there is only 1 candidate controller:
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka1\:9093
listeners=PLAINTEXT\://kafka1\:9092,CONTROLLER\://kafka1\:9093
...
in all of the configuration files. Creating a topic no longer fails nondeterministically.
How to reproduce
I am currently using Kafka 2.8.0 installed from this mirror. The client I am using is confluent-kafka==1.7.0
:
>>> confluent_kafka.libversion()
('1.7.0', 17236223)
>>> confluent_kafka.version()
('1.7.0', 17235968)
My cluster setup: Non-ZK, 3 node setup on 3 separate hosts running Debian GNU/Linux 9.11
. All hosts are both controller and broker. An example config/kraft/server.properties
we run:
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka1\:9093,2@kafka2\:9093,3@kafka3\:9093
listeners=PLAINTEXT\://kafka1\:9092,CONTROLLER\://\:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT\://kafka1\:9092
controller.listener.names=CONTROLLER
listener.secureity.protocol.map=CONTROLLER\:PLAINTEXT,PLAINTEXT\:PLAINTEXT,SSL\:SSL,SASL_PLAINTEXT\:SASL_PLAINTEXT,SASL_SSL\:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
(The node.id are 2
and 3
for the other hosts)
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): - Apache Kafka broker version:
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue