瀏覽代碼

#211 Feature/clear topic messages (#241)

* added delete action

* added test for delete action

* added 404 status in contract

* fixed typo

* added partition parameter

* big refactoring
Ramazan Yapparov 4 年之前
父節點
當前提交
881a2167b7

+ 10 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -196,4 +196,14 @@ public class ClusterService {
                 .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
                 .orElse(Flux.empty());
     }
+
+    public Mono<Void> deleteTopicMessages(String clusterName, String topicName, List<Integer> partitions) {
+        var cluster = clustersStorage.getClusterByName(clusterName)
+                .orElseThrow(() -> new NotFoundException("No such cluster"));
+        if (!cluster.getTopics().containsKey(topicName)) {
+            throw new NotFoundException("No such topic");
+        }
+        return consumingService.loadOffsets(cluster, topicName, partitions)
+                .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
+    }
 }

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 import java.time.Duration;
@@ -55,6 +56,25 @@ public class ConsumingService {
 				.limitRequest(recordsLimit);
 	}
 
+	public Mono<Map<TopicPartition, Long>> loadOffsets(KafkaCluster cluster, String topicName, List<Integer> partitionsToInclude) {
+		return Mono.fromSupplier(() -> {
+			try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
+				var partitions = consumer.partitionsFor(topicName).stream()
+                        .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
+						.map(p -> new TopicPartition(topicName, p.partition()))
+						.collect(Collectors.toList());
+				var beginningOffsets = consumer.beginningOffsets(partitions);
+				var endOffsets = consumer.endOffsets(partitions);
+				return endOffsets.entrySet().stream()
+						.filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
+						.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+			} catch (Exception e) {
+				log.error("Error occurred while consuming records", e);
+				throw new RuntimeException(e);
+			}
+		});
+	}
+
 	private boolean filterTopicMessage(TopicMessage message, String query) {
 		if (StringUtils.isEmpty(query)) {
 			return true;

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -490,4 +490,12 @@ public class KafkaService {
             return Collections.emptyMap();
         }
     }
+
+    public Mono<Void> deleteTopicMessages(KafkaCluster cluster, Map<TopicPartition, Long> offsets) {
+        var records = offsets.entrySet().stream()
+                .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
+                .map(ac -> ac.deleteRecords(records)).then();
+    }
 }

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -18,6 +18,7 @@ import reactor.core.publisher.Mono;
 import javax.validation.Valid;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Function;
 
 @RestController
@@ -84,6 +85,12 @@ public class MetricsRestController implements ApiClustersApi {
                 .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
     }
 
+    @Override
+    public Mono<ResponseEntity<Void>> deleteTopicMessages(String clusterName, String topicName, @Valid List<Integer> partitions, ServerWebExchange exchange) {
+        return clusterService.deleteTopicMessages(clusterName, topicName, Optional.ofNullable(partitions).orElse(List.of()))
+                .map(ResponseEntity::ok);
+    }
+
     @Override
     public Mono<ResponseEntity<Topic>> createTopic(String clusterName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
         return clusterService.createTopic(clusterName, topicFormData)

+ 2 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui;
 
+import com.provectus.kafka.ui.container.KafkaConnectContainer;
+import com.provectus.kafka.ui.container.SchemaRegistryContainer;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.springframework.boot.test.context.SpringBootTest;

+ 79 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java

@@ -0,0 +1,79 @@
+package com.provectus.kafka.ui;
+
+import com.provectus.kafka.ui.model.TopicFormData;
+import com.provectus.kafka.ui.model.TopicMessage;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import lombok.extern.log4j.Log4j2;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
+@Log4j2
+@AutoConfigureWebTestClient(timeout = "60000")
+public class KafkaConsumerTests extends AbstractBaseTest {
+
+    @Autowired
+    private WebTestClient webTestClient;
+
+
+    @Test
+    public void shouldDeleteRecords() {
+        var topicName = UUID.randomUUID().toString();
+        webTestClient.post()
+                .uri("/api/clusters/{clusterName}/topics", LOCAL)
+                .bodyValue(new TopicFormData()
+                        .name(topicName)
+                        .partitions(1)
+                        .replicationFactor(1)
+                        .configs(Map.of())
+                )
+                .exchange()
+                .expectStatus()
+                .isOk();
+
+        try(KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
+            Stream.of("one", "two", "three", "four")
+                    .forEach(value -> producer.send(topicName, value));
+        }
+
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
+                .exchange()
+                .expectStatus()
+                .isOk()
+                .expectBodyList(TopicMessage.class)
+                .hasSize(4);
+
+        webTestClient.delete()
+                .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
+                .exchange()
+                .expectStatus()
+                .isOk();
+
+        webTestClient.get()
+                .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
+                .exchange()
+                .expectStatus()
+                .isOk()
+                .expectBodyList(TopicMessage.class)
+                .hasSize(0);
+    }
+
+    @Test
+    public void shouldReturn404ForNonExistingTopic() {
+        var topicName = UUID.randomUUID().toString();
+
+        webTestClient.delete()
+                .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
+                .exchange()
+                .expectStatus()
+                .isNotFound();
+    }
+}

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java → kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/KafkaConnectContainer.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui;
+package com.provectus.kafka.ui.container;
 
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryContainer.java → kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/SchemaRegistryContainer.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui;
+package com.provectus.kafka.ui.container;
 
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;

+ 35 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java

@@ -0,0 +1,35 @@
+package com.provectus.kafka.ui.producer;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.testcontainers.containers.KafkaContainer;
+
+import java.util.Map;
+
+public class KafkaTestProducer<KeyT, ValueT> implements AutoCloseable {
+    private final KafkaProducer<KeyT, ValueT> producer;
+
+    private KafkaTestProducer(KafkaProducer<KeyT, ValueT> producer) {
+        this.producer = producer;
+    }
+
+    public static KafkaTestProducer<String, String> forKafka(KafkaContainer kafkaContainer) {
+        return new KafkaTestProducer<>(new KafkaProducer<>(Map.of(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(),
+                ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer",
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
+        )));
+    }
+
+    public void send(String topic, ValueT value) {
+        producer.send(new ProducerRecord<>(topic, value));
+    }
+
+    @Override
+    public void close() {
+        producer.close();
+    }
+}

+ 28 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -309,6 +309,34 @@ paths:
                 type: array
                 items:
                   $ref: '#/components/schemas/TopicMessage'
+    delete:
+      tags:
+        - /api/clusters
+      summary: deleteTopicMessages
+      operationId: deleteTopicMessages
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: partitions
+          in: query
+          required: false
+          schema:
+            type: array
+            items:
+              type: integer
+      responses:
+        200:
+          description: OK
+        404:
+          description: Not found
 
   /api/clusters/{clusterName}/consumer-groups/{id}:
     get: