Added cluster version to api (#609)
This commit is contained in:
parent
ca476d3373
commit
e3b2ea1052
5 changed files with 37 additions and 20 deletions
|
@ -15,6 +15,7 @@ public class ExtendedAdminClient {
|
|||
private final Set<SupportedFeature> supportedFeatures;
|
||||
|
||||
public static Mono<ExtendedAdminClient> extendedAdminClient(AdminClient adminClient) {
|
||||
|
||||
return ClusterUtil.getSupportedFeatures(adminClient)
|
||||
.map(s -> new ExtendedAdminClient(adminClient, s));
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import lombok.Data;
|
|||
@Builder(toBuilder = true)
|
||||
public class KafkaCluster {
|
||||
private final String name;
|
||||
private final String version;
|
||||
private final Integer jmxPort;
|
||||
private final String bootstrapServers;
|
||||
private final String zookeeper;
|
||||
|
|
|
@ -104,13 +104,16 @@ public class KafkaService {
|
|||
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
|
||||
return getOrCreateAdminClient(cluster)
|
||||
.flatMap(
|
||||
ac -> getClusterMetrics(ac.getAdminClient())
|
||||
.flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
|
||||
.flatMap(clusterMetrics ->
|
||||
getTopicsData(ac.getAdminClient()).flatMap(it ->
|
||||
updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)
|
||||
).map(segmentSizeDto -> buildFromData(cluster, segmentSizeDto))
|
||||
)
|
||||
ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap(
|
||||
version ->
|
||||
getClusterMetrics(ac.getAdminClient())
|
||||
.flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
|
||||
.flatMap(clusterMetrics ->
|
||||
getTopicsData(ac.getAdminClient()).flatMap(it ->
|
||||
updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)
|
||||
).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto))
|
||||
)
|
||||
)
|
||||
).onErrorResume(
|
||||
e -> Mono.just(cluster.toBuilder()
|
||||
.status(ServerStatus.OFFLINE)
|
||||
|
@ -120,6 +123,7 @@ public class KafkaService {
|
|||
}
|
||||
|
||||
private KafkaCluster buildFromData(KafkaCluster currentCluster,
|
||||
String version,
|
||||
InternalSegmentSizeDto segmentSizeDto) {
|
||||
|
||||
var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
|
||||
|
@ -152,6 +156,7 @@ public class KafkaService {
|
|||
.build();
|
||||
|
||||
return currentCluster.toBuilder()
|
||||
.version(version)
|
||||
.status(ServerStatus.ONLINE)
|
||||
.zookeeperStatus(zookeeperStatus)
|
||||
.lastZookeeperException(zookeeperException)
|
||||
|
|
|
@ -237,23 +237,12 @@ public class ClusterUtil {
|
|||
|
||||
public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(
|
||||
AdminClient adminClient) {
|
||||
return ClusterUtil.toMono(adminClient.describeCluster().controller())
|
||||
.map(Node::id)
|
||||
.map(id -> Collections
|
||||
.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())))
|
||||
.map(brokerCR -> adminClient.describeConfigs(brokerCR).all())
|
||||
.flatMap(ClusterUtil::toMono)
|
||||
return getClusterVersion(adminClient)
|
||||
.map(ClusterUtil::getSupportedUpdateFeature)
|
||||
.map(Collections::singleton);
|
||||
}
|
||||
|
||||
private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(
|
||||
Map<ConfigResource, Config> configs) {
|
||||
String version = configs.values().stream()
|
||||
.map(Config::entries)
|
||||
.flatMap(Collection::stream)
|
||||
.filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
|
||||
.findFirst().orElseThrow().value();
|
||||
private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(String version) {
|
||||
try {
|
||||
final String[] parts = version.split("\\.");
|
||||
if (parts.length > 2) {
|
||||
|
@ -268,6 +257,25 @@ public class ClusterUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static Mono<String> getClusterVersion(AdminClient adminClient) {
|
||||
return ClusterUtil.toMono(adminClient.describeCluster().controller())
|
||||
.map(Node::id)
|
||||
.map(id -> Collections
|
||||
.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())))
|
||||
.map(brokerCR -> adminClient.describeConfigs(brokerCR).all())
|
||||
.flatMap(ClusterUtil::toMono)
|
||||
.map(ClusterUtil::getClusterVersion);
|
||||
}
|
||||
|
||||
public static String getClusterVersion(Map<ConfigResource, Config> configs) {
|
||||
return configs.values().stream()
|
||||
.map(Config::entries)
|
||||
.flatMap(Collection::stream)
|
||||
.filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
|
||||
.findFirst().orElseThrow().value();
|
||||
}
|
||||
|
||||
|
||||
public static <T, R> Map<T, R> toSingleMap(Stream<Map<T, R>> streamOfMaps) {
|
||||
return streamOfMaps
|
||||
.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
||||
|
|
|
@ -1281,6 +1281,8 @@ components:
|
|||
type: number
|
||||
readOnly:
|
||||
type: boolean
|
||||
version:
|
||||
type: string
|
||||
features:
|
||||
type: array
|
||||
items:
|
||||
|
|
Loading…
Add table
Reference in a new issue