Skip to content

Commit

Permalink
Generic Streamable Registry (opensearch-project#7780)
Browse files Browse the repository at this point in the history
This commit adds a WriteableRegistry to register generic read / write
serialization methods for StreamInput.readGeneric and
StreamOutput.writeGeneric. This enables modules, plugins, and libraries
to register their own type serialization logic without forcing the class
implementation to be in the :server module.

Decoupling this logic also further decouples the StreamInput /
StreamOutput transport logic from the server module such that it can be
refactored to the core library to support cloud native and serverless
implementations.

Signed-off-by: Nicholas Walter Knize <[email protected]>
  • Loading branch information
nknize authored and austintlee committed Jun 2, 2023
1 parent d4ffd8e commit d93bd95
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public interface BaseWriteable<S extends BaseStreamOutput> {
*/
class WriteableRegistry {
private static final Map<Class<?>, Writer<? extends BaseStreamOutput, ?>> WRITER_REGISTRY = new ConcurrentHashMap<>();
private static final Map<Class<?>, Class<?>> WRITER_CUSTOM_CLASS_MAP = new ConcurrentHashMap<>();
private static final Map<Byte, Reader<? extends BaseStreamInput, ?>> READER_REGISTRY = new ConcurrentHashMap<>();

/**
Expand All @@ -36,10 +37,9 @@ class WriteableRegistry {
* @opensearch.internal
*/
public static <W extends Writer<? extends BaseStreamOutput, ?>> void registerWriter(final Class<?> clazz, final W writer) {
if (WRITER_REGISTRY.containsKey(clazz)) {
if (WRITER_REGISTRY.putIfAbsent(clazz, writer) != null) {
throw new IllegalArgumentException("Streamable writer already registered for type [" + clazz.getName() + "]");
}
WRITER_REGISTRY.put(clazz, writer);
}

/**
Expand All @@ -48,10 +48,15 @@ class WriteableRegistry {
* @opensearch.internal
*/
public static <R extends Reader<? extends BaseStreamInput, ?>> void registerReader(final byte ordinal, final R reader) {
if (READER_REGISTRY.containsKey(ordinal)) {
if (READER_REGISTRY.putIfAbsent(ordinal, reader) != null) {
throw new IllegalArgumentException("Streamable reader already registered for ordinal [" + (int) ordinal + "]");
}
READER_REGISTRY.put(ordinal, reader);
}

public static void registerClassAlias(final Class<?> classInstance, final Class<?> classGeneric) {
if (WRITER_CUSTOM_CLASS_MAP.putIfAbsent(classInstance, classGeneric) != null) {
throw new IllegalArgumentException("Streamable custom class already registered [" + classInstance.getClass() + "]");
}
}

/**
Expand All @@ -69,6 +74,21 @@ class WriteableRegistry {
public static <R extends Reader<? extends BaseStreamInput, ?>> R getReader(final byte b) {
return (R) READER_REGISTRY.get(b);
}

public static Class<?> getCustomClassFromInstance(final Object value) {
if (value == null) {
throw new IllegalArgumentException("Attempting to retrieve a class type from a null value");
}
// rip through registered classes; return the class iff 'value' is an instance
// we do it this way to cover inheritance and interfaces (e.g., joda DateTime is an instanceof
// a ReadableInstant interface)
for (final Class<?> clazz : WRITER_CUSTOM_CLASS_MAP.values()) {
if (clazz.isInstance(value)) {
return clazz;
}
}
return null;
}
}

/**
Expand Down
14 changes: 0 additions & 14 deletions server/src/main/java/org/opensearch/common/geo/GeoPoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
import org.opensearch.common.geo.GeoUtils.EffectivePoint;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.BaseWriteable.Reader;
import org.opensearch.core.common.io.stream.BaseWriteable.Writer;
import org.opensearch.core.common.io.stream.BaseWriteable.WriteableRegistry;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.geometry.Geometry;
Expand Down Expand Up @@ -97,17 +94,6 @@ public GeoPoint(final StreamInput in) throws IOException {
this.lon = in.readDouble();
}

/**
* Register this type as a streamable so it can be serialized over the wire
*/
public static void registerStreamables() {
WriteableRegistry.<Writer<StreamOutput, ?>>registerWriter(GeoPoint.class, (o, v) -> {
o.writeByte((byte) 22);
((GeoPoint) v).writeTo(o);
});
WriteableRegistry.<Reader<StreamInput, ?>>registerReader(Byte.valueOf((byte) 22), GeoPoint::new);
}

public GeoPoint reset(double lat, double lon) {
this.lat = lat;
this.lon = lon;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.joda.time.DateTimeZone;
import org.opensearch.Build;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
Expand Down Expand Up @@ -825,24 +824,7 @@ private Date readDate() throws IOException {
}

/**
* Read a {@linkplain DateTimeZone}.
*/
public DateTimeZone readTimeZone() throws IOException {
return DateTimeZone.forID(readString());
}

/**
* Read an optional {@linkplain DateTimeZone}.
*/
public DateTimeZone readOptionalTimeZone() throws IOException {
if (readBoolean()) {
return DateTimeZone.forID(readString());
}
return null;
}

/**
* Read a {@linkplain DateTimeZone}.
* Read a {@linkplain ZoneId}.
*/
public ZoneId readZoneId() throws IOException {
return ZoneId.of(readString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
import org.opensearch.Build;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
Expand Down Expand Up @@ -734,12 +732,6 @@ public final void writeOptionalInstant(@Nullable Instant instant) throws IOExcep
o.writeByte((byte) 12);
o.writeLong(((Date) v).getTime());
});
writers.put(ReadableInstant.class, (o, v) -> {
o.writeByte((byte) 13);
final ReadableInstant instant = (ReadableInstant) v;
o.writeString(instant.getZone().getID());
o.writeLong(instant.getMillis());
});
writers.put(BytesReference.class, (o, v) -> {
o.writeByte((byte) 14);
o.writeBytesReference((BytesReference) v);
Expand Down Expand Up @@ -795,16 +787,17 @@ public final void writeOptionalInstant(@Nullable Instant instant) throws IOExcep
}

private static Class<?> getGenericType(Object value) {
if (value instanceof List) {
Class<?> registeredClass = Writeable.WriteableRegistry.getCustomClassFromInstance(value);
if (registeredClass != null) {
return registeredClass;
} else if (value instanceof List) {
return List.class;
} else if (value instanceof Object[]) {
return Object[].class;
} else if (value instanceof Map) {
return Map.class;
} else if (value instanceof Set) {
return Set.class;
} else if (value instanceof ReadableInstant) {
return ReadableInstant.class;
} else if (value instanceof BytesReference) {
return BytesReference.class;
} else {
Expand Down Expand Up @@ -1136,32 +1129,13 @@ public void writeOptionalNamedWriteable(@Nullable NamedWriteable namedWriteable)
}
}

/**
* Write a {@linkplain DateTimeZone} to the stream.
*/
public void writeTimeZone(DateTimeZone timeZone) throws IOException {
writeString(timeZone.getID());
}

/**
* Write a {@linkplain ZoneId} to the stream.
*/
public void writeZoneId(ZoneId timeZone) throws IOException {
writeString(timeZone.getId());
}

/**
* Write an optional {@linkplain DateTimeZone} to the stream.
*/
public void writeOptionalTimeZone(@Nullable DateTimeZone timeZone) throws IOException {
if (timeZone == null) {
writeBoolean(false);
} else {
writeBoolean(true);
writeTimeZone(timeZone);
}
}

/**
* Write an optional {@linkplain ZoneId} to the stream.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.io.stream;

import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
import org.opensearch.common.geo.GeoPoint;
import org.opensearch.common.time.DateUtils;
import org.opensearch.core.common.io.stream.BaseWriteable.WriteableRegistry;
import org.opensearch.core.common.io.stream.BaseWriteable;
import org.opensearch.script.JodaCompatibleZonedDateTime;

import java.time.Instant;
import java.time.ZoneId;

/**
* This utility class registers generic types for streaming over the wire using
* {@linkplain StreamOutput#writeGenericValue(Object)} and {@linkplain StreamInput#readGenericValue()}
*
* In this manner we can register any type across OpenSearch modules, plugins, or libraries without requiring
* the implementation reside in the server module.
*
* @opensearch.internal
*/
public final class Streamables {

// no instance:
private Streamables() {}

/**
* Called when {@linkplain org.opensearch.transport.TransportService} is loaded by the classloader
* We do this because streamables depend on the TransportService being loaded
*/
public static void registerStreamables() {
registerWriters();
registerReaders();
}

/**
* Registers writers by class type
*/
private static void registerWriters() {
/** {@link ReadableInstant} */
WriteableRegistry.<BaseWriteable.Writer<StreamOutput, ?>>registerWriter(ReadableInstant.class, (o, v) -> {
o.writeByte((byte) 13);
final ReadableInstant instant = (ReadableInstant) v;
o.writeString(instant.getZone().getID());
o.writeLong(instant.getMillis());
});
WriteableRegistry.registerClassAlias(ReadableInstant.class, ReadableInstant.class);
/** {@link JodaCompatibleZonedDateTime} */
WriteableRegistry.<BaseWriteable.Writer<StreamOutput, ?>>registerWriter(JodaCompatibleZonedDateTime.class, (o, v) -> {
// write the joda compatibility datetime as joda datetime
o.writeByte((byte) 13);
final JodaCompatibleZonedDateTime zonedDateTime = (JodaCompatibleZonedDateTime) v;
String zoneId = zonedDateTime.getZonedDateTime().getZone().getId();
// joda does not understand "Z" for utc, so we must special case
o.writeString(zoneId.equals("Z") ? DateTimeZone.UTC.getID() : zoneId);
o.writeLong(zonedDateTime.toInstant().toEpochMilli());
});
/** {@link GeoPoint} */
BaseWriteable.WriteableRegistry.<BaseWriteable.Writer<StreamOutput, ?>>registerWriter(GeoPoint.class, (o, v) -> {
o.writeByte((byte) 22);
((GeoPoint) v).writeTo(o);
});
}

/**
* Registers a reader function mapped by ordinal values that are written by {@linkplain StreamOutput}
*
* NOTE: see {@code StreamOutput#WRITERS} for all registered ordinals
*/
private static void registerReaders() {
/** {@link JodaCompatibleZonedDateTime */
WriteableRegistry.<BaseWriteable.Reader<StreamInput, ?>>registerReader(Byte.valueOf((byte) 13), (i) -> {
final ZoneId zoneId = DateUtils.dateTimeZoneToZoneId(DateTimeZone.forID(i.readString()));
long millis = i.readLong();
return new JodaCompatibleZonedDateTime(Instant.ofEpochMilli(millis), zoneId);
});
/** {@link GeoPoint} */
WriteableRegistry.<BaseWriteable.Reader<StreamInput, ?>>registerReader(Byte.valueOf((byte) 22), GeoPoint::new);
}
}
32 changes: 32 additions & 0 deletions server/src/main/java/org/opensearch/common/joda/Joda.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.common.joda;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.time.FormatNames;
Expand Down Expand Up @@ -315,6 +317,36 @@ public static JodaDateFormatter forPattern(String input) {
return new JodaDateFormatter(input, formatter, formatter);
}

public static void writeTimeZone(final StreamOutput out, final DateTimeZone timeZone) throws IOException {
out.writeString(timeZone.getID());
}

public static void writeOptionalTimeZone(final StreamOutput out, final DateTimeZone timeZone) throws IOException {
if (timeZone == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
writeTimeZone(out, timeZone);
}
}

/**
* Read a {@linkplain DateTimeZone} from a {@linkplain StreamInput}.
*/
public static DateTimeZone readTimeZone(final StreamInput in) throws IOException {
return DateTimeZone.forID(in.readString());
}

/**
* Read an optional {@linkplain DateTimeZone}.
*/
public static DateTimeZone readOptionalTimeZone(final StreamInput in) throws IOException {
if (in.readBoolean()) {
return DateTimeZone.forID(in.readString());
}
return null;
}

private static void maybeLogJodaDeprecation(String format) {
if (JodaDeprecationPatterns.isDeprecatedPattern(format)) {
String suggestion = JodaDeprecationPatterns.formatSuggestion(format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,8 @@

package org.opensearch.script;

import org.joda.time.DateTimeZone;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.BaseWriteable.Reader;
import org.opensearch.core.common.io.stream.BaseWriteable.Writer;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.time.DateUtils;
import org.opensearch.core.common.io.stream.BaseWriteable.WriteableRegistry;

import java.time.DayOfWeek;
import java.time.Instant;
Expand Down Expand Up @@ -84,26 +77,6 @@ public JodaCompatibleZonedDateTime(Instant instant, ZoneId zone) {
this.dt = ZonedDateTime.ofInstant(instant, zone);
}

/**
* Register this type as a streamable so it can be serialized over the wire
*/
public static void registerStreamables() {
WriteableRegistry.<Writer<StreamOutput, ?>>registerWriter(JodaCompatibleZonedDateTime.class, (o, v) -> {
// write the joda compatibility datetime as joda datetime
o.writeByte((byte) 13);
final JodaCompatibleZonedDateTime zonedDateTime = (JodaCompatibleZonedDateTime) v;
String zoneId = zonedDateTime.getZonedDateTime().getZone().getId();
// joda does not understand "Z" for utc, so we must special case
o.writeString(zoneId.equals("Z") ? DateTimeZone.UTC.getID() : zoneId);
o.writeLong(zonedDateTime.toInstant().toEpochMilli());
});
WriteableRegistry.<Reader<StreamInput, ?>>registerReader(Byte.valueOf((byte) 13), (i) -> {
final ZoneId zoneId = DateUtils.dateTimeZoneToZoneId(DateTimeZone.forID(i.readString()));
long millis = i.readLong();
return new JodaCompatibleZonedDateTime(Instant.ofEpochMilli(millis), zoneId);
});
}

// access the underlying ZonedDateTime
public ZonedDateTime getZonedDateTime() {
return dt;
Expand Down
Loading

0 comments on commit d93bd95

Please sign in to comment.