Monday, November 1, 2021

[SOLVED] Check whether a Kafka topic exists in Python

Issue

I want to create a Kafka topic if it does not already exist. I know how to create a topic via the bash, but I don't know how to check whether it exists.

topic_exists = ??????
if not topic_exists:
    subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--create',  
        '--zookeeper', '{}:2181'.format(KAFKAHOST),
        '--topic', str(self.topic), 
        '--partitions', str(self.partitions),
        '--replication-factor', str(self.replication_factor)])

Solution

You can use the --list (List all available topics) option for kafka-topics.sh and see if self.topic exists in the topics array, as shown below.

Depending on the number of topics you have this approach might be a bit heavy. If this is the case, you might be able to get away with using --describe (List details for the given topics) which will likely return empty if the topic doesn't exist. I haven't thoroughly tested this, so I can't say for sure how solid this solution (--describe) is, but it might be worth it for you to investigate a bit further.

wanted_topics = ['host_updates_queue', 'foo_bar']

topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--list',
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

for wanted in wanted_topics:
    if wanted in topics:
        print '\'{}\' topic exists!'.format(wanted)
    else:
        print '\'{}\' topic does NOT exist!'.format(wanted)

    topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--describe',
        '--topic', wanted,
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

    if not topic_desc:
        print 'No description found for the topic \'{}\''.format(wanted)

OUTPUT:

root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'

There is also a Broker Configuration available so you don't have to take any of these steps:

auto.create.topics.enable | true | Enable auto creation of topic on the server. If this is set to true then attempts to produce data or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.

I would take this approach if possible.

Note that you should set topic configs (server.properties) on your broker for num.partitions and default.replication.factor to match your settings in your code snippet.



Answered By - chrsblck