package com.provectus.kafka.ui.config; import static com.provectus.kafka.ui.model.MetricsScrapeProperties.JMX_METRICS_TYPE; import jakarta.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; @Configuration @ConfigurationProperties("kafka") @Data public class ClustersProperties { List clusters = new ArrayList<>(); String internalTopicPrefix; Integer adminClientTimeout; PollingProperties polling = new PollingProperties(); @Data public static class Cluster { String name; String bootstrapServers; String schemaRegistry; SchemaRegistryAuth schemaRegistryAuth; KeystoreConfig schemaRegistrySsl; String ksqldbServer; KsqldbServerAuth ksqldbServerAuth; KeystoreConfig ksqldbServerSsl; List kafkaConnect; MetricsConfig metrics; Map properties; boolean readOnly = false; List serde; String defaultKeySerde; String defaultValueSerde; List masking; Long pollingThrottleRate; TruststoreConfig ssl; AuditProperties audit; } @Data public static class PollingProperties { Integer pollTimeoutMs; Integer partitionPollTimeout; Integer noDataEmptyPolls; Integer maxPageSize; Integer defaultPageSize; } @Data @ToString(exclude = {"password", "keystorePassword"}) public static class MetricsConfig { String type; Integer port; Boolean ssl; String username; String password; String keystoreLocation; String keystorePassword; MetricsStorage store; } @Data public static class MetricsStorage { PrometheusStorage prometheus; } @Data @ToString(exclude = {"pushGatewayPassword"}) public static class PrometheusStorage { String url; String pushGatewayUrl; String pushGatewayUsername; String pushGatewayPassword; String pushGatewayJobName; Boolean remoteWrite; } @Data @NoArgsConstructor @AllArgsConstructor @Builder(toBuilder = true) @ToString(exclude = {"password", "keystorePassword"}) public static class ConnectCluster { String name; String address; String username; String password; String keystoreLocation; String keystorePassword; } @Data @ToString(exclude = {"password"}) public static class SchemaRegistryAuth { String username; String password; } @Data @ToString(exclude = {"truststorePassword"}) public static class TruststoreConfig { String truststoreLocation; String truststorePassword; } @Data public static class SerdeConfig { String name; String className; String filePath; Map properties; String topicKeysPattern; String topicValuesPattern; } @Data @ToString(exclude = "password") public static class KsqldbServerAuth { String username; String password; } @Data @NoArgsConstructor @AllArgsConstructor @ToString(exclude = {"keystorePassword"}) public static class KeystoreConfig { String keystoreLocation; String keystorePassword; } @Data public static class Masking { Type type; List fields; String fieldsNamePattern; List maskingCharsReplacement; //used when type=MASK String replacement; //used when type=REPLACE String topicKeysPattern; String topicValuesPattern; public enum Type { REMOVE, MASK, REPLACE } } @Data @NoArgsConstructor @AllArgsConstructor public static class AuditProperties { String topic; Integer auditTopicsPartitions; Boolean topicAuditEnabled; Boolean consoleAuditEnabled; Map auditTopicProperties; } @PostConstruct public void validateAndSetDefaults() { if (clusters != null) { validateClusterNames(); flattenClusterProperties(); setMetricsDefaults(); } } private void setMetricsDefaults() { for (Cluster cluster : clusters) { if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) { cluster.getMetrics().setType(JMX_METRICS_TYPE); } } } private void flattenClusterProperties() { for (Cluster cluster : clusters) { cluster.setProperties(flattenClusterProperties(null, cluster.getProperties())); } } private Map flattenClusterProperties(@Nullable String prefix, @Nullable Map propertiesMap) { Map flattened = new HashMap<>(); if (propertiesMap != null) { propertiesMap.forEach((k, v) -> { String key = prefix == null ? k : prefix + "." + k; if (v instanceof Map) { flattened.putAll(flattenClusterProperties(key, (Map) v)); } else { flattened.put(key, v); } }); } return flattened; } private void validateClusterNames() { // if only one cluster provided it is ok not to set name if (clusters.size() == 1 && !StringUtils.hasText(clusters.get(0).getName())) { clusters.get(0).setName("Default"); return; } Set clusterNames = new HashSet<>(); for (Cluster clusterProperties : clusters) { if (!StringUtils.hasText(clusterProperties.getName())) { throw new IllegalStateException( "Application config isn't valid. " + "Cluster names should be provided in case of multiple clusters present"); } if (!clusterNames.add(clusterProperties.getName())) { throw new IllegalStateException( "Application config isn't valid. Two clusters can't have the same name"); } } } }