|
@@ -68,10 +68,12 @@ public class ClusterUtil {
|
|
) {
|
|
) {
|
|
return consumer.assignment().topicPartitions().stream()
|
|
return consumer.assignment().topicPartitions().stream()
|
|
.map(tp -> {
|
|
.map(tp -> {
|
|
- Long currentOffset = groupOffsets.get(tp).offset();
|
|
|
|
- Long endOffset = endOffsets.get(tp);
|
|
|
|
|
|
+ Long currentOffset = Optional.ofNullable(
|
|
|
|
+ groupOffsets.get(tp)).map(o -> o.offset()).orElse(0L);
|
|
|
|
+ Long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L);
|
|
ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail();
|
|
ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail();
|
|
cd.setConsumerId(consumer.consumerId());
|
|
cd.setConsumerId(consumer.consumerId());
|
|
|
|
+ cd.setHost(consumer.host());
|
|
cd.setTopic(tp.topic());
|
|
cd.setTopic(tp.topic());
|
|
cd.setPartition(tp.partition());
|
|
cd.setPartition(tp.partition());
|
|
cd.setCurrentOffset(currentOffset);
|
|
cd.setCurrentOffset(currentOffset);
|
|
@@ -116,7 +118,7 @@ public class ClusterUtil {
|
|
|
|
|
|
int urpCount = partitions.stream()
|
|
int urpCount = partitions.stream()
|
|
.flatMap(partition -> partition.getReplicas().stream())
|
|
.flatMap(partition -> partition.getReplicas().stream())
|
|
- .filter(InternalReplica::isInSync).mapToInt(e -> 1)
|
|
|
|
|
|
+ .filter(p -> !p.isInSync()).mapToInt(e -> 1)
|
|
.sum();
|
|
.sum();
|
|
|
|
|
|
int inSyncReplicasCount = partitions.stream()
|
|
int inSyncReplicasCount = partitions.stream()
|
|
@@ -199,6 +201,10 @@ public class ClusterUtil {
|
|
.filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
|
|
.filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
|
|
.findFirst().orElseThrow().value();
|
|
.findFirst().orElseThrow().value();
|
|
try {
|
|
try {
|
|
|
|
+ final String[] parts = version.split("\\.");
|
|
|
|
+ if (parts.length>2) {
|
|
|
|
+ version = parts[0] + "." + parts[1];
|
|
|
|
+ }
|
|
return Float.parseFloat(version.split("-")[0]) <= 2.3f
|
|
return Float.parseFloat(version.split("-")[0]) <= 2.3f
|
|
? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
|
|
? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -210,18 +216,18 @@ public class ClusterUtil {
|
|
public static Topic convertToTopic(InternalTopic internalTopic) {
|
|
public static Topic convertToTopic(InternalTopic internalTopic) {
|
|
Topic topic = new Topic();
|
|
Topic topic = new Topic();
|
|
topic.setName(internalTopic.getName());
|
|
topic.setName(internalTopic.getName());
|
|
- List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {
|
|
|
|
- Partition partition = new Partition();
|
|
|
|
- partition.setPartition(s.getPartition());
|
|
|
|
- partition.setLeader(s.getLeader());
|
|
|
|
- partition.setReplicas(s.getReplicas().stream().flatMap(r -> {
|
|
|
|
- Replica replica = new Replica();
|
|
|
|
- replica.setBroker(r.getBroker());
|
|
|
|
- return Stream.of(replica);
|
|
|
|
- }).collect(Collectors.toList()));
|
|
|
|
- return Stream.of(partition);
|
|
|
|
- }).collect(Collectors.toList());
|
|
|
|
- topic.setPartitions(partitions);
|
|
|
|
|
|
+// List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {
|
|
|
|
+// Partition partition = new Partition();
|
|
|
|
+// partition.setPartition(s.getPartition());
|
|
|
|
+// partition.setLeader(s.getLeader());
|
|
|
|
+// partition.setReplicas(s.getReplicas().stream().flatMap(r -> {
|
|
|
|
+// Replica replica = new Replica();
|
|
|
|
+// replica.setBroker(r.getBroker());
|
|
|
|
+// return Stream.of(replica);
|
|
|
|
+// }).collect(Collectors.toList()));
|
|
|
|
+// return Stream.of(partition);
|
|
|
|
+// }).collect(Collectors.toList());
|
|
|
|
+// topic.setPartitions(partitions);
|
|
return topic;
|
|
return topic;
|
|
}
|
|
}
|
|
|
|
|