#234: Consumer group offsets reset implementation (#605)

* junit4 annotations replaced with jupiter

* Consumer group offsets reset implementation

* comments added, Sonar warn fix

* minor cleanup

* checkstyle fix

* PR comments fix

* API doc fix

* API path renamed

Co-authored-by: Ilya Kuramshin <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2021-07-05 14:32:01 +03:00 committed by GitHub
parent 97e9db0d8c
commit 66228b00d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 521 additions and 5 deletions

View file

@ -1,13 +1,23 @@
package com.provectus.kafka.ui.controller;
import static java.util.stream.Collectors.toMap;
import com.provectus.kafka.ui.api.ConsumerGroupsApi;
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails;
import com.provectus.kafka.ui.model.ConsumerGroupOffsetsReset;
import com.provectus.kafka.ui.model.PartitionOffset;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.OffsetsResetService;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
@ -18,6 +28,8 @@ import reactor.core.publisher.Mono;
@Log4j2
public class ConsumerGroupsController implements ConsumerGroupsApi {
private final ClusterService clusterService;
private final OffsetsResetService offsetsResetService;
private final ClustersStorage clustersStorage;
@Override
public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName, String id,
@ -49,4 +61,48 @@ public class ConsumerGroupsController implements ConsumerGroupsApi {
return clusterService.getTopicConsumerGroupDetail(clusterName, topicName)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName, String group,
Mono<ConsumerGroupOffsetsReset>
consumerGroupOffsetsReset,
ServerWebExchange exchange) {
return consumerGroupOffsetsReset.map(reset -> {
var cluster =
clustersStorage.getClusterByName(clusterName).orElseThrow(ClusterNotFoundException::new);
switch (reset.getResetType()) {
case EARLIEST:
offsetsResetService
.resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions());
break;
case LATEST:
offsetsResetService
.resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions());
break;
case TIMESTAMP:
if (reset.getResetToTimestamp() == null) {
throw new ValidationException(
"resetToTimestamp is required when TIMESTAMP reset type used");
}
offsetsResetService
.resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(),
reset.getResetToTimestamp());
break;
case OFFSET:
if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) {
throw new ValidationException(
"partitionsOffsets is required when OFFSET reset type used");
}
Map<Integer, Long> offsets = reset.getPartitionsOffsets().stream()
.collect(toMap(PartitionOffset::getPartition, PartitionOffset::getOffset));
offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets);
break;
default:
throw new ValidationException("Unknown resetType " + reset.getResetType());
}
return ResponseEntity.ok().build();
});
}
}

View file

@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.AdminClient;
@ -85,6 +86,7 @@ public class KafkaService {
private final JmxClusterUtil jmxClusterUtil;
private final ClustersStorage clustersStorage;
private final DeserializationService deserializationService;
@Setter // used in tests
@Value("${kafka.admin-client-timeout}")
private int clientTimeout;
@ -393,7 +395,7 @@ public class KafkaService {
Map<String, Object> properties) {
Properties props = new Properties();
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID().toString());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);

View file

