|
@@ -7,11 +7,14 @@ import com.provectus.kafka.ui.AbstractIntegrationTest;
|
|
import com.provectus.kafka.ui.model.ConsumerPosition;
|
|
import com.provectus.kafka.ui.model.ConsumerPosition;
|
|
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
|
|
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
-import com.provectus.kafka.ui.model.MessageFormatDTO;
|
|
|
|
import com.provectus.kafka.ui.model.SeekDirectionDTO;
|
|
import com.provectus.kafka.ui.model.SeekDirectionDTO;
|
|
import com.provectus.kafka.ui.model.SeekTypeDTO;
|
|
import com.provectus.kafka.ui.model.SeekTypeDTO;
|
|
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
|
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
|
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
|
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
|
|
|
+import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
|
|
|
|
+import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
|
|
|
|
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
|
|
|
|
+import com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde;
|
|
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
|
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
|
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
|
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
|
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
|
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
|
@@ -140,7 +143,9 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key("testKey")
|
|
.key("testKey")
|
|
|
|
+ .keySerde(StringSerde.name())
|
|
.content("testValue")
|
|
.content("testValue")
|
|
|
|
+ .valueSerde(StringSerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertThat(polled.getKey()).isEqualTo("testKey");
|
|
assertThat(polled.getKey()).isEqualTo("testKey");
|
|
@@ -149,40 +154,30 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- void noSchemaJsonKeyJsonValue() {
|
|
|
|
- new SendAndReadSpec()
|
|
|
|
- .withMsgToSend(
|
|
|
|
- new CreateTopicMessageDTO()
|
|
|
|
- .key("{ \"f1\": 111, \"f2\": \"testStr1\" }")
|
|
|
|
- .content("{ \"f1\": 222, \"f2\": \"testStr2\" }")
|
|
|
|
- )
|
|
|
|
- .doAssert(polled -> {
|
|
|
|
- assertThat(polled.getKey()).isEqualTo("{ \"f1\": 111, \"f2\": \"testStr1\" }");
|
|
|
|
- assertThat(polled.getContent()).isEqualTo("{ \"f1\": 222, \"f2\": \"testStr2\" }");
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test
|
|
|
|
- void keyIsIntValueIsDoubleShouldBeSerializedAsStrings() {
|
|
|
|
|
|
+ void keyIsIntValueIsLong() {
|
|
new SendAndReadSpec()
|
|
new SendAndReadSpec()
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key("123")
|
|
.key("123")
|
|
- .content("234.56")
|
|
|
|
|
|
+ .keySerde(Int32Serde.name())
|
|
|
|
+ .content("21474836470")
|
|
|
|
+ .valueSerde(Int64Serde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertThat(polled.getKey()).isEqualTo("123");
|
|
assertThat(polled.getKey()).isEqualTo("123");
|
|
- assertThat(polled.getContent()).isEqualTo("234.56");
|
|
|
|
|
|
+ assertThat(polled.getContent()).isEqualTo("21474836470");
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- void noSchemaKeyIsNull() {
|
|
|
|
|
|
+ void keyIsNull() {
|
|
new SendAndReadSpec()
|
|
new SendAndReadSpec()
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(null)
|
|
.key(null)
|
|
|
|
+ .keySerde(StringSerde.name())
|
|
.content("testValue")
|
|
.content("testValue")
|
|
|
|
+ .valueSerde(StringSerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertThat(polled.getKey()).isNull();
|
|
assertThat(polled.getKey()).isNull();
|
|
@@ -191,12 +186,14 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- void noSchemaValueIsNull() {
|
|
|
|
|
|
+ void valueIsNull() {
|
|
new SendAndReadSpec()
|
|
new SendAndReadSpec()
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key("testKey")
|
|
.key("testKey")
|
|
|
|
+ .keySerde(StringSerde.name())
|
|
.content(null)
|
|
.content(null)
|
|
|
|
+ .valueSerde(StringSerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertThat(polled.getKey()).isEqualTo("testKey");
|
|
assertThat(polled.getKey()).isEqualTo("testKey");
|
|
@@ -212,7 +209,9 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key("\"some string\"")
|
|
.key("\"some string\"")
|
|
|
|
+ .keySerde(SchemaRegistrySerde.name())
|
|
.content("123")
|
|
.content("123")
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertThat(polled.getKey()).isEqualTo("\"some string\"");
|
|
assertThat(polled.getKey()).isEqualTo("\"some string\"");
|
|
@@ -221,14 +220,16 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- void nonNullableKvWithAvroSchema() {
|
|
|
|
|
|
+ void recordAvroSchema() {
|
|
new SendAndReadSpec()
|
|
new SendAndReadSpec()
|
|
.withKeySchema(AVRO_SCHEMA_1)
|
|
.withKeySchema(AVRO_SCHEMA_1)
|
|
.withValueSchema(AVRO_SCHEMA_2)
|
|
.withValueSchema(AVRO_SCHEMA_2)
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(AVRO_SCHEMA_1_JSON_RECORD)
|
|
.key(AVRO_SCHEMA_1_JSON_RECORD)
|
|
|
|
+ .keySerde(SchemaRegistrySerde.name())
|
|
.content(AVRO_SCHEMA_2_JSON_RECORD)
|
|
.content(AVRO_SCHEMA_2_JSON_RECORD)
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
@@ -236,36 +237,6 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- void keyWithNoSchemaValueWithAvroSchema() {
|
|
|
|
- new SendAndReadSpec()
|
|
|
|
- .withValueSchema(AVRO_SCHEMA_1)
|
|
|
|
- .withMsgToSend(
|
|
|
|
- new CreateTopicMessageDTO()
|
|
|
|
- .key("testKey")
|
|
|
|
- .content(AVRO_SCHEMA_1_JSON_RECORD)
|
|
|
|
- )
|
|
|
|
- .doAssert(polled -> {
|
|
|
|
- assertThat(polled.getKey()).isEqualTo("testKey");
|
|
|
|
- assertJsonEqual(polled.getContent(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test
|
|
|
|
- void keyWithAvroSchemaValueWithNoSchema() {
|
|
|
|
- new SendAndReadSpec()
|
|
|
|
- .withKeySchema(AVRO_SCHEMA_1)
|
|
|
|
- .withMsgToSend(
|
|
|
|
- new CreateTopicMessageDTO()
|
|
|
|
- .key(AVRO_SCHEMA_1_JSON_RECORD)
|
|
|
|
- .content("testVal")
|
|
|
|
- )
|
|
|
|
- .doAssert(polled -> {
|
|
|
|
- assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
|
|
- assertThat(polled.getContent()).isEqualTo("testVal");
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Test
|
|
@Test
|
|
void keyWithNoSchemaValueWithProtoSchema() {
|
|
void keyWithNoSchemaValueWithProtoSchema() {
|
|
new SendAndReadSpec()
|
|
new SendAndReadSpec()
|
|
@@ -273,7 +244,9 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key("testKey")
|
|
.key("testKey")
|
|
|
|
+ .keySerde(StringSerde.name())
|
|
.content(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
.content(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertThat(polled.getKey()).isEqualTo("testKey");
|
|
assertThat(polled.getKey()).isEqualTo("testKey");
|
|
@@ -289,7 +262,10 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(null)
|
|
.key(null)
|
|
|
|
+ .keySerde(SchemaRegistrySerde.name())
|
|
.content(AVRO_SCHEMA_2_JSON_RECORD)
|
|
.content(AVRO_SCHEMA_2_JSON_RECORD)
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
|
|
+
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertThat(polled.getKey()).isNull();
|
|
assertThat(polled.getKey()).isNull();
|
|
@@ -298,33 +274,19 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- void valueWithAvroSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
|
|
|
|
|
|
+ void valueWithAvroSchemaShouldThrowExceptionIfArgIsNotValidJsonObject() {
|
|
new SendAndReadSpec()
|
|
new SendAndReadSpec()
|
|
.withValueSchema(AVRO_SCHEMA_2)
|
|
.withValueSchema(AVRO_SCHEMA_2)
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
- // f2 has type object instead of string
|
|
|
|
- .content("{ \"f1\": 111, \"f2\": {} }")
|
|
|
|
|
|
+ .keySerde(StringSerde.name())
|
|
|
|
+ // f2 has type int instead of string
|
|
|
|
+ .content("{ \"f1\": 111, \"f2\": 123 }")
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.assertSendThrowsException();
|
|
.assertSendThrowsException();
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- void keyWithAvroSchemaValueWithAvroSchemaValueIsNull() {
|
|
|
|
- new SendAndReadSpec()
|
|
|
|
- .withKeySchema(AVRO_SCHEMA_1)
|
|
|
|
- .withValueSchema(AVRO_SCHEMA_2)
|
|
|
|
- .withMsgToSend(
|
|
|
|
- new CreateTopicMessageDTO()
|
|
|
|
- .key(AVRO_SCHEMA_1_JSON_RECORD)
|
|
|
|
- .content(null)
|
|
|
|
- )
|
|
|
|
- .doAssert(polled -> {
|
|
|
|
- assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
|
|
- assertThat(polled.getContent()).isNull();
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Test
|
|
@Test
|
|
void keyWithAvroSchemaValueWithProtoSchema() {
|
|
void keyWithAvroSchemaValueWithProtoSchema() {
|
|
new SendAndReadSpec()
|
|
new SendAndReadSpec()
|
|
@@ -333,7 +295,9 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(AVRO_SCHEMA_1_JSON_RECORD)
|
|
.key(AVRO_SCHEMA_1_JSON_RECORD)
|
|
|
|
+ .keySerde(SchemaRegistrySerde.name())
|
|
.content(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
.content(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
@@ -347,8 +311,12 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withValueSchema(PROTOBUF_SCHEMA)
|
|
.withValueSchema(PROTOBUF_SCHEMA)
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
|
|
+ .key(null)
|
|
|
|
+ .keySerde(StringSerde.name())
|
|
// f2 field has type object instead of int
|
|
// f2 field has type object instead of int
|
|
- .content("{ \"f1\" : \"test str\", \"f2\" : {} }"))
|
|
|
|
|
|
+ .content("{ \"f1\" : \"test str\", \"f2\" : {} }")
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
|
|
+ )
|
|
.assertSendThrowsException();
|
|
.assertSendThrowsException();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -360,7 +328,9 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
.key(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
|
|
+ .keySerde(SchemaRegistrySerde.name())
|
|
.content(JSON_SCHEMA_RECORD)
|
|
.content(JSON_SCHEMA_RECORD)
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
|
|
assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
|
|
@@ -368,29 +338,17 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- void keyWithJsonValueWithJsonSchemaKeyValueIsNull() {
|
|
|
|
- new SendAndReadSpec()
|
|
|
|
- .withKeySchema(JSON_SCHEMA)
|
|
|
|
- .withValueSchema(JSON_SCHEMA)
|
|
|
|
- .withMsgToSend(
|
|
|
|
- new CreateTopicMessageDTO()
|
|
|
|
- .key(JSON_SCHEMA_RECORD)
|
|
|
|
- )
|
|
|
|
- .doAssert(polled -> {
|
|
|
|
- assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
|
|
|
|
- assertThat(polled.getContent()).isNull();
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Test
|
|
@Test
|
|
void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() {
|
|
void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() {
|
|
new SendAndReadSpec()
|
|
new SendAndReadSpec()
|
|
.withValueSchema(JSON_SCHEMA)
|
|
.withValueSchema(JSON_SCHEMA)
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
|
|
+ .key(null)
|
|
|
|
+ .keySerde(StringSerde.name())
|
|
// 'f2' field has has type object instead of string
|
|
// 'f2' field has has type object instead of string
|
|
.content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }")
|
|
.content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }")
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.assertSendThrowsException();
|
|
.assertSendThrowsException();
|
|
}
|
|
}
|
|
@@ -403,17 +361,20 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(AVRO_SCHEMA_1_JSON_RECORD)
|
|
.key(AVRO_SCHEMA_1_JSON_RECORD)
|
|
|
|
+ .keySerde(SchemaRegistrySerde.name())
|
|
.content(AVRO_SCHEMA_2_JSON_RECORD)
|
|
.content(AVRO_SCHEMA_2_JSON_RECORD)
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
|
|
assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
|
|
assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
|
|
assertThat(polled.getKeySize()).isEqualTo(15L);
|
|
assertThat(polled.getKeySize()).isEqualTo(15L);
|
|
assertThat(polled.getValueSize()).isEqualTo(15L);
|
|
assertThat(polled.getValueSize()).isEqualTo(15L);
|
|
- assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.AVRO);
|
|
|
|
- assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.AVRO);
|
|
|
|
- assertThat(polled.getKeySchemaId()).isNotEmpty();
|
|
|
|
- assertThat(polled.getValueSchemaId()).isNotEmpty();
|
|
|
|
|
|
+ assertThat(polled.getKeyDeserializeProperties().get("schemaId")).isNotNull();
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
|
|
|
|
+ assertThat(polled.getKeyDeserializeProperties().get("type")).isEqualTo("AVRO");
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("type")).isEqualTo("AVRO");
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
@@ -425,17 +386,19 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
.key(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
|
|
+ .keySerde(SchemaRegistrySerde.name())
|
|
.content(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
.content(PROTOBUF_SCHEMA_JSON_RECORD)
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
|
|
assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
|
|
assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
|
|
assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
|
|
assertThat(polled.getKeySize()).isEqualTo(18L);
|
|
assertThat(polled.getKeySize()).isEqualTo(18L);
|
|
assertThat(polled.getValueSize()).isEqualTo(18L);
|
|
assertThat(polled.getValueSize()).isEqualTo(18L);
|
|
- assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.PROTOBUF);
|
|
|
|
- assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.PROTOBUF);
|
|
|
|
- assertThat(polled.getKeySchemaId()).isNotEmpty();
|
|
|
|
- assertThat(polled.getValueSchemaId()).isNotEmpty();
|
|
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
|
|
|
|
+ assertThat(polled.getKeyDeserializeProperties().get("type")).isEqualTo("PROTOBUF");
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("type")).isEqualTo("PROTOBUF");
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
@@ -447,19 +410,21 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(JSON_SCHEMA_RECORD)
|
|
.key(JSON_SCHEMA_RECORD)
|
|
|
|
+ .keySerde(SchemaRegistrySerde.name())
|
|
.content(JSON_SCHEMA_RECORD)
|
|
.content(JSON_SCHEMA_RECORD)
|
|
|
|
+ .valueSerde(SchemaRegistrySerde.name())
|
|
.headers(Map.of("header1", "value1"))
|
|
.headers(Map.of("header1", "value1"))
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
|
|
assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
|
|
assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
|
|
assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
|
|
- assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.JSON);
|
|
|
|
- assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.JSON);
|
|
|
|
- assertThat(polled.getKeySchemaId()).isNotEmpty();
|
|
|
|
- assertThat(polled.getValueSchemaId()).isNotEmpty();
|
|
|
|
assertThat(polled.getKeySize()).isEqualTo(57L);
|
|
assertThat(polled.getKeySize()).isEqualTo(57L);
|
|
assertThat(polled.getValueSize()).isEqualTo(57L);
|
|
assertThat(polled.getValueSize()).isEqualTo(57L);
|
|
assertThat(polled.getHeadersSize()).isEqualTo(13L);
|
|
assertThat(polled.getHeadersSize()).isEqualTo(13L);
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
|
|
|
|
+ assertThat(polled.getKeyDeserializeProperties().get("type")).isEqualTo("JSON");
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
|
|
|
|
+ assertThat(polled.getValueDeserializeProperties().get("type")).isEqualTo("JSON");
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
@@ -469,7 +434,9 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
.withMsgToSend(
|
|
.withMsgToSend(
|
|
new CreateTopicMessageDTO()
|
|
new CreateTopicMessageDTO()
|
|
.key(null)
|
|
.key(null)
|
|
|
|
+ .keySerde(StringSerde.name()) // any serde
|
|
.content(null)
|
|
.content(null)
|
|
|
|
+ .valueSerde(StringSerde.name()) // any serde
|
|
)
|
|
)
|
|
.doAssert(polled -> {
|
|
.doAssert(polled -> {
|
|
assertThat(polled.getKey()).isNull();
|
|
assertThat(polled.getKey()).isNull();
|
|
@@ -514,10 +481,6 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
if (valueSchema != null) {
|
|
if (valueSchema != null) {
|
|
schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema);
|
|
schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema);
|
|
}
|
|
}
|
|
-
|
|
|
|
- // need to update to see new topic & schemas
|
|
|
|
- clustersMetricsScheduler.updateMetrics();
|
|
|
|
-
|
|
|
|
return topic;
|
|
return topic;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -547,7 +510,9 @@ public class SendAndReadTests extends AbstractIntegrationTest {
|
|
),
|
|
),
|
|
null,
|
|
null,
|
|
null,
|
|
null,
|
|
- 1
|
|
|
|
|
|
+ 1,
|
|
|
|
+ msgToSend.getKeySerde().get(),
|
|
|
|
+ msgToSend.getValueSerde().get()
|
|
).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
|
.map(TopicMessageEventDTO::getMessage)
|
|
.map(TopicMessageEventDTO::getMessage)
|
|
.blockLast(Duration.ofSeconds(5000));
|
|
.blockLast(Duration.ofSeconds(5000));
|