ClustersProperties.java 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package com.provectus.kafka.ui.config;
  2. import com.provectus.kafka.ui.model.MetricsConfig;
  3. import java.util.ArrayList;
  4. import java.util.HashMap;
  5. import java.util.HashSet;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Set;
  9. import javax.annotation.Nullable;
  10. import javax.annotation.PostConstruct;
  11. import lombok.AllArgsConstructor;
  12. import lombok.Builder;
  13. import lombok.Data;
  14. import lombok.NoArgsConstructor;
  15. import lombok.ToString;
  16. import org.springframework.boot.context.properties.ConfigurationProperties;
  17. import org.springframework.context.annotation.Configuration;
  18. import org.springframework.util.StringUtils;
  19. @Configuration
  20. @ConfigurationProperties("kafka")
  21. @Data
  22. public class ClustersProperties {
  23. List<Cluster> clusters = new ArrayList<>();
  24. String internalTopicPrefix;
  25. PollingProperties polling = new PollingProperties();
  26. @Data
  27. public static class Cluster {
  28. String name;
  29. String bootstrapServers;
  30. String schemaRegistry;
  31. SchemaRegistryAuth schemaRegistryAuth;
  32. KeystoreConfig schemaRegistrySsl;
  33. String ksqldbServer;
  34. KsqldbServerAuth ksqldbServerAuth;
  35. KeystoreConfig ksqldbServerSsl;
  36. List<ConnectCluster> kafkaConnect;
  37. MetricsConfigData metrics;
  38. Map<String, Object> properties;
  39. boolean readOnly = false;
  40. List<SerdeConfig> serde;
  41. String defaultKeySerde;
  42. String defaultValueSerde;
  43. List<Masking> masking;
  44. Long pollingThrottleRate;
  45. TruststoreConfig ssl;
  46. }
  47. @Data
  48. public static class PollingProperties {
  49. Integer pollTimeoutMs;
  50. Integer partitionPollTimeout;
  51. Integer noDataEmptyPolls;
  52. }
  53. @Data
  54. @ToString(exclude = "password")
  55. public static class MetricsConfigData {
  56. String type;
  57. Integer port;
  58. Boolean ssl;
  59. String username;
  60. String password;
  61. String keystoreLocation;
  62. String keystorePassword;
  63. }
  64. @Data
  65. @NoArgsConstructor
  66. @AllArgsConstructor
  67. @Builder(toBuilder = true)
  68. @ToString(exclude = {"password", "keystorePassword"})
  69. public static class ConnectCluster {
  70. String name;
  71. String address;
  72. String username;
  73. String password;
  74. String keystoreLocation;
  75. String keystorePassword;
  76. }
  77. @Data
  78. @ToString(exclude = {"password"})
  79. public static class SchemaRegistryAuth {
  80. String username;
  81. String password;
  82. }
  83. @Data
  84. @ToString(exclude = {"truststorePassword"})
  85. public static class TruststoreConfig {
  86. String truststoreLocation;
  87. String truststorePassword;
  88. }
  89. @Data
  90. public static class SerdeConfig {
  91. String name;
  92. String className;
  93. String filePath;
  94. Map<String, Object> properties;
  95. String topicKeysPattern;
  96. String topicValuesPattern;
  97. }
  98. @Data
  99. @ToString(exclude = "password")
  100. public static class KsqldbServerAuth {
  101. String username;
  102. String password;
  103. }
  104. @Data
  105. @NoArgsConstructor
  106. @AllArgsConstructor
  107. @ToString(exclude = {"keystorePassword"})
  108. public static class KeystoreConfig {
  109. String keystoreLocation;
  110. String keystorePassword;
  111. }
  112. @Data
  113. public static class Masking {
  114. Type type;
  115. List<String> fields; //if null or empty list - policy will be applied to all fields
  116. List<String> pattern; //used when type=MASK
  117. String replacement; //used when type=REPLACE
  118. String topicKeysPattern;
  119. String topicValuesPattern;
  120. public enum Type {
  121. REMOVE, MASK, REPLACE
  122. }
  123. }
  124. @PostConstruct
  125. public void validateAndSetDefaults() {
  126. if (clusters != null) {
  127. validateClusterNames();
  128. flattenClusterProperties();
  129. setMetricsDefaults();
  130. }
  131. }
  132. private void setMetricsDefaults() {
  133. for (Cluster cluster : clusters) {
  134. if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) {
  135. cluster.getMetrics().setType(MetricsConfig.JMX_METRICS_TYPE);
  136. }
  137. }
  138. }
  139. private void flattenClusterProperties() {
  140. for (Cluster cluster : clusters) {
  141. cluster.setProperties(flattenClusterProperties(null, cluster.getProperties()));
  142. }
  143. }
  144. private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
  145. @Nullable Map<String, Object> propertiesMap) {
  146. Map<String, Object> flattened = new HashMap<>();
  147. if (propertiesMap != null) {
  148. propertiesMap.forEach((k, v) -> {
  149. String key = prefix == null ? k : prefix + "." + k;
  150. if (v instanceof Map<?, ?>) {
  151. flattened.putAll(flattenClusterProperties(key, (Map<String, Object>) v));
  152. } else {
  153. flattened.put(key, v);
  154. }
  155. });
  156. }
  157. return flattened;
  158. }
  159. private void validateClusterNames() {
  160. // if only one cluster provided it is ok not to set name
  161. if (clusters.size() == 1 && !StringUtils.hasText(clusters.get(0).getName())) {
  162. clusters.get(0).setName("Default");
  163. return;
  164. }
  165. Set<String> clusterNames = new HashSet<>();
  166. for (Cluster clusterProperties : clusters) {
  167. if (!StringUtils.hasText(clusterProperties.getName())) {
  168. throw new IllegalStateException(
  169. "Application config isn't valid. "
  170. + "Cluster names should be provided in case of multiple clusters present");
  171. }
  172. if (!clusterNames.add(clusterProperties.getName())) {
  173. throw new IllegalStateException(
  174. "Application config isn't valid. Two clusters can't have the same name");
  175. }
  176. }
  177. }
  178. }