DeserializationService.java 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.config.ClustersProperties;
  3. import com.provectus.kafka.ui.model.KafkaCluster;
  4. import com.provectus.kafka.ui.model.SerdeDescriptionDTO;
  5. import com.provectus.kafka.ui.serde.api.SchemaDescription;
  6. import com.provectus.kafka.ui.serde.api.Serde;
  7. import com.provectus.kafka.ui.serdes.ClusterSerdes;
  8. import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
  9. import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
  10. import com.provectus.kafka.ui.serdes.SerdeInstance;
  11. import java.io.Closeable;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.Map;
  15. import java.util.concurrent.ConcurrentHashMap;
  16. import javax.annotation.Nullable;
  17. import javax.validation.ValidationException;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.springframework.core.env.Environment;
  20. import org.springframework.stereotype.Component;
  21. @Slf4j
  22. @Component
  23. public class DeserializationService implements Closeable {
  24. private final Map<KafkaCluster, ClusterSerdes> clusterSerdes = new ConcurrentHashMap<>();
  25. public DeserializationService(Environment env,
  26. ClustersStorage clustersStorage,
  27. ClustersProperties clustersProperties) {
  28. for (int i = 0; i < clustersProperties.getClusters().size(); i++) {
  29. var clusterProperties = clustersProperties.getClusters().get(i);
  30. var cluster = clustersStorage.getClusterByName(clusterProperties.getName()).get();
  31. clusterSerdes.put(cluster, new ClusterSerdes(env, clustersProperties, i));
  32. }
  33. }
  34. private Serde.Serializer getSerializer(KafkaCluster cluster,
  35. String topic,
  36. Serde.Target type,
  37. String serdeName) {
  38. var serdes = this.clusterSerdes.get(cluster);
  39. var serde = serdes.serdeForName(serdeName)
  40. .orElseThrow(() -> new ValidationException(
  41. String.format("Serde %s not found", serdeName)));
  42. if (!serde.canSerialize(topic, type)) {
  43. throw new ValidationException(
  44. String.format("Serde %s can't be applied for '%s' topic's %s serialization", serde, topic, type));
  45. }
  46. return serde.serializer(topic, type);
  47. }
  48. private SerdeInstance getSerdeForDeserialize(KafkaCluster cluster,
  49. String topic,
  50. Serde.Target type,
  51. @Nullable String serdeName) {
  52. var serdes = this.clusterSerdes.get(cluster);
  53. if (serdeName != null) {
  54. var serde = serdes.serdeForName(serdeName)
  55. .orElseThrow(() -> new ValidationException(String.format("Serde '%s' not found", serdeName)));
  56. if (!serde.canDeserialize(topic, type)) {
  57. throw new ValidationException(
  58. String.format("Serde '%s' can't be applied to '%s' topic %s", serdeName, topic, type));
  59. }
  60. return serde;
  61. } else {
  62. return serdes.suggestSerdeForDeserialize(topic, type);
  63. }
  64. }
  65. public ProducerRecordCreator producerRecordCreator(KafkaCluster cluster,
  66. String topic,
  67. String keySerdeName,
  68. String valueSerdeName) {
  69. return new ProducerRecordCreator(
  70. getSerializer(cluster, topic, Serde.Target.KEY, keySerdeName),
  71. getSerializer(cluster, topic, Serde.Target.VALUE, valueSerdeName)
  72. );
  73. }
  74. public ConsumerRecordDeserializer deserializerFor(KafkaCluster cluster,
  75. String topic,
  76. @Nullable String keySerdeName,
  77. @Nullable String valueSerdeName) {
  78. var keySerde = getSerdeForDeserialize(cluster, topic, Serde.Target.KEY, keySerdeName);
  79. var valueSerde = getSerdeForDeserialize(cluster, topic, Serde.Target.VALUE, valueSerdeName);
  80. var fallbackSerde = clusterSerdes.get(cluster).getFallbackSerde();
  81. return new ConsumerRecordDeserializer(
  82. keySerde.getName(),
  83. keySerde.deserializer(topic, Serde.Target.KEY),
  84. valueSerde.getName(),
  85. valueSerde.deserializer(topic, Serde.Target.VALUE),
  86. fallbackSerde.getName(),
  87. fallbackSerde.deserializer(topic, Serde.Target.KEY),
  88. fallbackSerde.deserializer(topic, Serde.Target.VALUE)
  89. );
  90. }
  91. public List<SerdeDescriptionDTO> getSerdesForSerialize(KafkaCluster cluster,
  92. String topic,
  93. Serde.Target serdeType) {
  94. var serdes = clusterSerdes.get(cluster);
  95. var preferred = serdes.suggestSerdeForSerialize(topic, serdeType);
  96. var result = new ArrayList<SerdeDescriptionDTO>();
  97. result.add(toDto(preferred, topic, serdeType, true));
  98. serdes.all()
  99. .filter(s -> !s.getName().equals(preferred.getName()))
  100. .filter(s -> s.canSerialize(topic, serdeType))
  101. .forEach(s -> result.add(toDto(s, topic, serdeType, false)));
  102. return result;
  103. }
  104. public List<SerdeDescriptionDTO> getSerdesForDeserialize(KafkaCluster cluster,
  105. String topic,
  106. Serde.Target serdeType) {
  107. var serdes = clusterSerdes.get(cluster);
  108. var preferred = serdes.suggestSerdeForDeserialize(topic, serdeType);
  109. var result = new ArrayList<SerdeDescriptionDTO>();
  110. result.add(toDto(preferred, topic, serdeType, true));
  111. serdes.all()
  112. .filter(s -> !s.getName().equals(preferred.getName()))
  113. .filter(s -> s.canDeserialize(topic, serdeType))
  114. .forEach(s -> result.add(toDto(s, topic, serdeType, false)));
  115. return result;
  116. }
  117. private SerdeDescriptionDTO toDto(SerdeInstance serdeInstance,
  118. String topic,
  119. Serde.Target serdeType,
  120. boolean preferred) {
  121. var schemaOpt = serdeInstance.getSchema(topic, serdeType);
  122. return new SerdeDescriptionDTO()
  123. .name(serdeInstance.getName())
  124. .description(serdeInstance.description().orElse(null))
  125. .schema(schemaOpt.map(SchemaDescription::getSchema).orElse(null))
  126. .additionalProperties(schemaOpt.map(SchemaDescription::getAdditionalProperties).orElse(null))
  127. .preferred(preferred);
  128. }
  129. @Override
  130. public void close() {
  131. clusterSerdes.values().forEach(ClusterSerdes::close);
  132. }
  133. }