@ -0,0 +1,168 @@
package com.provectus.kafka.ui.service;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static org.apache.kafka.common.ConsumerGroupState.DEAD;
import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;
/**
* Implementation follows https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
* to works like "kafka-consumer-groups --reset-offsets" console command
* (see kafka.admin.ConsumerGroupCommand)
*/
@Log4j2
@Component
@RequiredArgsConstructor
public class OffsetsResetService {
private final KafkaService kafkaService;
public void resetToEarliest(KafkaCluster cluster, String group, String topic,
Collection<Integer> partitions) {
checkGroupCondition(cluster, group);
try (var consumer = getConsumer(cluster, group)) {
var targetPartitions = getTargetPartitions(consumer, topic, partitions);
var offsets = consumer.beginningOffsets(targetPartitions);
commitOffsets(consumer, offsets);
}
}
public void resetToLatest(KafkaCluster cluster, String group, String topic,
Collection<Integer> partitions) {
checkGroupCondition(cluster, group);
try (var consumer = getConsumer(cluster, group)) {
var targetPartitions = getTargetPartitions(consumer, topic, partitions);
var offsets = consumer.endOffsets(targetPartitions);
commitOffsets(consumer, offsets);
}
}
public void resetToTimestamp(KafkaCluster cluster, String group, String topic,
Collection<Integer> partitions, long targetTimestamp) {
checkGroupCondition(cluster, group);
try (var consumer = getConsumer(cluster, group)) {
var targetPartitions = getTargetPartitions(consumer, topic, partitions);
var offsets = offsetsByTimestamp(consumer, targetPartitions, targetTimestamp);
commitOffsets(consumer, offsets);
}
}
public void resetToOffsets(KafkaCluster cluster, String group, String topic,
Map<Integer, Long> targetOffsets) {
checkGroupCondition(cluster, group);
try (var consumer = getConsumer(cluster, group)) {
var offsets = targetOffsets.entrySet().stream()
.collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue));
offsets = editOffsetsIfNeeded(consumer, offsets);
commitOffsets(consumer, offsets);
}
}
private void checkGroupCondition(KafkaCluster cluster, String groupId) {
ConsumerGroupDescription description =
kafkaService.getConsumerGroupsInternal(cluster)
.blockOptional()
.stream()
.flatMap(Collection::stream)
.filter(cgd -> cgd.groupId().equals(groupId))
.findAny()
.orElseThrow(() -> new NotFoundException("Consumer group not found"));
if (!Set.of(DEAD, EMPTY).contains(description.state())) {
throw new ValidationException(
String.format(
"Group's offsets can be reset only if group is inactive, but group is in %s state",
description.state()));
}
}
private Map<TopicPartition, Long> offsetsByTimestamp(Consumer<?, ?> consumer,
Set<TopicPartition> partitions,
long timestamp) {
Map<TopicPartition, OffsetAndTimestamp> timestampedOffsets = consumer
.offsetsForTimes(partitions.stream().collect(toMap(p -> p, p -> timestamp)));
var foundOffsets = timestampedOffsets.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()));
// for partitions where we didnt find offset by timestamp, we use end offsets
foundOffsets.putAll(consumer.endOffsets(Sets.difference(partitions, foundOffsets.keySet())));
return foundOffsets;
}
private Set<TopicPartition> getTargetPartitions(Consumer<?, ?> consumer, String topic,
Collection<Integer> partitions) {
var allPartitions = allTopicPartitions(consumer, topic);
if (partitions == null || partitions.isEmpty()) {
return allPartitions;
} else {
return partitions.stream()
.map(idx -> new TopicPartition(topic, idx))
.peek(tp -> checkArgument(allPartitions.contains(tp), "Invalid partition %s", tp))
.collect(toSet());
}
}
private Set<TopicPartition> allTopicPartitions(Consumer<?, ?> consumer, String topic) {
return consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(toSet());
}
/**
* Checks if submitted offsets is between earliest and latest offsets. If case of range change
* fail we reset offset to either earliest or latest offsets (To follow logic from
* kafka.admin.ConsumerGroupCommand.scala)
*/
private Map<TopicPartition, Long> editOffsetsIfNeeded(Consumer<?, ?> consumer,
Map<TopicPartition, Long> offsetsToCheck) {
var earliestOffsets = consumer.beginningOffsets(offsetsToCheck.keySet());
var latestOffsets = consumer.endOffsets(offsetsToCheck.keySet());
var result = new HashMap<TopicPartition, Long>();
offsetsToCheck.forEach((tp, offset) -> {
if (earliestOffsets.get(tp) > offset) {
log.warn("Offset for partition {} is lower than earliest offset, resetting to earliest",
tp);
result.put(tp, earliestOffsets.get(tp));
} else if (latestOffsets.get(tp) < offset) {
log.warn("Offset for partition {} is greater than latest offset, resetting to latest", tp);
result.put(tp, latestOffsets.get(tp));
} else {
result.put(tp, offset);
}
});
return result;
}
private void commitOffsets(Consumer<?, ?> consumer, Map<TopicPartition, Long> offsets) {
consumer.commitSync(
offsets.entrySet().stream()
.collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())))
);
}
private Consumer<?, ?> getConsumer(KafkaCluster cluster, String groupId) {
return kafkaService.createConsumer(cluster, Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId));
}
}

View file

