Skip to content

Commit

Permalink
Verify schema registry
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Sep 18, 2024
1 parent 862d291 commit 8ea8b04
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,27 @@ catalogs:
type: test
options:
url: http://localhost:8081

id: 1
schema: |-
{
"type": "record",
"name": "cities",
"namespace": "dev",
"fields": [
{
"name": "description",
"type": string
},
{
"name": "id",
"type": string
},
{
"name": "name",
"type": string
}
]
}
bindings:
app0:
type: pgsql-kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public String generateSchema(
schemaBuilder.setLength(0);

final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database);

final String recordName = String.format("%s.%s", database, createTable.getTable().getName());
final String recordName = createTable.getTable().getName();

schemaBuilder.append("{\n");
schemaBuilder.append("\"type\": \"record\",\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.stream;

import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_VERSION_ID;
import static java.util.Objects.requireNonNull;

import java.io.InputStreamReader;
Expand Down Expand Up @@ -1346,14 +1347,20 @@ else if (server.commandsProcessed == 1)
final PgsqlKafkaBindingConfig binding = server.binding;
final String schema = binding.avroValueSchema.generateSchema(server.database, statement);
final String subject = String.format("%s.%s-value", server.database, topic);
binding.catalog.register(subject, schema);

final String policy = binding.avroValueSchema.primaryKey(statement) != null
? "compact"
: "delete";
if (binding.catalog.register(subject, schema) != NO_VERSION_ID)
{
final String policy = binding.avroValueSchema.primaryKey(statement) != null
? "compact"
: "delete";

final KafkaCreateTopicsProxy createTopicsProxy = server.createTopicsProxy;
createTopicsProxy.doKafkaBegin(traceId, authorization, topics, policy);
final KafkaCreateTopicsProxy createTopicsProxy = server.createTopicsProxy;
createTopicsProxy.doKafkaBegin(traceId, authorization, topics, policy);
}
else
{
server.cleanup(traceId, authorization);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public TestCatalogHandler(
this.url = options != null ? options.url : null;
}

@Override
public int register(
String subject,
String schema)
{
return this.schema.equals(schema) ? this.id : NO_VERSION_ID;
}

@Override
public int resolve(
String subject,
Expand Down

0 comments on commit 8ea8b04

Please sign in to comment.