Add "initializing" state to ServerStatus enum (#1652)

* * initializing status added to cluster
* MetricsCache initializing
* unuseful tests removed BrokerServiceTest

* checkstyle fix

Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2022-02-24 14:18:24 +03:00 committed by GitHub
parent cd24ff631e
commit 4651c5e308
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 36 additions and 44 deletions

View file

@ -8,7 +8,7 @@ import com.provectus.kafka.ui.util.JmxClusterUtil;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Builder;
@ -33,25 +33,30 @@ public class MetricsCache {
InternalLogDirStats logDirInfo;
Map<String, TopicDescription> topicDescriptions;
Map<String, List<ConfigEntry>> topicConfigs;
}
public static Metrics empty() {
return Metrics.builder()
.status(ServerStatusDTO.OFFLINE)
.version("Unknown")
.features(List.of())
.zkStatus(new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null))
.clusterDescription(
new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
.jmxMetrics(JmxClusterUtil.JmxMetrics.empty())
.logDirInfo(InternalLogDirStats.empty())
.topicDescriptions(Map.of())
.topicConfigs(Map.of())
.build();
public static Metrics empty() {
return builder()
.status(ServerStatusDTO.OFFLINE)
.version("Unknown")
.features(List.of())
.zkStatus(new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null))
.clusterDescription(
new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
.jmxMetrics(JmxClusterUtil.JmxMetrics.empty())
.logDirInfo(InternalLogDirStats.empty())
.topicDescriptions(Map.of())
.topicConfigs(Map.of())
.build();
}
}
private final Map<String, Metrics> cache = new ConcurrentHashMap<>();
public MetricsCache(ClustersStorage clustersStorage) {
var initializing = Metrics.empty().toBuilder().status(ServerStatusDTO.INITIALIZING).build();
clustersStorage.getKafkaClusters().forEach(c -> cache.put(c.getName(), initializing));
}
public synchronized void replace(KafkaCluster c, Metrics stats) {
cache.put(c.getName(), stats);
}
@ -89,7 +94,7 @@ public class MetricsCache {
}
public Metrics get(KafkaCluster c) {
return Optional.ofNullable(cache.get(c.getName())).orElseGet(MetricsCache::empty);
return Objects.requireNonNull(cache.get(c.getName()), "Unknown cluster metrics requested");
}
}

View file

@ -56,7 +56,7 @@ public class MetricsService {
.doOnError(e ->
log.error("Failed to collect cluster {} info", cluster.getName(), e))
.onErrorResume(
e -> Mono.just(MetricsCache.empty().toBuilder().lastKafkaException(e).build()));
e -> Mono.just(MetricsCache.Metrics.empty().toBuilder().lastKafkaException(e).build()));
}
private Mono<InternalLogDirStats> getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {

View file

@ -1,45 +1,31 @@
package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.AbstractBaseTest;
import com.provectus.kafka.ui.mapper.ClusterMapperImpl;
import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
import com.provectus.kafka.ui.model.BrokerDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.Properties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import reactor.test.StepVerifier;
@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
class BrokerServiceTest extends AbstractBaseTest {
private final KafkaCluster kafkaCluster =
KafkaCluster.builder()
.name(LOCAL)
.bootstrapServers(kafka.getBootstrapServers())
.properties(new Properties())
.build();
@Autowired
private BrokerService brokerService;
@BeforeEach
void init() {
AdminClientServiceImpl adminClientService = new AdminClientServiceImpl();
adminClientService.setClientTimeout(5_000);
brokerService =
new BrokerService(new MetricsCache(), adminClientService, new DescribeLogDirsMapper(), new ClusterMapperImpl());
}
@Autowired
private ClustersStorage clustersStorage;
@Test
void getBrokersNominal() {
BrokerDTO brokerdto = new BrokerDTO();
brokerdto.setId(1);
brokerdto.setHost("localhost");
String port = kafka.getBootstrapServers().substring(kafka.getBootstrapServers().lastIndexOf(":") + 1);
brokerdto.setPort(Integer.parseInt(port));
void getBrokersReturnsFilledBrokerDto() {
BrokerDTO expectedBroker = new BrokerDTO();
expectedBroker.setId(1);
expectedBroker.setHost(kafka.getHost());
expectedBroker.setPort(kafka.getFirstMappedPort());
StepVerifier.create(brokerService.getBrokers(kafkaCluster))
.expectNext(brokerdto)
var localCluster = clustersStorage.getClusterByName(LOCAL).get();
StepVerifier.create(brokerService.getBrokers(localCluster))
.expectNext(expectedBroker)
.verifyComplete();
}

View file

@ -29,7 +29,7 @@ class TopicsServicePaginationTest {
.collect(Collectors.toSet())))
.getMock();
MetricsCache.Metrics metricsCache = MetricsCache.empty().toBuilder()
MetricsCache.Metrics metricsCache = MetricsCache.Metrics.empty().toBuilder()
.topicDescriptions(
topicsInCache.stream().collect(Collectors.toMap(TopicDescription::name, d -> d)))
.build();

View file

@ -1667,6 +1667,7 @@ components:
enum:
- online
- offline
- initializing
ClusterMetrics:
type: object