#164 readonly mode (#218)

* added ReadOnlyModeFilter

* added tests

* refactored kafka connect and schema registry tests
This commit is contained in:
Ramazan Yapparov 2021-03-01 19:51:58 +03:00 committed by GitHub
parent 4eb690d782
commit 73f8991517
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 204 additions and 30 deletions

View file

@ -27,6 +27,7 @@ public class ClustersProperties {
List<ConnectCluster> kafkaConnect;
int jmxPort;
Properties properties;
boolean readOnly = false;
}
@Data

View file

@ -0,0 +1,15 @@
package com.provectus.kafka.ui.cluster.exception;
import org.springframework.http.HttpStatus;
public class ReadOnlyException extends CustomBaseException {
public ReadOnlyException() {
super("This cluster is in read-only mode.");
}
@Override
public HttpStatus getResponseStatusCode() {
return HttpStatus.METHOD_NOT_ALLOWED;
}
}

View file

@ -29,4 +29,5 @@ public class KafkaCluster {
private final Path protobufFile;
private final String protobufMessageName;
private final Properties properties;
private final Boolean readOnly;
}

View file

@ -0,0 +1,49 @@
package com.provectus.kafka.ui.rest.config;
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
import com.provectus.kafka.ui.cluster.exception.ReadOnlyException;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import java.util.regex.Pattern;
@Order
@Component
@RequiredArgsConstructor
public class ReadOnlyModeFilter implements WebFilter {
private static final Pattern CLUSTER_NAME_REGEX = Pattern.compile("/api/clusters/(?<clusterName>[^/]++)");
private final ClustersStorage clustersStorage;
@NotNull
@Override
public Mono<Void> filter(ServerWebExchange exchange, @NotNull WebFilterChain chain) {
var isSafeMethod = exchange.getRequest().getMethod() == HttpMethod.GET;
if (isSafeMethod) {
return chain.filter(exchange);
}
var path = exchange.getRequest().getURI().getPath();
var matcher = CLUSTER_NAME_REGEX.matcher(path);
if (!matcher.find()) {
return chain.filter(exchange);
}
var clusterName = matcher.group("clusterName");
var kafkaCluster = clustersStorage.getClusterByName(clusterName)
.orElseThrow(() -> new NotFoundException(String.format("No cluster for name '%s'", clusterName)));
if (!kafkaCluster.getReadOnly()) {
return chain.filter(exchange);
}
return Mono.error(ReadOnlyException::new);
}
}

View file

@ -18,6 +18,7 @@ kafka:
- name: first
address: http://localhost:8083
jmxPort: 9998
read-only: true
admin-client-timeout: 5000
zookeeper:
connection-timeout: 1000

View file

