Skip to content

Commit

Permalink
Schema registry api view updates (#13678)
Browse files Browse the repository at this point in the history
* Update schema registry client APIs

* Update package structure for schema registry

* Update model classes

* Add clear cache method to sync client
  • Loading branch information
srnagar authored Jul 31, 2020
1 parent 57290b6 commit a5229d5
Show file tree
Hide file tree
Showing 43 changed files with 161 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@
<!-- Suppress the check on code-gen classes -->
<suppress checks="LineLength" files="com.azure.ai.textanalytics.implementation.TextAnalyticsClientImplBuilder"/>
<suppress checks="LineLength" files="com.azure.ai.textanalytics.implementation.TextAnalyticsClientImpl"/>
<suppress checks="." files="com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService"/>
<suppress checks="." files="AzureSchemaRegistryRestService"/>

<!-- Suppress the check on code-gen classes -->
<suppress checks="LineLength" files="com.azure.ai.formrecognizer.implementation.FormRecognizerClientImplBuilder"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.Codec;
import com.azure.data.schemaregistry.SerializationException;
import com.azure.data.schemaregistry.models.SerializationException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
Expand Down Expand Up @@ -79,14 +79,19 @@ public String getSchemaName(Object object) {
return AvroSchemaUtils.getSchema(object).getFullName();
}

@Override
public String getSchemaGroup() {
return "$Default";
}

/**
* Returns ByteArrayOutputStream containing Avro encoding of object parameter
* @param object Object to be encoded into byte stream
* @return closed ByteArrayOutputStream
* @throws SerializationException wraps runtime exceptions
*/
@Override
public ByteArrayOutputStream encode(Object object) {
public byte[] encode(Object object) {
Schema schema = AvroSchemaUtils.getSchema(object);

try {
Expand All @@ -104,7 +109,7 @@ public ByteArrayOutputStream encode(Object object) {
writer.write(object, encoder);
encoder.flush();
}
return out;
return out.toByteArray();
} catch (IOException | RuntimeException e) {
// Avro serialization can throw AvroRuntimeException, NullPointerException, ClassCastException, etc
throw logger.logExceptionAsError(
Expand All @@ -119,7 +124,7 @@ public ByteArrayOutputStream encode(Object object) {
* @return deserialized object
* @throws SerializationException upon deserialization failure
*/
public Object decodeBytes(byte[] b, Object object) {
public Object decode(byte[] b, Object object) {
Objects.requireNonNull(object, "Schema must not be null.");

if (!(object instanceof Schema)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import com.azure.core.experimental.serializer.ObjectSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.SchemaRegistrySerializer;
import com.azure.data.schemaregistry.SerializationException;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.models.SerializationException;
import com.azure.data.schemaregistry.CachedSchemaRegistryAsyncClient;
import reactor.core.publisher.Mono;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.data.schemaregistry.avro;

import com.azure.data.schemaregistry.SchemaRegistrySerializer;
import com.azure.data.schemaregistry.SerializationException;
import com.azure.data.schemaregistry.models.SerializationException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package com.azure.data.schemaregistry.avro;

import com.azure.core.credential.TokenCredential;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.CachedSchemaRegistryClientBuilder;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.schemaregistry.client;
package com.azure.data.schemaregistry;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.client.implementation.models.SchemaId;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.implementation.models.SchemaId;
import com.azure.data.schemaregistry.models.SchemaRegistryClientException;
import com.azure.data.schemaregistry.models.SchemaRegistryObject;
import reactor.core.publisher.Mono;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -78,8 +82,10 @@ public final class CachedSchemaRegistryAsyncClient {
this.maxSchemaMapSize = MAX_SCHEMA_MAP_SIZE_DEFAULT;
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaRegistryObject> registerSchema(
String schemaGroup, String schemaName, String schemaString, String schemaType) {

if (schemaStringCache.containsKey(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString))) {
logger.verbose(
"Cache hit schema string. Group: '{}', name: '{}', schema type: '{}', payload: '{}'",
Expand All @@ -92,6 +98,7 @@ public Mono<SchemaRegistryObject> registerSchema(
.map(response -> response.getValue());
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaRegistryObject>> registerSchemaWithResponse(String schemaGroup, String schemaName,
String schemaString, String schemaType) {
return registerSchemaWithResponse(schemaGroup, schemaName, schemaString, schemaType, Context.NONE);
Expand Down Expand Up @@ -123,6 +130,8 @@ Mono<Response<SchemaRegistryObject>> registerSchemaWithResponse(String schemaGro

SchemaRegistryObject registered = new SchemaRegistryObject(schemaId.getId(),
schemaType,
schemaName,
schemaGroup,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING),
getParseFunc(schemaType));

Expand All @@ -137,6 +146,7 @@ Mono<Response<SchemaRegistryObject>> registerSchemaWithResponse(String schemaGro
});
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<SchemaRegistryObject> getSchema(String schemaId) {
if (idCache.containsKey(schemaId)) {
logger.verbose("Cache hit for schema id '{}'", schemaId);
Expand All @@ -145,6 +155,7 @@ public Mono<SchemaRegistryObject> getSchema(String schemaId) {
return getSchemaWithResponse(schemaId).map(Response::getValue);
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<SchemaRegistryObject>> getSchemaWithResponse(String schemaId) {
return getSchemaWithResponse(schemaId, Context.NONE);
}
Expand Down Expand Up @@ -172,6 +183,8 @@ Mono<Response<SchemaRegistryObject>> getSchemaWithResponse(String schemaId, Cont

SchemaRegistryObject schemaObject = new SchemaRegistryObject(schemaId,
schemaType,
null,
null,
response.getValue().getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING),
getParseFunc(schemaType));

Expand All @@ -186,6 +199,7 @@ Mono<Response<SchemaRegistryObject>> getSchemaWithResponse(String schemaId, Cont
}


@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<String> getSchemaId(String schemaGroup, String schemaName, String schemaString, String schemaType) {

if (schemaStringCache.containsKey(getSchemaStringCacheKey(schemaGroup, schemaName, schemaString))) {
Expand All @@ -197,6 +211,7 @@ public Mono<String> getSchemaId(String schemaGroup, String schemaName, String sc
.map(response -> response.getValue());
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
String schemaType) {
return getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, schemaType, Context.NONE);
Expand Down Expand Up @@ -231,6 +246,8 @@ Mono<Response<String>> getSchemaIdWithResponse(String schemaGroup, String schema
new SchemaRegistryObject(
schemaId.getId(),
schemaType,
schemaName,
schemaGroup,
schemaString.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING),
getParseFunc(schemaType)));
logger.verbose("Cached schema string. Group: '{}', name: '{}'", schemaGroup, schemaName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.azure.data.schemaregistry.client;
package com.azure.data.schemaregistry;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.models.SchemaRegistryObject;

@ServiceClient(
builder = CachedSchemaRegistryClientBuilder.class,
Expand All @@ -15,33 +18,43 @@ public final class CachedSchemaRegistryClient {
this.asyncClient = asyncClient;
}

@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaRegistryObject registerSchema(String schemaGroup, String schemaName, String schemaString,
String schemaType) {
return registerSchemaWithResponse(schemaGroup, schemaName, schemaString, schemaType, Context.NONE).getValue();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaRegistryObject> registerSchemaWithResponse(String schemaGroup, String schemaName,
String schemaString, String schemaType, Context context) {
return this.asyncClient.registerSchemaWithResponse(schemaGroup, schemaName, schemaString, schemaType,
context).block();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public SchemaRegistryObject getSchema(String schemaId) {
return getSchemaWithResponse(schemaId, Context.NONE).getValue();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Response<SchemaRegistryObject> getSchemaWithResponse(String schemaId, Context context) {
return this.asyncClient.getSchemaWithResponse(schemaId).block();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public String getSchemaId(String schemaGroup, String schemaName, String schemaString, String schemaType) {
return getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, schemaType, Context.NONE).getValue();
}

@ServiceMethod(returns = ReturnType.SINGLE)
public Response<String> getSchemaIdWithResponse(String schemaGroup, String schemaName, String schemaString,
String schemaType, Context context) {
return this.asyncClient.getSchemaIdWithResponse(schemaGroup, schemaName, schemaString, schemaType, context)
.block();
}

public void clearCache() {
this.asyncClient.clearCache();
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.schemaregistry.client;
package com.azure.data.schemaregistry;

import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.credential.TokenCredential;
Expand All @@ -23,9 +23,8 @@
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.Codec;
import com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.client.implementation.AzureSchemaRegistryRestServiceClientBuilder;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryRestService;
import com.azure.data.schemaregistry.implementation.AzureSchemaRegistryRestServiceClientBuilder;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.temporal.ChronoUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package com.azure.data.schemaregistry;

import java.io.ByteArrayOutputStream;
import com.azure.data.schemaregistry.models.SerializationException;

/**
* An interface defining operations required for registry-based serialization and deserialization.
Expand Down Expand Up @@ -34,6 +34,8 @@ public interface Codec {
*/
String getSchemaName(Object object);

String getSchemaGroup();

/**
* Returns string representation of schema object to be stored in the service.
*
Expand All @@ -49,7 +51,7 @@ public interface Codec {
* @return output stream containing byte representation of object
* @throws SerializationException if generating byte representation of object fails
*/
ByteArrayOutputStream encode(Object object);
byte[] encode(Object object);

/**
* Decodes byte array into Object given provided schema object.
Expand All @@ -58,5 +60,5 @@ public interface Codec {
* @return deserialized object
* @throws SerializationException if decode operation fails
*/
Object decodeBytes(byte[] encodedBytes, Object schemaObject);
Object decode(byte[] encodedBytes, Object schemaObject);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

import com.azure.core.exception.HttpResponseException;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.client.CachedSchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.client.SchemaRegistryClientException;
import com.azure.data.schemaregistry.client.SchemaRegistryObject;
import com.azure.data.schemaregistry.models.SchemaRegistryClientException;
import com.azure.data.schemaregistry.models.SchemaRegistryObject;
import com.azure.data.schemaregistry.models.SerializationException;
import reactor.core.publisher.Mono;

import java.io.IOException;
Expand Down Expand Up @@ -55,7 +55,7 @@ public SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryCl
this(schemaRegistryClient, serializerCodec, deserializerCodecList, null, null);
}

public <T> SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
public SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
Codec serializerCodec, List<Codec> deserializerCodecList, Boolean autoRegisterSchemas,
String schemaGroup) {

Expand Down Expand Up @@ -153,7 +153,7 @@ protected <T extends OutputStream> Mono<T> serialize(T s, Object object) {
.put(id.getBytes(StandardCharsets.UTF_8));
try {
s.write(idBuffer.array());
serializerCodec.encode(object).writeTo(s);
s.write(serializerCodec.encode(object));
} catch (IOException e) {
sink.error(new SerializationException(e.getMessage(), e));
}
Expand All @@ -170,7 +170,7 @@ protected <T extends OutputStream> Mono<T> serialize(T s, Object object) {
* @return object, deserialized with the prefixed schema
* @throws SerializationException if deserialization of registry schema or message payload fails.
*/
protected Mono<Object> deserialize(InputStream s) throws SerializationException {
protected Mono<Object> deserialize(InputStream s) {
if (s == null) {
return Mono.empty();
}
Expand All @@ -195,7 +195,7 @@ protected Mono<Object> deserialize(InputStream s) throws SerializationException
.onErrorMap(IOException.class,
e -> logger.logExceptionAsError(new SerializationException(e.getMessage(), e)))
.handle((registryObject, sink) -> {
Object payloadSchema = registryObject.deserialize();
Object payloadSchema = registryObject.getSchema();

if (payloadSchema == null) {
sink.error(logger.logExceptionAsError(
Expand All @@ -210,7 +210,7 @@ protected Mono<Object> deserialize(InputStream s) throws SerializationException
byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);

Codec codec = getDeserializerCodec(registryObject);
sink.next(codec.decodeBytes(b, payloadSchema));
sink.next(codec.decode(b, payloadSchema));
})
.onErrorMap(e -> {
if (e instanceof SchemaRegistryClientException) {
Expand Down

This file was deleted.

Loading

0 comments on commit a5229d5

Please sign in to comment.