-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support obtaining protobuf schemas from schema registry for grpc services #757
Conversation
…artial data frame while computing crc32c value
@@ -84,7 +86,7 @@ public ModelConfig adaptFromJson( | |||
for (String catalogName: catalogsJson.keySet()) | |||
{ | |||
JsonArray schemasJson = catalogsJson.getJsonArray(catalogName); | |||
List<SchemaConfig> schemas = new LinkedList<>(); | |||
ObjectHashSet<SchemaConfig> schemas = new ObjectHashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left side should just be Set<SchemaConfig>
or Collection<SchemaConfig>
.
ObjectHashSet<CatalogedConfig>.ObjectIterator catalogIterator = catalogs.iterator(); | ||
|
||
while (catalogIterator.hasNext()) | ||
{ | ||
for (SchemaConfig catalogSchema : catalog.schemas) | ||
CatalogedConfig catalog = catalogIterator.nextValue(); | ||
ObjectHashSet<SchemaConfig>.ObjectIterator schemaIterator = catalog.schemas.iterator(); | ||
|
||
while (schemaIterator.hasNext()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're using ObjectHashSet
with reusable iterator, then for ( ... : ...)
syntax is fine, it will pick up the iterator implicitly.
@@ -102,6 +104,6 @@ private GrpcProtobufConfig asProtobuf( | |||
final String location = ((JsonString) value).getString(); | |||
final String protoService = readURL.apply(location); | |||
|
|||
return ProtobufParser.protobufConfig(location, protoService); | |||
return parser.parse(location, protoService); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return parser.parse(location, protoService); | |
return parser.parse(location, protobuf); |
...src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/config/GrpcProtobufParser.java
Show resolved
Hide resolved
@@ -40,8 +42,8 @@ public class BindingConfig | |||
public final String entry; | |||
public final String vault; | |||
public final OptionsConfig options; | |||
public final ObjectHashSet<CatalogedConfig> catalogs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to avoid using implementation details in fields, let's use Set<CatalogedConfig>
or Collection<CatalogedConfig>
instead.
@@ -117,15 +119,27 @@ public BindingConfigBuilder<T> options( | |||
return this; | |||
} | |||
|
|||
public CatalogRefConfigBuilder<BindingConfigBuilder<T>> catalog() | |||
public BindingConfigBuilder<T> catalogs( | |||
ObjectHashSet<CatalogedConfig> catalogs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
|
||
public CatalogedConfig( | ||
String name, | ||
List<SchemaConfig> schemas) | ||
ObjectHashSet<SchemaConfig> schemas) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
public final class CatalogedConfigBuilder<T> extends ConfigBuilder<T, CatalogedConfigBuilder<T>> | ||
{ | ||
private final Function<CatalogedConfig, T> mapper; | ||
|
||
private String name; | ||
private List<SchemaConfig> schemas; | ||
private ObjectHashSet<SchemaConfig> schemas; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here
} | ||
|
||
|
||
@Override | ||
public CatalogRefConfig adaptFromJson( | ||
public ObjectHashSet<CatalogedConfig> adaptFromJson( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here and twice in the method body too
{ | ||
for (CatalogedConfig cataloged : binding.catalogs) | ||
{ | ||
Pattern pattern = Pattern.compile(cataloged.name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This use of Pattern doesn't seem to be able to fail, so not sure why we have it.
@@ -81,7 +81,7 @@ protected AvroModelHandler( | |||
this.encoder = encoderFactory.binaryEncoder(EMPTY_OUTPUT_STREAM, null); | |||
CatalogedConfig cataloged = config.cataloged.get(0); | |||
this.handler = supplyCatalog.apply(cataloged.id); | |||
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null; | |||
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.stream().findFirst().get() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revert this if reverting to List, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I have resolved this but not sure why it is not updated locally I don't have this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Figured it out I was debugging issue reported by barnabas and accidentally made changes into that branch
@@ -60,7 +60,7 @@ public JsonModelHandler( | |||
this.service = JsonValidationService.newInstance(); | |||
this.factory = schemaProvider.createParserFactory(null); | |||
CatalogedConfig cataloged = config.cataloged.get(0); | |||
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null; | |||
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.stream().findFirst().get() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revert this if reverting to List, right?
List<SchemaConfig> schemas = new ArrayList<>(converter.cataloged.get(0).schemas); | ||
assertThat(schemas.get(0).strategy, equalTo("topic")); | ||
assertThat(schemas.get(0).version, equalTo("latest")); | ||
assertThat(schemas.get(0).id, equalTo(0)); | ||
assertThat(schemas.get(1).subject, equalTo("cat")); | ||
assertThat(schemas.get(1).strategy, nullValue()); | ||
assertThat(schemas.get(1).version, equalTo("latest")); | ||
assertThat(schemas.get(1).id, equalTo(0)); | ||
assertThat(schemas.get(2).strategy, nullValue()); | ||
assertThat(schemas.get(2).version, nullValue()); | ||
assertThat(schemas.get(2).id, equalTo(42)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revert this if reverting to List, right?
@@ -86,7 +84,7 @@ public ModelConfig adaptFromJson( | |||
for (String catalogName: catalogsJson.keySet()) | |||
{ | |||
JsonArray schemasJson = catalogsJson.getJsonArray(catalogName); | |||
ObjectHashSet<SchemaConfig> schemas = new ObjectHashSet<>(); | |||
List<SchemaConfig> schemas = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are using List
, lets keep exposing it as a List
in the config object instead of Collection
.
@@ -76,7 +76,7 @@ protected ProtobufModelHandler( | |||
{ | |||
CatalogedConfig cataloged = config.cataloged.get(0); | |||
this.handler = supplyCatalog.apply(cataloged.id); | |||
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null; | |||
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.stream().findFirst().get() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revert this if reverting to List, right?
{ | ||
CatalogedConfig catalog = catalogIterator.nextValue(); | ||
ObjectHashSet<SchemaConfig>.ObjectIterator schemaIterator = catalog.schemas.iterator(); | ||
CatalogedConfig catalog = catalogs.get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalogedConfig catalog = catalogs.get(0); | |
CatalogedConfig catalog = catalogs.get(i); |
@@ -77,7 +76,7 @@ public final class GrpcBindingConfig | |||
private final HttpGrpcHeaderHelper helper; | |||
private final Long2ObjectHashMap<CatalogHandler> handlersById; | |||
private final Int2ObjectHashMap<GrpcProtobufConfig> protobufsBySchemaId; | |||
private final ObjectHashSet<CatalogedConfig> catalogs; | |||
private final List<CatalogedConfig> catalogs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need both List<CatalogedConfig> catalogs
and Long2ObjectHashMap<CatalogHandler> handlersById
.
Seems like we need a data structure that has:
final CatalogHandler handler;
final String subject;
final String version;
int schemaId;
GrpcProtobufConfig protobuf;
and a list of these, flattening them for each catalog + schema combination, using same catalog handler for schemas on each catalog.
Then in resolveProtobufs, we just need to iterate this list, checking to see if schemaId changes compared to most recent one, and update it and newly parsed protobuf if necessary.
Then resolveMethod can stream over this list instead, mapping to GrpcProtobufConfig, before doing the rest.
@@ -76,7 +74,7 @@ public TestModelConfig adaptFromJson( | |||
for (String catalogName: catalogsJson.keySet()) | |||
{ | |||
JsonArray schemasJson = catalogsJson.getJsonArray(catalogName); | |||
ObjectHashSet<SchemaConfig> schemas = new ObjectHashSet<>(); | |||
List<SchemaConfig> schemas = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use ArrayList as it is faster to walk via .get(int)
.
JsonObject catalogsJson) | ||
{ | ||
ObjectHashSet<CatalogedConfig> catalogs = new ObjectHashSet<>(); | ||
List<CatalogedConfig> catalogs = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use ArrayList as it is faster to walk via .get(int)
.
for (String catalogName: catalogsJson.keySet()) | ||
{ | ||
JsonArray schemasJson = catalogsJson.getJsonArray(catalogName); | ||
ObjectHashSet<SchemaConfig> schemas = new ObjectHashSet<>(); | ||
List<SchemaConfig> schemas = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use ArrayList as it is faster to walk via .get(int)
.
catalog.resolveProtobuf(); | ||
} | ||
|
||
final Stream<GrpcProtobufConfig> objectStream = catalogs.stream().map(c -> c.protobuf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this. My idea is to reuse resolveMethod.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use catalogs
in resolveMethod
then it can be
return catalogs.stream().
.map(c -> c.protobuf)
.map(p -> p.services.stream().filter(s -> s.service.equals(serviceName)).findFirst().orElse(null))
.filter(Objects::nonNull)
.map(s -> s.methods.stream().filter(m -> m.method.equals(methodName)).findFirst().orElse(null))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
catalog.resolveProtobuf(); | ||
} | ||
|
||
final Stream<GrpcProtobufConfig> objectStream = catalogs.stream().map(c -> c.protobuf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use catalogs
in resolveMethod
then it can be
return catalogs.stream().
.map(c -> c.protobuf)
.map(p -> p.services.stream().filter(s -> s.service.equals(serviceName)).findFirst().orElse(null))
.filter(Objects::nonNull)
.map(s -> s.methods.stream().filter(m -> m.method.equals(methodName)).findFirst().orElse(null))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
|
||
public final List<GrpcRouteConfig> routes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to nitpick, can we move this after options
please?
this.catalogs = new ObjectHashSet<>(); | ||
|
||
for (CatalogedConfig catalog : binding.catalogs) | ||
{ | ||
CatalogHandler handler = supplyCatalog.apply(catalog.id); | ||
for (SchemaConfig schema : catalog.schemas) | ||
{ | ||
catalogs.add(new GrpcCatalogSchema(handler, schema.subject, schema.version)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.catalogs = new ObjectHashSet<>(); | |
for (CatalogedConfig catalog : binding.catalogs) | |
{ | |
CatalogHandler handler = supplyCatalog.apply(catalog.id); | |
for (SchemaConfig schema : catalog.schemas) | |
{ | |
catalogs.add(new GrpcCatalogSchema(handler, schema.subject, schema.version)); | |
} | |
} | |
} | |
Set<GrpcCatalogSchema> catalogs = new ObjectHashSet<>(); | |
for (CatalogedConfig catalog : binding.catalogs) | |
{ | |
CatalogHandler handler = supplyCatalog.apply(catalog.id); | |
for (SchemaConfig schema : catalog.schemas) | |
{ | |
catalogs.add(new GrpcCatalogSchema(handler, schema.subject, schema.version)); | |
} | |
} | |
this.catalogs = catalogs; | |
} |
for (GrpcCatalogSchema catalog : catalogs) | ||
{ | ||
catalog.resolveProtobuf(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets change resolveProtobuf()
to return GrpcProtobufConfig
, then in the resolveMethod(catalogs)
method, use
.map(GrpcCatalogSchema::resolveProtobuf)
instead of
.map(c -> c.protobuf)
So no need for this loop then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's what I was thinking thanks good suggestion
Description
Support obtaining protobuf schemas from schema registry for grpc services
Fixes #697