* - added search by topic name - added filter by internal topics - added sort by multiple columns - added tests * moved sorting later in the stream
This commit is contained in:
parent
993db2fc00
commit
f3c0866940
4 changed files with 180 additions and 10 deletions
|
@ -2,6 +2,7 @@ package com.provectus.kafka.ui.controller;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.api.TopicsApi;
|
import com.provectus.kafka.ui.api.TopicsApi;
|
||||||
import com.provectus.kafka.ui.model.Topic;
|
import com.provectus.kafka.ui.model.Topic;
|
||||||
|
import com.provectus.kafka.ui.model.TopicColumnsToSort;
|
||||||
import com.provectus.kafka.ui.model.TopicConfig;
|
import com.provectus.kafka.ui.model.TopicConfig;
|
||||||
import com.provectus.kafka.ui.model.TopicCreation;
|
import com.provectus.kafka.ui.model.TopicCreation;
|
||||||
import com.provectus.kafka.ui.model.TopicDetails;
|
import com.provectus.kafka.ui.model.TopicDetails;
|
||||||
|
@ -64,9 +65,19 @@ public class TopicsController implements TopicsApi {
|
||||||
@Override
|
@Override
|
||||||
public Mono<ResponseEntity<TopicsResponse>> getTopics(String clusterName, @Valid Integer page,
|
public Mono<ResponseEntity<TopicsResponse>> getTopics(String clusterName, @Valid Integer page,
|
||||||
@Valid Integer perPage,
|
@Valid Integer perPage,
|
||||||
|
@Valid Boolean showInternal,
|
||||||
|
@Valid String search,
|
||||||
|
@Valid TopicColumnsToSort orderBy,
|
||||||
ServerWebExchange exchange) {
|
ServerWebExchange exchange) {
|
||||||
return Mono.just(ResponseEntity.ok(clusterService
|
return Mono.just(ResponseEntity.ok(clusterService
|
||||||
.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(perPage))));
|
.getTopics(
|
||||||
|
clusterName,
|
||||||
|
Optional.ofNullable(page),
|
||||||
|
Optional.ofNullable(perPage),
|
||||||
|
Optional.ofNullable(showInternal),
|
||||||
|
Optional.ofNullable(search),
|
||||||
|
Optional.ofNullable(orderBy)
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -14,6 +14,7 @@ import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||||
import com.provectus.kafka.ui.model.InternalTopic;
|
import com.provectus.kafka.ui.model.InternalTopic;
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.model.Topic;
|
import com.provectus.kafka.ui.model.Topic;
|
||||||
|
import com.provectus.kafka.ui.model.TopicColumnsToSort;
|
||||||
import com.provectus.kafka.ui.model.TopicConfig;
|
import com.provectus.kafka.ui.model.TopicConfig;
|
||||||
import com.provectus.kafka.ui.model.TopicConsumerGroups;
|
import com.provectus.kafka.ui.model.TopicConsumerGroups;
|
||||||
import com.provectus.kafka.ui.model.TopicCreation;
|
import com.provectus.kafka.ui.model.TopicCreation;
|
||||||
|
@ -32,6 +33,7 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
@ -80,26 +82,55 @@ public class ClusterService {
|
||||||
|
|
||||||
|
|
||||||
public TopicsResponse getTopics(String name, Optional<Integer> page,
|
public TopicsResponse getTopics(String name, Optional<Integer> page,
|
||||||
Optional<Integer> nullablePerPage) {
|
Optional<Integer> nullablePerPage,
|
||||||
|
Optional<Boolean> showInternal,
|
||||||
|
Optional<String> search,
|
||||||
|
Optional<TopicColumnsToSort> sortBy) {
|
||||||
Predicate<Integer> positiveInt = i -> i > 0;
|
Predicate<Integer> positiveInt = i -> i > 0;
|
||||||
int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
|
int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
|
||||||
var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
|
var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
|
||||||
var cluster = clustersStorage.getClusterByName(name)
|
var cluster = clustersStorage.getClusterByName(name)
|
||||||
.orElseThrow(ClusterNotFoundException::new);
|
.orElseThrow(ClusterNotFoundException::new);
|
||||||
var totalPages = (cluster.getTopics().size() / perPage)
|
List<Topic> topics = cluster.getTopics().values().stream()
|
||||||
+ (cluster.getTopics().size() % perPage == 0 ? 0 : 1);
|
.filter(topic -> !topic.isInternal()
|
||||||
|
|| showInternal
|
||||||
|
.map(i -> topic.isInternal() == i)
|
||||||
|
.orElse(true))
|
||||||
|
.filter(topic ->
|
||||||
|
search
|
||||||
|
.map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
|
||||||
|
.orElse(true))
|
||||||
|
.sorted(getComparatorForTopic(sortBy))
|
||||||
|
.map(clusterMapper::toTopic)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
var totalPages = (topics.size() / perPage)
|
||||||
|
+ (topics.size() % perPage == 0 ? 0 : 1);
|
||||||
return new TopicsResponse()
|
return new TopicsResponse()
|
||||||
.pageCount(totalPages)
|
.pageCount(totalPages)
|
||||||
.topics(
|
.topics(
|
||||||
cluster.getTopics().values().stream()
|
topics.stream()
|
||||||
.sorted(Comparator.comparing(InternalTopic::getName))
|
|
||||||
.skip(topicsToSkip)
|
.skip(topicsToSkip)
|
||||||
.limit(perPage)
|
.limit(perPage)
|
||||||
.map(clusterMapper::toTopic)
|
|
||||||
.collect(Collectors.toList())
|
.collect(Collectors.toList())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSort> sortBy) {
|
||||||
|
var defaultComparator = Comparator.comparing(InternalTopic::getName);
|
||||||
|
if (sortBy.isEmpty()) {
|
||||||
|
return defaultComparator;
|
||||||
|
}
|
||||||
|
switch (sortBy.get()) {
|
||||||
|
case TOTAL_PARTITIONS:
|
||||||
|
return Comparator.comparing(InternalTopic::getPartitionCount);
|
||||||
|
case OUT_OF_SYNC_REPLICAS:
|
||||||
|
return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
|
||||||
|
case NAME:
|
||||||
|
default:
|
||||||
|
return defaultComparator;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
|
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
|
||||||
return clustersStorage.getClusterByName(name)
|
return clustersStorage.getClusterByName(name)
|
||||||
.flatMap(c ->
|
.flatMap(c ->
|
||||||
|
|
|
@ -7,6 +7,7 @@ import com.provectus.kafka.ui.mapper.ClusterMapper;
|
||||||
import com.provectus.kafka.ui.model.InternalTopic;
|
import com.provectus.kafka.ui.model.InternalTopic;
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.model.Topic;
|
import com.provectus.kafka.ui.model.Topic;
|
||||||
|
import com.provectus.kafka.ui.model.TopicColumnsToSort;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -47,7 +48,9 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty());
|
var topics = clusterService.getTopics(topicName,
|
||||||
|
Optional.empty(), Optional.empty(), Optional.empty(),
|
||||||
|
Optional.empty(), Optional.empty());
|
||||||
assertThat(topics.getPageCount()).isEqualTo(5);
|
assertThat(topics.getPageCount()).isEqualTo(5);
|
||||||
assertThat(topics.getTopics()).hasSize(20);
|
assertThat(topics.getTopics()).hasSize(20);
|
||||||
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
|
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
|
||||||
|
@ -69,7 +72,8 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33));
|
var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33),
|
||||||
|
Optional.empty(), Optional.empty(), Optional.empty());
|
||||||
assertThat(topics.getPageCount()).isEqualTo(4);
|
assertThat(topics.getPageCount()).isEqualTo(4);
|
||||||
assertThat(topics.getTopics()).hasSize(1)
|
assertThat(topics.getTopics()).hasSize(1)
|
||||||
.first().extracting(Topic::getName).isEqualTo("99");
|
.first().extracting(Topic::getName).isEqualTo("99");
|
||||||
|
@ -91,9 +95,111 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1));
|
var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1),
|
||||||
|
Optional.empty(), Optional.empty(), Optional.empty());
|
||||||
assertThat(topics.getPageCount()).isEqualTo(5);
|
assertThat(topics.getPageCount()).isEqualTo(5);
|
||||||
assertThat(topics.getTopics()).hasSize(20);
|
assertThat(topics.getTopics()).hasSize(20);
|
||||||
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
|
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldListBotInternalAndNonInternalTopics() {
|
||||||
|
var topicName = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
when(clustersStorage.getClusterByName(topicName))
|
||||||
|
.thenReturn(Optional.of(KafkaCluster.builder()
|
||||||
|
.topics(
|
||||||
|
IntStream.rangeClosed(1, 100).boxed()
|
||||||
|
.map(Objects::toString)
|
||||||
|
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
|
||||||
|
.partitions(Map.of())
|
||||||
|
.name(e)
|
||||||
|
.internal(Integer.parseInt(e) % 10 == 0)
|
||||||
|
.build()))
|
||||||
|
)
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
var topics = clusterService.getTopics(topicName,
|
||||||
|
Optional.empty(), Optional.empty(), Optional.of(true),
|
||||||
|
Optional.empty(), Optional.empty());
|
||||||
|
assertThat(topics.getPageCount()).isEqualTo(5);
|
||||||
|
assertThat(topics.getTopics()).hasSize(20);
|
||||||
|
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldListOnlyNonInternalTopics() {
|
||||||
|
var topicName = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
when(clustersStorage.getClusterByName(topicName))
|
||||||
|
.thenReturn(Optional.of(KafkaCluster.builder()
|
||||||
|
.topics(
|
||||||
|
IntStream.rangeClosed(1, 100).boxed()
|
||||||
|
.map(Objects::toString)
|
||||||
|
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
|
||||||
|
.partitions(Map.of())
|
||||||
|
.name(e)
|
||||||
|
.internal(Integer.parseInt(e) % 10 == 0)
|
||||||
|
.build()))
|
||||||
|
)
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
var topics = clusterService.getTopics(topicName,
|
||||||
|
Optional.empty(), Optional.empty(), Optional.of(true),
|
||||||
|
Optional.empty(), Optional.empty());
|
||||||
|
assertThat(topics.getPageCount()).isEqualTo(5);
|
||||||
|
assertThat(topics.getTopics()).hasSize(20);
|
||||||
|
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldListOnlyTopicsContainingOne() {
|
||||||
|
var topicName = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
when(clustersStorage.getClusterByName(topicName))
|
||||||
|
.thenReturn(Optional.of(KafkaCluster.builder()
|
||||||
|
.topics(
|
||||||
|
IntStream.rangeClosed(1, 100).boxed()
|
||||||
|
.map(Objects::toString)
|
||||||
|
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
|
||||||
|
.partitions(Map.of())
|
||||||
|
.name(e)
|
||||||
|
.build()))
|
||||||
|
)
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
var topics = clusterService.getTopics(topicName,
|
||||||
|
Optional.empty(), Optional.empty(), Optional.empty(),
|
||||||
|
Optional.of("1"), Optional.empty());
|
||||||
|
assertThat(topics.getPageCount()).isEqualTo(1);
|
||||||
|
assertThat(topics.getTopics()).hasSize(20);
|
||||||
|
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldListTopicsOrderedByPartitionsCount() {
|
||||||
|
var topicName = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
when(clustersStorage.getClusterByName(topicName))
|
||||||
|
.thenReturn(Optional.of(KafkaCluster.builder()
|
||||||
|
.topics(
|
||||||
|
IntStream.rangeClosed(1, 100).boxed()
|
||||||
|
.map(Objects::toString)
|
||||||
|
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
|
||||||
|
.partitions(Map.of())
|
||||||
|
.name(e)
|
||||||
|
.partitionCount(100 - Integer.parseInt(e))
|
||||||
|
.build()))
|
||||||
|
)
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
var topics = clusterService.getTopics(topicName,
|
||||||
|
Optional.empty(), Optional.empty(), Optional.empty(),
|
||||||
|
Optional.empty(), Optional.of(TopicColumnsToSort.TOTAL_PARTITIONS));
|
||||||
|
assertThat(topics.getPageCount()).isEqualTo(5);
|
||||||
|
assertThat(topics.getTopics()).hasSize(20);
|
||||||
|
assertThat(topics.getTopics()).map(Topic::getPartitionCount).isSorted();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,21 @@ paths:
|
||||||
required: false
|
required: false
|
||||||
schema:
|
schema:
|
||||||
type: integer
|
type: integer
|
||||||
|
- name: showInternal
|
||||||
|
in: query
|
||||||
|
required: false
|
||||||
|
schema:
|
||||||
|
type: boolean
|
||||||
|
- name: search
|
||||||
|
in: query
|
||||||
|
required: false
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
- name: orderBy
|
||||||
|
in: query
|
||||||
|
required: false
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/TopicColumnsToSort'
|
||||||
responses:
|
responses:
|
||||||
200:
|
200:
|
||||||
description: OK
|
description: OK
|
||||||
|
@ -1245,6 +1260,13 @@ components:
|
||||||
items:
|
items:
|
||||||
$ref: '#/components/schemas/Topic'
|
$ref: '#/components/schemas/Topic'
|
||||||
|
|
||||||
|
TopicColumnsToSort:
|
||||||
|
type: string
|
||||||
|
enum:
|
||||||
|
- NAME
|
||||||
|
- OUT_OF_SYNC_REPLICAS
|
||||||
|
- TOTAL_PARTITIONS
|
||||||
|
|
||||||
Topic:
|
Topic:
|
||||||
type: object
|
type: object
|
||||||
properties:
|
properties:
|
||||||
|
|
Loading…
Add table
Reference in a new issue