ClusterUtil.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package com.provectus.kafka.ui.util;
  2. import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
  3. import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
  4. import com.provectus.kafka.ui.deserialization.RecordDeserializer;
  5. import com.provectus.kafka.ui.model.ConsumerGroup;
  6. import com.provectus.kafka.ui.model.ConsumerGroupDetails;
  7. import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail;
  8. import com.provectus.kafka.ui.model.ExtendedAdminClient;
  9. import com.provectus.kafka.ui.model.InternalPartition;
  10. import com.provectus.kafka.ui.model.InternalReplica;
  11. import com.provectus.kafka.ui.model.InternalTopic;
  12. import com.provectus.kafka.ui.model.InternalTopicConfig;
  13. import com.provectus.kafka.ui.model.ServerStatus;
  14. import com.provectus.kafka.ui.model.TopicMessage;
  15. import java.time.Instant;
  16. import java.time.OffsetDateTime;
  17. import java.time.ZoneId;
  18. import java.util.Collection;
  19. import java.util.Collections;
  20. import java.util.HashMap;
  21. import java.util.List;
  22. import java.util.Map;
  23. import java.util.Optional;
  24. import java.util.Set;
  25. import java.util.stream.Collectors;
  26. import java.util.stream.Stream;
  27. import lombok.extern.slf4j.Slf4j;
  28. import org.apache.kafka.clients.admin.AdminClient;
  29. import org.apache.kafka.clients.admin.Config;
  30. import org.apache.kafka.clients.admin.ConfigEntry;
  31. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  32. import org.apache.kafka.clients.admin.MemberAssignment;
  33. import org.apache.kafka.clients.admin.MemberDescription;
  34. import org.apache.kafka.clients.admin.TopicDescription;
  35. import org.apache.kafka.clients.consumer.ConsumerRecord;
  36. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  37. import org.apache.kafka.common.KafkaFuture;
  38. import org.apache.kafka.common.Node;
  39. import org.apache.kafka.common.TopicPartition;
  40. import org.apache.kafka.common.config.ConfigResource;
  41. import org.apache.kafka.common.record.TimestampType;
  42. import org.apache.kafka.common.utils.Bytes;
  43. import reactor.core.publisher.Mono;
  44. @Slf4j
  45. public class ClusterUtil {
  46. private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
  47. private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
  48. public static <T> Mono<T> toMono(KafkaFuture<T> future) {
  49. return Mono.create(sink -> future.whenComplete((res, ex) -> {
  50. if (ex != null) {
  51. sink.error(ex);
  52. } else {
  53. sink.success(res);
  54. }
  55. }));
  56. }
  57. public static Mono<String> toMono(KafkaFuture<Void> future, String topicName) {
  58. return Mono.create(sink -> future.whenComplete((res, ex) -> {
  59. if (ex != null) {
  60. sink.error(ex);
  61. } else {
  62. sink.success(topicName);
  63. }
  64. }));
  65. }
  66. public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c) {
  67. ConsumerGroup consumerGroup = new ConsumerGroup();
  68. consumerGroup.setConsumerGroupId(c.groupId());
  69. consumerGroup.setNumConsumers(c.members().size());
  70. int numTopics = c.members().stream()
  71. .flatMap(m -> m.assignment().topicPartitions().stream().flatMap(t -> Stream.of(t.topic())))
  72. .collect(Collectors.toSet()).size();
  73. consumerGroup.setNumTopics(numTopics);
  74. consumerGroup.setSimple(c.isSimpleConsumerGroup());
  75. Optional.ofNullable(c.state())
  76. .ifPresent(s -> consumerGroup.setState(s.name()));
  77. Optional.ofNullable(c.coordinator())
  78. .ifPresent(coord -> consumerGroup.setCoordintor(coord.host()));
  79. consumerGroup.setPartitionAssignor(c.partitionAssignor());
  80. return consumerGroup;
  81. }
  82. public static ConsumerGroupDetails convertToConsumerGroupDetails(
  83. ConsumerGroupDescription desc, List<ConsumerTopicPartitionDetail> consumers
  84. ) {
  85. return new ConsumerGroupDetails()
  86. .consumers(consumers)
  87. .consumerGroupId(desc.groupId())
  88. .simple(desc.isSimpleConsumerGroup())
  89. .coordintor(Optional.ofNullable(desc.coordinator()).map(Node::host).orElse(""))
  90. .state(Optional.ofNullable(desc.state()).map(Enum::name).orElse(""))
  91. .partitionAssignor(desc.partitionAssignor());
  92. }
  93. public static List<ConsumerTopicPartitionDetail> convertToConsumerTopicPartitionDetails(
  94. MemberDescription consumer,
  95. Map<TopicPartition, OffsetAndMetadata> groupOffsets,
  96. Map<TopicPartition, Long> endOffsets,
  97. String groupId
  98. ) {
  99. return consumer.assignment().topicPartitions().stream()
  100. .map(tp -> {
  101. long currentOffset = Optional.ofNullable(groupOffsets.get(tp))
  102. .map(OffsetAndMetadata::offset).orElse(0L);
  103. long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L);
  104. ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail();
  105. cd.setGroupId(groupId);
  106. cd.setConsumerId(consumer.consumerId());
  107. cd.setHost(consumer.host());
  108. cd.setTopic(tp.topic());
  109. cd.setPartition(tp.partition());
  110. cd.setCurrentOffset(currentOffset);
  111. cd.setEndOffset(endOffset);
  112. cd.setMessagesBehind(endOffset - currentOffset);
  113. return cd;
  114. }).collect(Collectors.toList());
  115. }
  116. public static InternalTopicConfig mapToInternalTopicConfig(ConfigEntry configEntry) {
  117. InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
  118. .name(configEntry.name())
  119. .value(configEntry.value());
  120. if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
  121. builder.defaultValue(configEntry.value());
  122. } else {
  123. builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name()));
  124. }
  125. return builder.build();
  126. }
  127. public static InternalTopic mapToInternalTopic(TopicDescription topicDescription) {
  128. var topic = InternalTopic.builder();
  129. topic.internal(topicDescription.isInternal());
  130. topic.name(topicDescription.name());
  131. List<InternalPartition> partitions = topicDescription.partitions().stream().map(
  132. partition -> {
  133. var partitionDto = InternalPartition.builder();
  134. partitionDto.leader(partition.leader().id());
  135. partitionDto.partition(partition.partition());
  136. partitionDto.inSyncReplicasCount(partition.isr().size());
  137. partitionDto.replicasCount(partition.replicas().size());
  138. List<InternalReplica> replicas = partition.replicas().stream().map(
  139. r -> new InternalReplica(r.id(), partition.leader().id() != r.id(),
  140. partition.isr().contains(r)))
  141. .collect(Collectors.toList());
  142. partitionDto.replicas(replicas);
  143. return partitionDto.build();
  144. })
  145. .collect(Collectors.toList());
  146. int urpCount = partitions.stream()
  147. .flatMap(partition -> partition.getReplicas().stream())
  148. .filter(p -> !p.isInSync()).mapToInt(e -> 1)
  149. .sum();
  150. int inSyncReplicasCount = partitions.stream()
  151. .mapToInt(InternalPartition::getInSyncReplicasCount)
  152. .sum();
  153. int replicasCount = partitions.stream()
  154. .mapToInt(InternalPartition::getReplicasCount)
  155. .sum();
  156. topic.partitions(partitions.stream().collect(Collectors.toMap(
  157. InternalPartition::getPartition,
  158. t -> t
  159. )));
  160. topic.replicas(replicasCount);
  161. topic.partitionCount(topicDescription.partitions().size());
  162. topic.inSyncReplicas(inSyncReplicasCount);
  163. topic.replicationFactor(
  164. topicDescription.partitions().isEmpty()
  165. ? 0
  166. : topicDescription.partitions().get(0).replicas().size()
  167. );
  168. topic.underReplicatedPartitions(urpCount);
  169. return topic.build();
  170. }
  171. public static int convertToIntServerStatus(ServerStatus serverStatus) {
  172. return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
  173. }
  174. public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord,
  175. RecordDeserializer recordDeserializer) {
  176. Map<String, String> headers = new HashMap<>();
  177. consumerRecord.headers().iterator()
  178. .forEachRemaining(header -> headers.put(header.key(), new String(header.value())));
  179. TopicMessage topicMessage = new TopicMessage();
  180. OffsetDateTime timestamp =
  181. OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
  182. TopicMessage.TimestampTypeEnum timestampType =
  183. mapToTimestampType(consumerRecord.timestampType());
  184. topicMessage.setPartition(consumerRecord.partition());
  185. topicMessage.setOffset(consumerRecord.offset());
  186. topicMessage.setTimestamp(timestamp);
  187. topicMessage.setTimestampType(timestampType);
  188. if (consumerRecord.key() != null) {
  189. topicMessage.setKey(consumerRecord.key().toString());
  190. }
  191. topicMessage.setHeaders(headers);
  192. Object parsedValue = recordDeserializer.deserialize(consumerRecord);
  193. topicMessage.setContent(parsedValue);
  194. return topicMessage;
  195. }
  196. private static TopicMessage.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
  197. switch (timestampType) {
  198. case CREATE_TIME:
  199. return TopicMessage.TimestampTypeEnum.CREATE_TIME;
  200. case LOG_APPEND_TIME:
  201. return TopicMessage.TimestampTypeEnum.LOG_APPEND_TIME;
  202. case NO_TIMESTAMP_TYPE:
  203. return TopicMessage.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
  204. default:
  205. throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
  206. }
  207. }
  208. public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(
  209. AdminClient adminClient) {
  210. return ClusterUtil.toMono(adminClient.describeCluster().controller())
  211. .map(Node::id)
  212. .map(id -> Collections
  213. .singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())))
  214. .map(brokerCR -> adminClient.describeConfigs(brokerCR).all())
  215. .flatMap(ClusterUtil::toMono)
  216. .map(ClusterUtil::getSupportedUpdateFeature)
  217. .map(Collections::singleton);
  218. }
  219. private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(
  220. Map<ConfigResource, Config> configs) {
  221. String version = configs.values().stream()
  222. .map(Config::entries)
  223. .flatMap(Collection::stream)
  224. .filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
  225. .findFirst().orElseThrow().value();
  226. try {
  227. final String[] parts = version.split("\\.");
  228. if (parts.length > 2) {
  229. version = parts[0] + "." + parts[1];
  230. }
  231. return Float.parseFloat(version.split("-")[0]) <= 2.3f
  232. ? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS :
  233. ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
  234. } catch (Exception e) {
  235. log.error("Conversion clusterVersion {} to float value failed", version);
  236. throw e;
  237. }
  238. }
  239. public static <T, R> Map<T, R> toSingleMap(Stream<Map<T, R>> streamOfMaps) {
  240. return streamOfMaps
  241. .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
  242. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
  243. }
  244. public static Optional<ConsumerGroupDescription> filterConsumerGroupTopic(
  245. ConsumerGroupDescription description, String topic) {
  246. final List<MemberDescription> members = description.members().stream()
  247. .map(m -> filterConsumerMemberTopic(m, topic))
  248. .filter(m -> !m.assignment().topicPartitions().isEmpty())
  249. .collect(Collectors.toList());
  250. if (!members.isEmpty()) {
  251. return Optional.of(
  252. new ConsumerGroupDescription(
  253. description.groupId(),
  254. description.isSimpleConsumerGroup(),
  255. members,
  256. description.partitionAssignor(),
  257. description.state(),
  258. description.coordinator()
  259. )
  260. );
  261. } else {
  262. return Optional.empty();
  263. }
  264. }
  265. public static MemberDescription filterConsumerMemberTopic(
  266. MemberDescription description, String topic) {
  267. final Set<TopicPartition> topicPartitions = description.assignment().topicPartitions()
  268. .stream().filter(tp -> tp.topic().equals(topic))
  269. .collect(Collectors.toSet());
  270. MemberAssignment assignment = new MemberAssignment(topicPartitions);
  271. return new MemberDescription(
  272. description.consumerId(),
  273. description.groupInstanceId(),
  274. description.clientId(),
  275. description.host(),
  276. assignment
  277. );
  278. }
  279. }