fix for string literals should not be duplicated
This commit is contained in:
parent
50f10bceff
commit
c3d18a5c0d
11 changed files with 167 additions and 130 deletions
|
@ -28,6 +28,8 @@ import reactor.core.publisher.Mono;
|
|||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class BrokersController extends AbstractController implements BrokersApi {
|
||||
private static final String BROKER_ID = "brokerId";
|
||||
|
||||
private final BrokerService brokerService;
|
||||
private final ClusterMapper clusterMapper;
|
||||
|
||||
|
@ -94,7 +96,7 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW)
|
||||
.operationName("getBrokerConfig")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
|
@ -113,7 +115,7 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
|
||||
.operationName("updateBrokerTopicPartitionLogDir")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
|
@ -133,7 +135,7 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
|
||||
.operationName("updateBrokerConfigByName")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
|
|
|
@ -38,6 +38,7 @@ import reactor.core.publisher.Mono;
|
|||
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
|
||||
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
|
||||
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
|
||||
private static final String CONNECTOR_NAME = "connectorName";
|
||||
|
||||
private final KafkaConnectService kafkaConnectService;
|
||||
private final AccessControlService accessControlService;
|
||||
|
@ -116,7 +117,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
||||
.operationName("deleteConnector")
|
||||
.operationParams(Map.of("connectorName", connectName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
|
@ -184,7 +185,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
||||
.operationName("setConnectorConfig")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
|
@ -211,7 +212,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(connectActions)
|
||||
.operationName("updateConnectorState")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
|
@ -231,7 +232,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW)
|
||||
.operationName("getConnectorTasks")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
|
@ -251,7 +252,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
|
||||
.operationName("restartConnectorTask")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
|
|
|
@ -52,6 +52,8 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public static final class AccessContextBuilder {
|
||||
private static final String ACTIONS_NOT_PRESENT = "actions not present";
|
||||
|
||||
private Collection<ApplicationConfigAction> applicationConfigActions = Collections.emptySet();
|
||||
private String cluster;
|
||||
private Collection<ClusterConfigAction> clusterConfigActions = Collections.emptySet();
|
||||
|
@ -75,7 +77,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder applicationConfigActions(ApplicationConfigAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.applicationConfigActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -86,7 +88,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder clusterConfigActions(ClusterConfigAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.clusterConfigActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -97,7 +99,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder topicActions(TopicAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.topicActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -108,7 +110,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder consumerGroupActions(ConsumerGroupAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.consumerGroupActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -119,7 +121,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder connectActions(ConnectAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.connectActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -135,25 +137,25 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder schemaActions(SchemaAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.schemaActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder ksqlActions(KsqlAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.ksqlActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder aclActions(AclAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.aclActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder auditActions(AuditAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.auditActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,23 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
|
||||
private static final JsonMapper JSON_MAPPER = createMapper();
|
||||
|
||||
private static final String ASSIGNMENT = "assignment";
|
||||
private static final String CLIENT_HOST = "client_host";
|
||||
private static final String CLIENT_ID = "client_id";
|
||||
private static final String COMMIT_TIMESTAMP = "commit_timestamp";
|
||||
private static final String CURRENT_STATE_TIMESTAMP = "current_state_timestamp";
|
||||
private static final String GENERATION = "generation";
|
||||
private static final String LEADER = "leader";
|
||||
private static final String MEMBERS = "members";
|
||||
private static final String MEMBER_ID = "member_id";
|
||||
private static final String METADATA = "metadata";
|
||||
private static final String OFFSET = "offset";
|
||||
private static final String PROTOCOL = "protocol";
|
||||
private static final String PROTOCOL_TYPE = "protocol_type";
|
||||
private static final String REBALANCE_TIMEOUT = "rebalance_timeout";
|
||||
private static final String SESSION_TIMEOUT = "session_timeout";
|
||||
private static final String SUBSCRIPTION = "subscription";
|
||||
|
||||
public static final String TOPIC = "__consumer_offsets";
|
||||
|
||||
public static String name() {
|
||||
|
@ -115,128 +132,128 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
private Deserializer valueDeserializer() {
|
||||
final Schema commitOffsetSchemaV0 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV1 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, ""),
|
||||
new Field("expire_timestamp", Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV2 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV3 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field("leader_epoch", Type.INT32, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV4 = new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field("leader_epoch", Type.INT32, ""),
|
||||
new Field("metadata", Type.COMPACT_STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, ""),
|
||||
new Field(METADATA, Type.COMPACT_STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
);
|
||||
|
||||
final Schema metadataSchema0 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema1 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema2 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema3 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field("group_instance_id", Type.NULLABLE_STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema4 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.COMPACT_STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new CompactArrayOf(new Schema(
|
||||
new Field("member_id", Type.COMPACT_STRING, ""),
|
||||
new Field(PROTOCOL_TYPE, Type.COMPACT_STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new CompactArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.COMPACT_STRING, ""),
|
||||
new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("client_id", Type.COMPACT_STRING, ""),
|
||||
new Field("client_host", Type.COMPACT_STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.COMPACT_BYTES, ""),
|
||||
new Field("assignment", Type.COMPACT_BYTES, ""),
|
||||
new Field(CLIENT_ID, Type.COMPACT_STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.COMPACT_STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.COMPACT_BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.COMPACT_BYTES, ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
)), ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
|
@ -248,7 +265,7 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
short version = bb.getShort();
|
||||
// ideally, we should distinguish if value is commit or metadata
|
||||
// by checking record's key, but our current serde structure doesn't allow that.
|
||||
// so, we trying to parse into metadata first and after into commit msg
|
||||
// so, we are trying to parse into metadata first and after into commit msg
|
||||
try {
|
||||
result = toJson(
|
||||
switch (version) {
|
||||
|
|
|
@ -43,6 +43,8 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
return "SchemaRegistry";
|
||||
}
|
||||
|
||||
private static final String SCHEMA_REGISTRY = "schemaRegistry";
|
||||
|
||||
private SchemaRegistryClient schemaRegistryClient;
|
||||
private List<String> schemaRegistryUrls;
|
||||
private String valueSchemaNameTemplate;
|
||||
|
@ -54,7 +56,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
@Override
|
||||
public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
return kafkaClusterProperties.getListProperty("schemaRegistry", String.class)
|
||||
return kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class)
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.isPresent();
|
||||
}
|
||||
|
@ -62,7 +64,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
@Override
|
||||
public void autoConfigure(PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
var urls = kafkaClusterProperties.getListProperty("schemaRegistry", String.class)
|
||||
var urls = kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class)
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.orElseThrow(() -> new ValidationException("No urls provided for schema registry"));
|
||||
configure(
|
||||
|
@ -88,7 +90,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
var urls = serdeProperties.getListProperty("url", String.class)
|
||||
.or(() -> kafkaClusterProperties.getListProperty("schemaRegistry", String.class))
|
||||
.or(() -> kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class))
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.orElseThrow(() -> new ValidationException("No urls provided for schema registry"));
|
||||
configure(
|
||||
|
|
|
@ -11,6 +11,9 @@ import org.apache.kafka.common.Node;
|
|||
|
||||
class WellKnownMetrics {
|
||||
|
||||
private static final String BROKER_TOPIC_METRICS = "BrokerTopicMetrics";
|
||||
private static final String FIFTEEN_MINUTE_RATE = "FifteenMinuteRate";
|
||||
|
||||
// per broker
|
||||
final Map<Integer, BigDecimal> brokerBytesInFifteenMinuteRate = new HashMap<>();
|
||||
final Map<Integer, BigDecimal> brokerBytesOutFifteenMinuteRate = new HashMap<>();
|
||||
|
@ -36,15 +39,15 @@ class WellKnownMetrics {
|
|||
if (!brokerBytesInFifteenMinuteRate.containsKey(node.id())
|
||||
&& rawMetric.labels().size() == 1
|
||||
&& "BytesInPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
brokerBytesInFifteenMinuteRate.put(node.id(), rawMetric.value());
|
||||
}
|
||||
if (!brokerBytesOutFifteenMinuteRate.containsKey(node.id())
|
||||
&& rawMetric.labels().size() == 1
|
||||
&& "BytesOutPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
brokerBytesOutFifteenMinuteRate.put(node.id(), rawMetric.value());
|
||||
}
|
||||
}
|
||||
|
@ -53,8 +56,8 @@ class WellKnownMetrics {
|
|||
String name = rawMetric.name();
|
||||
String topic = rawMetric.labels().get("topic");
|
||||
if (topic != null
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
String nameProperty = rawMetric.labels().get("name");
|
||||
if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
|
||||
bytesInFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
|
||||
|
|
|
@ -52,6 +52,7 @@ import reactor.core.publisher.Mono;
|
|||
public class AccessControlService {
|
||||
|
||||
private static final String ACCESS_DENIED = "Access denied";
|
||||
private static final String ACTIONS_ARE_EMPTY = "actions are empty";
|
||||
|
||||
@Nullable
|
||||
private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
|
||||
|
@ -206,7 +207,7 @@ public class AccessControlService {
|
|||
if (context.getTopic() == null && context.getTopicActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getTopicActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getTopicActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getTopicActions()
|
||||
.stream()
|
||||
|
@ -243,7 +244,7 @@ public class AccessControlService {
|
|||
if (context.getConsumerGroup() == null && context.getConsumerGroupActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getConsumerGroupActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getConsumerGroupActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getConsumerGroupActions()
|
||||
.stream()
|
||||
|
@ -276,7 +277,7 @@ public class AccessControlService {
|
|||
if (context.getSchema() == null && context.getSchemaActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getSchemaActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getSchemaActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getSchemaActions()
|
||||
.stream()
|
||||
|
@ -309,7 +310,7 @@ public class AccessControlService {
|
|||
if (context.getConnect() == null && context.getConnectActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getConnectActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getConnectActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getConnectActions()
|
||||
.stream()
|
||||
|
|
|
@ -43,6 +43,8 @@ public class JsonAvroConversion {
|
|||
|
||||
private static final JsonMapper MAPPER = new JsonMapper();
|
||||
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
|
||||
private static final String FORMAT = "format";
|
||||
private static final String DATE_TIME = "date-time";
|
||||
|
||||
// converts json into Object that is expected input for KafkaAvroSerializer
|
||||
// (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
|
||||
|
@ -347,7 +349,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("uuid"))))
|
||||
Map.of(FORMAT, new TextNode("uuid"))))
|
||||
),
|
||||
|
||||
DECIMAL("decimal",
|
||||
|
@ -385,7 +387,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date"))))
|
||||
Map.of(FORMAT, new TextNode("date"))))
|
||||
),
|
||||
|
||||
TIME_MILLIS("time-millis",
|
||||
|
@ -406,7 +408,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("time"))))
|
||||
Map.of(FORMAT, new TextNode("time"))))
|
||||
),
|
||||
|
||||
TIME_MICROS("time-micros",
|
||||
|
@ -427,7 +429,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("time"))))
|
||||
Map.of(FORMAT, new TextNode("time"))))
|
||||
),
|
||||
|
||||
TIMESTAMP_MILLIS("timestamp-millis",
|
||||
|
@ -448,7 +450,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date-time"))))
|
||||
Map.of(FORMAT, new TextNode(DATE_TIME))))
|
||||
),
|
||||
|
||||
TIMESTAMP_MICROS("timestamp-micros",
|
||||
|
@ -473,7 +475,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date-time"))))
|
||||
Map.of(FORMAT, new TextNode(DATE_TIME))))
|
||||
),
|
||||
|
||||
LOCAL_TIMESTAMP_MILLIS("local-timestamp-millis",
|
||||
|
@ -491,7 +493,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date-time"))))
|
||||
Map.of(FORMAT, new TextNode(DATE_TIME))))
|
||||
),
|
||||
|
||||
LOCAL_TIMESTAMP_MICROS("local-timestamp-micros",
|
||||
|
@ -508,7 +510,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date-time"))))
|
||||
Map.of(FORMAT, new TextNode(DATE_TIME))))
|
||||
);
|
||||
|
||||
private final String name;
|
||||
|
|
|
@ -37,6 +37,9 @@ import reactor.util.function.Tuples;
|
|||
|
||||
public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.Descriptor> {
|
||||
|
||||
private static final String MAXIMUM = "maximum";
|
||||
private static final String MINIMUM = "minimum";
|
||||
|
||||
private final Set<String> simpleTypesWrapperNames = Set.of(
|
||||
BoolValue.getDescriptor().getFullName(),
|
||||
Int32Value.getDescriptor().getFullName(),
|
||||
|
@ -156,15 +159,15 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
|
|||
case INT32, FIXED32, SFIXED32, SINT32 -> new SimpleJsonType(
|
||||
JsonType.Type.INTEGER,
|
||||
Map.of(
|
||||
"maximum", IntNode.valueOf(Integer.MAX_VALUE),
|
||||
"minimum", IntNode.valueOf(Integer.MIN_VALUE)
|
||||
MAXIMUM, IntNode.valueOf(Integer.MAX_VALUE),
|
||||
MINIMUM, IntNode.valueOf(Integer.MIN_VALUE)
|
||||
)
|
||||
);
|
||||
case UINT32 -> new SimpleJsonType(
|
||||
JsonType.Type.INTEGER,
|
||||
Map.of(
|
||||
"maximum", LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
|
||||
"minimum", IntNode.valueOf(0)
|
||||
MAXIMUM, LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
|
||||
MINIMUM, IntNode.valueOf(0)
|
||||
)
|
||||
);
|
||||
//TODO: actually all *64 types will be printed with quotes (as strings),
|
||||
|
@ -173,15 +176,15 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
|
|||
case INT64, FIXED64, SFIXED64, SINT64 -> new SimpleJsonType(
|
||||
JsonType.Type.INTEGER,
|
||||
Map.of(
|
||||
"maximum", LongNode.valueOf(Long.MAX_VALUE),
|
||||
"minimum", LongNode.valueOf(Long.MIN_VALUE)
|
||||
MAXIMUM, LongNode.valueOf(Long.MAX_VALUE),
|
||||
MINIMUM, LongNode.valueOf(Long.MIN_VALUE)
|
||||
)
|
||||
);
|
||||
case UINT64 -> new SimpleJsonType(
|
||||
JsonType.Type.INTEGER,
|
||||
Map.of(
|
||||
"maximum", new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
|
||||
"minimum", LongNode.valueOf(0)
|
||||
MAXIMUM, new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
|
||||
MINIMUM, LongNode.valueOf(0)
|
||||
)
|
||||
);
|
||||
case MESSAGE, GROUP -> new SimpleJsonType(JsonType.Type.OBJECT);
|
||||
|
|
|
@ -10,25 +10,27 @@ import lombok.experimental.Accessors;
|
|||
@Accessors(chain = true)
|
||||
public class Schema {
|
||||
|
||||
private static final String USER_DIR = "user.dir";
|
||||
|
||||
private String name, valuePath;
|
||||
private SchemaType type;
|
||||
|
||||
public static Schema createSchemaAvro() {
|
||||
return new Schema().setName("schema_avro-" + randomAlphabetic(5))
|
||||
.setType(SchemaType.AVRO)
|
||||
.setValuePath(System.getProperty("user.dir") + "/src/main/resources/testData/schemas/schema_avro_value.json");
|
||||
.setValuePath(System.getProperty(USER_DIR) + "/src/main/resources/testData/schemas/schema_avro_value.json");
|
||||
}
|
||||
|
||||
public static Schema createSchemaJson() {
|
||||
return new Schema().setName("schema_json-" + randomAlphabetic(5))
|
||||
.setType(SchemaType.JSON)
|
||||
.setValuePath(System.getProperty("user.dir") + "/src/main/resources/testData/schemas/schema_json_Value.json");
|
||||
.setValuePath(System.getProperty(USER_DIR) + "/src/main/resources/testData/schemas/schema_json_Value.json");
|
||||
}
|
||||
|
||||
public static Schema createSchemaProtobuf() {
|
||||
return new Schema().setName("schema_protobuf-" + randomAlphabetic(5))
|
||||
.setType(SchemaType.PROTOBUF)
|
||||
.setValuePath(
|
||||
System.getProperty("user.dir") + "/src/main/resources/testData/schemas/schema_protobuf_value.txt");
|
||||
System.getProperty(USER_DIR) + "/src/main/resources/testData/schemas/schema_protobuf_value.txt");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ import io.qameta.allure.Step;
|
|||
|
||||
public class TopicCreateEditForm extends BasePage {
|
||||
|
||||
private static final String RETENTION_BYTES = "retentionBytes";
|
||||
|
||||
protected SelenideElement timeToRetainField = $x("//input[@id='timeToRetain']");
|
||||
protected SelenideElement partitionsField = $x("//input[@name='partitions']");
|
||||
protected SelenideElement nameField = $(id("topicFormName"));
|
||||
|
@ -138,12 +140,12 @@ public class TopicCreateEditForm extends BasePage {
|
|||
|
||||
@Step
|
||||
public TopicCreateEditForm selectRetentionBytes(String visibleValue) {
|
||||
return selectFromDropDownByVisibleText("retentionBytes", visibleValue);
|
||||
return selectFromDropDownByVisibleText(RETENTION_BYTES, visibleValue);
|
||||
}
|
||||
|
||||
@Step
|
||||
public TopicCreateEditForm selectRetentionBytes(Long optionValue) {
|
||||
return selectFromDropDownByOptionValue("retentionBytes", optionValue.toString());
|
||||
return selectFromDropDownByOptionValue(RETENTION_BYTES, optionValue.toString());
|
||||
}
|
||||
|
||||
@Step
|
||||
|
@ -202,7 +204,7 @@ public class TopicCreateEditForm extends BasePage {
|
|||
|
||||
@Step
|
||||
public String getMaxSizeOnDisk() {
|
||||
return new KafkaUiSelectElement("retentionBytes").getCurrentValue();
|
||||
return new KafkaUiSelectElement(RETENTION_BYTES).getCurrentValue();
|
||||
}
|
||||
|
||||
@Step
|
||||
|
|
Loading…
Add table
Reference in a new issue