|
@@ -1,15 +1,12 @@
|
|
package com.provectus.kafka.ui.helpers;
|
|
package com.provectus.kafka.ui.helpers;
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
-import com.google.gson.Gson;
|
|
|
|
import com.provectus.kafka.ui.api.ApiClient;
|
|
import com.provectus.kafka.ui.api.ApiClient;
|
|
import com.provectus.kafka.ui.api.api.KafkaConnectApi;
|
|
import com.provectus.kafka.ui.api.api.KafkaConnectApi;
|
|
import com.provectus.kafka.ui.api.api.MessagesApi;
|
|
import com.provectus.kafka.ui.api.api.MessagesApi;
|
|
|
|
+import com.provectus.kafka.ui.api.api.SchemasApi;
|
|
import com.provectus.kafka.ui.api.api.TopicsApi;
|
|
import com.provectus.kafka.ui.api.api.TopicsApi;
|
|
-import com.provectus.kafka.ui.api.model.CreateTopicMessage;
|
|
|
|
-import com.provectus.kafka.ui.api.model.ErrorResponse;
|
|
|
|
-import com.provectus.kafka.ui.api.model.NewConnector;
|
|
|
|
-import com.provectus.kafka.ui.api.model.TopicCreation;
|
|
|
|
|
|
+import com.provectus.kafka.ui.api.model.*;
|
|
import com.provectus.kafka.ui.base.TestConfiguration;
|
|
import com.provectus.kafka.ui.base.TestConfiguration;
|
|
import lombok.SneakyThrows;
|
|
import lombok.SneakyThrows;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -35,7 +32,22 @@ public class ApiHelper {
|
|
return new TopicsApi(new ApiClient().setBasePath(baseURL));
|
|
return new TopicsApi(new ApiClient().setBasePath(baseURL));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ private SchemasApi schemaApi() {
|
|
|
|
+ return new SchemasApi(new ApiClient().setBasePath(baseURL));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ private KafkaConnectApi connectorApi() {
|
|
|
|
+ return new KafkaConnectApi(new ApiClient().setBasePath(baseURL));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ private MessagesApi messageApi() {
|
|
|
|
+ return new MessagesApi(new ApiClient().setBasePath(baseURL));
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ @SneakyThrows
|
|
public void createTopic(String clusterName, String topicName) {
|
|
public void createTopic(String clusterName, String topicName) {
|
|
TopicCreation topic = new TopicCreation();
|
|
TopicCreation topic = new TopicCreation();
|
|
topic.setName(topicName);
|
|
topic.setName(topicName);
|
|
@@ -50,26 +62,32 @@ public class ApiHelper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
public void deleteTopic(String clusterName, String topicName) {
|
|
public void deleteTopic(String clusterName, String topicName) {
|
|
try {
|
|
try {
|
|
topicApi().deleteTopic(clusterName, topicName).block();
|
|
topicApi().deleteTopic(clusterName, topicName).block();
|
|
|
|
+ } catch (WebClientResponseException ignore) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ public void createSchema(String clusterName, String schemaName, SchemaType type, String schemaValue) {
|
|
|
|
+ NewSchemaSubject schemaSubject = new NewSchemaSubject();
|
|
|
|
+ schemaSubject.setSubject(schemaName);
|
|
|
|
+ schemaSubject.setSchema(schemaValue);
|
|
|
|
+ schemaSubject.setSchemaType(type);
|
|
|
|
+ try {
|
|
|
|
+ schemaApi().createNewSchema(clusterName, schemaSubject).block();
|
|
} catch (WebClientResponseException ex) {
|
|
} catch (WebClientResponseException ex) {
|
|
- ErrorResponse errorResponse = new Gson().fromJson(ex.getResponseBodyAsString(), ErrorResponse.class);
|
|
|
|
- if (errorResponse.getMessage().startsWith("This server does not host this")) {
|
|
|
|
- log.info("This server does not host this " + topicName);
|
|
|
|
- } else {
|
|
|
|
- throw ex;
|
|
|
|
- }
|
|
|
|
|
|
+ ex.printStackTrace();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
- private KafkaConnectApi connectorApi() {
|
|
|
|
- ApiClient defaultClient = new ApiClient();
|
|
|
|
- defaultClient.setBasePath(baseURL);
|
|
|
|
- KafkaConnectApi connectorsApi = new KafkaConnectApi(defaultClient);
|
|
|
|
- return connectorsApi;
|
|
|
|
|
|
+ public void deleteSchema(String clusterName, String schemaName) {
|
|
|
|
+ try {
|
|
|
|
+ schemaApi().deleteSchema(clusterName, schemaName).block();
|
|
|
|
+ } catch (WebClientResponseException ignore) {
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
@@ -88,7 +106,7 @@ public class ApiHelper {
|
|
connector.setConfig(configMap);
|
|
connector.setConfig(configMap);
|
|
try {
|
|
try {
|
|
connectorApi().deleteConnector(clusterName, connectName, connectorName).block();
|
|
connectorApi().deleteConnector(clusterName, connectName, connectorName).block();
|
|
- } catch (WebClientResponseException ignored){
|
|
|
|
|
|
+ } catch (WebClientResponseException ignored) {
|
|
}
|
|
}
|
|
connectorApi().createConnector(clusterName, connectName, connector).block();
|
|
connectorApi().createConnector(clusterName, connectName, connector).block();
|
|
}
|
|
}
|
|
@@ -97,17 +115,9 @@ public class ApiHelper {
|
|
return connectorApi().getConnects(clusterName).blockFirst().getName();
|
|
return connectorApi().getConnects(clusterName).blockFirst().getName();
|
|
}
|
|
}
|
|
|
|
|
|
- @SneakyThrows
|
|
|
|
- private MessagesApi messageApi() {
|
|
|
|
- ApiClient defaultClient = new ApiClient();
|
|
|
|
- defaultClient.setBasePath(baseURL);
|
|
|
|
- MessagesApi messagesApi = new MessagesApi(defaultClient);
|
|
|
|
- return messagesApi;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
public void sendMessage(String clusterName, String topicName, String messageContentJson,
|
|
public void sendMessage(String clusterName, String topicName, String messageContentJson,
|
|
- String messageKey) {
|
|
|
|
|
|
+ String messageKey) {
|
|
CreateTopicMessage createMessage = new CreateTopicMessage();
|
|
CreateTopicMessage createMessage = new CreateTopicMessage();
|
|
createMessage.partition(0);
|
|
createMessage.partition(0);
|
|
createMessage.setContent(messageContentJson);
|
|
createMessage.setContent(messageContentJson);
|