@ -0,0 +1,219 @@
package com.provectus.kafka.ui.service;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.provectus.kafka.ui.AbstractBaseTest;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class OffsetsResetServiceTest extends AbstractBaseTest {
private static final int PARTITIONS = 5;
private static final KafkaCluster CLUSTER =
KafkaCluster.builder()
.name(LOCAL)
.bootstrapServers(kafka.getBootstrapServers())
.properties(new Properties())
.build();
private final String groupId = "OffsetsResetServiceTestGroup-" + UUID.randomUUID();
private final String topic = "OffsetsResetServiceTestTopic-" + UUID.randomUUID();
private KafkaService kafkaService;
private OffsetsResetService offsetsResetService;
@BeforeEach
void init() {
kafkaService = new KafkaService(null, null, null, null);
kafkaService.setClientTimeout(5_000);
offsetsResetService = new OffsetsResetService(kafkaService);
createTopic(new NewTopic(topic, PARTITIONS, (short) 1));
createConsumerGroup();
}
@AfterEach
void cleanUp() {
deleteTopic(topic);
}
private void createConsumerGroup() {
try (var consumer = groupConsumer()) {
consumer.subscribe(Pattern.compile("no-such-topic-pattern"));
consumer.poll(Duration.ofMillis(200));
consumer.commitSync();
}
}
@Test
void failsIfGroupDoesNotExists() {
assertThatThrownBy(
() -> offsetsResetService.resetToEarliest(CLUSTER, "non-existing-group", topic, null))
.isInstanceOf(NotFoundException.class);
assertThatThrownBy(
() -> offsetsResetService.resetToLatest(CLUSTER, "non-existing-group", topic, null))
.isInstanceOf(NotFoundException.class);
assertThatThrownBy(() -> offsetsResetService
.resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis()))
.isInstanceOf(NotFoundException.class);
assertThatThrownBy(
() -> offsetsResetService.resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of()))
.isInstanceOf(NotFoundException.class);
}
@Test
void failsIfGroupIsActive() {
// starting consumer to activate group
try (var consumer = groupConsumer()) {
consumer.subscribe(Pattern.compile("no-such-topic-pattern"));
consumer.poll(Duration.ofMillis(100));
assertThatThrownBy(() -> offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null))
.isInstanceOf(ValidationException.class);
assertThatThrownBy(() -> offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null))
.isInstanceOf(ValidationException.class);
assertThatThrownBy(() -> offsetsResetService
.resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis()))
.isInstanceOf(ValidationException.class);
assertThatThrownBy(
() -> offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of()))
.isInstanceOf(ValidationException.class);
}
}
@Test
void resetToOffsets() {
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10));
var expectedOffsets = Map.of(0, 5L, 1, 5L, 2, 5L);
offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, expectedOffsets);
assertOffsets(expectedOffsets);
}
@Test
void resetToOffsetsCommitsEarliestOrLatestOffsetsIfOffsetsBoundsNotValid() {
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10));
var offsetsWithInValidBounds = Map.of(0, -2L, 1, 5L, 2, 500L);
var expectedOffsets = Map.of(0, 0L, 1, 5L, 2, 10L);
offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, offsetsWithInValidBounds);
assertOffsets(expectedOffsets);
}
@Test
void resetToEarliest() {
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10));
commit(Map.of(0, 5L, 1, 5L, 2, 5L));
offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, List.of(0, 1));
assertOffsets(Map.of(0, 0L, 1, 0L, 2, 5L));
commit(Map.of(0, 5L, 1, 5L, 2, 5L));
offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null);
assertOffsets(Map.of(0, 0L, 1, 0L, 2, 0L, 3, 0L, 4, 0L));
}
@Test
void resetToLatest() {
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10, 3, 10, 4, 10));
commit(Map.of(0, 5L, 1, 5L, 2, 5L));
offsetsResetService.resetToLatest(CLUSTER, groupId, topic, List.of(0, 1));
assertOffsets(Map.of(0, 10L, 1, 10L, 2, 5L));
commit(Map.of(0, 5L, 1, 5L, 2, 5L));
offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null);
assertOffsets(Map.of(0, 10L, 1, 10L, 2, 10L, 3, 10L, 4, 10L));
}
@Test
void resetToTimestamp() {
send(
Stream.of(
new ProducerRecord<Bytes, Bytes>(topic, 0, 1000L, null, null),
new ProducerRecord<Bytes, Bytes>(topic, 0, 1500L, null, null),
new ProducerRecord<Bytes, Bytes>(topic, 0, 2000L, null, null),
new ProducerRecord<Bytes, Bytes>(topic, 1, 1000L, null, null),
new ProducerRecord<Bytes, Bytes>(topic, 1, 2000L, null, null),
new ProducerRecord<Bytes, Bytes>(topic, 2, 1000L, null, null),
new ProducerRecord<Bytes, Bytes>(topic, 2, 1100L, null, null),
new ProducerRecord<Bytes, Bytes>(topic, 2, 1200L, null, null)));
offsetsResetService.resetToTimestamp(CLUSTER, groupId, topic, List.of(0, 1, 2, 3), 1600L);
assertOffsets(Map.of(0, 2L, 1, 1L, 2, 3L, 3, 0L));
}
private void commit(Map<Integer, Long> offsetsToCommit) {
try (var consumer = groupConsumer()) {
consumer.commitSync(
offsetsToCommit.entrySet().stream()
.collect(Collectors.toMap(
e -> new TopicPartition(topic, e.getKey()),
e -> new OffsetAndMetadata(e.getValue())))
);
}
}
private void sendMsgsToPartition(Map<Integer, Integer> msgsCountForPartitions) {
Bytes bytes = new Bytes("noMatter".getBytes());
send(
msgsCountForPartitions.entrySet().stream()
.flatMap(e ->
IntStream.range(0, e.getValue())
.mapToObj(i -> new ProducerRecord<>(topic, e.getKey(), bytes, bytes))));
}
private void send(Stream<ProducerRecord<Bytes, Bytes>> toSend) {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
var serializer = new BytesSerializer();
try (var producer = new KafkaProducer<>(properties, serializer, serializer)) {
toSend.forEach(producer::send);
producer.flush();
}
}
private void assertOffsets(Map<Integer, Long> expectedOffsets) {
try (var consumer = groupConsumer()) {
var tps = expectedOffsets.keySet().stream()
.map(idx -> new TopicPartition(topic, idx))
.collect(Collectors.toSet());
var actualOffsets = consumer.committed(tps).entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().partition(), e -> e.getValue().offset()));
assertThat(actualOffsets).isEqualTo(expectedOffsets);
}
}
private Consumer<?, ?> groupConsumer() {
return kafkaService.createConsumer(CLUSTER, Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId));
}
}

