|
@@ -8,15 +8,20 @@ import com.provectus.kafka.ui.kafka.KafkaService;
|
|
import com.provectus.kafka.ui.model.*;
|
|
import com.provectus.kafka.ui.model.*;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.SneakyThrows;
|
|
import lombok.SneakyThrows;
|
|
|
|
+
|
|
|
|
+import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
+import java.time.OffsetDateTime;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
+import javax.validation.Valid;
|
|
|
|
+
|
|
@Service
|
|
@Service
|
|
@RequiredArgsConstructor
|
|
@RequiredArgsConstructor
|
|
public class ClusterService {
|
|
public class ClusterService {
|
|
@@ -24,6 +29,7 @@ public class ClusterService {
|
|
private final ClustersStorage clustersStorage;
|
|
private final ClustersStorage clustersStorage;
|
|
private final ClusterMapper clusterMapper;
|
|
private final ClusterMapper clusterMapper;
|
|
private final KafkaService kafkaService;
|
|
private final KafkaService kafkaService;
|
|
|
|
+ private final ConsumingService consumingService;
|
|
|
|
|
|
public List<Cluster> getClusters() {
|
|
public List<Cluster> getClusters() {
|
|
return clustersStorage.getKafkaClusters()
|
|
return clustersStorage.getKafkaClusters()
|
|
@@ -48,15 +54,17 @@ public class ClusterService {
|
|
}
|
|
}
|
|
|
|
|
|
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
|
|
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
|
|
- return clustersStorage.getClusterByName(name).flatMap(
|
|
|
|
- c -> Optional.ofNullable(c.getTopics().get(topicName))
|
|
|
|
- ).map(clusterMapper::toTopicDetails);
|
|
|
|
|
|
+ return clustersStorage.getClusterByName(name)
|
|
|
|
+ .map(KafkaCluster::getTopics)
|
|
|
|
+ .map(t -> t.get(topicName))
|
|
|
|
+ .map(clusterMapper::toTopicDetails);
|
|
}
|
|
}
|
|
|
|
|
|
public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
|
|
public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
|
|
- return clustersStorage.getClusterByName(name).flatMap(
|
|
|
|
- c -> Optional.ofNullable(c.getTopics().get(topicName))
|
|
|
|
- ).map( t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
|
|
|
|
|
|
+ return clustersStorage.getClusterByName(name)
|
|
|
|
+ .map(KafkaCluster::getTopics)
|
|
|
|
+ .map(t -> t.get(topicName))
|
|
|
|
+ .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
|
|
}
|
|
}
|
|
|
|
|
|
public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
|
|
public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
|
|
@@ -82,4 +90,11 @@ public class ClusterService {
|
|
}).collect(Collectors.toList())))
|
|
}).collect(Collectors.toList())))
|
|
.flatMapMany(Flux::fromIterable);
|
|
.flatMapMany(Flux::fromIterable);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public Flux<TopicMessage> getMessages(String clusterName, String topicName, Integer partition, Long offset, OffsetDateTime timestamp) {
|
|
|
|
+ return clustersStorage.getClusterByName(clusterName)
|
|
|
|
+ .map(c -> consumingService.loadMessages(c, topicName))
|
|
|
|
+ .orElse(Flux.empty());
|
|
|
|
+
|
|
|
|
+ }
|
|
}
|
|
}
|