diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index b0bac49169..b1437a5e11 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -70,42 +70,45 @@ public class ConsumerGroupsController implements ConsumerGroupsApi { Mono consumerGroupOffsetsReset, ServerWebExchange exchange) { - return consumerGroupOffsetsReset.map(reset -> { + return consumerGroupOffsetsReset.flatMap(reset -> { var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(ClusterNotFoundException::new); switch (reset.getResetType()) { case EARLIEST: - offsetsResetService + return offsetsResetService .resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions()); - break; case LATEST: - offsetsResetService + return offsetsResetService .resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions()); - break; case TIMESTAMP: if (reset.getResetToTimestamp() == null) { - throw new ValidationException( - "resetToTimestamp is required when TIMESTAMP reset type used"); + return Mono.error( + new ValidationException( + "resetToTimestamp is required when TIMESTAMP reset type used" + ) + ); } - offsetsResetService + return offsetsResetService .resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(), reset.getResetToTimestamp()); - break; case OFFSET: if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) { - throw new ValidationException( - "partitionsOffsets is required when OFFSET reset type used"); + return Mono.error( + new ValidationException( + "partitionsOffsets is required when OFFSET reset type used" + ) + ); } Map offsets = reset.getPartitionsOffsets().stream() .collect(toMap(PartitionOffset::getPartition, PartitionOffset::getOffset)); - offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets); - break; + return offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets); default: - throw new ValidationException("Unknown resetType " + reset.getResetType()); + return Mono.error( + new ValidationException("Unknown resetType " + reset.getResetType()) + ); } - return ResponseEntity.ok().build(); - }); + }).map(o -> ResponseEntity.ok().build()); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java index 73d86cecb3..6420e6d7ec 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java @@ -1,18 +1,19 @@ package com.provectus.kafka.ui.service; import static com.google.common.base.Preconditions.checkArgument; +import static com.provectus.kafka.ui.util.ClusterUtil.toMono; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; import static org.apache.kafka.common.ConsumerGroupState.DEAD; import static org.apache.kafka.common.ConsumerGroupState.EMPTY; -import com.google.common.collect.Sets; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ValidationException; -import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.KafkaCluster; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import lombok.RequiredArgsConstructor; @@ -24,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; /** * Implementation follows https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling @@ -36,64 +38,88 @@ import org.springframework.stereotype.Component; public class OffsetsResetService { private final KafkaService kafkaService; + private final AdminClientService adminClientService; - public void resetToEarliest(KafkaCluster cluster, String group, String topic, - Collection partitions) { - checkGroupCondition(cluster, group); - try (var consumer = getConsumer(cluster, group)) { - var targetPartitions = getTargetPartitions(consumer, topic, partitions); - var offsets = consumer.beginningOffsets(targetPartitions); - commitOffsets(consumer, offsets); - } + public Mono> resetToEarliest( + KafkaCluster cluster, String group, String topic, Collection partitions) { + return checkGroupCondition(cluster, group) + .flatMap(g -> { + try (var consumer = getConsumer(cluster, group)) { + var targetPartitions = getTargetPartitions(consumer, topic, partitions); + var offsets = consumer.beginningOffsets(targetPartitions); + return commitOffsets(consumer, offsets); + } + }); } - public void resetToLatest(KafkaCluster cluster, String group, String topic, - Collection partitions) { - checkGroupCondition(cluster, group); - try (var consumer = getConsumer(cluster, group)) { - var targetPartitions = getTargetPartitions(consumer, topic, partitions); - var offsets = consumer.endOffsets(targetPartitions); - commitOffsets(consumer, offsets); - } + public Mono> resetToLatest( + KafkaCluster cluster, String group, String topic, Collection partitions) { + return checkGroupCondition(cluster, group).flatMap( + g -> { + try (var consumer = getConsumer(cluster, group)) { + var targetPartitions = getTargetPartitions(consumer, topic, partitions); + var offsets = consumer.endOffsets(targetPartitions); + return commitOffsets(consumer, offsets); + } + } + ); } - public void resetToTimestamp(KafkaCluster cluster, String group, String topic, - Collection partitions, long targetTimestamp) { - checkGroupCondition(cluster, group); - try (var consumer = getConsumer(cluster, group)) { - var targetPartitions = getTargetPartitions(consumer, topic, partitions); - var offsets = offsetsByTimestamp(consumer, targetPartitions, targetTimestamp); - commitOffsets(consumer, offsets); - } + public Mono> resetToTimestamp( + KafkaCluster cluster, String group, String topic, Collection partitions, + long targetTimestamp) { + return checkGroupCondition(cluster, group).flatMap( + g -> { + try (var consumer = getConsumer(cluster, group)) { + var targetPartitions = getTargetPartitions(consumer, topic, partitions); + var offsets = offsetsByTimestamp(consumer, targetPartitions, targetTimestamp); + return commitOffsets(consumer, offsets); + } + } + ); } - public void resetToOffsets(KafkaCluster cluster, String group, String topic, - Map targetOffsets) { - checkGroupCondition(cluster, group); - try (var consumer = getConsumer(cluster, group)) { - var offsets = targetOffsets.entrySet().stream() - .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue)); - offsets = editOffsetsIfNeeded(consumer, offsets); - commitOffsets(consumer, offsets); - } + public Mono> resetToOffsets( + KafkaCluster cluster, String group, String topic, Map targetOffsets) { + return checkGroupCondition(cluster, group).flatMap( + g -> { + try (var consumer = getConsumer(cluster, group)) { + var offsets = targetOffsets.entrySet().stream() + .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue)); + offsets = editOffsetsIfNeeded(consumer, offsets); + return commitOffsets(consumer, offsets); + } + } + ); } - private void checkGroupCondition(KafkaCluster cluster, String groupId) { - InternalConsumerGroup description = - kafkaService.getConsumerGroupsInternal(cluster) - .blockOptional() - .stream() - .flatMap(Collection::stream) - .filter(cgd -> cgd.getGroupId().equals(groupId)) - .findAny() - .orElseThrow(() -> new NotFoundException("Consumer group not found")); - - if (!Set.of(DEAD, EMPTY).contains(description.getState())) { - throw new ValidationException( - String.format( - "Group's offsets can be reset only if group is inactive, but group is in %s state", - description.getState())); - } + private Mono checkGroupCondition(KafkaCluster cluster, String groupId) { + return adminClientService.getOrCreateAdminClient(cluster) + .flatMap(ac -> + // we need to call listConsumerGroups() to check group existence, because + // describeConsumerGroups() will return consumer group even if it doesn't exist + toMono(ac.getAdminClient().listConsumerGroups().all()) + .filter(cgs -> cgs.stream().anyMatch(g -> g.groupId().equals(groupId))) + .flatMap(cgs -> toMono( + ac.getAdminClient().describeConsumerGroups(List.of(groupId)).all())) + .filter(cgs -> cgs.containsKey(groupId)) + .map(cgs -> cgs.get(groupId)) + .flatMap(cg -> { + if (!Set.of(DEAD, EMPTY).contains(cg.state())) { + return Mono.error( + new ValidationException( + String.format( + "Group's offsets can be reset only if group is inactive," + + " but group is in %s state", + cg.state() + ) + ) + ); + } + return Mono.just(cg); + }) + .switchIfEmpty(Mono.error(new NotFoundException("Consumer group not found"))) + ); } private Map offsetsByTimestamp(Consumer consumer, @@ -107,7 +133,10 @@ public class OffsetsResetService { .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); // for partitions where we didnt find offset by timestamp, we use end offsets - foundOffsets.putAll(consumer.endOffsets(Sets.difference(partitions, foundOffsets.keySet()))); + Set endOffsets = new HashSet<>(partitions); + endOffsets.removeAll(foundOffsets.keySet()); + foundOffsets.putAll(consumer.endOffsets(endOffsets)); + return foundOffsets; } @@ -155,11 +184,13 @@ public class OffsetsResetService { return result; } - private void commitOffsets(Consumer consumer, Map offsets) { - consumer.commitSync( - offsets.entrySet().stream() - .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))) - ); + private Mono> commitOffsets( + Consumer consumer, Map offsets + ) { + var toCommit = offsets.entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + consumer.commitSync(toCommit); + return Mono.just(toCommit); } private Consumer getConsumer(KafkaCluster cluster, String groupId) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java index 779bda4d3e..290cfec9d7 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java @@ -56,7 +56,7 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { FeatureService featureService = new FeatureServiceImpl(brokerService); adminClientService.setClientTimeout(5_000); kafkaService = new KafkaService(null, null, null, null, adminClientService, featureService); - offsetsResetService = new OffsetsResetService(kafkaService); + offsetsResetService = new OffsetsResetService(kafkaService, adminClientService); createTopic(new NewTopic(topic, PARTITIONS, (short) 1)); createConsumerGroup(); @@ -78,17 +78,22 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { @Test void failsIfGroupDoesNotExists() { assertThatThrownBy( - () -> offsetsResetService.resetToEarliest(CLUSTER, "non-existing-group", topic, null)) - .isInstanceOf(NotFoundException.class); + () -> offsetsResetService + .resetToEarliest(CLUSTER, "non-existing-group", topic, null).block() + ).isInstanceOf(NotFoundException.class); assertThatThrownBy( - () -> offsetsResetService.resetToLatest(CLUSTER, "non-existing-group", topic, null)) - .isInstanceOf(NotFoundException.class); + () -> offsetsResetService + .resetToLatest(CLUSTER, "non-existing-group", topic, null).block() + ).isInstanceOf(NotFoundException.class); assertThatThrownBy(() -> offsetsResetService - .resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis())) - .isInstanceOf(NotFoundException.class); + .resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis()) + .block() + ).isInstanceOf(NotFoundException.class); assertThatThrownBy( - () -> offsetsResetService.resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of())) - .isInstanceOf(NotFoundException.class); + () -> offsetsResetService + .resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of()) + .block() + ).isInstanceOf(NotFoundException.class); } @Test @@ -98,16 +103,19 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { consumer.subscribe(Pattern.compile("no-such-topic-pattern")); consumer.poll(Duration.ofMillis(100)); - assertThatThrownBy(() -> offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null)) - .isInstanceOf(ValidationException.class); - assertThatThrownBy(() -> offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null)) - .isInstanceOf(ValidationException.class); - assertThatThrownBy(() -> offsetsResetService - .resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis())) - .isInstanceOf(ValidationException.class); + assertThatThrownBy(() -> + offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null).block() + ).isInstanceOf(ValidationException.class); assertThatThrownBy( - () -> offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of())) - .isInstanceOf(ValidationException.class); + () -> offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null).block() + ).isInstanceOf(ValidationException.class); + assertThatThrownBy(() -> offsetsResetService + .resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis()) + .block() + ).isInstanceOf(ValidationException.class); + assertThatThrownBy( + () -> offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of()).block() + ).isInstanceOf(ValidationException.class); } } @@ -116,7 +124,7 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); var expectedOffsets = Map.of(0, 5L, 1, 5L, 2, 5L); - offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, expectedOffsets); + offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, expectedOffsets).block(); assertOffsets(expectedOffsets); } @@ -126,7 +134,7 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { var offsetsWithInValidBounds = Map.of(0, -2L, 1, 5L, 2, 500L); var expectedOffsets = Map.of(0, 0L, 1, 5L, 2, 10L); - offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, offsetsWithInValidBounds); + offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, offsetsWithInValidBounds).block(); assertOffsets(expectedOffsets); } @@ -135,11 +143,11 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); commit(Map.of(0, 5L, 1, 5L, 2, 5L)); - offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, List.of(0, 1)); + offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, List.of(0, 1)).block(); assertOffsets(Map.of(0, 0L, 1, 0L, 2, 5L)); commit(Map.of(0, 5L, 1, 5L, 2, 5L)); - offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null); + offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null).block(); assertOffsets(Map.of(0, 0L, 1, 0L, 2, 0L, 3, 0L, 4, 0L)); } @@ -148,11 +156,11 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10, 3, 10, 4, 10)); commit(Map.of(0, 5L, 1, 5L, 2, 5L)); - offsetsResetService.resetToLatest(CLUSTER, groupId, topic, List.of(0, 1)); + offsetsResetService.resetToLatest(CLUSTER, groupId, topic, List.of(0, 1)).block(); assertOffsets(Map.of(0, 10L, 1, 10L, 2, 5L)); commit(Map.of(0, 5L, 1, 5L, 2, 5L)); - offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null); + offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null).block(); assertOffsets(Map.of(0, 10L, 1, 10L, 2, 10L, 3, 10L, 4, 10L)); } @@ -169,7 +177,9 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { new ProducerRecord(topic, 2, 1100L, null, null), new ProducerRecord(topic, 2, 1200L, null, null))); - offsetsResetService.resetToTimestamp(CLUSTER, groupId, topic, List.of(0, 1, 2, 3), 1600L); + offsetsResetService.resetToTimestamp( + CLUSTER, groupId, topic, List.of(0, 1, 2, 3), 1600L + ).block(); assertOffsets(Map.of(0, 2L, 1, 1L, 2, 3L, 3, 0L)); } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 7f446bdba2..cb882ef078 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2256,7 +2256,6 @@ components: format: int64 required: - partition - - offset ConsumerGroupOffsetsResetType: type: string