Browse Source

BE: Implement topic active producers API (#4121)

Co-authored-by: iliax <ikuramshin@provectus.com>
Ilya Kuramshin 1 year ago
parent
commit
4ec7975b2e

+ 29 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -22,6 +22,7 @@ import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicCreationDTO;
 import com.provectus.kafka.ui.model.TopicCreationDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
+import com.provectus.kafka.ui.model.TopicProducerStateDTO;
 import com.provectus.kafka.ui.model.TopicUpdateDTO;
 import com.provectus.kafka.ui.model.TopicUpdateDTO;
 import com.provectus.kafka.ui.model.TopicsResponseDTO;
 import com.provectus.kafka.ui.model.TopicsResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
@@ -327,6 +328,34 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .doOnEach(sig -> audit(context, sig));
         .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
+  @Override
+  public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates(String clusterName,
+                                                                                   String topicName,
+                                                                                   ServerWebExchange exchange) {
+    var context = AccessContext.builder()
+        .cluster(clusterName)
+        .topic(topicName)
+        .topicActions(VIEW)
+        .operationName("getActiveProducerStates")
+        .build();
+
+    Comparator<TopicProducerStateDTO> ordering =
+        Comparator.comparingInt(TopicProducerStateDTO::getPartition)
+            .thenComparing(Comparator.comparing(TopicProducerStateDTO::getProducerId).reversed());
+
+    Flux<TopicProducerStateDTO> states = topicsService.getActiveProducersState(getCluster(clusterName), topicName)
+        .flatMapMany(statesMap ->
+            Flux.fromStream(
+                statesMap.entrySet().stream()
+                    .flatMap(e -> e.getValue().stream().map(p -> clusterMapper.map(e.getKey().partition(), p)))
+                    .sorted(ordering)));
+
+    return validateAccess(context)
+        .thenReturn(states)
+        .map(ResponseEntity::ok)
+        .doOnEach(sig -> audit(context, sig));
+  }
+
   private Comparator<InternalTopic> getComparatorForTopic(
   private Comparator<InternalTopic> getComparatorForTopic(
       TopicColumnsToSortDTO orderBy) {
       TopicColumnsToSortDTO orderBy) {
     var defaultComparator = Comparator.comparing(InternalTopic::getName);
     var defaultComparator = Comparator.comparing(InternalTopic::getName);

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -30,10 +30,12 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
 import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
+import com.provectus.kafka.ui.model.TopicProducerStateDTO;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ProducerState;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclOperation;
@@ -117,6 +119,17 @@ public interface ClusterMapper {
     return brokerDiskUsage;
     return brokerDiskUsage;
   }
   }
 
 
+  default TopicProducerStateDTO map(int partition, ProducerState state) {
+    return new TopicProducerStateDTO()
+        .partition(partition)
+        .producerId(state.producerId())
+        .producerEpoch(state.producerEpoch())
+        .lastSequence(state.lastSequence())
+        .lastTimestampMs(state.lastTimestamp())
+        .coordinatorEpoch(state.coordinatorEpoch().stream().boxed().findAny().orElse(null))
+        .currentTransactionStartOffset(state.currentTransactionStartOffset().stream().boxed().findAny().orElse(null));
+  }
+
   static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
   static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
     return switch (operation) {
     return switch (operation) {
       case ALL -> KafkaAclDTO.OperationEnum.ALL;
       case ALL -> KafkaAclDTO.OperationEnum.ALL;

+ 17 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -31,6 +31,7 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.AccessLevel;
@@ -55,6 +56,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.ProducerState;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -658,6 +660,21 @@ public class ReactiveAdminClient implements Closeable {
     return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
     return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
   }
   }
 
 
+  // returns tp -> list of active producer's states (if any)
+  public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
+    return describeTopic(topic)
+        .map(td -> client.describeProducers(
+                IntStream.range(0, td.partitions().size())
+                    .mapToObj(i -> new TopicPartition(topic, i))
+                    .toList()
+            ).all()
+        )
+        .flatMap(ReactiveAdminClient::toMono)
+        .map(map -> map.entrySet().stream()
+            .filter(e -> !e.getValue().activeProducers().isEmpty()) // skipping partitions without producers
+            .collect(toMap(Map.Entry::getKey, e -> e.getValue().activeProducers())));
+  }
+
   private Mono<Void> incrementalAlterConfig(String topicName,
   private Mono<Void> incrementalAlterConfig(String topicName,
                                             List<ConfigEntry> currentConfigs,
                                             List<ConfigEntry> currentConfigs,
                                             Map<String, String> newConfigs) {
                                             Map<String, String> newConfigs) {

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -39,6 +39,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.ProducerState;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
@@ -459,6 +460,11 @@ public class TopicsService {
         );
         );
   }
   }
 
 
+  public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> ac.getActiveProducersState(topic));
+  }
+
   private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
   private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
     return adminClientService.get(cluster)
     return adminClientService.get(cluster)
         .flatMap(ac -> ac.listTopics(true))
         .flatMap(ac -> ac.listTopics(true))

+ 52 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -763,6 +763,33 @@ paths:
         404:
         404:
           description: Not found
           description: Not found
 
 
+  /api/clusters/{clusterName}/topics/{topicName}/activeproducers:
+    get:
+      tags:
+        - Topics
+      summary: get producer states for topic
+      operationId: getActiveProducerStates
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/TopicProducerState'
+
   /api/clusters/{clusterName}/topics/{topicName}/consumer-groups:
   /api/clusters/{clusterName}/topics/{topicName}/consumer-groups:
     get:
     get:
       tags:
       tags:
@@ -2619,6 +2646,31 @@ components:
         - PROTOBUF
         - PROTOBUF
         - UNKNOWN
         - UNKNOWN
 
 
+    TopicProducerState:
+      type: object
+      properties:
+        partition:
+          type: integer
+          format: int32
+        producerId:
+          type: integer
+          format: int64
+        producerEpoch:
+          type: integer
+          format: int32
+        lastSequence:
+          type: integer
+          format: int32
+        lastTimestampMs:
+          type: integer
+          format: int64
+        coordinatorEpoch:
+          type: integer
+          format: int32
+        currentTransactionStartOffset:
+          type: integer
+          format: int64
+
     ConsumerGroup:
     ConsumerGroup:
       discriminator:
       discriminator:
         propertyName: inherit
         propertyName: inherit