This commit is contained in:
iliax 2023-06-21 11:09:44 +04:00
parent 38eb68dcc5
commit 54a5398413
10 changed files with 118 additions and 60 deletions

View file

@ -1,7 +1,7 @@
package com.provectus.kafka.ui.model;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.ScrapedClusterState;
import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -19,12 +19,13 @@ public class Statistics {
List<ClusterFeature> features;
ReactiveAdminClient.ClusterDescription clusterDescription;
Metrics metrics;
ScrapedClusterState clusterState;
//TODO: to be removed -->>
InternalLogDirStats logDirInfo;
Map<String, TopicDescription> topicDescriptions;
Map<String, List<ConfigEntry>> topicConfigs;
ScrapedClusterState clusterState;
public static Statistics empty() {
return builder()
.status(ServerStatusDTO.OFFLINE)

View file

@ -0,0 +1,56 @@
package com.provectus.kafka.ui.service.metrics.v2.scrape;
import com.google.common.collect.Table;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import lombok.Value;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.TopicDescription;
import reactor.core.publisher.Mono;
@Value
public class ScrapedClusterState {
record NodeState(SegmentStats segmentStats) {
}
record TopicState(
Instant scrapeTime,
String name,
List<ConfigEntry> configs,
TopicDescription description,
Map<Integer, Long> offsets,
SegmentStats segmentStats,
Map<Integer, SegmentStats> partitionsSegmentStats) {
}
record ConsumerGroupState(
Instant scrapeTime,
String group,
ConsumerGroupDescription description,
Table<String, Integer, Long> committedOffsets,
Map<String, Instant> lastTopicActivity) {
}
record SegmentStats(long segmentSize,
int segmentsCount) {
}
Instant scrapeStartTime;
Map<Integer, NodeState> nodesStates;
Map<String, TopicState> topicStates;
Map<String, ConsumerGroupState> consumerGroupsStates;
public static ScrapedClusterState empty() {
//TODO impl
return null;
}
public static Mono<ScrapedClusterState> scrape(ReactiveAdminClient ac) {
return null;//TODO impl
}
}

View file

@ -3,14 +3,19 @@ package com.provectus.kafka.ui.service.metrics.v2.scrape;
import io.prometheus.client.Collector.MetricFamilySamples;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
public interface ScrapedMetrics {
Stream<MetricFamilySamples> asStream();
static ScrapedMetrics create(Collection<MetricFamilySamples> lst) {
return lst::stream;
}
static ScrapedMetrics empty() {
return create(List.of());
}
Stream<MetricFamilySamples> asStream();
}

View file

@ -0,0 +1,8 @@
package com.provectus.kafka.ui.service.metrics.v2.scrape;
public class Scrapping {
}

View file

@ -2,19 +2,22 @@ package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
import static io.prometheus.client.Collector.*;
import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics;
import java.util.List;
import java.util.stream.Stream;
public class InferredMetrics implements ScrapedMetrics {
private final List<MetricFamilySamples> metrics;
public InferredMetrics(List<MetricFamilySamples> metrics) {
this.metrics = metrics;
}
@Override
public Stream<MetricFamilySamples> asStream() {
return null;
}
public ScrapedClusterState clusterState() {
//todo: impl
return null;
return metrics.stream();
}
}

View file

@ -0,0 +1,34 @@
package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper;
import java.util.List;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Mono;
@RequiredArgsConstructor
public class InferredMetricsScraper implements Scraper<InferredMetrics> {
private final Supplier<ScrapedClusterState> currentStateSupplier;
private ScrapedClusterState prevState = null;
@Override
public synchronized Mono<InferredMetrics> scrape() {
if (prevState == null) {
prevState = currentStateSupplier.get();
return Mono.empty();
}
var newState = currentStateSupplier.get();
var inferred = infer(prevState, newState);
prevState = newState;
return Mono.just(inferred);
}
private static InferredMetrics infer(ScrapedClusterState prevState,
ScrapedClusterState newState) {
//TODO: impl
return new InferredMetrics(List.of());
}
}

View file

@ -1,22 +0,0 @@
package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper;
import reactor.core.publisher.Mono;
public class InferredMetricsScrapper implements Scraper<InferredMetrics> {
private final ReactiveAdminClient adminClient;
private volatile ScrapedClusterState clusterState;
public InferredMetricsScrapper(ReactiveAdminClient adminClient) {
this.adminClient = adminClient;
}
@Override
public Mono<InferredMetrics> scrape() {
return null;
}
}

View file

@ -1,19 +0,0 @@
package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states.ConsumerGroupsState;
import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states.TopicsState;
import java.time.Instant;
import lombok.Value;
@Value
public class ScrapedClusterState {
Instant scrapeStart;
TopicsState topicsState;
ConsumerGroupsState consumerGroupsState;
public static ScrapedClusterState empty() {
return new ScrapedClusterState(null, null, null);
}
}

View file

@ -1,4 +0,0 @@
package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states;
public class ConsumerGroupsState {
}

View file

@ -1,4 +0,0 @@
package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states;
public class TopicsState {
}