Topic active producer's states retrieval API
This commit is contained in:
parent
2a61b97fab
commit
d6b31bf77f
5 changed files with 117 additions and 0 deletions
|
@ -22,6 +22,7 @@ import com.provectus.kafka.ui.model.TopicConfigDTO;
|
|||
import com.provectus.kafka.ui.model.TopicCreationDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDetailsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
|
||||
import com.provectus.kafka.ui.model.TopicUpdateDTO;
|
||||
import com.provectus.kafka.ui.model.TopicsResponseDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
|
@ -327,6 +328,34 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates(String clusterName,
|
||||
String topicName,
|
||||
ServerWebExchange exchange) {
|
||||
var context = AccessContext.builder()
|
||||
.cluster(clusterName)
|
||||
.topic(topicName)
|
||||
.topicActions(VIEW)
|
||||
.operationName("getActiveProducerStates")
|
||||
.build();
|
||||
|
||||
Comparator<TopicProducerStateDTO> ordering =
|
||||
Comparator.comparingInt(TopicProducerStateDTO::getPartition)
|
||||
.thenComparing(TopicProducerStateDTO::getProducerId);
|
||||
|
||||
Flux<TopicProducerStateDTO> states = topicsService.getActiveProducersState(getCluster(clusterName), topicName)
|
||||
.flatMapMany(statesMap ->
|
||||
Flux.fromStream(
|
||||
statesMap.entrySet().stream()
|
||||
.flatMap(e -> e.getValue().stream().map(p -> clusterMapper.map(e.getKey().partition(), p)))
|
||||
.sorted(ordering)));
|
||||
|
||||
return validateAccess(context)
|
||||
.thenReturn(states)
|
||||
.map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
private Comparator<InternalTopic> getComparatorForTopic(
|
||||
TopicColumnsToSortDTO orderBy) {
|
||||
var defaultComparator = Comparator.comparing(InternalTopic::getName);
|
||||
|
|
|
@ -30,11 +30,13 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
|
|||
import com.provectus.kafka.ui.model.TopicConfigDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDetailsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
|
||||
import com.provectus.kafka.ui.service.metrics.RawMetric;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.common.acl.AccessControlEntry;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.apache.kafka.common.acl.AclOperation;
|
||||
|
@ -118,6 +120,17 @@ public interface ClusterMapper {
|
|||
return brokerDiskUsage;
|
||||
}
|
||||
|
||||
default TopicProducerStateDTO map(int partition, ProducerState state) {
|
||||
return new TopicProducerStateDTO()
|
||||
.partition(partition)
|
||||
.producerId(state.producerId())
|
||||
.producerEpoch(state.producerEpoch())
|
||||
.lastSequence(state.lastSequence())
|
||||
.lastTimestampMs(state.lastTimestamp())
|
||||
.coordinatorEpoch(state.coordinatorEpoch().stream().boxed().findAny().orElse(null))
|
||||
.currentTransactionStartOffset(state.currentTransactionStartOffset().stream().boxed().findAny().orElse(null));
|
||||
}
|
||||
|
||||
static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
|
||||
return switch (operation) {
|
||||
case ALL -> KafkaAclDTO.OperationEnum.ALL;
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.function.BiFunction;
|
|||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.AccessLevel;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
|||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.clients.admin.RecordsToDelete;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
|
@ -658,6 +660,21 @@ public class ReactiveAdminClient implements Closeable {
|
|||
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
|
||||
}
|
||||
|
||||
// returns tp -> list of active producer's states (if any)
|
||||
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
|
||||
return describeTopic(topic)
|
||||
.map(td -> client.describeProducers(
|
||||
IntStream.range(0, td.partitions().size())
|
||||
.mapToObj(i -> new TopicPartition(topic, i))
|
||||
.toList()
|
||||
).all()
|
||||
)
|
||||
.flatMap(ReactiveAdminClient::toMono)
|
||||
.map(map -> map.entrySet().stream()
|
||||
.filter(e -> !e.getValue().activeProducers().isEmpty()) // skipping partitions without producers
|
||||
.collect(toMap(Map.Entry::getKey, e -> e.getValue().activeProducers())));
|
||||
}
|
||||
|
||||
private Mono<Void> incrementalAlterConfig(String topicName,
|
||||
List<ConfigEntry> currentConfigs,
|
||||
Map<String, String> newConfigs) {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
|
|||
import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
||||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
@ -459,6 +460,11 @@ public class TopicsService {
|
|||
);
|
||||
}
|
||||
|
||||
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.getActiveProducersState(topic));
|
||||
}
|
||||
|
||||
private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.listTopics(true))
|
||||
|
|
|
@ -763,6 +763,33 @@ paths:
|
|||
404:
|
||||
description: Not found
|
||||
|
||||
/api/clusters/{clusterName}/topics/{topicName}/activeproducers:
|
||||
get:
|
||||
tags:
|
||||
- Topics
|
||||
summary: get producer states for topic
|
||||
operationId: getActiveProducerStates
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: topicName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/TopicProducerState'
|
||||
|
||||
/api/clusters/{clusterName}/topics/{topicName}/consumer-groups:
|
||||
get:
|
||||
tags:
|
||||
|
@ -2619,6 +2646,31 @@ components:
|
|||
- PROTOBUF
|
||||
- UNKNOWN
|
||||
|
||||
TopicProducerState:
|
||||
type: object
|
||||
properties:
|
||||
partition:
|
||||
type: integer
|
||||
format: int32
|
||||
producerId:
|
||||
type: integer
|
||||
format: int64
|
||||
producerEpoch:
|
||||
type: integer
|
||||
format: int32
|
||||
lastSequence:
|
||||
type: integer
|
||||
format: int32
|
||||
lastTimestampMs:
|
||||
type: integer
|
||||
format: int64
|
||||
coordinatorEpoch:
|
||||
type: integer
|
||||
format: int32
|
||||
currentTransactionStartOffset:
|
||||
type: integer
|
||||
format: int64
|
||||
|
||||
ConsumerGroup:
|
||||
discriminator:
|
||||
propertyName: inherit
|
||||
|
|
Loading…
Add table
Reference in a new issue