DeserializationService.java 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.config.ClustersProperties;
  3. import com.provectus.kafka.ui.model.KafkaCluster;
  4. import com.provectus.kafka.ui.model.SerdeDescriptionDTO;
  5. import com.provectus.kafka.ui.serde.api.SchemaDescription;
  6. import com.provectus.kafka.ui.serde.api.Serde;
  7. import com.provectus.kafka.ui.serdes.ClusterSerdes;
  8. import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
  9. import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
  10. import com.provectus.kafka.ui.serdes.SerdeInstance;
  11. import com.provectus.kafka.ui.serdes.SerdesInitializer;
  12. import java.io.Closeable;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.Map;
  16. import java.util.concurrent.ConcurrentHashMap;
  17. import javax.annotation.Nullable;
  18. import javax.validation.ValidationException;
  19. import org.springframework.core.env.Environment;
  20. import org.springframework.stereotype.Component;
  21. @Component
  22. public class DeserializationService implements Closeable {
  23. private final Map<String, ClusterSerdes> clusterSerdes = new ConcurrentHashMap<>();
  24. public DeserializationService(Environment env,
  25. ClustersStorage clustersStorage,
  26. ClustersProperties clustersProperties) {
  27. var serdesInitializer = new SerdesInitializer();
  28. for (int i = 0; i < clustersProperties.getClusters().size(); i++) {
  29. var clusterProperties = clustersProperties.getClusters().get(i);
  30. var cluster = clustersStorage.getClusterByName(clusterProperties.getName()).get();
  31. clusterSerdes.put(cluster.getName(), serdesInitializer.init(env, clustersProperties, i));
  32. }
  33. }
  34. private ClusterSerdes getSerdesFor(KafkaCluster cluster) {
  35. return clusterSerdes.get(cluster.getName());
  36. }
  37. private Serde.Serializer getSerializer(KafkaCluster cluster,
  38. String topic,
  39. Serde.Target type,
  40. String serdeName) {
  41. var serdes = getSerdesFor(cluster);
  42. var serde = serdes.serdeForName(serdeName)
  43. .orElseThrow(() -> new ValidationException(
  44. String.format("Serde %s not found", serdeName)));
  45. if (!serde.canSerialize(topic, type)) {
  46. throw new ValidationException(
  47. String.format("Serde %s can't be applied for '%s' topic's %s serialization", serde, topic, type));
  48. }
  49. return serde.serializer(topic, type);
  50. }
  51. private SerdeInstance getSerdeForDeserialize(KafkaCluster cluster,
  52. String topic,
  53. Serde.Target type,
  54. @Nullable String serdeName) {
  55. var serdes = getSerdesFor(cluster);
  56. if (serdeName != null) {
  57. var serde = serdes.serdeForName(serdeName)
  58. .orElseThrow(() -> new ValidationException(String.format("Serde '%s' not found", serdeName)));
  59. if (!serde.canDeserialize(topic, type)) {
  60. throw new ValidationException(
  61. String.format("Serde '%s' can't be applied to '%s' topic %s", serdeName, topic, type));
  62. }
  63. return serde;
  64. } else {
  65. return serdes.suggestSerdeForDeserialize(topic, type);
  66. }
  67. }
  68. public ProducerRecordCreator producerRecordCreator(KafkaCluster cluster,
  69. String topic,
  70. String keySerdeName,
  71. String valueSerdeName) {
  72. return new ProducerRecordCreator(
  73. getSerializer(cluster, topic, Serde.Target.KEY, keySerdeName),
  74. getSerializer(cluster, topic, Serde.Target.VALUE, valueSerdeName)
  75. );
  76. }
  77. public ConsumerRecordDeserializer deserializerFor(KafkaCluster cluster,
  78. String topic,
  79. @Nullable String keySerdeName,
  80. @Nullable String valueSerdeName) {
  81. var keySerde = getSerdeForDeserialize(cluster, topic, Serde.Target.KEY, keySerdeName);
  82. var valueSerde = getSerdeForDeserialize(cluster, topic, Serde.Target.VALUE, valueSerdeName);
  83. var fallbackSerde = getSerdesFor(cluster).getFallbackSerde();
  84. return new ConsumerRecordDeserializer(
  85. keySerde.getName(),
  86. keySerde.deserializer(topic, Serde.Target.KEY),
  87. valueSerde.getName(),
  88. valueSerde.deserializer(topic, Serde.Target.VALUE),
  89. fallbackSerde.getName(),
  90. fallbackSerde.deserializer(topic, Serde.Target.KEY),
  91. fallbackSerde.deserializer(topic, Serde.Target.VALUE)
  92. );
  93. }
  94. public List<SerdeDescriptionDTO> getSerdesForSerialize(KafkaCluster cluster,
  95. String topic,
  96. Serde.Target serdeType) {
  97. var serdes = getSerdesFor(cluster);
  98. var preferred = serdes.suggestSerdeForSerialize(topic, serdeType);
  99. var result = new ArrayList<SerdeDescriptionDTO>();
  100. result.add(toDto(preferred, topic, serdeType, true));
  101. serdes.all()
  102. .filter(s -> !s.getName().equals(preferred.getName()))
  103. .filter(s -> s.canSerialize(topic, serdeType))
  104. .forEach(s -> result.add(toDto(s, topic, serdeType, false)));
  105. return result;
  106. }
  107. public List<SerdeDescriptionDTO> getSerdesForDeserialize(KafkaCluster cluster,
  108. String topic,
  109. Serde.Target serdeType) {
  110. var serdes = getSerdesFor(cluster);
  111. var preferred = serdes.suggestSerdeForDeserialize(topic, serdeType);
  112. var result = new ArrayList<SerdeDescriptionDTO>();
  113. result.add(toDto(preferred, topic, serdeType, true));
  114. serdes.all()
  115. .filter(s -> !s.getName().equals(preferred.getName()))
  116. .filter(s -> s.canDeserialize(topic, serdeType))
  117. .forEach(s -> result.add(toDto(s, topic, serdeType, false)));
  118. return result;
  119. }
  120. private SerdeDescriptionDTO toDto(SerdeInstance serdeInstance,
  121. String topic,
  122. Serde.Target serdeType,
  123. boolean preferred) {
  124. var schemaOpt = serdeInstance.getSchema(topic, serdeType);
  125. return new SerdeDescriptionDTO()
  126. .name(serdeInstance.getName())
  127. .description(serdeInstance.description().orElse(null))
  128. .schema(schemaOpt.map(SchemaDescription::getSchema).orElse(null))
  129. .additionalProperties(schemaOpt.map(SchemaDescription::getAdditionalProperties).orElse(null))
  130. .preferred(preferred);
  131. }
  132. @Override
  133. public void close() {
  134. clusterSerdes.values().forEach(ClusterSerdes::close);
  135. }
  136. }