Merge pull request #724 from provectus/703-additional-fields-topicConfig
703 additional fields topic config
This commit is contained in:
commit
1f268579d0
7 changed files with 116 additions and 10 deletions
|
@ -35,7 +35,6 @@ import java.math.BigDecimal;
|
|||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
|
@ -123,6 +122,8 @@ public interface ClusterMapper {
|
|||
return result;
|
||||
}
|
||||
|
||||
@Mapping(target = "isReadOnly", source = "readOnly")
|
||||
@Mapping(target = "isSensitive", source = "sensitive")
|
||||
TopicConfig toTopicConfig(InternalTopicConfig topic);
|
||||
|
||||
Replica toReplica(InternalReplica replica);
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package com.provectus.kafka.ui.model;
|
||||
|
||||
|
||||
import java.util.List;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
|
@ -10,4 +12,8 @@ public class InternalTopicConfig {
|
|||
private final String name;
|
||||
private final String value;
|
||||
private final String defaultValue;
|
||||
private final ConfigEntry.ConfigSource source;
|
||||
private final boolean isSensitive;
|
||||
private final boolean isReadOnly;
|
||||
private final List<ConfigEntry.ConfigSynonym> synonyms;
|
||||
}
|
||||
|
|
|
@ -337,7 +337,8 @@ public class KafkaService {
|
|||
.map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
|
||||
return ClusterUtil.toMono(adminClient.describeConfigs(resources,
|
||||
new DescribeConfigsOptions().includeSynonyms(true)).all())
|
||||
.map(configs ->
|
||||
configs.entrySet().stream().collect(Collectors.toMap(
|
||||
c -> c.getKey().name(),
|
||||
|
|
|
@ -195,7 +195,11 @@ public class ClusterUtil {
|
|||
public static InternalTopicConfig mapToInternalTopicConfig(ConfigEntry configEntry) {
|
||||
InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
|
||||
.name(configEntry.name())
|
||||
.value(configEntry.value());
|
||||
.value(configEntry.value())
|
||||
.source(configEntry.source())
|
||||
.isReadOnly(configEntry.isReadOnly())
|
||||
.isSensitive(configEntry.isSensitive())
|
||||
.synonyms(configEntry.synonyms());
|
||||
if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
|
||||
builder.defaultValue(configEntry.value());
|
||||
} else {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui;
|
||||
|
||||
import com.provectus.kafka.ui.api.model.TopicConfig;
|
||||
import com.provectus.kafka.ui.model.BrokerConfig;
|
||||
import com.provectus.kafka.ui.model.PartitionsIncrease;
|
||||
import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
|
||||
|
@ -127,6 +128,12 @@ public class KafkaConsumerTests extends AbstractBaseTest {
|
|||
.exchange()
|
||||
.expectStatus()
|
||||
.isNotFound();
|
||||
|
||||
webTestClient.get()
|
||||
.uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isNotFound();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -144,18 +151,17 @@ public class KafkaConsumerTests extends AbstractBaseTest {
|
|||
.returnResult()
|
||||
.getResponseBody();
|
||||
|
||||
assert configs != null;
|
||||
Assertions.assertNotNull(configs);
|
||||
assert !configs.isEmpty();
|
||||
Assertions.assertNotEquals(null, configs.get(0).getName());
|
||||
Assertions.assertNotEquals(null, configs.get(0).getIsReadOnly());
|
||||
Assertions.assertNotEquals(null, configs.get(0).getIsSensitive());
|
||||
Assertions.assertNotEquals(null, configs.get(0).getSource());
|
||||
Assertions.assertNotNull(configs.get(0).getName());
|
||||
Assertions.assertNotNull(configs.get(0).getIsReadOnly());
|
||||
Assertions.assertNotNull(configs.get(0).getIsSensitive());
|
||||
Assertions.assertNotNull(configs.get(0).getSource());
|
||||
Assertions.assertNotNull(configs.get(0).getSynonyms());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReturn404ForNonExistingBroker() {
|
||||
var topicName = UUID.randomUUID().toString();
|
||||
|
||||
webTestClient.get()
|
||||
.uri("/api/clusters/{clusterName}/brokers/{id}/configs",
|
||||
LOCAL,
|
||||
|
@ -164,4 +170,38 @@ public class KafkaConsumerTests extends AbstractBaseTest {
|
|||
.expectStatus()
|
||||
.isNotFound();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRetrieveTopicConfig() {
|
||||
var topicName = UUID.randomUUID().toString();
|
||||
|
||||
webTestClient.post()
|
||||
.uri("/api/clusters/{clusterName}/topics", LOCAL)
|
||||
.bodyValue(new TopicCreation()
|
||||
.name(topicName)
|
||||
.partitions(1)
|
||||
.replicationFactor(1)
|
||||
.configs(Map.of())
|
||||
)
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isOk();
|
||||
|
||||
List<TopicConfig> configs = webTestClient.get()
|
||||
.uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isOk()
|
||||
.expectBodyList(TopicConfig.class)
|
||||
.returnResult()
|
||||
.getResponseBody();
|
||||
|
||||
Assertions.assertNotNull(configs);
|
||||
assert !configs.isEmpty();
|
||||
Assertions.assertNotNull(configs.get(0).getName());
|
||||
Assertions.assertNotNull(configs.get(0).getIsReadOnly());
|
||||
Assertions.assertNotNull(configs.get(0).getIsSensitive());
|
||||
Assertions.assertNotNull(configs.get(0).getSource());
|
||||
Assertions.assertNotNull(configs.get(0).getSynonyms());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,11 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
||||
import com.provectus.kafka.ui.model.InternalTopic;
|
||||
import com.provectus.kafka.ui.model.InternalTopicConfig;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.Topic;
|
||||
import com.provectus.kafka.ui.model.TopicColumnsToSort;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
@ -16,6 +18,7 @@ import java.util.UUID;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
|
@ -256,4 +259,45 @@ class ClusterServiceTest {
|
|||
assertThat(topics.getTopics()).map(Topic::getPartitionCount).isSorted();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRetrieveTopicConfigs() {
|
||||
var topicName = UUID.randomUUID().toString();
|
||||
|
||||
when(clustersStorage.getClusterByName(topicName))
|
||||
.thenReturn(Optional.of(KafkaCluster.builder()
|
||||
.topics(
|
||||
IntStream.rangeClosed(1, 100).boxed()
|
||||
.map(Objects::toString)
|
||||
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
|
||||
.name(e)
|
||||
.topicConfigs(
|
||||
List.of(InternalTopicConfig.builder()
|
||||
.name("testName")
|
||||
.value("testValue")
|
||||
.defaultValue("testDefaultValue")
|
||||
.source(ConfigEntry.ConfigSource.DEFAULT_CONFIG)
|
||||
.isReadOnly(true)
|
||||
.isSensitive(true)
|
||||
.synonyms(List.of())
|
||||
.build()
|
||||
)
|
||||
)
|
||||
.build()))
|
||||
)
|
||||
.build()));
|
||||
|
||||
var configs = clusterService.getTopicConfigs(topicName, "1");
|
||||
var topicConfig = configs.isPresent() ? configs.get().get(0) : null;
|
||||
|
||||
assertThat(configs.isPresent()).isTrue();
|
||||
assertThat(topicConfig.getName()).isEqualTo("testName");
|
||||
assertThat(topicConfig.getValue()).isEqualTo("testValue");
|
||||
assertThat(topicConfig.getDefaultValue()).isEqualTo("testDefaultValue");
|
||||
assertThat(topicConfig.getSource().getValue())
|
||||
.isEqualTo(ConfigEntry.ConfigSource.DEFAULT_CONFIG.name());
|
||||
assertThat(topicConfig.getSynonyms()).isNotNull();
|
||||
assertThat(topicConfig.getIsReadOnly()).isTrue();
|
||||
assertThat(topicConfig.getIsSensitive()).isTrue();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1722,6 +1722,16 @@ components:
|
|||
type: string
|
||||
defaultValue:
|
||||
type: string
|
||||
source:
|
||||
$ref: "#/components/schemas/ConfigSource"
|
||||
isSensitive:
|
||||
type: boolean
|
||||
isReadOnly:
|
||||
type: boolean
|
||||
synonyms:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/ConfigSynonym"
|
||||
required:
|
||||
- name
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue