ISSUE-868 Fix group offsets reseting (#871)

* ISSUE-868 Fix group offsets reseting

* tests fixes

Co-authored-by: Ilya Kuramshin <ikuramshin@provectus.com>
This commit is contained in:
German Osin 2021-09-14 15:42:04 +03:00 committed by GitHub
parent 64f957771c
commit 43709cc3be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 142 additions and 99 deletions

View file

@ -70,42 +70,45 @@ public class ConsumerGroupsController implements ConsumerGroupsApi {
Mono<ConsumerGroupOffsetsReset> Mono<ConsumerGroupOffsetsReset>
consumerGroupOffsetsReset, consumerGroupOffsetsReset,
ServerWebExchange exchange) { ServerWebExchange exchange) {
return consumerGroupOffsetsReset.map(reset -> { return consumerGroupOffsetsReset.flatMap(reset -> {
var cluster = var cluster =
clustersStorage.getClusterByName(clusterName).orElseThrow(ClusterNotFoundException::new); clustersStorage.getClusterByName(clusterName).orElseThrow(ClusterNotFoundException::new);
switch (reset.getResetType()) { switch (reset.getResetType()) {
case EARLIEST: case EARLIEST:
offsetsResetService return offsetsResetService
.resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions()); .resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions());
break;
case LATEST: case LATEST:
offsetsResetService return offsetsResetService
.resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions()); .resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions());
break;
case TIMESTAMP: case TIMESTAMP:
if (reset.getResetToTimestamp() == null) { if (reset.getResetToTimestamp() == null) {
throw new ValidationException( return Mono.error(
"resetToTimestamp is required when TIMESTAMP reset type used"); new ValidationException(
"resetToTimestamp is required when TIMESTAMP reset type used"
)
);
} }
offsetsResetService return offsetsResetService
.resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(), .resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(),
reset.getResetToTimestamp()); reset.getResetToTimestamp());
break;
case OFFSET: case OFFSET:
if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) { if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) {
throw new ValidationException( return Mono.error(
"partitionsOffsets is required when OFFSET reset type used"); new ValidationException(
"partitionsOffsets is required when OFFSET reset type used"
)
);
} }
Map<Integer, Long> offsets = reset.getPartitionsOffsets().stream() Map<Integer, Long> offsets = reset.getPartitionsOffsets().stream()
.collect(toMap(PartitionOffset::getPartition, PartitionOffset::getOffset)); .collect(toMap(PartitionOffset::getPartition, PartitionOffset::getOffset));
offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets); return offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets);
break;
default: default:
throw new ValidationException("Unknown resetType " + reset.getResetType()); return Mono.error(
new ValidationException("Unknown resetType " + reset.getResetType())
);
} }
return ResponseEntity.ok().build(); }).map(o -> ResponseEntity.ok().build());
});
} }
} }

View file

