Mastering SpringKafka: Uncovering Partition Creation and Replication Counts
Introduction:
"Apache
Kafka is a distributed streaming platform that allows you to build real-time
data pipelines and streaming apps. It is widely used for its high throughput,
fault tolerance, and replication capabilities."
In this post, we'll look at a common scenario that many Kafka users encounter,
determining the count of partitions and replications for a Kafka topic.
Problem Statement:
"Imagine
you've just joined a new project where Kafka is being used. You need to
understand the current configuration of your Kafka topics, specifically, how
many partitions and replications have been created. But what if there's no
documentation available? Don't worry, there's a way to find out using
KafkaAdmin"
Technical Terms:
- KafkaAdmin - Spring Kafka API for communicating with kafka server
- AdminClient - Apache Kafka API for communicating with kafka server using native client
Solution:
public
List<TopicInfo>
getTopicsWithPartitions(boolean showInternalTopics) {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(showInternalTopics);
List<TopicInfo>
topicPartitionCounts = new ArrayList<>();
Map<String, TopicDescription>
topicDescriptionMap;
try (AdminClient
kafkaAdminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties()))
{
try {
topicDescriptionMap = kafkaAdmin.describeTopics(kafkaAdminClient
.listTopics(options)
.names()
.get(1, TimeUnit.MINUTES)
.toArray(String[]::new));
} catch (InterruptedException
ie) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted while
getting topic listings", ie);
} catch (TimeoutException |
ExecutionException ex) {
throw new KafkaException("Failed to obtain
topic listings", ex);
}
}
topicDescriptionMap.forEach((topicName, topicDescription) -> {
int partitionCount =
topicDescription.partitions().size();
int replicationCount =
topicDescription.partitions().getFirst().replicas().size();
topicPartitionCounts.add(new TopicInfo(topicName,
partitionCount, replicationCount));
});
// Sort the list by topicName
topicPartitionCounts.sort(Comparator.comparing(TopicInfo::topicName));
return topicPartitionCounts;
}
Breaking Down the Code:
- ListTopicOptions is
used to pass the options for Listing the topics available in the
cluster.
- AdminClient.create(kafkaAdmin.getConfigurationProperties()) creates Apache
kafka Adminclient by using the configuration properties
provided in the spring boot application.
- kafkaAdmin.describeTopics() provides
the topic description for all the topic names passed to it.
Solution explanation:
Using the
application properties which we have used for starting spring kafka and spring
boot application, create a AdminClient which is used to find
all available topics in the cluster. Once all topic names are found using describeTopics fetch
all properties as a Map. Iterate the Map and create custom POJO (TopicInfo) which
can be used for exposing the values for an endpoint
public record TopicInfo(String topicName, int partitionCount, int replicationCount) {}
Conclusion:
Combining Apachekafka's AdminClient and
SpringKafka's KafkaAdmin we have found the replication and
partition count easily. We can Achieve the same using AdminClient as well but why should we reinvent the wheel when it is already available, Hence used combination of both.
Thank you for sharing very helpful
ReplyDelete