Mastering SpringKafka: Uncovering Partition Creation and Replication Counts


Introduction:

"Apache Kafka is a distributed streaming platform that allows you to build real-time data pipelines and streaming apps. It is widely used for its high throughput, fault tolerance, and replication capabilities."


In this post, we'll look at a common scenario that many Kafka users encounter, determining the count of partitions and replications for a Kafka topic.

Problem Statement:

"Imagine you've just joined a new project where Kafka is being used. You need to understand the current configuration of your Kafka topics, specifically, how many partitions and replications have been created. But what if there's no documentation available? Don't worry, there's a way to find out using KafkaAdmin"

Technical Terms:

  • KafkaAdmin - Spring Kafka API for communicating with kafka server
  • AdminClient - Apache Kafka API for communicating with kafka server using native client

Solution:

public List<TopicInfo> getTopicsWithPartitions(boolean showInternalTopics) {
    ListTopicsOptions options =
new ListTopicsOptions();
    options.listInternal(showInternalTopics);

    List<TopicInfo> topicPartitionCounts =
new ArrayList<>();

    Map<String, TopicDescription> topicDescriptionMap;
   
try (AdminClient kafkaAdminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
       
try {
            topicDescriptionMap =
kafkaAdmin.describeTopics(kafkaAdminClient
                    .listTopics(options)
                    .names()
                    .get(
1, TimeUnit.MINUTES)
                    .toArray(String[]::
new));
        }
catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
           
throw new KafkaException("Interrupted while getting topic listings", ie);
        }
catch (TimeoutException | ExecutionException ex) {
           
throw new KafkaException("Failed to obtain topic listings", ex);
        }
    }

    topicDescriptionMap.forEach((topicName, topicDescription) -> {
       
int partitionCount = topicDescription.partitions().size();
       
int replicationCount =
                topicDescription.partitions().getFirst().replicas().size();
       
topicPartitionCounts.add(new TopicInfo(topicName, partitionCount, replicationCount));
    });

   
// Sort the list by topicName
   
topicPartitionCounts.sort(Comparator.comparing(TopicInfo::topicName));
   
return topicPartitionCounts;
}

 

Breaking Down the Code:

  • ListTopicOptions is used to pass the options for Listing the topics available in the cluster. 
  • AdminClient.create(kafkaAdmin.getConfigurationProperties()) creates Apache kafka Adminclient by using the configuration properties provided in the spring boot application.
  • kafkaAdmin.describeTopics() provides the topic description for all the topic names passed to it.

Solution explanation:

Using the application properties which we have used for starting spring kafka and spring boot application, create a AdminClient which is used to find all available topics in the cluster. Once all topic names are found using describeTopics fetch all properties as a Map. Iterate the Map and create custom POJO (TopicInfo) which can be used for exposing the values for an endpoint

 public record TopicInfo(String topicName, int partitionCount, int replicationCount) {}

Conclusion:

Combining Apachekafka's AdminClient and SpringKafka's KafkaAdmin we have found the replication and partition count easily. We can Achieve the same using AdminClient as well but why should we reinvent the wheel when it is already available, Hence used combination of both.

SourceCode

Comments

Post a Comment

Popular posts from this blog

ABC’s Of Life