StatisticsCache.java 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.model.KafkaCluster;
  3. import com.provectus.kafka.ui.model.ServerStatusDTO;
  4. import com.provectus.kafka.ui.model.Statistics;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Objects;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. import org.apache.kafka.clients.admin.ConfigEntry;
  11. import org.apache.kafka.clients.admin.TopicDescription;
  12. import org.springframework.stereotype.Component;
  13. @Component
  14. public class StatisticsCache {
  15. private final Map<String, Statistics> cache = new ConcurrentHashMap<>();
  16. public StatisticsCache(ClustersStorage clustersStorage) {
  17. var initializing = Statistics.empty().toBuilder().status(ServerStatusDTO.INITIALIZING).build();
  18. clustersStorage.getKafkaClusters().forEach(c -> cache.put(c.getName(), initializing));
  19. }
  20. public synchronized void replace(KafkaCluster c, Statistics stats) {
  21. cache.put(c.getName(), stats);
  22. }
  23. public synchronized void update(KafkaCluster c,
  24. Map<String, TopicDescription> descriptions,
  25. Map<String, List<ConfigEntry>> configs) {
  26. var metrics = get(c);
  27. var updatedDescriptions = new HashMap<>(metrics.getTopicDescriptions());
  28. updatedDescriptions.putAll(descriptions);
  29. var updatedConfigs = new HashMap<>(metrics.getTopicConfigs());
  30. updatedConfigs.putAll(configs);
  31. replace(
  32. c,
  33. metrics.toBuilder()
  34. .topicDescriptions(updatedDescriptions)
  35. .topicConfigs(updatedConfigs)
  36. .build()
  37. );
  38. }
  39. public synchronized void onTopicDelete(KafkaCluster c, String topic) {
  40. var metrics = get(c);
  41. var updatedDescriptions = new HashMap<>(metrics.getTopicDescriptions());
  42. updatedDescriptions.remove(topic);
  43. var updatedConfigs = new HashMap<>(metrics.getTopicConfigs());
  44. updatedConfigs.remove(topic);
  45. replace(
  46. c,
  47. metrics.toBuilder()
  48. .topicDescriptions(updatedDescriptions)
  49. .topicConfigs(updatedConfigs)
  50. .build()
  51. );
  52. }
  53. public Statistics get(KafkaCluster c) {
  54. return Objects.requireNonNull(cache.get(c.getName()), "Unknown cluster metrics requested");
  55. }
  56. }