From 66228b00d5ea715f1b94bb8b53fc73bf525ebc74 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Mon, 5 Jul 2021 14:32:01 +0300 Subject: [PATCH] #234: Consumer group offsets reset implementation (#605) * junit4 annotations replaced with jupiter * Consumer group offsets reset implementation * comments added, Sonar warn fix * minor cleanup * checkstyle fix * PR comments fix * API doc fix * API path renamed Co-authored-by: Ilya Kuramshin --- .../controller/ConsumerGroupsController.java | 56 +++++ .../kafka/ui/service/KafkaService.java | 4 +- .../kafka/ui/service/OffsetsResetService.java | 168 ++++++++++++++ .../ui/service/OffsetsResetServiceTest.java | 219 ++++++++++++++++++ .../AvroJsonSchemaConverterTest.java | 2 +- .../ProtobufSchemaConverterTest.java | 2 +- .../main/resources/swagger/kafka-ui-api.yaml | 73 +++++- pom.xml | 2 +- 8 files changed, 521 insertions(+), 5 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index 1a0d2f0748..add1508348 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -1,13 +1,23 @@ package com.provectus.kafka.ui.controller; +import static java.util.stream.Collectors.toMap; + import com.provectus.kafka.ui.api.ConsumerGroupsApi; +import com.provectus.kafka.ui.exception.ClusterNotFoundException; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroupDetails; +import com.provectus.kafka.ui.model.ConsumerGroupOffsetsReset; +import com.provectus.kafka.ui.model.PartitionOffset; import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.service.ClusterService; +import com.provectus.kafka.ui.service.ClustersStorage; +import com.provectus.kafka.ui.service.OffsetsResetService; +import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.http.ResponseEntity; +import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; @@ -18,6 +28,8 @@ import reactor.core.publisher.Mono; @Log4j2 public class ConsumerGroupsController implements ConsumerGroupsApi { private final ClusterService clusterService; + private final OffsetsResetService offsetsResetService; + private final ClustersStorage clustersStorage; @Override public Mono> deleteConsumerGroup(String clusterName, String id, @@ -49,4 +61,48 @@ public class ConsumerGroupsController implements ConsumerGroupsApi { return clusterService.getTopicConsumerGroupDetail(clusterName, topicName) .map(ResponseEntity::ok); } + + @Override + public Mono> resetConsumerGroupOffsets(String clusterName, String group, + Mono + consumerGroupOffsetsReset, + ServerWebExchange exchange) { + return consumerGroupOffsetsReset.map(reset -> { + var cluster = + clustersStorage.getClusterByName(clusterName).orElseThrow(ClusterNotFoundException::new); + + switch (reset.getResetType()) { + case EARLIEST: + offsetsResetService + .resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions()); + break; + case LATEST: + offsetsResetService + .resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions()); + break; + case TIMESTAMP: + if (reset.getResetToTimestamp() == null) { + throw new ValidationException( + "resetToTimestamp is required when TIMESTAMP reset type used"); + } + offsetsResetService + .resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(), + reset.getResetToTimestamp()); + break; + case OFFSET: + if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) { + throw new ValidationException( + "partitionsOffsets is required when OFFSET reset type used"); + } + Map offsets = reset.getPartitionsOffsets().stream() + .collect(toMap(PartitionOffset::getPartition, PartitionOffset::getOffset)); + offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets); + break; + default: + throw new ValidationException("Unknown resetType " + reset.getResetType()); + } + return ResponseEntity.ok().build(); + }); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index f83be936b8..e772af94ad 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.admin.AdminClient; @@ -85,6 +86,7 @@ public class KafkaService { private final JmxClusterUtil jmxClusterUtil; private final ClustersStorage clustersStorage; private final DeserializationService deserializationService; + @Setter // used in tests @Value("${kafka.admin-client-timeout}") private int clientTimeout; @@ -393,7 +395,7 @@ public class KafkaService { Map properties) { Properties props = new Properties(); props.putAll(cluster.getProperties()); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID().toString()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java new file mode 100644 index 0000000000..d04d649989 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java @@ -0,0 +1,168 @@ +package com.provectus.kafka.ui.service; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static org.apache.kafka.common.ConsumerGroupState.DEAD; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.exception.NotFoundException; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.KafkaCluster; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.TopicPartition; +import org.springframework.stereotype.Component; + +/** + * Implementation follows https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling + * to works like "kafka-consumer-groups --reset-offsets" console command + * (see kafka.admin.ConsumerGroupCommand) + */ +@Log4j2 +@Component +@RequiredArgsConstructor +public class OffsetsResetService { + + private final KafkaService kafkaService; + + public void resetToEarliest(KafkaCluster cluster, String group, String topic, + Collection partitions) { + checkGroupCondition(cluster, group); + try (var consumer = getConsumer(cluster, group)) { + var targetPartitions = getTargetPartitions(consumer, topic, partitions); + var offsets = consumer.beginningOffsets(targetPartitions); + commitOffsets(consumer, offsets); + } + } + + public void resetToLatest(KafkaCluster cluster, String group, String topic, + Collection partitions) { + checkGroupCondition(cluster, group); + try (var consumer = getConsumer(cluster, group)) { + var targetPartitions = getTargetPartitions(consumer, topic, partitions); + var offsets = consumer.endOffsets(targetPartitions); + commitOffsets(consumer, offsets); + } + } + + public void resetToTimestamp(KafkaCluster cluster, String group, String topic, + Collection partitions, long targetTimestamp) { + checkGroupCondition(cluster, group); + try (var consumer = getConsumer(cluster, group)) { + var targetPartitions = getTargetPartitions(consumer, topic, partitions); + var offsets = offsetsByTimestamp(consumer, targetPartitions, targetTimestamp); + commitOffsets(consumer, offsets); + } + } + + public void resetToOffsets(KafkaCluster cluster, String group, String topic, + Map targetOffsets) { + checkGroupCondition(cluster, group); + try (var consumer = getConsumer(cluster, group)) { + var offsets = targetOffsets.entrySet().stream() + .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue)); + offsets = editOffsetsIfNeeded(consumer, offsets); + commitOffsets(consumer, offsets); + } + } + + private void checkGroupCondition(KafkaCluster cluster, String groupId) { + ConsumerGroupDescription description = + kafkaService.getConsumerGroupsInternal(cluster) + .blockOptional() + .stream() + .flatMap(Collection::stream) + .filter(cgd -> cgd.groupId().equals(groupId)) + .findAny() + .orElseThrow(() -> new NotFoundException("Consumer group not found")); + + if (!Set.of(DEAD, EMPTY).contains(description.state())) { + throw new ValidationException( + String.format( + "Group's offsets can be reset only if group is inactive, but group is in %s state", + description.state())); + } + } + + private Map offsetsByTimestamp(Consumer consumer, + Set partitions, + long timestamp) { + Map timestampedOffsets = consumer + .offsetsForTimes(partitions.stream().collect(toMap(p -> p, p -> timestamp))); + + var foundOffsets = timestampedOffsets.entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); + + // for partitions where we didnt find offset by timestamp, we use end offsets + foundOffsets.putAll(consumer.endOffsets(Sets.difference(partitions, foundOffsets.keySet()))); + return foundOffsets; + } + + private Set getTargetPartitions(Consumer consumer, String topic, + Collection partitions) { + var allPartitions = allTopicPartitions(consumer, topic); + if (partitions == null || partitions.isEmpty()) { + return allPartitions; + } else { + return partitions.stream() + .map(idx -> new TopicPartition(topic, idx)) + .peek(tp -> checkArgument(allPartitions.contains(tp), "Invalid partition %s", tp)) + .collect(toSet()); + } + } + + private Set allTopicPartitions(Consumer consumer, String topic) { + return consumer.partitionsFor(topic).stream() + .map(info -> new TopicPartition(topic, info.partition())) + .collect(toSet()); + } + + /** + * Checks if submitted offsets is between earliest and latest offsets. If case of range change + * fail we reset offset to either earliest or latest offsets (To follow logic from + * kafka.admin.ConsumerGroupCommand.scala) + */ + private Map editOffsetsIfNeeded(Consumer consumer, + Map offsetsToCheck) { + var earliestOffsets = consumer.beginningOffsets(offsetsToCheck.keySet()); + var latestOffsets = consumer.endOffsets(offsetsToCheck.keySet()); + var result = new HashMap(); + offsetsToCheck.forEach((tp, offset) -> { + if (earliestOffsets.get(tp) > offset) { + log.warn("Offset for partition {} is lower than earliest offset, resetting to earliest", + tp); + result.put(tp, earliestOffsets.get(tp)); + } else if (latestOffsets.get(tp) < offset) { + log.warn("Offset for partition {} is greater than latest offset, resetting to latest", tp); + result.put(tp, latestOffsets.get(tp)); + } else { + result.put(tp, offset); + } + }); + return result; + } + + private void commitOffsets(Consumer consumer, Map offsets) { + consumer.commitSync( + offsets.entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))) + ); + } + + private Consumer getConsumer(KafkaCluster cluster, String groupId) { + return kafkaService.createConsumer(cluster, Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId)); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java new file mode 100644 index 0000000000..ff075c7720 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java @@ -0,0 +1,219 @@ +package com.provectus.kafka.ui.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.provectus.kafka.ui.AbstractBaseTest; +import com.provectus.kafka.ui.exception.NotFoundException; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.KafkaCluster; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class OffsetsResetServiceTest extends AbstractBaseTest { + + private static final int PARTITIONS = 5; + + private static final KafkaCluster CLUSTER = + KafkaCluster.builder() + .name(LOCAL) + .bootstrapServers(kafka.getBootstrapServers()) + .properties(new Properties()) + .build(); + + private final String groupId = "OffsetsResetServiceTestGroup-" + UUID.randomUUID(); + private final String topic = "OffsetsResetServiceTestTopic-" + UUID.randomUUID(); + + private KafkaService kafkaService; + private OffsetsResetService offsetsResetService; + + @BeforeEach + void init() { + kafkaService = new KafkaService(null, null, null, null); + kafkaService.setClientTimeout(5_000); + offsetsResetService = new OffsetsResetService(kafkaService); + + createTopic(new NewTopic(topic, PARTITIONS, (short) 1)); + createConsumerGroup(); + } + + @AfterEach + void cleanUp() { + deleteTopic(topic); + } + + private void createConsumerGroup() { + try (var consumer = groupConsumer()) { + consumer.subscribe(Pattern.compile("no-such-topic-pattern")); + consumer.poll(Duration.ofMillis(200)); + consumer.commitSync(); + } + } + + @Test + void failsIfGroupDoesNotExists() { + assertThatThrownBy( + () -> offsetsResetService.resetToEarliest(CLUSTER, "non-existing-group", topic, null)) + .isInstanceOf(NotFoundException.class); + assertThatThrownBy( + () -> offsetsResetService.resetToLatest(CLUSTER, "non-existing-group", topic, null)) + .isInstanceOf(NotFoundException.class); + assertThatThrownBy(() -> offsetsResetService + .resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis())) + .isInstanceOf(NotFoundException.class); + assertThatThrownBy( + () -> offsetsResetService.resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of())) + .isInstanceOf(NotFoundException.class); + } + + @Test + void failsIfGroupIsActive() { + // starting consumer to activate group + try (var consumer = groupConsumer()) { + consumer.subscribe(Pattern.compile("no-such-topic-pattern")); + consumer.poll(Duration.ofMillis(100)); + + assertThatThrownBy(() -> offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null)) + .isInstanceOf(ValidationException.class); + assertThatThrownBy(() -> offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null)) + .isInstanceOf(ValidationException.class); + assertThatThrownBy(() -> offsetsResetService + .resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis())) + .isInstanceOf(ValidationException.class); + assertThatThrownBy( + () -> offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of())) + .isInstanceOf(ValidationException.class); + } + } + + @Test + void resetToOffsets() { + sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); + + var expectedOffsets = Map.of(0, 5L, 1, 5L, 2, 5L); + offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, expectedOffsets); + assertOffsets(expectedOffsets); + } + + @Test + void resetToOffsetsCommitsEarliestOrLatestOffsetsIfOffsetsBoundsNotValid() { + sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); + + var offsetsWithInValidBounds = Map.of(0, -2L, 1, 5L, 2, 500L); + var expectedOffsets = Map.of(0, 0L, 1, 5L, 2, 10L); + offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, offsetsWithInValidBounds); + assertOffsets(expectedOffsets); + } + + @Test + void resetToEarliest() { + sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); + + commit(Map.of(0, 5L, 1, 5L, 2, 5L)); + offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, List.of(0, 1)); + assertOffsets(Map.of(0, 0L, 1, 0L, 2, 5L)); + + commit(Map.of(0, 5L, 1, 5L, 2, 5L)); + offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null); + assertOffsets(Map.of(0, 0L, 1, 0L, 2, 0L, 3, 0L, 4, 0L)); + } + + @Test + void resetToLatest() { + sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10, 3, 10, 4, 10)); + + commit(Map.of(0, 5L, 1, 5L, 2, 5L)); + offsetsResetService.resetToLatest(CLUSTER, groupId, topic, List.of(0, 1)); + assertOffsets(Map.of(0, 10L, 1, 10L, 2, 5L)); + + commit(Map.of(0, 5L, 1, 5L, 2, 5L)); + offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null); + assertOffsets(Map.of(0, 10L, 1, 10L, 2, 10L, 3, 10L, 4, 10L)); + } + + @Test + void resetToTimestamp() { + send( + Stream.of( + new ProducerRecord(topic, 0, 1000L, null, null), + new ProducerRecord(topic, 0, 1500L, null, null), + new ProducerRecord(topic, 0, 2000L, null, null), + new ProducerRecord(topic, 1, 1000L, null, null), + new ProducerRecord(topic, 1, 2000L, null, null), + new ProducerRecord(topic, 2, 1000L, null, null), + new ProducerRecord(topic, 2, 1100L, null, null), + new ProducerRecord(topic, 2, 1200L, null, null))); + + offsetsResetService.resetToTimestamp(CLUSTER, groupId, topic, List.of(0, 1, 2, 3), 1600L); + assertOffsets(Map.of(0, 2L, 1, 1L, 2, 3L, 3, 0L)); + } + + + private void commit(Map offsetsToCommit) { + try (var consumer = groupConsumer()) { + consumer.commitSync( + offsetsToCommit.entrySet().stream() + .collect(Collectors.toMap( + e -> new TopicPartition(topic, e.getKey()), + e -> new OffsetAndMetadata(e.getValue()))) + ); + } + } + + private void sendMsgsToPartition(Map msgsCountForPartitions) { + Bytes bytes = new Bytes("noMatter".getBytes()); + send( + msgsCountForPartitions.entrySet().stream() + .flatMap(e -> + IntStream.range(0, e.getValue()) + .mapToObj(i -> new ProducerRecord<>(topic, e.getKey(), bytes, bytes)))); + } + + private void send(Stream> toSend) { + var properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + var serializer = new BytesSerializer(); + try (var producer = new KafkaProducer<>(properties, serializer, serializer)) { + toSend.forEach(producer::send); + producer.flush(); + } + } + + private void assertOffsets(Map expectedOffsets) { + try (var consumer = groupConsumer()) { + var tps = expectedOffsets.keySet().stream() + .map(idx -> new TopicPartition(topic, idx)) + .collect(Collectors.toSet()); + + var actualOffsets = consumer.committed(tps).entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().partition(), e -> e.getValue().offset())); + + assertThat(actualOffsets).isEqualTo(expectedOffsets); + } + } + + private Consumer groupConsumer() { + return kafkaService.createConsumer(CLUSTER, Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId)); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java index 576b57e98d..036b1c48ed 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java @@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URI; import java.net.URISyntaxException; import org.apache.avro.Schema; -import org.junit.Test; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; public class AvroJsonSchemaConverterTest { @Test diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java index 9a8e4d89d3..fa4235d42c 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java @@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.net.URI; import java.net.URISyntaxException; -import org.junit.Test; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; public class ProtobufSchemaConverterTest { diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 04ad5bda55..e48609fed2 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -314,7 +314,7 @@ paths: type: array items: type: string - description: The format is [partition]::[offset] for specifying offsets or [partition]::[timstamp in millis] for specifying timestamps + description: The format is [partition]::[offset] for specifying offsets or [partition]::[timestamp in millis] for specifying timestamps - name: limit in: query schema: @@ -508,6 +508,32 @@ paths: items: $ref: '#/components/schemas/ConsumerGroup' + /api/clusters/{clusterName}/consumer-groups/{id}/offsets: + post: + tags: + - Consumer Groups + summary: resets consumer group offsets + operationId: resetConsumerGroupOffsets + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: id + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ConsumerGroupOffsetsReset' + responses: + 200: + description: OK + /api/clusters/{clusterName}/schemas: post: tags: @@ -1841,6 +1867,51 @@ components: - SOURCE - SINK + ConsumerGroupOffsetsReset: + type: object + properties: + topic: + type: string + resetType: + $ref: '#/components/schemas/ConsumerGroupOffsetsResetType' + partitions: + type: array + items: + type: integer + description: list of target partitions, all partitions will be used if it is not set or empty + resetToTimestamp: + type: integer + format: int64 + description: should be set if resetType is TIMESTAMP + partitionsOffsets: + type: array + items: + $ref: '#/components/schemas/PartitionOffset' + description: List of partition offsets to reset to, should be set when resetType is OFFSET + required: + - topic + - resetType + + PartitionOffset: + type: object + properties: + partition: + type: integer + offset: + type: integer + format: int64 + required: + - partition + - offset + + ConsumerGroupOffsetsResetType: + type: string + enum: + - EARLIEST + - LATEST + - TIMESTAMP + - OFFSET + TaskStatus: type: object properties: diff --git a/pom.xml b/pom.xml index 9fd4b3cd59..c91d9e392c 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 1.18.10 latest 0.11 - 2.4.0 + 2.4.1 v14.17.1 1.4.10 1.8.0