View file

@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.avro.Schema;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class AvroJsonSchemaConverterTest {
@Test

View file

@ -4,8 +4,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.net.URI;
import java.net.URISyntaxException;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class ProtobufSchemaConverterTest {

View file

@ -314,7 +314,7 @@ paths:
type: array
items:
type: string
description: The format is [partition]::[offset] for specifying offsets or [partition]::[timstamp in millis] for specifying timestamps
description: The format is [partition]::[offset] for specifying offsets or [partition]::[timestamp in millis] for specifying timestamps
- name: limit
in: query
schema:
@ -508,6 +508,32 @@ paths:
items:
$ref: '#/components/schemas/ConsumerGroup'
/api/clusters/{clusterName}/consumer-groups/{id}/offsets:
post:
tags:
- Consumer Groups
summary: resets consumer group offsets
operationId: resetConsumerGroupOffsets
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: id
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ConsumerGroupOffsetsReset'
responses:
200:
description: OK
/api/clusters/{clusterName}/schemas:
post:
tags:
@ -1841,6 +1867,51 @@ components:
- SOURCE
- SINK
ConsumerGroupOffsetsReset:
type: object
properties:
topic:
type: string
resetType:
$ref: '#/components/schemas/ConsumerGroupOffsetsResetType'
partitions:
type: array
items:
type: integer
description: list of target partitions, all partitions will be used if it is not set or empty
resetToTimestamp:
type: integer
format: int64
description: should be set if resetType is TIMESTAMP
partitionsOffsets:
type: array
items:
$ref: '#/components/schemas/PartitionOffset'
description: List of partition offsets to reset to, should be set when resetType is OFFSET
required:
- topic
- resetType
PartitionOffset:
type: object
properties:
partition:
type: integer
offset:
type: integer
format: int64
required:
- partition
- offset
ConsumerGroupOffsetsResetType:
type: string
enum:
- EARLIEST
- LATEST
- TIMESTAMP
- OFFSET
TaskStatus:
type: object
properties:

View file

@ -20,7 +20,7 @@
<org.projectlombok.version>1.18.10</org.projectlombok.version>
<git.revision>latest</git.revision>
<zkclient.version>0.11</zkclient.version>
<kafka-clients.version>2.4.0</kafka-clients.version>
<kafka-clients.version>2.4.1</kafka-clients.version>
<node.version>v14.17.1</node.version>
<dockerfile-maven-plugin.version>1.4.10</dockerfile-maven-plugin.version>
<frontend-maven-plugin.version>1.8.0</frontend-maven-plugin.version>