Browse Source

added delete action

Ramazan Yapparov 4 years ago
parent
commit
1b5f5e29d0

+ 13 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -196,4 +196,17 @@ public class ClusterService {
                 .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
                 .orElse(Flux.empty());
     }
+
+    public Mono<Void> deleteTopicMessages(String clusterName, String topicName) {
+        var cluster = clustersStorage.getClusterByName(clusterName)
+                .orElseThrow(() -> new NotFoundException("No such cluster"));
+        var partitions = getTopicDetails(clusterName, topicName)
+                .orElseThrow(() -> new NotFoundException("No such topcic"))
+                .getPartitions().stream()
+                .map(Partition::getPartition)
+                .map(partition -> new TopicPartition(topicName, partition))
+                .collect(Collectors.toList());
+        return consumingService.loadOffsets(cluster, partitions)
+                .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, topicName, offsets)).next();
+    }
 }

+ 24 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

@@ -55,6 +55,12 @@ public class ConsumingService {
 				.limitRequest(recordsLimit);
 	}
 
+	public Flux<Map<TopicPartition, Long>> loadOffsets(KafkaCluster cluster, List<TopicPartition> partitions) {
+		OffsetEmitter emitter = new OffsetEmitter(kafkaService, cluster, partitions);
+		return Flux.create(emitter::emit)
+				.subscribeOn(Schedulers.boundedElastic());
+	}
+
 	private boolean filterTopicMessage(TopicMessage message, String query) {
 		if (StringUtils.isEmpty(query)) {
 			return true;
@@ -181,4 +187,22 @@ public class ConsumingService {
 			consumer.seekToBeginning(partitions);
 		}
 	}
+
+	@RequiredArgsConstructor
+	private static class OffsetEmitter {
+		private final KafkaService kafkaService;
+		private final KafkaCluster cluster;
+		private final List<TopicPartition> partitions;
+
+		public void emit(FluxSink<Map<TopicPartition, Long>> sink) {
+			try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
+				Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
+				sink.next(offsets);
+			} catch (Exception e) {
+				log.error("Error occurred while consuming records", e);
+				throw new RuntimeException(e);
+			}
+		}
+
+	}
 }

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -32,6 +33,7 @@ import reactor.util.function.Tuples;
 import java.math.BigDecimal;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -490,4 +492,10 @@ public class KafkaService {
             return Collections.emptyMap();
         }
     }
+
+    public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName, Map<TopicPartition, Long> offsets) {
+        var records = offsets.entrySet().stream().map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
+                .map(ac -> ac.deleteRecords(records)).then();
+    }
 }

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -84,6 +84,11 @@ public class MetricsRestController implements ApiClustersApi {
                 .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
     }
 
+    @Override
+    public Mono<ResponseEntity<Void>> deleteTopicMessages(String clusterName, String topicName, ServerWebExchange exchange) {
+        return clusterService.deleteTopicMessages(clusterName, topicName).map(ResponseEntity::ok);
+    }
+
     @Override
     public Mono<ResponseEntity<Topic>> createTopic(String clusterName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
         return clusterService.createTopic(clusterName, topicFormData)

+ 19 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -309,6 +309,25 @@ paths:
                 type: array
                 items:
                   $ref: '#/components/schemas/TopicMessage'
+    delete:
+      tags:
+        - /api/clusters
+      summary: deleteTopicMessages
+      operationId: deleteTopicMessages
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
 
   /api/clusters/{clusterName}/consumer-groups/{id}:
     get: