LogDirsTest.java 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import com.provectus.kafka.ui.AbstractBaseTest;
  4. import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
  5. import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
  6. import com.provectus.kafka.ui.model.BrokerTopicLogdirs;
  7. import com.provectus.kafka.ui.model.BrokersLogdirs;
  8. import com.provectus.kafka.ui.model.ErrorResponse;
  9. import java.util.List;
  10. import java.util.Map;
  11. import org.junit.jupiter.api.Test;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
  14. import org.springframework.core.ParameterizedTypeReference;
  15. import org.springframework.test.context.ContextConfiguration;
  16. import org.springframework.test.web.reactive.server.WebTestClient;
  17. @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
  18. @AutoConfigureWebTestClient(timeout = "60000")
  19. public class LogDirsTest extends AbstractBaseTest {
  20. @Autowired
  21. private WebTestClient webTestClient;
  22. @Test
  23. public void testAllBrokers() {
  24. List<BrokersLogdirs> dirs = webTestClient.get()
  25. .uri("/api/clusters/{clusterName}/brokers/logdirs", LOCAL)
  26. .exchange()
  27. .expectStatus().isOk()
  28. .expectBody(new ParameterizedTypeReference<List<BrokersLogdirs>>() {})
  29. .returnResult()
  30. .getResponseBody();
  31. assertThat(dirs).hasSize(1);
  32. BrokersLogdirs dir = dirs.get(0);
  33. assertThat(dir.getName()).isEqualTo("/var/lib/kafka/data");
  34. assertThat(dir.getTopics().stream().anyMatch(t -> t.getName().equals("__consumer_offsets")))
  35. .isTrue();
  36. BrokerTopicLogdirs topic = dir.getTopics().stream()
  37. .filter(t -> t.getName().equals("__consumer_offsets"))
  38. .findAny().get();
  39. assertThat(topic.getPartitions()).hasSize(1);
  40. assertThat(topic.getPartitions().get(0).getBroker()).isEqualTo(1);
  41. assertThat(topic.getPartitions().get(0).getSize()).isPositive();
  42. }
  43. @Test
  44. public void testOneBrokers() {
  45. List<BrokersLogdirs> dirs = webTestClient.get()
  46. .uri("/api/clusters/{clusterName}/brokers/logdirs?broker=1", LOCAL)
  47. .exchange()
  48. .expectStatus().isOk()
  49. .expectBody(new ParameterizedTypeReference<List<BrokersLogdirs>>() {})
  50. .returnResult()
  51. .getResponseBody();
  52. assertThat(dirs).hasSize(1);
  53. BrokersLogdirs dir = dirs.get(0);
  54. assertThat(dir.getName()).isEqualTo("/var/lib/kafka/data");
  55. assertThat(dir.getTopics().stream().anyMatch(t -> t.getName().equals("__consumer_offsets")))
  56. .isTrue();
  57. BrokerTopicLogdirs topic = dir.getTopics().stream()
  58. .filter(t -> t.getName().equals("__consumer_offsets"))
  59. .findAny().get();
  60. assertThat(topic.getPartitions()).hasSize(1);
  61. assertThat(topic.getPartitions().get(0).getBroker()).isEqualTo(1);
  62. assertThat(topic.getPartitions().get(0).getSize()).isPositive();
  63. }
  64. @Test
  65. public void testWrongBrokers() {
  66. List<BrokersLogdirs> dirs = webTestClient.get()
  67. .uri("/api/clusters/{clusterName}/brokers/logdirs?broker=2", LOCAL)
  68. .exchange()
  69. .expectStatus().isOk()
  70. .expectBody(new ParameterizedTypeReference<List<BrokersLogdirs>>() {})
  71. .returnResult()
  72. .getResponseBody();
  73. assertThat(dirs).isEmpty();
  74. }
  75. @Test
  76. public void testChangeDirToWrongDir() {
  77. ErrorResponse dirs = webTestClient.patch()
  78. .uri("/api/clusters/{clusterName}/brokers/{id}/logdirs", LOCAL, 1)
  79. .bodyValue(Map.of(
  80. "topic", "__consumer_offsets",
  81. "partition", "0",
  82. "logDir", "/asdf/as"
  83. )
  84. )
  85. .exchange()
  86. .expectStatus().isBadRequest()
  87. .expectBody(ErrorResponse.class)
  88. .returnResult()
  89. .getResponseBody();
  90. assertThat(dirs.getMessage())
  91. .isEqualTo(new LogDirNotFoundApiException().getMessage());
  92. dirs = webTestClient.patch()
  93. .uri("/api/clusters/{clusterName}/brokers/{id}/logdirs", LOCAL, 1)
  94. .bodyValue(Map.of(
  95. "topic", "asdf",
  96. "partition", "0",
  97. "logDir", "/var/lib/kafka/data"
  98. )
  99. )
  100. .exchange()
  101. .expectStatus().isBadRequest()
  102. .expectBody(ErrorResponse.class)
  103. .returnResult()
  104. .getResponseBody();
  105. assertThat(dirs.getMessage())
  106. .isEqualTo(new TopicOrPartitionNotFoundException().getMessage());
  107. }
  108. }