ReactiveAdminClientTest.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package com.provectus.kafka.ui.service;
  2. import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExceptionFilter;
  3. import static java.util.Objects.requireNonNull;
  4. import static org.assertj.core.api.Assertions.assertThat;
  5. import com.provectus.kafka.ui.AbstractIntegrationTest;
  6. import com.provectus.kafka.ui.producer.KafkaTestProducer;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.UUID;
  11. import java.util.stream.Stream;
  12. import lombok.SneakyThrows;
  13. import org.apache.kafka.clients.admin.AdminClient;
  14. import org.apache.kafka.clients.admin.AlterConfigOp;
  15. import org.apache.kafka.clients.admin.Config;
  16. import org.apache.kafka.clients.admin.ConfigEntry;
  17. import org.apache.kafka.clients.admin.NewTopic;
  18. import org.apache.kafka.clients.admin.OffsetSpec;
  19. import org.apache.kafka.clients.producer.ProducerRecord;
  20. import org.apache.kafka.common.KafkaFuture;
  21. import org.apache.kafka.common.TopicPartition;
  22. import org.apache.kafka.common.config.ConfigResource;
  23. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  24. import org.apache.kafka.common.internals.KafkaFutureImpl;
  25. import org.junit.function.ThrowingRunnable;
  26. import org.junit.jupiter.api.AfterEach;
  27. import org.junit.jupiter.api.BeforeEach;
  28. import org.junit.jupiter.api.Test;
  29. import reactor.test.StepVerifier;
  30. class ReactiveAdminClientTest extends AbstractIntegrationTest {
  31. private final List<ThrowingRunnable> clearings = new ArrayList<>();
  32. private AdminClient adminClient;
  33. private ReactiveAdminClient reactiveAdminClient;
  34. @BeforeEach
  35. void init() {
  36. AdminClientService adminClientService = applicationContext.getBean(AdminClientService.class);
  37. ClustersStorage clustersStorage = applicationContext.getBean(ClustersStorage.class);
  38. reactiveAdminClient = requireNonNull(adminClientService.get(clustersStorage.getClusterByName(LOCAL).get()).block());
  39. adminClient = reactiveAdminClient.getClient();
  40. }
  41. @AfterEach
  42. void tearDown() {
  43. for (ThrowingRunnable clearing : clearings) {
  44. try {
  45. clearing.run();
  46. } catch (Throwable th) {
  47. //NOOP
  48. }
  49. }
  50. }
  51. @Test
  52. void testUpdateTopicConfigs() throws Exception {
  53. String topic = UUID.randomUUID().toString();
  54. createTopics(new NewTopic(topic, 1, (short) 1));
  55. var configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
  56. adminClient.incrementalAlterConfigs(
  57. Map.of(
  58. configResource,
  59. List.of(
  60. new AlterConfigOp(new ConfigEntry("compression.type", "gzip"), AlterConfigOp.OpType.SET),
  61. new AlterConfigOp(new ConfigEntry("retention.bytes", "12345678"), AlterConfigOp.OpType.SET)
  62. )
  63. )
  64. ).all().get();
  65. StepVerifier.create(
  66. reactiveAdminClient.updateTopicConfig(
  67. topic,
  68. Map.of(
  69. "compression.type", "snappy", //changing existing config
  70. "file.delete.delay.ms", "12345" // adding new one
  71. )
  72. )
  73. ).expectComplete().verify();
  74. Config config = adminClient.describeConfigs(List.of(configResource)).values().get(configResource).get();
  75. assertThat(config.get("retention.bytes").value()).isNotEqualTo("12345678"); // wes reset to default
  76. assertThat(config.get("compression.type").value()).isEqualTo("snappy");
  77. assertThat(config.get("file.delete.delay.ms").value()).isEqualTo("12345");
  78. }
  79. @SneakyThrows
  80. void createTopics(NewTopic... topics) {
  81. adminClient.createTopics(List.of(topics)).all().get();
  82. clearings.add(() -> adminClient.deleteTopics(Stream.of(topics).map(NewTopic::name).toList()).all().get());
  83. }
  84. @Test
  85. void testToMonoWithExceptionFilter() {
  86. var failedFuture = new KafkaFutureImpl<String>();
  87. failedFuture.completeExceptionally(new UnknownTopicOrPartitionException());
  88. var okFuture = new KafkaFutureImpl<String>();
  89. okFuture.complete("done");
  90. var emptyFuture = new KafkaFutureImpl<String>();
  91. emptyFuture.complete(null);
  92. Map<String, KafkaFuture<String>> arg = Map.of(
  93. "failure", failedFuture,
  94. "ok", okFuture,
  95. "empty", emptyFuture
  96. );
  97. StepVerifier.create(toMonoWithExceptionFilter(arg, UnknownTopicOrPartitionException.class))
  98. .assertNext(result -> assertThat(result).hasSize(1).containsEntry("ok", "done"))
  99. .verifyComplete();
  100. }
  101. @Test
  102. void testListOffsetsUnsafe() {
  103. String topic = UUID.randomUUID().toString();
  104. createTopics(new NewTopic(topic, 2, (short) 1));
  105. // sending messages to have non-zero offsets for tp
  106. try (var producer = KafkaTestProducer.forKafka(kafka)) {
  107. producer.send(new ProducerRecord<>(topic, 1, "k", "v"));
  108. producer.send(new ProducerRecord<>(topic, 1, "k", "v"));
  109. }
  110. var requestedPartitions = List.of(
  111. new TopicPartition(topic, 0),
  112. new TopicPartition(topic, 1)
  113. );
  114. StepVerifier.create(reactiveAdminClient.listOffsetsUnsafe(requestedPartitions, OffsetSpec.earliest()))
  115. .assertNext(offsets -> {
  116. assertThat(offsets)
  117. .hasSize(2)
  118. .containsEntry(new TopicPartition(topic, 0), 0L)
  119. .containsEntry(new TopicPartition(topic, 1), 0L);
  120. })
  121. .verifyComplete();
  122. StepVerifier.create(reactiveAdminClient.listOffsetsUnsafe(requestedPartitions, OffsetSpec.latest()))
  123. .assertNext(offsets -> {
  124. assertThat(offsets)
  125. .hasSize(2)
  126. .containsEntry(new TopicPartition(topic, 0), 0L)
  127. .containsEntry(new TopicPartition(topic, 1), 2L);
  128. })
  129. .verifyComplete();
  130. }
  131. }