SerdesInitializer.java 12 KB


  1. package com.provectus.kafka.ui.serdes;
  2. import com.google.common.annotations.VisibleForTesting;
  3. import com.google.common.base.Preconditions;
  4. import com.google.common.base.Strings;
  5. import com.google.common.collect.ImmutableMap;
  6. import com.provectus.kafka.ui.config.ClustersProperties;
  7. import com.provectus.kafka.ui.config.ClustersProperties.SerdeConfig;
  8. import com.provectus.kafka.ui.exception.ValidationException;
  9. import com.provectus.kafka.ui.serde.api.PropertyResolver;
  10. import com.provectus.kafka.ui.serde.api.Serde;
  11. import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
  12. import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
  13. import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
  14. import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
  15. import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
  16. import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
  17. import com.provectus.kafka.ui.serdes.builtin.StringSerde;
  18. import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
  19. import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
  20. import com.provectus.kafka.ui.serdes.builtin.UuidBinarySerde;
  21. import com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde;
  22. import java.util.LinkedHashMap;
  23. import java.util.Map;
  24. import java.util.Optional;
  25. import java.util.regex.Pattern;
  26. import javax.annotation.Nullable;
  27. import lombok.SneakyThrows;
  28. import lombok.extern.slf4j.Slf4j;
  29. import org.springframework.core.env.Environment;
  30. @Slf4j
  31. public class SerdesInitializer {
  32. private final Map<String, Class<? extends BuiltInSerde>> builtInSerdeClasses;
  33. private final CustomSerdeLoader customSerdeLoader;
  34. public SerdesInitializer() {
  35. this(
  36. ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
  37. .put(StringSerde.name(), StringSerde.class)
  38. .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
  39. .put(ProtobufFileSerde.name(), ProtobufFileSerde.class)
  40. .put(Int32Serde.name(), Int32Serde.class)
  41. .put(Int64Serde.name(), Int64Serde.class)
  42. .put(UInt32Serde.name(), UInt32Serde.class)
  43. .put(UInt64Serde.name(), UInt64Serde.class)
  44. .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
  45. .put(Base64Serde.name(), Base64Serde.class)
  46. .put(UuidBinarySerde.name(), UuidBinarySerde.class)
  47. .build(),
  48. new CustomSerdeLoader()
  49. );
  50. }
  51. @VisibleForTesting
  52. SerdesInitializer(Map<String, Class<? extends BuiltInSerde>> builtInSerdeClasses,
  53. CustomSerdeLoader customSerdeLoader) {
  54. this.builtInSerdeClasses = builtInSerdeClasses;
  55. this.customSerdeLoader = customSerdeLoader;
  56. }
  57. /**
  58. * Initialization algorithm:
  59. * First, we iterate over explicitly configured serdes from cluster config:
  60. * > if serde has name = one of built-in serde's names:
  61. * - if serde's properties are empty, we treat it as serde should be
  62. * auto-configured - we try to do that
  63. * - if serde's properties not empty, we treat it as an intention to
  64. * override default configuration, so we configuring it with specific config (calling configure(..))
  65. * <p/>
  66. * > if serde has className = one of built-in serde's classes:
  67. * - initializing it with specific config and with default classloader
  68. * <p/>
  69. * > if serde has custom className != one of built-in serde's classes:
  70. * - initializing it with specific config and with custom classloader (see CustomSerdeLoader)
  71. * <p/>
  72. * Second, we iterate over remaining built-in serdes (that we NOT explicitly configured by config)
  73. * trying to auto-configure them and registering with empty patterns - they will be present
  74. * in Serde selection in UI, but not assigned to any topic k/v.
  75. */
  76. public ClusterSerdes init(Environment env,
  77. ClustersProperties clustersProperties,
  78. int clusterIndex) {
  79. ClustersProperties.Cluster clusterProperties = clustersProperties.getClusters().get(clusterIndex);
  80. log.debug("Configuring serdes for cluster {}", clusterProperties.getName());
  81. var globalPropertiesResolver = new PropertyResolverImpl(env);
  82. var clusterPropertiesResolver = new PropertyResolverImpl(env, "kafka.clusters." + clusterIndex);
  83. Map<String, SerdeInstance> registeredSerdes = new LinkedHashMap<>();
  84. // initializing serdes from config
  85. if (clusterProperties.getSerde() != null) {
  86. for (int i = 0; i < clusterProperties.getSerde().size(); i++) {
  87. SerdeConfig serdeConfig = clusterProperties.getSerde().get(i);
  88. if (Strings.isNullOrEmpty(serdeConfig.getName())) {
  89. throw new ValidationException("'name' property not set for serde: " + serdeConfig);
  90. }
  91. if (registeredSerdes.containsKey(serdeConfig.getName())) {
  92. throw new ValidationException("Multiple serdes with same name: " + serdeConfig.getName());
  93. }
  94. var instance = createSerdeFromConfig(
  95. serdeConfig,
  96. new PropertyResolverImpl(env, "kafka.clusters." + clusterIndex + ".serde." + i + ".properties"),
  97. clusterPropertiesResolver,
  98. globalPropertiesResolver
  99. );
  100. registeredSerdes.put(serdeConfig.getName(), instance);
  101. }
  102. }
  103. // initializing remaining built-in serdes with empty selection patters
  104. builtInSerdeClasses.forEach((name, clazz) -> {
  105. if (!registeredSerdes.containsKey(name)) {
  106. BuiltInSerde serde = createSerdeInstance(clazz);
  107. if (autoConfigureSerde(serde, clusterPropertiesResolver, globalPropertiesResolver)) {
  108. registeredSerdes.put(name, new SerdeInstance(name, serde, null, null, null));
  109. }
  110. }
  111. });
  112. registerTopicRelatedSerde(registeredSerdes);
  113. return new ClusterSerdes(
  114. registeredSerdes,
  115. Optional.ofNullable(clusterProperties.getDefaultKeySerde())
  116. .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default key serde not found"))
  117. .orElse(null),
  118. Optional.ofNullable(clusterProperties.getDefaultValueSerde())
  119. .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))
  120. .or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
  121. .or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
  122. .orElse(null),
  123. createFallbackSerde()
  124. );
  125. }
  126. /**
  127. * Registers serdse that should only be used for specific (hard-coded) topics, like ConsumerOffsetsSerde.
  128. */
  129. private void registerTopicRelatedSerde(Map<String, SerdeInstance> serdes) {
  130. registerConsumerOffsetsSerde(serdes);
  131. }
  132. private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
  133. var pattern = Pattern.compile(ConsumerOffsetsSerde.TOPIC);
  134. serdes.put(
  135. ConsumerOffsetsSerde.name(),
  136. new SerdeInstance(
  137. ConsumerOffsetsSerde.name(),
  138. new ConsumerOffsetsSerde(),
  139. pattern,
  140. pattern,
  141. null
  142. )
  143. );
  144. }
  145. private SerdeInstance createFallbackSerde() {
  146. StringSerde serde = new StringSerde();
  147. serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
  148. return new SerdeInstance("Fallback", serde, null, null, null);
  149. }
  150. @SneakyThrows
  151. private SerdeInstance createSerdeFromConfig(SerdeConfig serdeConfig,
  152. PropertyResolver serdeProps,
  153. PropertyResolver clusterProps,
  154. PropertyResolver globalProps) {
  155. if (builtInSerdeClasses.containsKey(serdeConfig.getName())) {
  156. return createSerdeWithBuiltInSerdeName(serdeConfig, serdeProps, clusterProps, globalProps);
  157. }
  158. if (serdeConfig.getClassName() != null) {
  159. var builtInSerdeClass = builtInSerdeClasses.values().stream()
  160. .filter(c -> c.getName().equals(serdeConfig.getClassName()))
  161. .findAny();
  162. // built-in serde type with custom name
  163. if (builtInSerdeClass.isPresent()) {
  164. return createSerdeWithBuiltInClass(builtInSerdeClass.get(), serdeConfig, serdeProps, clusterProps, globalProps);
  165. }
  166. }
  167. log.info("Loading custom serde {}", serdeConfig.getName());
  168. return loadAndInitCustomSerde(serdeConfig, serdeProps, clusterProps, globalProps);
  169. }
  170. private SerdeInstance createSerdeWithBuiltInSerdeName(SerdeConfig serdeConfig,
  171. PropertyResolver serdeProps,
  172. PropertyResolver clusterProps,
  173. PropertyResolver globalProps) {
  174. String name = serdeConfig.getName();
  175. if (serdeConfig.getClassName() != null) {
  176. throw new ValidationException("className can't be set for built-in serde");
  177. }
  178. if (serdeConfig.getFilePath() != null) {
  179. throw new ValidationException("filePath can't be set for built-in serde types");
  180. }
  181. var clazz = builtInSerdeClasses.get(name);
  182. BuiltInSerde serde = createSerdeInstance(clazz);
  183. if (serdeConfig.getProperties() == null || serdeConfig.getProperties().isEmpty()) {
  184. if (!autoConfigureSerde(serde, clusterProps, globalProps)) {
  185. // no properties provided and serde does not support auto-configuration
  186. throw new ValidationException(name + " serde is not configured");
  187. }
  188. } else {
  189. // configuring serde with explicitly set properties
  190. serde.configure(serdeProps, clusterProps, globalProps);
  191. }
  192. return new SerdeInstance(
  193. name,
  194. serde,
  195. nullablePattern(serdeConfig.getTopicKeysPattern()),
  196. nullablePattern(serdeConfig.getTopicValuesPattern()),
  197. null
  198. );
  199. }
  200. private boolean autoConfigureSerde(BuiltInSerde serde, PropertyResolver clusterProps, PropertyResolver globalProps) {
  201. if (serde.canBeAutoConfigured(clusterProps, globalProps)) {
  202. serde.autoConfigure(clusterProps, globalProps);
  203. return true;
  204. }
  205. return false;
  206. }
  207. @SneakyThrows
  208. private SerdeInstance createSerdeWithBuiltInClass(Class<? extends BuiltInSerde> clazz,
  209. SerdeConfig serdeConfig,
  210. PropertyResolver serdeProps,
  211. PropertyResolver clusterProps,
  212. PropertyResolver globalProps) {
  213. if (serdeConfig.getFilePath() != null) {
  214. throw new ValidationException("filePath can't be set for built-in serde type");
  215. }
  216. BuiltInSerde serde = createSerdeInstance(clazz);
  217. serde.configure(serdeProps, clusterProps, globalProps);
  218. return new SerdeInstance(
  219. serdeConfig.getName(),
  220. serde,
  221. nullablePattern(serdeConfig.getTopicKeysPattern()),
  222. nullablePattern(serdeConfig.getTopicValuesPattern()),
  223. null
  224. );
  225. }
  226. @SneakyThrows
  227. private <T extends Serde> T createSerdeInstance(Class<T> clazz) {
  228. return clazz.getDeclaredConstructor().newInstance();
  229. }
  230. private SerdeInstance loadAndInitCustomSerde(SerdeConfig serdeConfig,
  231. PropertyResolver serdeProps,
  232. PropertyResolver clusterProps,
  233. PropertyResolver globalProps) {
  234. if (Strings.isNullOrEmpty(serdeConfig.getClassName())) {
  235. throw new ValidationException(
  236. "'className' property not set for custom serde " + serdeConfig.getName());
  237. }
  238. if (Strings.isNullOrEmpty(serdeConfig.getFilePath())) {
  239. throw new ValidationException(
  240. "'filePath' property not set for custom serde " + serdeConfig.getName());
  241. }
  242. var loaded = customSerdeLoader.loadAndConfigure(
  243. serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
  244. return new SerdeInstance(
  245. serdeConfig.getName(),
  246. loaded.getSerde(),
  247. nullablePattern(serdeConfig.getTopicKeysPattern()),
  248. nullablePattern(serdeConfig.getTopicValuesPattern()),
  249. loaded.getClassLoader()
  250. );
  251. }
  252. @Nullable
  253. private Pattern nullablePattern(@Nullable String pattern) {
  254. return pattern == null ? null : Pattern.compile(pattern);
  255. }
  256. }