package com.provectus.kafka.ui.service; import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.AbstractIntegrationTest; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; public class OffsetsResetServiceTest extends AbstractIntegrationTest { private static final int PARTITIONS = 5; private static final KafkaCluster CLUSTER = KafkaCluster.builder() .name(LOCAL) .bootstrapServers(kafka.getBootstrapServers()) .properties(new Properties()) .build(); private final String groupId = "OffsetsResetServiceTestGroup-" + UUID.randomUUID(); private final String topic = "OffsetsResetServiceTestTopic-" + UUID.randomUUID(); private OffsetsResetService offsetsResetService; @BeforeEach void init() { AdminClientServiceImpl adminClientService = new AdminClientServiceImpl(); adminClientService.setClientTimeout(5_000); offsetsResetService = new OffsetsResetService(adminClientService); createTopic(new NewTopic(topic, PARTITIONS, (short) 1)); createConsumerGroup(); } @AfterEach void cleanUp() { deleteTopic(topic); } private void createConsumerGroup() { try (var consumer = groupConsumer()) { consumer.subscribe(Pattern.compile("no-such-topic-pattern")); consumer.poll(Duration.ofMillis(200)); consumer.commitSync(); } } @Test void failsIfGroupDoesNotExists() { List> expectedNotFound = List.of( offsetsResetService .resetToEarliest(CLUSTER, "non-existing-group", topic, null), offsetsResetService .resetToLatest(CLUSTER, "non-existing-group", topic, null), offsetsResetService .resetToTimestamp(CLUSTER, "non-existing-group", topic, null, System.currentTimeMillis()), offsetsResetService .resetToOffsets(CLUSTER, "non-existing-group", topic, Map.of()) ); for (Mono mono : expectedNotFound) { StepVerifier.create(mono) .expectErrorMatches(t -> t instanceof NotFoundException) .verify(); } } @Test void failsIfGroupIsActive() { // starting consumer to activate group try (var consumer = groupConsumer()) { consumer.subscribe(Pattern.compile("no-such-topic-pattern")); consumer.poll(Duration.ofMillis(100)); List> expectedValidationError = List.of( offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null), offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null), offsetsResetService .resetToTimestamp(CLUSTER, groupId, topic, null, System.currentTimeMillis()), offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, Map.of()) ); for (Mono mono : expectedValidationError) { StepVerifier.create(mono) .expectErrorMatches(t -> t instanceof ValidationException) .verify(); } } } @Test void resetToOffsets() { sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); var expectedOffsets = Map.of(0, 5L, 1, 5L, 2, 5L); offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, expectedOffsets).block(); assertOffsets(expectedOffsets); } @Test void resetToOffsetsCommitsEarliestOrLatestOffsetsIfOffsetsBoundsNotValid() { sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); var offsetsWithInValidBounds = Map.of(0, -2L, 1, 5L, 2, 500L); var expectedOffsets = Map.of(0, 0L, 1, 5L, 2, 10L); offsetsResetService.resetToOffsets(CLUSTER, groupId, topic, offsetsWithInValidBounds).block(); assertOffsets(expectedOffsets); } @Test void resetToEarliest() { sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10)); commit(Map.of(0, 5L, 1, 5L, 2, 5L)); offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, List.of(0, 1)).block(); assertOffsets(Map.of(0, 0L, 1, 0L, 2, 5L)); commit(Map.of(0, 5L, 1, 5L, 2, 5L)); offsetsResetService.resetToEarliest(CLUSTER, groupId, topic, null).block(); assertOffsets(Map.of(0, 0L, 1, 0L, 2, 0L, 3, 0L, 4, 0L)); } @Test void resetToLatest() { sendMsgsToPartition(Map.of(0, 10, 1, 10, 2, 10, 3, 10, 4, 10)); commit(Map.of(0, 5L, 1, 5L, 2, 5L)); offsetsResetService.resetToLatest(CLUSTER, groupId, topic, List.of(0, 1)).block(); assertOffsets(Map.of(0, 10L, 1, 10L, 2, 5L)); commit(Map.of(0, 5L, 1, 5L, 2, 5L)); offsetsResetService.resetToLatest(CLUSTER, groupId, topic, null).block(); assertOffsets(Map.of(0, 10L, 1, 10L, 2, 10L, 3, 10L, 4, 10L)); } @Test void resetToTimestamp() { send( Stream.of( new ProducerRecord(topic, 0, 1000L, null, null), new ProducerRecord(topic, 0, 1500L, null, null), new ProducerRecord(topic, 0, 2000L, null, null), new ProducerRecord(topic, 1, 1000L, null, null), new ProducerRecord(topic, 1, 2000L, null, null), new ProducerRecord(topic, 2, 1000L, null, null), new ProducerRecord(topic, 2, 1100L, null, null), new ProducerRecord(topic, 2, 1200L, null, null))); offsetsResetService.resetToTimestamp( CLUSTER, groupId, topic, List.of(0, 1, 2, 3), 1600L ).block(); assertOffsets(Map.of(0, 2L, 1, 1L, 2, 3L, 3, 0L)); } private void commit(Map offsetsToCommit) { try (var consumer = groupConsumer()) { consumer.commitSync( offsetsToCommit.entrySet().stream() .collect(Collectors.toMap( e -> new TopicPartition(topic, e.getKey()), e -> new OffsetAndMetadata(e.getValue()))) ); } } private void sendMsgsToPartition(Map msgsCountForPartitions) { Bytes bytes = new Bytes("noMatter".getBytes()); send( msgsCountForPartitions.entrySet().stream() .flatMap(e -> IntStream.range(0, e.getValue()) .mapToObj(i -> new ProducerRecord<>(topic, e.getKey(), bytes, bytes)))); } private void send(Stream> toSend) { var properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); var serializer = new BytesSerializer(); try (var producer = new KafkaProducer<>(properties, serializer, serializer)) { toSend.forEach(producer::send); producer.flush(); } } private void assertOffsets(Map expectedOffsets) { try (var consumer = groupConsumer()) { var tps = expectedOffsets.keySet().stream() .map(idx -> new TopicPartition(topic, idx)) .collect(Collectors.toSet()); var actualOffsets = consumer.committed(tps).entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().partition(), e -> e.getValue().offset())); assertThat(actualOffsets).isEqualTo(expectedOffsets); } } private Consumer groupConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return new KafkaConsumer<>(props); } }