Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added override methods to refill Java.Util.Properties if needed #346

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions src/net/KNet/Specific/Streams/KNetStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ namespace MASES.KNet.Streams
/// <summary>
/// KNet extension of <see cref="Org.Apache.Kafka.Streams.KafkaStreams"/>
/// </summary>
public class KNetStreams : Org.Apache.Kafka.Streams.KafkaStreams, IGenericSerDesFactoryApplier
public class KNetStreams : IGenericSerDesFactoryApplier
{
readonly Org.Apache.Kafka.Streams.KafkaStreams _inner;
readonly KNetClientSupplier _supplier = null; // used to avoid GC recall
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

Org.Apache.Kafka.Streams.Processor.StateRestoreListener _stateRestoreListener; // used to avoid GC recall
StateListener _stateListener; // used to avoid GC recall
Org.Apache.Kafka.Streams.KafkaStreams.StateListener _stateListener; // used to avoid GC recall
Org.Apache.Kafka.Streams.Errors.StreamsUncaughtExceptionHandler _streamsUncaughtExceptionHandler; // used to avoid GC recall

#region Constructors
Expand All @@ -44,8 +45,8 @@ public class KNetStreams : Org.Apache.Kafka.Streams.KafkaStreams, IGenericSerDes
/// <param name="arg1"><see cref="Java.Util.Properties"/></param>
/// <param name="arg2"><see cref="Org.Apache.Kafka.Common.Utils.Time"/></param>
public KNetStreams(KNetTopology arg0, StreamsConfigBuilder arg1, Org.Apache.Kafka.Common.Utils.Time arg2)
: base(arg0, arg1, arg2)
{
_inner = new Org.Apache.Kafka.Streams.KafkaStreams(arg0, PrepareProperties(arg1), arg2);
_factory = arg1;
}
/// <summary>
Expand All @@ -56,8 +57,8 @@ public KNetStreams(KNetTopology arg0, StreamsConfigBuilder arg1, Org.Apache.Kafk
/// <param name="arg2"><see cref="KNetClientSupplier"/></param>
/// <param name="arg3"><see cref="Org.Apache.Kafka.Common.Utils.Time"/></param>
public KNetStreams(KNetTopology arg0, StreamsConfigBuilder arg1, KNetClientSupplier arg2, Org.Apache.Kafka.Common.Utils.Time arg3)
: base(arg0, arg1, arg2, arg3)
{
_inner = new Org.Apache.Kafka.Streams.KafkaStreams(arg0, PrepareProperties(arg1), arg2, arg3);
_factory = arg1;
_supplier = arg2;
}
Expand All @@ -68,8 +69,8 @@ public KNetStreams(KNetTopology arg0, StreamsConfigBuilder arg1, KNetClientSuppl
/// <param name="arg1"><see cref="Java.Util.Properties"/></param>
/// <param name="arg2"><see cref="KNetClientSupplier"/></param>
public KNetStreams(KNetTopology arg0, StreamsConfigBuilder arg1, KNetClientSupplier arg2)
: base(arg0, arg1, arg2)
{
_inner = new Org.Apache.Kafka.Streams.KafkaStreams(arg0, PrepareProperties(arg1), arg2);
_factory = arg1;
_supplier = arg2;
}
Expand All @@ -79,12 +80,30 @@ public KNetStreams(KNetTopology arg0, StreamsConfigBuilder arg1, KNetClientSuppl
/// <param name="arg0"><see cref="KNetTopology"/></param>
/// <param name="arg1"><see cref="Java.Util.Properties"/></param>
public KNetStreams(KNetTopology arg0, StreamsConfigBuilder arg1)
: base(arg0, arg1)
{
_inner = new Org.Apache.Kafka.Streams.KafkaStreams(arg0, PrepareProperties(arg1));
_factory = arg1;
}
#endregion

/// <summary>
/// Converter from <see cref="KNetStreams"/> to <see cref="Org.Apache.Kafka.Streams.KafkaStreams"/>
/// </summary>
public static implicit operator Org.Apache.Kafka.Streams.KafkaStreams(KNetStreams t) => t._inner;
/// <summary>
/// If set, this <see cref="Func{T, TResult}"/> will be called from <see cref="PrepareProperties(StreamsConfigBuilder)"/>
/// </summary>
public static Func<Java.Util.Properties, StreamsConfigBuilder> OverrideProperties { get; set; }
/// <summary>
/// Override this method to check and modify the <see cref="Java.Util.Properties"/> returned to underlying <see cref="Org.Apache.Kafka.Streams.KafkaStreams"/>
/// </summary>
/// <param name="builder"><see cref="StreamsConfigBuilder"/> to use to return <see cref="Java.Util.Properties"/></param>
/// <returns><see cref="Java.Util.Properties"/> used from underlying <see cref="Org.Apache.Kafka.Streams.KafkaStreams"/></returns>
protected virtual Java.Util.Properties PrepareProperties(StreamsConfigBuilder builder)
{
return OverrideProperties != null ? OverrideProperties(builder) : builder;
}

#region Instance methods
/// <summary>
/// KNet implementation of <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/KafkaStreams.html#queryMetadataForKey-java.lang.String-java.lang.Object-org.apache.kafka.common.serialization.Serializer-"/>
Expand All @@ -96,7 +115,7 @@ public KNetStreams(KNetTopology arg0, StreamsConfigBuilder arg1)
/// <returns><see cref="Org.Apache.Kafka.Streams.KeyQueryMetadata"/></returns>
public Org.Apache.Kafka.Streams.KeyQueryMetadata QueryMetadataForKey<TKey>(string arg0, TKey arg1, IKNetSerializer<TKey> arg2)
{
return QueryMetadataForKey<byte[]>(arg0, arg2.Serialize(null, arg1), arg2.KafkaSerializer);
return _inner.QueryMetadataForKey<byte[]>(arg0, arg2.Serialize(null, arg1), arg2.KafkaSerializer);
}
/// <summary>
/// KNet implementation of <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/KafkaStreams.html#queryMetadataForKey-java.lang.String-java.lang.Object-org.apache.kafka.streams.processor.StreamPartitioner-"/>
Expand All @@ -110,7 +129,7 @@ public Org.Apache.Kafka.Streams.KeyQueryMetadata QueryMetadataForKey<TKey>(strin
{
if (arg2 is IGenericSerDesFactoryApplier applier) applier.Factory = _factory;
var keySerDes = _factory.BuildKeySerDes<TKey>();
return IExecute<Org.Apache.Kafka.Streams.KeyQueryMetadata>("queryMetadataForKey", arg0, keySerDes.Serialize(null, arg1), arg2);
return _inner.IExecute<Org.Apache.Kafka.Streams.KeyQueryMetadata>("queryMetadataForKey", arg0, keySerDes.Serialize(null, arg1), arg2);
}

/// <summary>
Expand All @@ -123,8 +142,8 @@ public Org.Apache.Kafka.Streams.KeyQueryMetadata QueryMetadataForKey<TKey>(strin
public TKNetManagedStore Store<TKNetManagedStore, TStore>(Org.Apache.Kafka.Streams.StoreQueryParameters<TStore> arg0)
where TKNetManagedStore : KNetManagedStore<TStore>, IGenericSerDesFactoryApplier, new()
{
TKNetManagedStore store = new TKNetManagedStore();
var substore = Store<TStore>(arg0);
TKNetManagedStore store = new();
var substore = _inner.Store<TStore>(arg0);
if (store is IKNetManagedStore<TStore> knetManagedStore)
{
knetManagedStore.SetData(_factory, substore);
Expand All @@ -144,8 +163,8 @@ public TKNetManagedStore Store<TKNetManagedStore, TStore>(string storageId, KNet
where TKNetManagedStore : KNetManagedStore<TStore>, IGenericSerDesFactoryApplier, new()
{
var sqp = Org.Apache.Kafka.Streams.StoreQueryParameters<TStore>.FromNameAndType(storageId, storeType.Store);
TKNetManagedStore store = new TKNetManagedStore();
var substore = Store<TStore>(sqp);
TKNetManagedStore store = new();
var substore = _inner.Store<TStore>(sqp);
if (store is IKNetManagedStore<TStore> knetManagedStore)
{
knetManagedStore.SetData(_factory, substore);
Expand All @@ -160,41 +179,41 @@ public TKNetManagedStore Store<TKNetManagedStore, TStore>(string storageId, KNet
/// <returns><see cref="string"/></returns>
public string RemoveStreamThread(TimeSpan arg0)
{
var res = base.RemoveStreamThread(arg0);
var res = _inner.RemoveStreamThread(arg0);
return res.IsPresent() ? res.Get() : null;
}

/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/KafkaStreams.html#state--"/>
/// </summary>
/// <returns><see cref="Org.Apache.Kafka.Streams.KafkaStreams.State"/></returns>
public new State State => base.StateMethod();
public Org.Apache.Kafka.Streams.KafkaStreams.State State => _inner.StateMethod();

/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/KafkaStreams.html#setGlobalStateRestoreListener-org.apache.kafka.streams.processor.StateRestoreListener-"/>
/// </summary>
/// <param name="arg0"><see cref="Org.Apache.Kafka.Streams.Processor.StateRestoreListener"/></param>
public new void SetGlobalStateRestoreListener(Org.Apache.Kafka.Streams.Processor.StateRestoreListener arg0)
public void SetGlobalStateRestoreListener(Org.Apache.Kafka.Streams.Processor.StateRestoreListener arg0)
{
base.SetGlobalStateRestoreListener(arg0);
_inner.SetGlobalStateRestoreListener(arg0);
_stateRestoreListener = arg0;
}
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/KafkaStreams.html#setStateListener-org.apache.kafka.streams.KafkaStreams.StateListener-"/>
/// </summary>
/// <param name="arg0"><see cref="Org.Apache.Kafka.Streams.KafkaStreams.StateListener"/></param>
public new void SetStateListener(Org.Apache.Kafka.Streams.KafkaStreams.StateListener arg0)
public void SetStateListener(Org.Apache.Kafka.Streams.KafkaStreams.StateListener arg0)
{
base.SetStateListener(arg0);
_inner.SetStateListener(arg0);
_stateListener = arg0;
}
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/KafkaStreams.html#setUncaughtExceptionHandler-org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler-"/>
/// </summary>
/// <param name="arg0"><see cref="Org.Apache.Kafka.Streams.Errors.StreamsUncaughtExceptionHandler"/></param>
public new void SetUncaughtExceptionHandler(Org.Apache.Kafka.Streams.Errors.StreamsUncaughtExceptionHandler arg0)
public void SetUncaughtExceptionHandler(Org.Apache.Kafka.Streams.Errors.StreamsUncaughtExceptionHandler arg0)
{
base.SetUncaughtExceptionHandler(arg0);
_inner.SetUncaughtExceptionHandler(arg0);
_streamsUncaughtExceptionHandler = arg0;
}

Expand Down
26 changes: 23 additions & 3 deletions src/net/KNet/Specific/Streams/KNetTopologyConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
*/

using MASES.KNet.Serialization;
using System;

namespace MASES.KNet.Streams
{
/// <summary>
/// KNet implementation of <see cref="Org.Apache.Kafka.Streams.TopologyConfig"/>
/// </summary>
public class KNetTopologyConfig : Org.Apache.Kafka.Streams.TopologyConfig, IGenericSerDesFactoryApplier
public class KNetTopologyConfig : IGenericSerDesFactoryApplier
{
readonly Org.Apache.Kafka.Streams.TopologyConfig _inner;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }
#region Constructors
Expand All @@ -35,19 +37,37 @@ public class KNetTopologyConfig : Org.Apache.Kafka.Streams.TopologyConfig, IGene
/// <param name="arg1"><see cref="StreamsConfigBuilder"/></param>
/// <param name="arg2"><see cref="Java.Util.Properties"/></param>
public KNetTopologyConfig(string arg0, StreamsConfigBuilder arg1, Java.Util.Properties arg2)
: base(arg0, arg1, arg2)
{
_inner = new Org.Apache.Kafka.Streams.TopologyConfig(arg0, PrepareProperties(arg1), arg2);
_factory = arg1;
}
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/TopologyConfig.html#org.apache.kafka.streams.TopologyConfig(org.apache.kafka.streams.StreamsConfig)"/>
/// </summary>
/// <param name="arg0"><see cref="StreamsConfigBuilder"/></param>
public KNetTopologyConfig(StreamsConfigBuilder arg0)
: base(arg0)
{
_inner = new Org.Apache.Kafka.Streams.TopologyConfig(PrepareProperties(arg0));
_factory = arg0;
}
#endregion

/// <summary>
/// Converter from <see cref="KNetTopologyConfig"/> to <see cref="Org.Apache.Kafka.Streams.TopologyConfig"/>
/// </summary>
public static implicit operator Org.Apache.Kafka.Streams.TopologyConfig(KNetTopologyConfig t) => t._inner;
/// <summary>
/// If set, this <see cref="Func{T, TResult}"/> will be called from <see cref="PrepareProperties(StreamsConfigBuilder)"/>
/// </summary>
public static Func<Java.Util.Properties, StreamsConfigBuilder> OverrideProperties { get; set; }
/// <summary>
/// Override this method to check and modify the <see cref="Java.Util.Properties"/> returned to underlying <see cref="Org.Apache.Kafka.Streams.KafkaStreams"/>
/// </summary>
/// <param name="builder"><see cref="StreamsConfigBuilder"/> to use to return <see cref="Java.Util.Properties"/></param>
/// <returns><see cref="Java.Util.Properties"/> used from underlying <see cref="Org.Apache.Kafka.Streams.KafkaStreams"/></returns>
protected virtual Java.Util.Properties PrepareProperties(StreamsConfigBuilder builder)
{
return OverrideProperties != null ? OverrideProperties(builder) : builder;
}
}
}
Loading