@ -20,6 +20,8 @@ import java.time.Duration;
@SpringBootTest
@ActiveProfiles("test")
public abstract class AbstractBaseTest {
public static String LOCAL = "local";
public static String SECOND_LOCAL = "secondLocal";
public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
public final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"))
@ -44,12 +46,19 @@ public abstract class AbstractBaseTest {
schemaRegistry.start();
kafkaConnect.start();
System.setProperty("kafka.clusters.0.name", "local");
System.setProperty("kafka.clusters.0.name", LOCAL);
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());
System.setProperty("kafka.clusters.0.schemaRegistry", schemaRegistry.getTarget());
System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect");
System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
System.setProperty("kafka.clusters.1.readOnly", "true");
System.setProperty("kafka.clusters.1.bootstrapServers", kafka.getBootstrapServers());
System.setProperty("kafka.clusters.1.schemaRegistry", schemaRegistry.getTarget());
System.setProperty("kafka.clusters.1.kafkaConnect.0.name", "kafka-connect");
System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget());
context.addApplicationListener((ApplicationListener<ContextClosedEvent>) event -> {
kafkaConnect.close();
schemaRegistry.close();

View file

@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Log4j2
@AutoConfigureWebTestClient(timeout = "60000")
public class KafkaConnectServiceTests extends AbstractBaseTest {
private final String clusterName = "local";
private final String connectName = "kafka-connect";
private final String connectorName = UUID.randomUUID().toString();
private final Map<String, Object> config = Map.of(
@ -40,7 +39,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@BeforeEach
public void setUp() {
webTestClient.post()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.bodyValue(new NewConnector()
.name(connectorName)
.config(Map.of(
@ -57,7 +56,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@AfterEach
public void tearDown() {
webTestClient.delete()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", clusterName, connectName, connectorName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk();
}
@ -65,7 +64,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@Test
public void shouldListConnectors() {
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.exchange()
.expectStatus().isOk()
.expectBody()
@ -76,7 +75,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@Test
public void shouldReturnNotFoundForNonExistingCluster() {
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster", connectName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster", connectName)
.exchange()
.expectStatus().isNotFound();
}
@ -84,7 +83,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@Test
public void shouldReturnNotFoundForNonExistingConnectName() {
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, "nonExistingConnect")
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, "nonExistingConnect")
.exchange()
.expectStatus().isNotFound();
}
@ -102,7 +101,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
.name(connectorName)
.config(config);
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", clusterName, connectName, connectorName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(Connector.class)
@ -112,7 +111,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@Test
public void shouldUpdateConfig() {
webTestClient.put()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
.bodyValue(Map.of(
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max", "1",
@ -124,7 +123,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
.expectStatus().isOk();
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
@ -142,7 +141,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() {
var connectorName = UUID.randomUUID().toString();
webTestClient.post()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.bodyValue(Map.of(
"name", connectorName,
"config", Map.of(
@ -156,7 +155,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
.expectStatus().isBadRequest();
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.exchange()
.expectStatus().isOk()
.expectBody()
@ -168,7 +167,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {
var connectorName = UUID.randomUUID().toString();
webTestClient.post()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.bodyValue(Map.of(
"name", connectorName,
"config", Map.of(
@ -179,7 +178,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
.expectStatus().isBadRequest();
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
.exchange()
.expectStatus().isOk()
.expectBody()
@ -191,7 +190,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@Test
public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
webTestClient.put()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
.bodyValue(Map.of(
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max", "invalid number",
@ -203,7 +202,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
.expectStatus().isBadRequest();
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
@ -220,7 +219,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@Test
public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() {
webTestClient.put()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
.bodyValue(Map.of(
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
)
@ -229,7 +228,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
.expectStatus().isBadRequest();
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
.exchange()
.expectStatus().isOk()
.expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
@ -246,7 +245,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
@Test
public void shouldRetrieveConnectorPlugins() {
webTestClient.get()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/plugins", clusterName, connectName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/plugins", LOCAL, connectName)
.exchange()
.expectStatus().isOk()
.expectBodyList(ConnectorPlugin.class)
@ -257,7 +256,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
public void shouldSuccessfullyValidateConnectorPluginConfiguration() {
var pluginName = "FileStreamSinkConnector";
webTestClient.put()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", clusterName, connectName, pluginName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", LOCAL, connectName, pluginName)
.bodyValue(Map.of(
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max", "1",
@ -276,7 +275,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() {
var pluginName = "FileStreamSinkConnector";
webTestClient.put()
.uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", clusterName, connectName, pluginName)
.uri("/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", LOCAL, connectName, pluginName)
.bodyValue(Map.of(
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max", "0",

View file

@ -0,0 +1,97 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.model.TopicFormData;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.http.HttpStatus;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.web.reactive.server.WebTestClient;
import java.util.Map;
import java.util.UUID;
@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
@Log4j2
@AutoConfigureWebTestClient(timeout = "60000")
public class ReadOnlyModeTests extends AbstractBaseTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void shouldCreateTopicForNonReadonlyCluster() {
var topicName = UUID.randomUUID().toString();
webTestClient.post()
.uri("/api/clusters/{clusterName}/topics", LOCAL)
.bodyValue(new TopicFormData()
.name(topicName)
.partitions(1)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isOk();
}
@Test
public void shouldNotCreateTopicForReadonlyCluster() {
var topicName = UUID.randomUUID().toString();
webTestClient.post()
.uri("/api/clusters/{clusterName}/topics", SECOND_LOCAL)
.bodyValue(new TopicFormData()
.name(topicName)
.partitions(1)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isEqualTo(HttpStatus.METHOD_NOT_ALLOWED);
}
@Test
public void shouldUpdateTopicForNonReadonlyCluster() {
var topicName = UUID.randomUUID().toString();
webTestClient.post()
.uri("/api/clusters/{clusterName}/topics", LOCAL)
.bodyValue(new TopicFormData()
.name(topicName)
.partitions(1)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isOk();
webTestClient.patch()
.uri("/api/clusters/{clusterName}/topics/{topicName}", LOCAL, topicName)
.bodyValue(new TopicFormData()
.name(topicName)
.partitions(2)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isOk();
}
@Test
public void shouldNotUpdateTopicForReadonlyCluster() {
var topicName = UUID.randomUUID().toString();
webTestClient.patch()
.uri("/api/clusters/{clusterName}/topics/{topicName}", SECOND_LOCAL, topicName)
.bodyValue(new TopicFormData()
.name(topicName)
.partitions(1)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isEqualTo(HttpStatus.METHOD_NOT_ALLOWED);
}
}

View file

@ -27,7 +27,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
String subject;
@BeforeEach
void setUpBefore() {
public void setUpBefore() {
this.subject = UUID.randomUUID().toString();
}
@ -35,7 +35,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
public void should404WhenGetAllSchemasForUnknownCluster() {
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/unknown-cluster/schemas")
.uri("/api/clusters/unknown-cluster/schemas")
.exchange()
.expectStatus().isNotFound();
}
@ -45,7 +45,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
String unknownSchema = "unknown-schema";
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", unknownSchema)
.uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, unknownSchema)
.exchange()
.expectStatus().isNotFound();
}
@ -54,7 +54,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/compatibility")
.uri("/api/clusters/{clusterName}/schemas/compatibility", LOCAL)
.exchange()
.expectStatus().isOk()
.expectBody(CompatibilityLevel.class)
@ -71,7 +71,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas")
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
.exchange()
.expectStatus().isOk()
.expectBodyList(SchemaSubject.class)
@ -99,7 +99,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
//Get the created schema and check its items
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
.uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
.exchange()
.expectStatus().isOk()
.expectBodyList(SchemaSubject.class)
@ -110,7 +110,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
//Now let's change compatibility level of this schema to FULL whereas the global level should be BACKWARD
webTestClient.put()
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/compatibility", subject)
.uri("/api/clusters/{clusterName}/schemas/{subject}/compatibility", LOCAL, subject)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"compatibility\":\"FULL\"}"))
.exchange()
@ -119,7 +119,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
//Get one more time to check the schema compatibility level is changed to FULL
webTestClient
.get()
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}/latest", subject)
.uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject)
.exchange()
.expectStatus().isOk()
.expectBodyList(SchemaSubject.class)
@ -132,7 +132,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest {
private void createNewSubjectAndAssert(String subject) {
webTestClient
.post()
.uri("http://localhost:8080/api/clusters/local/schemas/{subject}", subject)
.uri("/api/clusters/{clusterName}/schemas/{subject}", LOCAL, subject)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"))
.exchange()

View file

@ -1021,6 +1021,8 @@ components:
type: number
bytesOutPerSec:
type: number
readOnly:
type: boolean
required:
- id
- name