@ -1,18 +1,19 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.provectus.kafka.ui.util.ClusterUtil.toMono;
import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
import static org.apache.kafka.common.ConsumerGroupState.DEAD; import static org.apache.kafka.common.ConsumerGroupState.DEAD;
import static org.apache.kafka.common.ConsumerGroupState.EMPTY; import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.InternalConsumerGroup;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -24,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
/** /**
* Implementation follows https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling * Implementation follows https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
@ -36,64 +38,88 @@ import org.springframework.stereotype.Component;
public class OffsetsResetService { public class OffsetsResetService {
private final KafkaService kafkaService; private final KafkaService kafkaService;
private final AdminClientService adminClientService;
public void resetToEarliest(KafkaCluster cluster, String group, String topic, public Mono<Map<TopicPartition, OffsetAndMetadata>> resetToEarliest(
Collection<Integer> partitions) { KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
checkGroupCondition(cluster, group); return checkGroupCondition(cluster, group)
try (var consumer = getConsumer(cluster, group)) { .flatMap(g -> {
var targetPartitions = getTargetPartitions(consumer, topic, partitions); try (var consumer = getConsumer(cluster, group)) {
var offsets = consumer.beginningOffsets(targetPartitions); var targetPartitions = getTargetPartitions(consumer, topic, partitions);
commitOffsets(consumer, offsets); var offsets = consumer.beginningOffsets(targetPartitions);
} return commitOffsets(consumer, offsets);
}
});
} }
public void resetToLatest(KafkaCluster cluster, String group, String topic, public Mono<Map<TopicPartition, OffsetAndMetadata>> resetToLatest(
Collection<Integer> partitions) { KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
checkGroupCondition(cluster, group); return checkGroupCondition(cluster, group).flatMap(
try (var consumer = getConsumer(cluster, group)) { g -> {
var targetPartitions = getTargetPartitions(consumer, topic, partitions); try (var consumer = getConsumer(cluster, group)) {
var offsets = consumer.endOffsets(targetPartitions); var targetPartitions = getTargetPartitions(consumer, topic, partitions);
commitOffsets(consumer, offsets); var offsets = consumer.endOffsets(targetPartitions);
} return commitOffsets(consumer, offsets);
}
}
);
} }
public void resetToTimestamp(KafkaCluster cluster, String group, String topic, public Mono<Map<TopicPartition, OffsetAndMetadata>> resetToTimestamp(
Collection<Integer> partitions, long targetTimestamp) { KafkaCluster cluster, String group, String topic, Collection<Integer> partitions,
checkGroupCondition(cluster, group); long targetTimestamp) {
try (var consumer = getConsumer(cluster, group)) { return checkGroupCondition(cluster, group).flatMap(
var targetPartitions = getTargetPartitions(consumer, topic, partitions); g -> {
var offsets = offsetsByTimestamp(consumer, targetPartitions, targetTimestamp); try (var consumer = getConsumer(cluster, group)) {
commitOffsets(consumer, offsets); var targetPartitions = getTargetPartitions(consumer, topic, partitions);
} var offsets = offsetsByTimestamp(consumer, targetPartitions, targetTimestamp);
return commitOffsets(consumer, offsets);
}
}
);
} }
public void resetToOffsets(KafkaCluster cluster, String group, String topic, public Mono<Map<TopicPartition, OffsetAndMetadata>> resetToOffsets(
Map<Integer, Long> targetOffsets) { KafkaCluster cluster, String group, String topic, Map<Integer, Long> targetOffsets) {
checkGroupCondition(cluster, group); return checkGroupCondition(cluster, group).flatMap(
try (var consumer = getConsumer(cluster, group)) { g -> {
var offsets = targetOffsets.entrySet().stream() try (var consumer = getConsumer(cluster, group)) {
.collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue)); var offsets = targetOffsets.entrySet().stream()
offsets = editOffsetsIfNeeded(consumer, offsets); .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue));
commitOffsets(consumer, offsets); offsets = editOffsetsIfNeeded(consumer, offsets);
} return commitOffsets(consumer, offsets);
}
}
);
} }
private void checkGroupCondition(KafkaCluster cluster, String groupId) { private Mono<ConsumerGroupDescription> checkGroupCondition(KafkaCluster cluster, String groupId) {
InternalConsumerGroup description = return adminClientService.getOrCreateAdminClient(cluster)
kafkaService.getConsumerGroupsInternal(cluster) .flatMap(ac ->
.blockOptional() // we need to call listConsumerGroups() to check group existence, because
.stream() // describeConsumerGroups() will return consumer group even if it doesn't exist
.flatMap(Collection::stream) toMono(ac.getAdminClient().listConsumerGroups().all())
.filter(cgd -> cgd.getGroupId().equals(groupId)) .filter(cgs -> cgs.stream().anyMatch(g -> g.groupId().equals(groupId)))
.findAny() .flatMap(cgs -> toMono(
.orElseThrow(() -> new NotFoundException("Consumer group not found")); ac.getAdminClient().describeConsumerGroups(List.of(groupId)).all()))
.filter(cgs -> cgs.containsKey(groupId))
if (!Set.of(DEAD, EMPTY).contains(description.getState())) { .map(cgs -> cgs.get(groupId))
throw new ValidationException( .flatMap(cg -> {
String.format( if (!Set.of(DEAD, EMPTY).contains(cg.state())) {
"Group's offsets can be reset only if group is inactive, but group is in %s state", return Mono.error(
description.getState())); new ValidationException(
} String.format(
"Group's offsets can be reset only if group is inactive,"
+ " but group is in %s state",
cg.state()
)
)
);
}
return Mono.just(cg);
})
.switchIfEmpty(Mono.error(new NotFoundException("Consumer group not found")))
);
} }
private Map<TopicPartition, Long> offsetsByTimestamp(Consumer<?, ?> consumer, private Map<TopicPartition, Long> offsetsByTimestamp(Consumer<?, ?> consumer,
@ -107,7 +133,10 @@ public class OffsetsResetService {
.collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()));
// for partitions where we didnt find offset by timestamp, we use end offsets // for partitions where we didnt find offset by timestamp, we use end offsets
foundOffsets.putAll(consumer.endOffsets(Sets.difference(partitions, foundOffsets.keySet()))); Set<TopicPartition> endOffsets = new HashSet<>(partitions);
endOffsets.removeAll(foundOffsets.keySet());
foundOffsets.putAll(consumer.endOffsets(endOffsets));
return foundOffsets; return foundOffsets;
} }
@ -155,11 +184,13 @@ public class OffsetsResetService {
return result; return result;
} }
private void commitOffsets(Consumer<?, ?> consumer, Map<TopicPartition, Long> offsets) { private Mono<Map<TopicPartition, OffsetAndMetadata>> commitOffsets(
consumer.commitSync( Consumer<?, ?> consumer, Map<TopicPartition, Long> offsets
offsets.entrySet().stream() ) {
.collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))) var toCommit = offsets.entrySet().stream()
); .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
consumer.commitSync(toCommit);
return Mono.just(toCommit);
} }
private Consumer<?, ?> getConsumer(KafkaCluster cluster, String groupId) { private Consumer<?, ?> getConsumer(KafkaCluster cluster, String groupId) {

View file

@ -56,7 +56,7 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
FeatureService featureService = new FeatureServiceImpl(brokerService); FeatureService featureService = new FeatureServiceImpl(brokerService);
adminClientService.setClientTimeout(5_000); adminClientService.setClientTimeout(5_000);
kafkaService = new KafkaService(null, null, null, null, adminClientService, featureService); kafkaService = new KafkaService(null, null, null, null, adminClientService, featureService);
offsetsResetService = new OffsetsResetService(kafkaService); offsetsResetService = new OffsetsResetService(kafkaService, adminClientService);
createTopic(new NewTopic(topic, PARTITIONS, (short) 1)); createTopic(new NewTopic(topic, PARTITIONS, (short) 1));
createConsumerGroup(); createConsumerGroup();
@ -78,17 +78,22 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
@Test @Test
void failsIfGroupDoesNotExists() { void failsIfGroupDoesNotExists() {
assertThatThrownBy( assertThatThrownBy(
() -> offsetsResetService.resetToEarliest(CLUSTER, "non-existing-group", topic, null)) () -> offsetsResetService
.isInstanceOf(NotFoundException.class); .resetToEarliest(CLUSTER, "non-existing-group", topic, null).block()
).isInstanceOf(NotFoundException.class);
assertThatThrownBy( assertThatThrownBy(
() -> offsetsResetService.resetToLatest(CLUSTER, "non-existing-group", topic, null)) () -> offsetsResetService
.isInstanceOf(NotFoundException.class); .resetToLatest(CLUSTER, "non-existing-group", topic, null).block()
).isInstanceOf(NotFoundException.class);
assertThatThrownBy(() -> offsetsResetService assertThatThrownBy(() -> offsetsResetService
.resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis())) .resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis())
.isInstanceOf(NotFoundException.class); .block()
).isInstanceOf(NotFoundException.class);
assertThatThrownBy( assertThatThrownBy(
() -> offsetsResetService.resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of())) () -> offsetsResetService
.isInstanceOf(NotFoundException.class); .resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of())
.block()
).isInstanceOf(NotFoundException.class);
} }
@Test @Test
@ -98,16 +103,19 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
consumer.subscribe(Pattern.compile("no-such-topic-pattern")); consumer.subscribe(Pattern.compile("no-such-topic-pattern"));
consumer.poll(Duration.ofMillis(100)); consumer.poll(Duration.ofMillis(100));
assertThatThrownBy(() -> offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null)) assertThatThrownBy(() ->
.isInstanceOf(ValidationException.class); offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null).block()
assertThatThrownBy(() -> offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null)) ).isInstanceOf(ValidationException.class);
.isInstanceOf(ValidationException.class);
assertThatThrownBy(() -> offsetsResetService
.resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis()))
.isInstanceOf(ValidationException.class);
assertThatThrownBy( assertThatThrownBy(
() -> offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of())) () -> offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null).block()
.isInstanceOf(ValidationException.class); ).isInstanceOf(ValidationException.class);
assertThatThrownBy(() -> offsetsResetService
.resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis())
.block()
).isInstanceOf(ValidationException.class);
assertThatThrownBy(
() -> offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of()).block()
).isInstanceOf(ValidationException.class);
} }
} }
@ -116,7 +124,7 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10));
var expectedOffsets = Map.of(0, 5L, 1, 5L, 2, 5L); var expectedOffsets = Map.of(0, 5L, 1, 5L, 2, 5L);
offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, expectedOffsets); offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, expectedOffsets).block();
assertOffsets(expectedOffsets); assertOffsets(expectedOffsets);
} }
@ -126,7 +134,7 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
var offsetsWithInValidBounds = Map.of(0, -2L, 1, 5L, 2, 500L); var offsetsWithInValidBounds = Map.of(0, -2L, 1, 5L, 2, 500L);
var expectedOffsets = Map.of(0, 0L, 1, 5L, 2, 10L); var expectedOffsets = Map.of(0, 0L, 1, 5L, 2, 10L);
offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, offsetsWithInValidBounds); offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, offsetsWithInValidBounds).block();
assertOffsets(expectedOffsets); assertOffsets(expectedOffsets);
} }
@ -135,11 +143,11 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10));
commit(Map.of(0, 5L, 1, 5L, 2, 5L)); commit(Map.of(0, 5L, 1, 5L, 2, 5L));
offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, List.of(0, 1)); offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, List.of(0, 1)).block();
assertOffsets(Map.of(0, 0L, 1, 0L, 2, 5L)); assertOffsets(Map.of(0, 0L, 1, 0L, 2, 5L));
commit(Map.of(0, 5L, 1, 5L, 2, 5L)); commit(Map.of(0, 5L, 1, 5L, 2, 5L));
offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null); offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null).block();
assertOffsets(Map.of(0, 0L, 1, 0L, 2, 0L, 3, 0L, 4, 0L)); assertOffsets(Map.of(0, 0L, 1, 0L, 2, 0L, 3, 0L, 4, 0L));
} }
@ -148,11 +156,11 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10, 3, 10, 4, 10)); sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10, 3, 10, 4, 10));
commit(Map.of(0, 5L, 1, 5L, 2, 5L)); commit(Map.of(0, 5L, 1, 5L, 2, 5L));
offsetsResetService.resetToLatest(CLUSTER, groupId, topic, List.of(0, 1)); offsetsResetService.resetToLatest(CLUSTER, groupId, topic, List.of(0, 1)).block();
assertOffsets(Map.of(0, 10L, 1, 10L, 2, 5L)); assertOffsets(Map.of(0, 10L, 1, 10L, 2, 5L));
commit(Map.of(0, 5L, 1, 5L, 2, 5L)); commit(Map.of(0, 5L, 1, 5L, 2, 5L));
offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null); offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null).block();
assertOffsets(Map.of(0, 10L, 1, 10L, 2, 10L, 3, 10L, 4, 10L)); assertOffsets(Map.of(0, 10L, 1, 10L, 2, 10L, 3, 10L, 4, 10L));
} }
@ -169,7 +177,9 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
new ProducerRecord<Bytes, Bytes>(topic, 2, 1100L, null, null), new ProducerRecord<Bytes, Bytes>(topic, 2, 1100L, null, null),
new ProducerRecord<Bytes, Bytes>(topic, 2, 1200L, null, null))); new ProducerRecord<Bytes, Bytes>(topic, 2, 1200L, null, null)));
offsetsResetService.resetToTimestamp(CLUSTER, groupId, topic, List.of(0, 1, 2, 3), 1600L); offsetsResetService.resetToTimestamp(
CLUSTER, groupId, topic, List.of(0, 1, 2, 3), 1600L
).block();
assertOffsets(Map.of(0, 2L, 1, 1L, 2, 3L, 3, 0L)); assertOffsets(Map.of(0, 2L, 1, 1L, 2, 3L, 3, 0L));
} }

View file

@ -2256,7 +2256,6 @@ components:
format: int64 format: int64
required: required:
- partition - partition
- offset
ConsumerGroupOffsetsResetType: ConsumerGroupOffsetsResetType:
type: string type: string