ClusterSerdes.java 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package com.provectus.kafka.ui.serdes;
  2. import com.provectus.kafka.ui.serde.api.Serde;
  3. import com.provectus.kafka.ui.serdes.builtin.StringSerde;
  4. import java.io.Closeable;
  5. import java.util.Map;
  6. import java.util.Optional;
  7. import java.util.function.Predicate;
  8. import java.util.stream.Stream;
  9. import javax.annotation.Nullable;
  10. import lombok.Getter;
  11. import lombok.RequiredArgsConstructor;
  12. import lombok.extern.slf4j.Slf4j;
  13. @Slf4j
  14. @RequiredArgsConstructor
  15. public class ClusterSerdes implements Closeable {
  16. final Map<String, SerdeInstance> serdes;
  17. @Nullable
  18. final SerdeInstance defaultKeySerde;
  19. @Nullable
  20. final SerdeInstance defaultValueSerde;
  21. @Getter
  22. final SerdeInstance fallbackSerde;
  23. private Optional<SerdeInstance> findSerdeByPatternsOrDefault(String topic,
  24. Serde.Target type,
  25. Predicate<SerdeInstance> additionalCheck) {
  26. // iterating over serdes in the same order they were added in config
  27. for (SerdeInstance serdeInstance : serdes.values()) {
  28. var pattern = type == Serde.Target.KEY
  29. ? serdeInstance.topicKeyPattern
  30. : serdeInstance.topicValuePattern;
  31. if (pattern != null
  32. && pattern.matcher(topic).matches()
  33. && additionalCheck.test(serdeInstance)) {
  34. return Optional.of(serdeInstance);
  35. }
  36. }
  37. if (type == Serde.Target.KEY
  38. && defaultKeySerde != null
  39. && additionalCheck.test(defaultKeySerde)) {
  40. return Optional.of(defaultKeySerde);
  41. }
  42. if (type == Serde.Target.VALUE
  43. && defaultValueSerde != null
  44. && additionalCheck.test(defaultValueSerde)) {
  45. return Optional.of(defaultValueSerde);
  46. }
  47. return Optional.empty();
  48. }
  49. public Optional<SerdeInstance> serdeForName(String name) {
  50. return Optional.ofNullable(serdes.get(name));
  51. }
  52. public Stream<SerdeInstance> all() {
  53. return serdes.values().stream();
  54. }
  55. public SerdeInstance suggestSerdeForSerialize(String topic, Serde.Target type) {
  56. return findSerdeByPatternsOrDefault(topic, type, s -> s.canSerialize(topic, type))
  57. .orElse(serdes.get(StringSerde.name()));
  58. }
  59. public SerdeInstance suggestSerdeForDeserialize(String topic, Serde.Target type) {
  60. return findSerdeByPatternsOrDefault(topic, type, s -> s.canDeserialize(topic, type))
  61. .orElse(serdes.get(StringSerde.name()));
  62. }
  63. @Override
  64. public void close() {
  65. serdes.values().forEach(SerdeInstance::close);
  66. }
  67. }