From f3f6e74b1ee73a450feaed1432fbfee9f8ec6bad Mon Sep 17 00:00:00 2001 From: Roman Nedzvetskiy Date: Thu, 28 May 2020 14:18:22 +0300 Subject: [PATCH] Backend for updating topics, fixed backend from another pr branches (#52) * Backend for updating topics, fixed backend from another pr branches * Changed caching of extendedAdminClient instances Co-authored-by: Roman Nedzvetskiy --- .../ui/cluster/model/ExtendedAdminClient.java | 27 ++++++ .../ui/cluster/service/ClusterService.java | 34 ++++--- .../kafka/ui/cluster/util/ClusterUtil.java | 71 ++++++++++++-- .../kafka/ui/kafka/KafkaService.java | 92 ++++++++++++------ .../kafka/ui/rest/MetricsRestController.java | 8 +- .../src/main/resources/application-local.yml | 5 +- .../src/main/resources/application-sdp.yml | 5 +- .../main/resources/swagger/kafka-ui-api.yaml | 96 ++++++++++++++++++- 8 files changed, 279 insertions(+), 59 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ExtendedAdminClient.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ExtendedAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ExtendedAdminClient.java new file mode 100644 index 0000000000..69eeb032e1 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ExtendedAdminClient.java @@ -0,0 +1,27 @@ +package com.provectus.kafka.ui.cluster.model; + +import com.provectus.kafka.ui.cluster.util.ClusterUtil; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.admin.AdminClient; +import reactor.core.publisher.Mono; + +import java.util.Set; + +@Data +@RequiredArgsConstructor +public class ExtendedAdminClient { + + private final AdminClient adminClient; + private final Set supportedFeatures; + + public enum SupportedFeature { + INCREMENTAL_ALTER_CONFIGS, + ALTER_CONFIGS + } + + public static Mono extendedAdminClient(AdminClient adminClient) { + return ClusterUtil.getSupportedFeatures(adminClient) + .map(s -> new ExtendedAdminClient(adminClient, s)); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 2b6c5bd32c..13067af336 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -15,7 +15,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -81,7 +80,7 @@ public class ClusterService { var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new); return kafkaService.getOrCreateAdminClient(cluster).map(ac -> - ac.describeConsumerGroups(Collections.singletonList(consumerGroupId)).all() + ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all() ).flatMap(groups -> groupMetadata(cluster, consumerGroupId) .flatMap(offsets -> { @@ -98,7 +97,7 @@ public class ClusterService { public Mono> groupMetadata(KafkaCluster cluster, String consumerGroupId) { return kafkaService.getOrCreateAdminClient(cluster) - .map(ac -> ac.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata()) + .map(ac -> ac.getAdminClient().listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata()) .flatMap(ClusterUtil::toMono); } @@ -119,20 +118,11 @@ public class ClusterService { return clustersStorage.getClusterByName(clusterName) .map(kafkaService::getConsumerGroups) .orElse(Mono.empty()); - -// var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new); -// return kafkaService.getOrCreateAdminClient(cluster).map(ac -> ac.listConsumerGroups().all()) -// .flatMap(s -> -// kafkaService.getOrCreateAdminClient(cluster).flatMap(ac -> -// ClusterUtil.toMono(s).map(s1 -> s1.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).map(ac::describeConsumerGroups) -// )) -// .flatMap(s -> ClusterUtil.toMono(s.all()).map(details -> details.values().stream() -// .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))); } public Flux getBrokers (String clusterName) { return kafkaService.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow()) - .flatMap(client -> ClusterUtil.toMono(client.describeCluster().nodes()) + .flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes()) .map(n -> n.stream().map(node -> { Broker broker = new Broker(); broker.setId(node.idString()); @@ -141,6 +131,24 @@ public class ClusterService { .flatMapMany(Flux::fromIterable); } + @SneakyThrows + public Mono updateTopic(String clusterName, String topicName, Mono topicFormData) { + return clustersStorage.getClusterByName(clusterName).map(cl -> + topicFormData + .flatMap(t -> kafkaService.updateTopic(cl, topicName, t)) + .flatMap(t -> updateCluster(t, clusterName, cl)) + ) + .orElse(Mono.empty()); + } + + private Mono updateCluster(T topic, String clusterName, KafkaCluster cluster) { + return kafkaService.getUpdatedCluster(cluster) + .map(c -> { + clustersStorage.setKafkaCluster(clusterName, c); + return topic; + }); + } + public Flux getMessages(String clusterName, String topicName, Integer partition, Long offset, OffsetDateTime timestamp) { return clustersStorage.getClusterByName(clusterName) .map(c -> consumingService.loadMessages(c, topicName)) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java index fa72f46d3e..d73f245a8c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java @@ -1,19 +1,17 @@ package com.provectus.kafka.ui.cluster.util; import com.provectus.kafka.ui.cluster.model.*; -import com.provectus.kafka.ui.model.ConsumerGroup; -import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail; -import com.provectus.kafka.ui.model.ServerStatus; +import com.provectus.kafka.ui.model.*; +import lombok.extern.slf4j.Slf4j; import com.provectus.kafka.ui.model.TopicMessage; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; -import org.apache.kafka.clients.admin.MemberDescription; -import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Bytes; @@ -23,16 +21,18 @@ import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneId; import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; import static com.provectus.kafka.ui.kafka.KafkaConstants.TOPIC_DEFAULT_CONFIGS; import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; +@Slf4j public class ClusterUtil { + private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version"; + private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); public static Mono toMono(KafkaFuture future){ @@ -45,6 +45,16 @@ public class ClusterUtil { })); } + public static Mono toMono(KafkaFuture future, String topicName){ + return Mono.create(sink -> future.whenComplete((res, ex)->{ + if (ex!=null) { + sink.error(ex); + } else { + sink.success(topicName); + } + })); + } + public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) { ConsumerGroup consumerGroup = new ConsumerGroup(); consumerGroup.setConsumerGroupId(c.groupId()); @@ -173,6 +183,49 @@ public class ClusterUtil { throw new IllegalArgumentException("Unknown timestampType: " + timestampType); } } + public static Mono> getSupportedFeatures(AdminClient adminClient) { + return ClusterUtil.toMono(adminClient.describeCluster().controller()) + .map(Node::id) + .map(id -> Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString()))) + .map(brokerCR -> adminClient.describeConfigs(brokerCR).all()) + .flatMap(ClusterUtil::toMono) + .map(ClusterUtil::getSupportedUpdateFeature) + .map(Collections::singleton); + } + + private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(Map configs) { + String version = configs.values().stream() + .map(Config::entries) + .flatMap(Collection::stream) + .filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY)) + .findFirst().orElseThrow().value(); + try { + return Float.parseFloat(version.split("-")[0]) <= 2.3f + ? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS; + } catch (Exception e) { + log.error("Conversion clusterVersion {} to float value failed", version); + throw e; + } + } + + public static Topic convertToTopic (InternalTopic internalTopic) { + Topic topic = new Topic(); + topic.setName(internalTopic.getName()); + List partitions = internalTopic.getPartitions().stream().flatMap(s -> { + Partition partition = new Partition(); + partition.setPartition(s.getPartition()); + partition.setLeader(s.getLeader()); + partition.setReplicas(s.getReplicas().stream().flatMap(r -> { + Replica replica = new Replica(); + replica.setBroker(r.getBroker()); + return Stream.of(replica); + }).collect(Collectors.toList())); + return Stream.of(partition); + }).collect(Collectors.toList()); + topic.setPartitions(partitions); + return topic; + } + public static Map toSingleMap (Stream> streamOfMaps) { return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index 0c49ba4f31..c3f97b83f1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -4,6 +4,7 @@ import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ServerStatus; +import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.TopicFormData; import com.provectus.kafka.ui.zookeeper.ZookeeperService; import lombok.RequiredArgsConstructor; @@ -16,6 +17,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; +import org.springframework.beans.factory.annotation.Value; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; @@ -33,20 +35,23 @@ import java.util.stream.Stream; @Log4j2 public class KafkaService { + @Value("${kafka.admin-client-timeout}") + private int clientTimeout; + private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true); private final ZookeeperService zookeeperService; - private final Map adminClientCache = new ConcurrentHashMap<>(); + private final Map adminClientCache = new ConcurrentHashMap<>(); private final Map> leadersCache = new ConcurrentHashMap<>(); @SneakyThrows public Mono getUpdatedCluster(KafkaCluster cluster) { return getOrCreateAdminClient(cluster).flatMap( - ac -> getClusterMetrics(ac) + ac -> getClusterMetrics(ac.getAdminClient()) .flatMap( clusterMetrics -> - getTopicsData(ac).flatMap( topics -> - loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList())) + getTopicsData(ac.getAdminClient()).flatMap( topics -> + loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList())) .map( configs -> mergeWithConfigs(topics, configs)) .flatMap(it -> updateSegmentMetrics(ac, clusterMetrics, it)) ).map( segmentSizeDto -> buildFromData(cluster, segmentSizeDto)) @@ -170,8 +175,7 @@ public class KafkaService { public Mono createTopic(KafkaCluster cluster, Mono topicFormData) { - AdminClient adminClient = this.createAdminClient(cluster); - return this.createTopic(adminClient, topicFormData); + return getOrCreateAdminClient(cluster).flatMap(ac -> createTopic(ac.getAdminClient(), topicFormData)); } @SneakyThrows @@ -200,24 +204,18 @@ public class KafkaService { } - public Mono getOrCreateAdminClient(KafkaCluster cluster) { - AdminClient adminClient = adminClientCache.computeIfAbsent( - cluster.getName(), - (id) -> createAdminClient(cluster) - ); - - return isAdminClientConnected(adminClient); + public Mono getOrCreateAdminClient(KafkaCluster cluster) { + return Mono.justOrEmpty(adminClientCache.get(cluster.getName())) + .switchIfEmpty(createAdminClient(cluster)) + .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e)); } - public AdminClient createAdminClient(KafkaCluster kafkaCluster) { + public Mono createAdminClient(KafkaCluster kafkaCluster) { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()); - properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); - return AdminClient.create(properties); - } - - private Mono isAdminClientConnected(AdminClient adminClient) { - return getClusterId(adminClient).map( r -> adminClient); + properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); + AdminClient adminClient = AdminClient.create(properties); + return ExtendedAdminClient.extendedAdminClient(adminClient); } @@ -251,12 +249,11 @@ public class KafkaService { } public Mono> getConsumerGroups(KafkaCluster cluster) { - var adminClient = this.createAdminClient(cluster); - return ClusterUtil.toMono(adminClient.listConsumerGroups().all()) - .flatMap(s -> ClusterUtil.toMono(adminClient + return getOrCreateAdminClient(cluster).flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) + .flatMap(s -> ClusterUtil.toMono(ac.getAdminClient() .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())) .map(s -> s.values().stream() - .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList())); + .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))); } public KafkaConsumer createConsumer(KafkaCluster cluster) { @@ -271,12 +268,47 @@ public class KafkaService { @SneakyThrows - private Mono createTopic(AdminClient adminClient, NewTopic newTopic) { - return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)) - .values() - .values() - .iterator() - .next()); + private Mono createTopic(AdminClient adminClient, NewTopic newTopic) { + return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(), newTopic.name()); + } + + @SneakyThrows + public Mono updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) { + ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName); + return getOrCreateAdminClient(cluster) + .flatMap(ac -> { + if (ac.getSupportedFeatures().contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { + return incrementalAlterConfig(topicFormData, topicCR, ac) + .flatMap(c -> getUpdatedTopic(ac, topicName)); + } else { + return alterConfig(topicFormData, topicCR, ac) + .flatMap(c -> getUpdatedTopic(ac, topicName)); + } + }); + } + + + + private Mono getUpdatedTopic (ExtendedAdminClient ac, String topicName) { + return getTopicsData(ac.getAdminClient()) + .map(s -> s.stream() + .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow()) + .map(ClusterUtil::convertToTopic); + } + + private Mono incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) { + List listOp = topicFormData.getConfigs().entrySet().stream() + .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList()); + return ClusterUtil.toMono(ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCR, listOp)).all(), topicCR.name()); + } + + private Mono alterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) { + List configEntries = topicFormData.getConfigs().entrySet().stream() + .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))).collect(Collectors.toList()); + Config config = new Config(configEntries); + Map map = Collections.singletonMap(topicCR, config); + return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCR.name()); + } private Mono updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, Map internalTopic) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index fce720040b..439e23b4ce 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -11,9 +11,8 @@ import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.OffsetDateTime; - import javax.validation.Valid; +import java.time.OffsetDateTime; @RestController @RequiredArgsConstructor @@ -90,4 +89,9 @@ public class MetricsRestController implements ApiClustersApi { public Mono> getConsumerGroup(String clusterName, String consumerGroupId, ServerWebExchange exchange) { return clusterService.getConsumerGroupDetail(clusterName, consumerGroupId).map(ResponseEntity::ok); } + + @Override + public Mono> updateTopic(String clusterId, String topicName, @Valid Mono topicFormData, ServerWebExchange exchange) { + return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok); + } } diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index e61a941d56..ebf5d86b25 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -11,4 +11,7 @@ kafka: - name: localReplica bootstrapServers: localhost:29093 - zookeeper: localhost:2183 \ No newline at end of file + zookeeper: localhost:2183 + admin-client-timeout: 5000 +zookeeper: + connection-timeout: 1000 diff --git a/kafka-ui-api/src/main/resources/application-sdp.yml b/kafka-ui-api/src/main/resources/application-sdp.yml index 2edd2ff374..2343b49208 100644 --- a/kafka-ui-api/src/main/resources/application-sdp.yml +++ b/kafka-ui-api/src/main/resources/application-sdp.yml @@ -7,4 +7,7 @@ kafka: - name: secondLocal zookeeper: zookeeper1:2181 - bootstrapServers: kafka1:29092 \ No newline at end of file + bootstrapServers: kafka1:29092 + admin-client-timeout: 5000 +zookeeper: + connection-timeout: 1000 diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index bd467af68e..7bb211376c 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -141,6 +141,34 @@ paths: application/json: schema: $ref: '#/components/schemas/TopicDetails' + patch: + tags: + - /api/clusters + summary: updateTopic + operationId: updateTopic + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/TopicFormData' + responses: + 200: + description: Updated + content: + application/json: + schema: + $ref: '#/components/schemas/Topic' /api/clusters/{clusterName}/topics/{topicName}/config: get: @@ -210,12 +238,37 @@ paths: items: $ref: '#/components/schemas/TopicMessage' + /api/clusters/{clusterName}/consumer-groups/{id}: + get: + tags: + - /api/clusters + summary: get Consumer Group By Id + operationId: getConsumerGroup + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: id + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ConsumerGroupDetails' + /api/clusters/{clusterName}/consumerGroups: get: tags: - /api/clusters - summary: getConsumerGroup - operationId: getConsumerGroup + summary: get all ConsumerGroups + operationId: getConsumerGroups parameters: - name: clusterName in: path @@ -408,4 +461,41 @@ components: required: - partition - offset - - timestamp \ No newline at end of file + - timestamp + + TopicPartitionDto: + type: object + properties: + topic: + type: string + partition: + type: integer + required: + - topic + - partition + + ConsumerTopicPartitionDetail: + type: object + properties: + consumerId: + type: string + topic: + type: string + partition: + type: integer + currentOffset: + type: long + endOffset: + type: long + messagesBehind: + type: long + + ConsumerGroupDetails: + type: object + properties: + consumerGroupId: + type: string + consumers: + type: array + items: + $ref: '#/components/schemas/ConsumerTopicPartitionDetail' \ No newline at end of file