|
@@ -28,6 +28,23 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|
|
|
|
|
private static final JsonMapper JSON_MAPPER = createMapper();
|
|
|
|
|
|
+ private static final String ASSIGNMENT = "assignment";
|
|
|
+ private static final String CLIENT_HOST = "client_host";
|
|
|
+ private static final String CLIENT_ID = "client_id";
|
|
|
+ private static final String COMMIT_TIMESTAMP = "commit_timestamp";
|
|
|
+ private static final String CURRENT_STATE_TIMESTAMP = "current_state_timestamp";
|
|
|
+ private static final String GENERATION = "generation";
|
|
|
+ private static final String LEADER = "leader";
|
|
|
+ private static final String MEMBERS = "members";
|
|
|
+ private static final String MEMBER_ID = "member_id";
|
|
|
+ private static final String METADATA = "metadata";
|
|
|
+ private static final String OFFSET = "offset";
|
|
|
+ private static final String PROTOCOL = "protocol";
|
|
|
+ private static final String PROTOCOL_TYPE = "protocol_type";
|
|
|
+ private static final String REBALANCE_TIMEOUT = "rebalance_timeout";
|
|
|
+ private static final String SESSION_TIMEOUT = "session_timeout";
|
|
|
+ private static final String SUBSCRIPTION = "subscription";
|
|
|
+
|
|
|
public static final String TOPIC = "__consumer_offsets";
|
|
|
|
|
|
public static String name() {
|
|
@@ -116,128 +133,128 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|
|
private Deserializer valueDeserializer() {
|
|
|
final Schema commitOffsetSchemaV0 =
|
|
|
new Schema(
|
|
|
- new Field("offset", Type.INT64, ""),
|
|
|
- new Field("metadata", Type.STRING, ""),
|
|
|
- new Field("commit_timestamp", Type.INT64, "")
|
|
|
+ new Field(OFFSET, Type.INT64, ""),
|
|
|
+ new Field(METADATA, Type.STRING, ""),
|
|
|
+ new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
|
|
);
|
|
|
|
|
|
final Schema commitOffsetSchemaV1 =
|
|
|
new Schema(
|
|
|
- new Field("offset", Type.INT64, ""),
|
|
|
- new Field("metadata", Type.STRING, ""),
|
|
|
- new Field("commit_timestamp", Type.INT64, ""),
|
|
|
+ new Field(OFFSET, Type.INT64, ""),
|
|
|
+ new Field(METADATA, Type.STRING, ""),
|
|
|
+ new Field(COMMIT_TIMESTAMP, Type.INT64, ""),
|
|
|
new Field("expire_timestamp", Type.INT64, "")
|
|
|
);
|
|
|
|
|
|
final Schema commitOffsetSchemaV2 =
|
|
|
new Schema(
|
|
|
- new Field("offset", Type.INT64, ""),
|
|
|
- new Field("metadata", Type.STRING, ""),
|
|
|
- new Field("commit_timestamp", Type.INT64, "")
|
|
|
+ new Field(OFFSET, Type.INT64, ""),
|
|
|
+ new Field(METADATA, Type.STRING, ""),
|
|
|
+ new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
|
|
);
|
|
|
|
|
|
final Schema commitOffsetSchemaV3 =
|
|
|
new Schema(
|
|
|
- new Field("offset", Type.INT64, ""),
|
|
|
+ new Field(OFFSET, Type.INT64, ""),
|
|
|
new Field("leader_epoch", Type.INT32, ""),
|
|
|
- new Field("metadata", Type.STRING, ""),
|
|
|
- new Field("commit_timestamp", Type.INT64, "")
|
|
|
+ new Field(METADATA, Type.STRING, ""),
|
|
|
+ new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
|
|
);
|
|
|
|
|
|
final Schema commitOffsetSchemaV4 = new Schema(
|
|
|
- new Field("offset", Type.INT64, ""),
|
|
|
+ new Field(OFFSET, Type.INT64, ""),
|
|
|
new Field("leader_epoch", Type.INT32, ""),
|
|
|
- new Field("metadata", Type.COMPACT_STRING, ""),
|
|
|
- new Field("commit_timestamp", Type.INT64, ""),
|
|
|
+ new Field(METADATA, Type.COMPACT_STRING, ""),
|
|
|
+ new Field(COMMIT_TIMESTAMP, Type.INT64, ""),
|
|
|
Field.TaggedFieldsSection.of()
|
|
|
);
|
|
|
|
|
|
final Schema metadataSchema0 =
|
|
|
new Schema(
|
|
|
- new Field("protocol_type", Type.STRING, ""),
|
|
|
- new Field("generation", Type.INT32, ""),
|
|
|
- new Field("protocol", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("leader", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("members", new ArrayOf(new Schema(
|
|
|
- new Field("member_id", Type.STRING, ""),
|
|
|
- new Field("client_id", Type.STRING, ""),
|
|
|
- new Field("client_host", Type.STRING, ""),
|
|
|
- new Field("session_timeout", Type.INT32, ""),
|
|
|
- new Field("subscription", Type.BYTES, ""),
|
|
|
- new Field("assignment", Type.BYTES, "")
|
|
|
+ new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
|
|
+ new Field(GENERATION, Type.INT32, ""),
|
|
|
+ new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
|
|
+ new Field(LEADER, Type.NULLABLE_STRING, ""),
|
|
|
+ new Field(MEMBERS, new ArrayOf(new Schema(
|
|
|
+ new Field(MEMBER_ID, Type.STRING, ""),
|
|
|
+ new Field(CLIENT_ID, Type.STRING, ""),
|
|
|
+ new Field(CLIENT_HOST, Type.STRING, ""),
|
|
|
+ new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SUBSCRIPTION, Type.BYTES, ""),
|
|
|
+ new Field(ASSIGNMENT, Type.BYTES, "")
|
|
|
)), "")
|
|
|
);
|
|
|
|
|
|
final Schema metadataSchema1 =
|
|
|
new Schema(
|
|
|
- new Field("protocol_type", Type.STRING, ""),
|
|
|
- new Field("generation", Type.INT32, ""),
|
|
|
- new Field("protocol", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("leader", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("members", new ArrayOf(new Schema(
|
|
|
- new Field("member_id", Type.STRING, ""),
|
|
|
- new Field("client_id", Type.STRING, ""),
|
|
|
- new Field("client_host", Type.STRING, ""),
|
|
|
- new Field("rebalance_timeout", Type.INT32, ""),
|
|
|
- new Field("session_timeout", Type.INT32, ""),
|
|
|
- new Field("subscription", Type.BYTES, ""),
|
|
|
- new Field("assignment", Type.BYTES, "")
|
|
|
+ new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
|
|
+ new Field(GENERATION, Type.INT32, ""),
|
|
|
+ new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
|
|
+ new Field(LEADER, Type.NULLABLE_STRING, ""),
|
|
|
+ new Field(MEMBERS, new ArrayOf(new Schema(
|
|
|
+ new Field(MEMBER_ID, Type.STRING, ""),
|
|
|
+ new Field(CLIENT_ID, Type.STRING, ""),
|
|
|
+ new Field(CLIENT_HOST, Type.STRING, ""),
|
|
|
+ new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SUBSCRIPTION, Type.BYTES, ""),
|
|
|
+ new Field(ASSIGNMENT, Type.BYTES, "")
|
|
|
)), "")
|
|
|
);
|
|
|
|
|
|
final Schema metadataSchema2 =
|
|
|
new Schema(
|
|
|
- new Field("protocol_type", Type.STRING, ""),
|
|
|
- new Field("generation", Type.INT32, ""),
|
|
|
- new Field("protocol", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("leader", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("current_state_timestamp", Type.INT64, ""),
|
|
|
- new Field("members", new ArrayOf(new Schema(
|
|
|
- new Field("member_id", Type.STRING, ""),
|
|
|
- new Field("client_id", Type.STRING, ""),
|
|
|
- new Field("client_host", Type.STRING, ""),
|
|
|
- new Field("rebalance_timeout", Type.INT32, ""),
|
|
|
- new Field("session_timeout", Type.INT32, ""),
|
|
|
- new Field("subscription", Type.BYTES, ""),
|
|
|
- new Field("assignment", Type.BYTES, "")
|
|
|
+ new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
|
|
+ new Field(GENERATION, Type.INT32, ""),
|
|
|
+ new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
|
|
+ new Field(LEADER, Type.NULLABLE_STRING, ""),
|
|
|
+ new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
|
|
+ new Field(MEMBERS, new ArrayOf(new Schema(
|
|
|
+ new Field(MEMBER_ID, Type.STRING, ""),
|
|
|
+ new Field(CLIENT_ID, Type.STRING, ""),
|
|
|
+ new Field(CLIENT_HOST, Type.STRING, ""),
|
|
|
+ new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SUBSCRIPTION, Type.BYTES, ""),
|
|
|
+ new Field(ASSIGNMENT, Type.BYTES, "")
|
|
|
)), "")
|
|
|
);
|
|
|
|
|
|
final Schema metadataSchema3 =
|
|
|
new Schema(
|
|
|
- new Field("protocol_type", Type.STRING, ""),
|
|
|
- new Field("generation", Type.INT32, ""),
|
|
|
- new Field("protocol", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("leader", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("current_state_timestamp", Type.INT64, ""),
|
|
|
- new Field("members", new ArrayOf(new Schema(
|
|
|
- new Field("member_id", Type.STRING, ""),
|
|
|
+ new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
|
|
+ new Field(GENERATION, Type.INT32, ""),
|
|
|
+ new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
|
|
+ new Field(LEADER, Type.NULLABLE_STRING, ""),
|
|
|
+ new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
|
|
+ new Field(MEMBERS, new ArrayOf(new Schema(
|
|
|
+ new Field(MEMBER_ID, Type.STRING, ""),
|
|
|
new Field("group_instance_id", Type.NULLABLE_STRING, ""),
|
|
|
- new Field("client_id", Type.STRING, ""),
|
|
|
- new Field("client_host", Type.STRING, ""),
|
|
|
- new Field("rebalance_timeout", Type.INT32, ""),
|
|
|
- new Field("session_timeout", Type.INT32, ""),
|
|
|
- new Field("subscription", Type.BYTES, ""),
|
|
|
- new Field("assignment", Type.BYTES, "")
|
|
|
+ new Field(CLIENT_ID, Type.STRING, ""),
|
|
|
+ new Field(CLIENT_HOST, Type.STRING, ""),
|
|
|
+ new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SUBSCRIPTION, Type.BYTES, ""),
|
|
|
+ new Field(ASSIGNMENT, Type.BYTES, "")
|
|
|
)), "")
|
|
|
);
|
|
|
|
|
|
final Schema metadataSchema4 =
|
|
|
new Schema(
|
|
|
- new Field("protocol_type", Type.COMPACT_STRING, ""),
|
|
|
- new Field("generation", Type.INT32, ""),
|
|
|
- new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
|
|
|
- new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
|
|
|
- new Field("current_state_timestamp", Type.INT64, ""),
|
|
|
- new Field("members", new CompactArrayOf(new Schema(
|
|
|
- new Field("member_id", Type.COMPACT_STRING, ""),
|
|
|
+ new Field(PROTOCOL_TYPE, Type.COMPACT_STRING, ""),
|
|
|
+ new Field(GENERATION, Type.INT32, ""),
|
|
|
+ new Field(PROTOCOL, Type.COMPACT_NULLABLE_STRING, ""),
|
|
|
+ new Field(LEADER, Type.COMPACT_NULLABLE_STRING, ""),
|
|
|
+ new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
|
|
+ new Field(MEMBERS, new CompactArrayOf(new Schema(
|
|
|
+ new Field(MEMBER_ID, Type.COMPACT_STRING, ""),
|
|
|
new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
|
|
|
- new Field("client_id", Type.COMPACT_STRING, ""),
|
|
|
- new Field("client_host", Type.COMPACT_STRING, ""),
|
|
|
- new Field("rebalance_timeout", Type.INT32, ""),
|
|
|
- new Field("session_timeout", Type.INT32, ""),
|
|
|
- new Field("subscription", Type.COMPACT_BYTES, ""),
|
|
|
- new Field("assignment", Type.COMPACT_BYTES, ""),
|
|
|
+ new Field(CLIENT_ID, Type.COMPACT_STRING, ""),
|
|
|
+ new Field(CLIENT_HOST, Type.COMPACT_STRING, ""),
|
|
|
+ new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
|
|
+ new Field(SUBSCRIPTION, Type.COMPACT_BYTES, ""),
|
|
|
+ new Field(ASSIGNMENT, Type.COMPACT_BYTES, ""),
|
|
|
Field.TaggedFieldsSection.of()
|
|
|
)), ""),
|
|
|
Field.TaggedFieldsSection.of()
|
|
@@ -249,7 +266,7 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|
|
short version = bb.getShort();
|
|
|
// ideally, we should distinguish if value is commit or metadata
|
|
|
// by checking record's key, but our current serde structure doesn't allow that.
|
|
|
- // so, we trying to parse into metadata first and after into commit msg
|
|
|
+ // so, we are trying to parse into metadata first and after into commit msg
|
|
|
try {
|
|
|
result = toJson(
|
|
|
switch (version) {
|