From 430afa5c9857c18c3d44416d9795c7585fa9057b Mon Sep 17 00:00:00 2001 From: "Brian S. O'Neill" Date: Fri, 4 Oct 2024 11:55:13 -0700 Subject: [PATCH] Add support for automatic remote object disposal. --- CHANGELOG.md | 1 + .../java/org/cojen/dirmi/AutoDispose.java | 35 +++ src/main/java/org/cojen/dirmi/Disposer.java | 1 + .../org/cojen/dirmi/core/AutoDisposer.java | 166 +++++++++++++ .../org/cojen/dirmi/core/ClientSession.java | 22 +- .../org/cojen/dirmi/core/CoreSession.java | 112 +++++++-- .../cojen/dirmi/core/CoreSkeletonSupport.java | 5 + .../org/cojen/dirmi/core/CoreStubSupport.java | 21 +- .../java/org/cojen/dirmi/core/CoreUtils.java | 7 +- .../java/org/cojen/dirmi/core/CounterMap.java | 122 ++++++++++ .../cojen/dirmi/core/DisposedStubSupport.java | 20 +- .../cojen/dirmi/core/MethodIdWriterMaker.java | 2 +- .../java/org/cojen/dirmi/core/RemoteInfo.java | 22 +- .../org/cojen/dirmi/core/RemoteMethod.java | 7 + .../dirmi/core/RestorableStubSupport.java | 31 +-- .../org/cojen/dirmi/core/SkeletonMaker.java | 4 + .../org/cojen/dirmi/core/SkeletonSupport.java | 6 + src/main/java/org/cojen/dirmi/core/Stub.java | 63 +---- .../org/cojen/dirmi/core/StubFactory.java | 8 +- .../org/cojen/dirmi/core/StubInvoker.java | 217 +++++++++++++++++ .../java/org/cojen/dirmi/core/StubMaker.java | 118 +++++++++- .../java/org/cojen/dirmi/core/StubMap.java | 52 ++++ .../org/cojen/dirmi/core/StubSupport.java | 18 +- .../org/cojen/dirmi/core/StubWrapper.java | 71 ++++++ .../java/org/cojen/dirmi/AutoDisposeTest.java | 222 ++++++++++++++++++ .../org/cojen/dirmi/RemoteObjectTest.java | 2 +- .../dirmi/core/MismatchedInterfaceTest.java | 2 +- .../org/cojen/dirmi/core/RemoteInfoTest.java | 9 +- 28 files changed, 1209 insertions(+), 157 deletions(-) create mode 100644 src/main/java/org/cojen/dirmi/AutoDispose.java create mode 100644 src/main/java/org/cojen/dirmi/core/AutoDisposer.java create mode 100644 src/main/java/org/cojen/dirmi/core/CounterMap.java create mode 100644 src/main/java/org/cojen/dirmi/core/StubInvoker.java create mode 100644 src/main/java/org/cojen/dirmi/core/StubMap.java create mode 100644 src/main/java/org/cojen/dirmi/core/StubWrapper.java create mode 100644 src/test/java/org/cojen/dirmi/AutoDisposeTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bc72912..638c58c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Changelog v2.4.0 ------ * Added pipe methods for efficiently encoding and decoding complex objects. +* Added support for automatic remote object disposal. * Optimize reading and writing primitive arrays. v2.3.3 (2024-04-10) diff --git a/src/main/java/org/cojen/dirmi/AutoDispose.java b/src/main/java/org/cojen/dirmi/AutoDispose.java new file mode 100644 index 00000000..d69a88c5 --- /dev/null +++ b/src/main/java/org/cojen/dirmi/AutoDispose.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Cojen.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cojen.dirmi; + +import java.lang.annotation.*; + +/** + * Designates a remote interface as supporting automatic disposal when the client-side object + * is reclaimed by the garbage collector. Automatic disposal doesn't work for the {@link + * Session#root root} object, and it doesn't work when a reference cycle is formed between + * client-side and remote-side objects. + * + * @author Brian S. O'Neill + * @see Disposer + * @see Session#dispose + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface AutoDispose { +} diff --git a/src/main/java/org/cojen/dirmi/Disposer.java b/src/main/java/org/cojen/dirmi/Disposer.java index 0ba4e873..630144cc 100644 --- a/src/main/java/org/cojen/dirmi/Disposer.java +++ b/src/main/java/org/cojen/dirmi/Disposer.java @@ -35,6 +35,7 @@ * @author Brian S O'Neill * @see SessionAware * @see Session#dispose + * @see AutoDispose * @see DisposedException */ @Documented diff --git a/src/main/java/org/cojen/dirmi/core/AutoDisposer.java b/src/main/java/org/cojen/dirmi/core/AutoDisposer.java new file mode 100644 index 00000000..209c1b63 --- /dev/null +++ b/src/main/java/org/cojen/dirmi/core/AutoDisposer.java @@ -0,0 +1,166 @@ +/* + * Copyright 2024 Cojen.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cojen.dirmi.core; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; + +/** + * + * + * @author Brian S. O'Neill + */ +public final class AutoDisposer extends ReferenceQueue implements Runnable { + private static AutoDisposer cInstance; + + private static AutoDisposer access() { + AutoDisposer instance; + synchronized (AutoDisposer.class) { + instance = cInstance; + if (instance == null) { + cInstance = instance = new AutoDisposer(); + } + instance.mRegistered++; + } + return instance; + } + + private long mRegistered; + + private AutoDisposer() { + Thread t = new Thread(this, getClass().getSimpleName()); + t.setDaemon(true); + t.start(); + } + + public void run() { + // Infinite timeout. + long timeout = 0; + + while (true) { + Reference ref; + try { + ref = remove(timeout); + } catch (InterruptedException e) { + // Clear the interrupted status. + Thread.interrupted(); + ref = null; + } + + if (ref == null) { + // Timed out waiting, which implies that the timeout isn't infinite. + synchronized (AutoDisposer.class) { + if (mRegistered == 0) { + // Still idle, so exit. + if (cInstance == this) { + cInstance = null; + } + return; + } + } + continue; + } + + long removed = 1; + + while (true) { + if (ref instanceof BasicRef br) { + br.removed(); + } + ref = poll(); + if (ref == null) { + break; + } + removed++; + } + + synchronized (AutoDisposer.class) { + if ((mRegistered -= removed) == 0) { + // If still idle after one minute, then exit. + timeout = 60_000; + } else { + // Queue still has registered refs, so use an infinite timeout. There's no + // point in waking up this thread unless it has something to do. + timeout = 0; + } + } + } + } + + public static sealed class BasicRef extends WeakReference { + private final StubInvoker mInvoker; + + public BasicRef(StubWrapper wrapper, StubInvoker invoker) { + super(wrapper, access()); + mInvoker = invoker; + } + + void removed() { + if (mInvoker.support().session() instanceof CoreSession session) { + // Note: Although the pipe isn't flushed immediately, this operation might + // still block. If it does, then no dispose messages will be sent for any + // sessions until the blocked one automatically disconnects. This can be + // prevented by running a task in a separate thread, but that would end up + // creating a new temporary object. Ideally, the task option should only be + // used when the pipe's output buffer is full. + session.stubDisposeAndNotify(mInvoker, null, false); + } + } + } + + public static final class CountedRef extends BasicRef { + private static final VarHandle cRefCountHandle; + + static { + try { + var lookup = MethodHandles.lookup(); + cRefCountHandle = lookup.findVarHandle(CountedRef.class, "mRefCount", long.class); + } catch (Throwable e) { + throw CoreUtils.rethrow(e); + } + } + + private long mRefCount; + + public CountedRef(StubWrapper wrapper, StubInvoker invoker) { + super(wrapper, invoker); + mRefCount = 1; + } + + @Override + void removed() { + decRefCount(1); + } + + public void incRefCount() { + cRefCountHandle.getAndAdd(this, 1L); + } + + /** + * Note: Calling this method might block if a notification needs to be written. + */ + public void decRefCount(long amount) { + if (((long) cRefCountHandle.getAndAdd(this, -amount)) <= amount) { + super.removed(); + } + } + } +} diff --git a/src/main/java/org/cojen/dirmi/core/ClientSession.java b/src/main/java/org/cojen/dirmi/core/ClientSession.java index d61d5673..2a5f1464 100644 --- a/src/main/java/org/cojen/dirmi/core/ClientSession.java +++ b/src/main/java/org/cojen/dirmi/core/ClientSession.java @@ -95,11 +95,11 @@ void init(long serverId, Class rootType, byte[] bname, mStubFactoriesByClass.putIfAbsent(rootType, factory); - Stub root = factory.newStub(rootId, stubSupport()); + StubInvoker root = factory.newStub(rootId, stubSupport()); mStubs.put(root); - mRoot = (R) root; + mRoot = (R) root.init(); - Stub.setRootOrigin(root); + StubInvoker.setRootOrigin(root); } @Override @@ -133,8 +133,8 @@ boolean close(int reason, CorePipe controlPipe) { } private boolean isRootDisposed() { - return mRoot instanceof Stub stub - && Stub.cSupportHandle.getAcquire(stub) instanceof DisposedStubSupport; + return mRoot instanceof StubInvoker stub + && StubInvoker.cSupportHandle.getAcquire(stub) instanceof DisposedStubSupport; } @SuppressWarnings("unchecked") @@ -161,7 +161,7 @@ private boolean reconnectAttempt(Object result) { mEngine.changeIdentity(this, newSession.id); - var newRoot = (Stub) newSession.mRoot; + var newRoot = (StubInvoker) newSession.mRoot; Object removed = newSession.mStubs.remove(newRoot); assert newRoot == removed; assert newSession.mStubs.size() == 0; @@ -176,7 +176,7 @@ private boolean reconnectAttempt(Object result) { cServerSessionIdHandle.setRelease(this, newSession.mServerSessionId); - var root = (Stub) mRoot; + var root = (StubInvoker) mRoot; mStubs.changeIdentity(root, newRoot.id); Map typeMap; @@ -219,7 +219,7 @@ private boolean reconnectAttempt(Object result) { return false; } - Stub.cSupportHandle.setRelease(mRoot, newSupport); + StubInvoker.cSupportHandle.setRelease(mRoot, newSupport); // For all restorable stubs, update the MethodIdWriter and set a support object that // allows them to restore on demand. @@ -249,7 +249,7 @@ private boolean reconnectAttempt(Object result) { } if (writer != null) { - Stub.cWriterHandle.setRelease(stub, writer); + StubInvoker.cWriterHandle.setRelease(stub, writer); } else { // Although no remote methods changed, the current StubFactory is preferred. if (type == null) { @@ -257,12 +257,12 @@ private boolean reconnectAttempt(Object result) { } StubFactory factory = mStubFactoriesByClass.get(type); if (factory != null) { - Stub.cWriterHandle.setRelease(stub, factory); + StubInvoker.cWriterHandle.setRelease(stub, factory); } } if (stub != mRoot) { - Stub.cSupportHandle.setRelease(stub, restorableSupport); + StubInvoker.cSupportHandle.setRelease(stub, restorableSupport); } }); diff --git a/src/main/java/org/cojen/dirmi/core/CoreSession.java b/src/main/java/org/cojen/dirmi/core/CoreSession.java index edcae32f..598a60df 100644 --- a/src/main/java/org/cojen/dirmi/core/CoreSession.java +++ b/src/main/java/org/cojen/dirmi/core/CoreSession.java @@ -61,14 +61,14 @@ abstract class CoreSession extends Item implements Session { static final int C_PING = 1, C_PONG = 2, C_MESSAGE = 3, C_KNOWN_TYPE = 4, C_REQUEST_CONNECTION = 5, C_REQUEST_INFO = 6, C_REQUEST_INFO_TERM = 7, C_INFO_FOUND = 8, C_INFO_NOT_FOUND = 9, - C_SKELETON_DISPOSE = 10, C_STUB_DISPOSE = 11; + C_SKELETON_DISPOSE = 10, C_STUB_DISPOSE = 11, C_ACKNOWLEDGED_S = 12, C_ACKNOWLEDGED_L = 13; static final int R_CLOSED = 1, R_DISCONNECTED = 2, R_PING_FAILURE = 4, R_CONTROL_FAILURE = 8; private static final int SPIN_LIMIT; private static final VarHandle cStubSupportHandle, - cControlPipeHandle, cConLockHandle, cPipeClockHandle; + cControlPipeHandle, cConLockHandle, cPipeClockHandle, cAcknowledgedMapHandle; static { SPIN_LIMIT = Runtime.getRuntime().availableProcessors() > 1 ? 1 << 10 : 1; @@ -81,6 +81,8 @@ abstract class CoreSession extends Item implements Session { (CoreSession.class, "mControlPipe", CorePipe.class); cConLockHandle = lookup.findVarHandle(CoreSession.class, "mConLock", int.class); cPipeClockHandle = lookup.findVarHandle(CorePipe.class, "mClock", int.class); + cAcknowledgedMapHandle = lookup.findVarHandle + (CoreSession.class, "mAcknowledgedMap", CounterMap.class); } catch (Throwable e) { throw CoreUtils.rethrow(e); } @@ -88,7 +90,7 @@ abstract class CoreSession extends Item implements Session { final Engine mEngine; final Settings mSettings; - final ItemMap mStubs; + final StubMap mStubs; final ItemMap mStubFactories; final ConcurrentHashMap, StubFactory> mStubFactoriesByClass; final SkeletonMap mSkeletons; @@ -122,11 +124,13 @@ abstract class CoreSession extends Item implements Session { // Used when reconnecting. volatile WaitMap mTypeWaitMap; + private volatile CounterMap mAcknowledgedMap; + CoreSession(Engine engine, Settings settings) { super(IdGenerator.next()); mEngine = engine; mSettings = settings; - mStubs = new ItemMap(); + mStubs = new StubMap(); mStubFactories = new ItemMap(); mStubFactoriesByClass = new ConcurrentHashMap<>(); mSkeletons = new SkeletonMap(this); @@ -610,18 +614,18 @@ boolean close(int reason, CorePipe controlPipe) { mStubs.forEachToRemove(stub -> { if (stub.isRestorable() || stub == root()) { // Keep the restorable stubs and tag them with the new StubSupport. - Stub.cSupportHandle.setRelease(stub, newSupport); + StubInvoker.cSupportHandle.setRelease(stub, newSupport); return false; } - Stub.cSupportHandle.setRelease(stub, support); + StubInvoker.cSupportHandle.setRelease(stub, support); return true; }); R root = root(); - if (root instanceof Stub) { + if (root instanceof StubInvoker) { // In case the root origin has changed, replace it with the standard root // origin. It needs to restored specially upon reconnect anyhow. - Stub.setRootOrigin((Stub) root); + StubInvoker.setRootOrigin((StubInvoker) root); } } @@ -793,6 +797,12 @@ void startTasks() throws IOException { // Send the reply command. See the serverDispose method. mEngine.executeTask(() -> trySendCommandAndId(C_SKELETON_DISPOSE, id)); break; + case C_ACKNOWLEDGED_S: + stubDecRefCount(pipe.readLong(), pipe.readShort() & 0xffffL); + break; + case C_ACKNOWLEDGED_L: + stubDecRefCount(pipe.readLong(), pipe.readLong()); + break; default: throw new IllegalStateException("Unknown command: " + command); } @@ -838,6 +848,27 @@ private void sendByte(int which) throws IOException { } } + /** + * Note: Doesn't flush the pipe. + */ + private void sendAcknowledgement(long id, long amount) throws IOException { + mControlLock.lock(); + try { + CorePipe pipe = controlPipe(); + if (amount < 65536) { + pipe.write(C_ACKNOWLEDGED_S); + pipe.writeLong(id); + pipe.writeShort((short) amount); + } else { + pipe.write(C_ACKNOWLEDGED_L); + pipe.writeLong(id); + pipe.writeLong(amount); + } + } finally { + mControlLock.unlock(); + } + } + void sendInfoRequest(Class type) throws IOException { mControlLock.lock(); try { @@ -1002,6 +1033,13 @@ boolean doRun(CoreSession session) { cPipeClockHandle.setVolatile(pipe, 1); try { + CounterMap map = session.mAcknowledgedMap; + if (map != null) { + for (CounterMap.Entry e = map.drain(); e != null; e = e.next) { + session.sendAcknowledgement(e.id, e.counter); + } + } + session.sendByte(C_PING); } catch (IOException e) { session.close(R_CONTROL_FAILURE, pipe); @@ -1079,7 +1117,7 @@ final Object objectFor(long id) throws IOException { final Object objectFor(long id, long typeId) throws IOException { try { StubFactory factory = mStubFactories.get(typeId); - return mStubs.putIfAbsent(factory.newStub(id, stubSupport())); + return mStubs.putAndSelectStub(factory.newStub(id, stubSupport())); } catch (NoSuchObjectException e) { e.remoteAddress(remoteAddress()); throw e; @@ -1108,16 +1146,22 @@ final Object objectFor(long id, long typeId, RemoteInfo info) { mStubFactoriesByClass.putIfAbsent(type, factory); } - return mStubs.putIfAbsent(factory.newStub(id, stubSupport())); + return mStubs.putAndSelectStub(factory.newStub(id, stubSupport())); } private void trySendCommandAndId(int command, long id) { + trySendCommandAndId(command, id, true); + } + + private void trySendCommandAndId(int command, long id, boolean flush) { mControlLock.lock(); try { CorePipe pipe = controlPipe(); pipe.write(command); pipe.writeLong(id); - pipe.flush(); + if (flush) { + pipe.flush(); + } } catch (IOException e) { // Ignore. } finally { @@ -1125,7 +1169,7 @@ private void trySendCommandAndId(int command, long id) { } } - final Stub newDisconnectedStub(Class type, Throwable cause) { + final StubInvoker newDisconnectedStub(Class type, Throwable cause) { StubFactory factory = mStubFactoriesByClass.get(type); if (factory == null) { @@ -1135,7 +1179,8 @@ final Stub newDisconnectedStub(Class type, Throwable cause) { } long id = IdGenerator.nextNegative(); - Stub stub = factory.newStub(id, DisposedStubSupport.newLenientRestorable(this, cause)); + StubInvoker stub = factory.newStub + (id, DisposedStubSupport.newLenientRestorable(this, cause)); mStubs.put(stub); return stub; } @@ -1148,13 +1193,16 @@ final void stubSupport(CoreStubSupport support) { cStubSupportHandle.setRelease(this, support); } - final StubSupport stubDispose(Stub stub) { + final StubSupport stubDispose(StubInvoker stub) { mStubs.remove(stub); return DisposedStubSupport.EXPLICIT; } + /** + * @param message optional + */ final boolean stubDispose(long id, String message) { - Stub removed = mStubs.remove(id); + StubInvoker removed = mStubs.remove(id); if (removed == null) { return false; @@ -1167,23 +1215,37 @@ final boolean stubDispose(long id, String message) { disposed = new DisposedStubSupport(message); } - Stub.cSupportHandle.setRelease(removed, disposed); + StubInvoker.cSupportHandle.setRelease(removed, disposed); return true; } - final boolean stubDisposeAndNotify(Stub stub, String message) { + /** + * @param message optional + */ + final boolean stubDisposeAndNotify(Stub stub, String message, boolean flush) { long id = stub.id; if (stubDispose(id, message)) { // Notify the remote side to dispose the associated skeleton. If the command cannot // be sent, the skeleton will be disposed anyhow due to disconnect. - trySendCommandAndId(C_SKELETON_DISPOSE, id); + trySendCommandAndId(C_SKELETON_DISPOSE, id, flush); return true; } else { return false; } } + private void stubDecRefCount(long id, long amount) { + try { + StubInvoker invoker = mStubs.get(id).invoker(); + if (invoker instanceof StubInvoker.WithCountedRef counted) { + // Launch a task to prevent blocking the control thread. + mEngine.tryExecuteTask(() -> counted.decRefCount(amount)); + } + } catch (NoSuchObjectException e) { + } + } + final boolean serverDispose(Object server) { Skeleton skeleton; if (server == null || (skeleton = mSkeletons.skeletonFor(server, false)) == null) { @@ -1348,6 +1410,20 @@ private void detached(Skeleton skeleton, boolean newTask) { } } + void acknowledged(Skeleton skeleton) { + CounterMap map = mAcknowledgedMap; + + if (map == null) { + map = new CounterMap(); + var existing = (CounterMap) cAcknowledgedMapHandle.compareAndExchange(this, null, map); + if (existing != null) { + map = existing; + } + } + + map.increment(skeleton.id); + } + final void attachNotify(SessionAware sa) { // Special handling is required for the ServerSession root, which is why the state is // checked first. The skeleton is needed early, and it's assigned by the ServerSession diff --git a/src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java b/src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java index c908aae1..b5775165 100644 --- a/src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java +++ b/src/main/java/org/cojen/dirmi/core/CoreSkeletonSupport.java @@ -55,6 +55,11 @@ public void writeBrokenSkeletonAlias(Pipe pipe, mSession.writeBrokenSkeletonAlias((CorePipe) pipe, type, aliasId, exception); } + @Override + public void acknowledged(Skeleton skeleton) { + mSession.acknowledged(skeleton); + } + @Override public void dispose(Skeleton skeleton) { mSession.removeSkeleton(skeleton); diff --git a/src/main/java/org/cojen/dirmi/core/CoreStubSupport.java b/src/main/java/org/cojen/dirmi/core/CoreStubSupport.java index 281f6b3f..b64543e8 100644 --- a/src/main/java/org/cojen/dirmi/core/CoreStubSupport.java +++ b/src/main/java/org/cojen/dirmi/core/CoreStubSupport.java @@ -40,7 +40,9 @@ public CoreSession session() { } @Override - public Pipe connect(Stub stub, Class remoteFailureException) throws T { + public Pipe connect(StubInvoker stub, Class remoteFailureException) + throws T + { Pipe pipe = mLocalPipe.get(); if (pipe != null) { return pipe; @@ -53,7 +55,8 @@ public Pipe connect(Stub stub, Class remoteFailureExcep } @Override - public Pipe connectUnbatched(Stub stub, Class remoteFailureException) + public Pipe connectUnbatched(StubInvoker stub, + Class remoteFailureException) throws T { try { @@ -64,7 +67,7 @@ public Pipe connectUnbatched(Stub stub, Class remoteFai } @Override - public Pipe tryConnect(Stub stub, Class remoteFailureException) + public Pipe tryConnect(StubInvoker stub, Class remoteFailureException) throws T { Pipe pipe = mLocalPipe.get(); @@ -81,7 +84,7 @@ public Pipe tryConnect(Stub stub, Class remoteFailureEx } @Override - public Pipe tryConnectUnbatched(Stub stub, + public Pipe tryConnectUnbatched(StubInvoker stub, Class remoteFailureException) throws T { @@ -95,8 +98,8 @@ public Pipe tryConnectUnbatched(Stub stub, } @Override - public boolean validate(Stub stub, Pipe pipe) { - if (Stub.cSupportHandle.getAcquire(stub) == this) { + public boolean validate(StubInvoker stub, Pipe pipe) { + if (StubInvoker.cSupportHandle.getAcquire(stub) == this) { return true; } else { if (pipe != null) { @@ -133,7 +136,7 @@ public Object newAliasStub(Class remoteFailureException } @Override - public Stub newDisconnectedStub(Class type, Throwable cause) { + public StubInvoker newDisconnectedStub(Class type, Throwable cause) { return mSession.newDisconnectedStub(type, cause); } @@ -185,7 +188,7 @@ public T failed(Class remoteFailureException, } @Override - public void dispose(Stub stub) { - Stub.cSupportHandle.setRelease(stub, mSession.stubDispose(stub)); + public void dispose(StubInvoker stub) { + StubInvoker.cSupportHandle.setRelease(stub, mSession.stubDispose(stub)); } } diff --git a/src/main/java/org/cojen/dirmi/core/CoreUtils.java b/src/main/java/org/cojen/dirmi/core/CoreUtils.java index 077108b2..8b80e388 100644 --- a/src/main/java/org/cojen/dirmi/core/CoreUtils.java +++ b/src/main/java/org/cojen/dirmi/core/CoreUtils.java @@ -73,7 +73,7 @@ public static Session accessSession(Object obj) { if (!(obj instanceof Stub stub)) { throw new IllegalArgumentException(); } - return ((StubSupport) Stub.cSupportHandle.getAcquire(stub)).session(); + return stub.support().session(); } public static Session currentSession() { @@ -88,9 +88,8 @@ public static boolean dispose(Object obj) { if (!(obj instanceof Stub stub)) { throw new IllegalArgumentException(); } - var support = (StubSupport) Stub.cSupportHandle.getAcquire(stub); - if (support instanceof CoreStubSupport css) { - return css.session().stubDisposeAndNotify(stub, null); + if (stub.support() instanceof CoreStubSupport css) { + return css.session().stubDisposeAndNotify(stub, null, true); } return false; } diff --git a/src/main/java/org/cojen/dirmi/core/CounterMap.java b/src/main/java/org/cojen/dirmi/core/CounterMap.java new file mode 100644 index 00000000..08a59554 --- /dev/null +++ b/src/main/java/org/cojen/dirmi/core/CounterMap.java @@ -0,0 +1,122 @@ +/* + * Copyright 2024 Cojen.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cojen.dirmi.core; + +/** + * Thread-safe map of long identifiers to long counters. + * + * @author Brian S. O'Neill + */ +final class CounterMap { + private Entry[] mEntries; + private int mSize; + + CounterMap() { + mEntries = new Entry[16]; // must be power of 2 + } + + /** + * Remove all the entries into a linked list, or else return null if the map is empty. + */ + synchronized Entry drain() { + if (mSize == 0) { + return null; + } + + Entry[] entries = mEntries; + Entry head = null, tail = null; + + for (int i=0; i> 1)) >= entries.length && size < (1 << 30)) { + // Rehash. + var newEntries = new Entry[entries.length << 1]; + for (int i=0; i Pipe connect(Stub stub, Class remoteFailureException) throws T { + public Pipe connect(StubInvoker stub, Class remoteFailureException) + throws T + { DisposedException ex; if (mCause == null) { ex = new DisposedException(mMessage); @@ -108,14 +109,15 @@ public Pipe connect(Stub stub, Class remoteFailureExcep } @Override - public Pipe connectUnbatched(Stub stub, Class remoteFailureException) + public Pipe connectUnbatched(StubInvoker stub, + Class remoteFailureException) throws T { return connect(stub, remoteFailureException); } @Override - public Pipe tryConnect(Stub stub, Class remoteFailureException) + public Pipe tryConnect(StubInvoker stub, Class remoteFailureException) throws T { // When null is returned, the caller is expected to then call newDisconnectedStub. @@ -123,7 +125,7 @@ public Pipe tryConnect(Stub stub, Class remoteFailureEx } @Override - public Pipe tryConnectUnbatched(Stub stub, + public Pipe tryConnectUnbatched(StubInvoker stub, Class remoteFailureException) throws T { @@ -131,7 +133,7 @@ public Pipe tryConnectUnbatched(Stub stub, } @Override - public boolean validate(Stub stub, Pipe pipe) { + public boolean validate(StubInvoker stub, Pipe pipe) { return true; } @@ -148,7 +150,7 @@ public Object newAliasStub(Class remoteFailureException } @Override - public Stub newDisconnectedStub(Class type, Throwable cause) { + public StubInvoker newDisconnectedStub(Class type, Throwable cause) { return session().newDisconnectedStub(type, cause); } @@ -184,7 +186,7 @@ public T failed(Class remoteFailureException, } @Override - public void dispose(Stub stub) { - Stub.cSupportHandle.setRelease(stub, this); + public void dispose(StubInvoker stub) { + StubInvoker.cSupportHandle.setRelease(stub, this); } } diff --git a/src/main/java/org/cojen/dirmi/core/MethodIdWriterMaker.java b/src/main/java/org/cojen/dirmi/core/MethodIdWriterMaker.java index aac6d3f5..dc53bfc1 100644 --- a/src/main/java/org/cojen/dirmi/core/MethodIdWriterMaker.java +++ b/src/main/java/org/cojen/dirmi/core/MethodIdWriterMaker.java @@ -35,7 +35,7 @@ final class MethodIdWriterMaker { private static final SoftCache cCache = new SoftCache<>(); /** - * @param original server-side RemoteInfo that a Stub is coded to use + * @param original server-side RemoteInfo that a StubInvoker is coded to use * @param current server-side RemoteInfo provided by the remote side * @param force when false, return null if no mapping is needed */ diff --git a/src/main/java/org/cojen/dirmi/core/RemoteInfo.java b/src/main/java/org/cojen/dirmi/core/RemoteInfo.java index 6da243c5..ec655532 100644 --- a/src/main/java/org/cojen/dirmi/core/RemoteInfo.java +++ b/src/main/java/org/cojen/dirmi/core/RemoteInfo.java @@ -33,6 +33,7 @@ import java.util.TreeMap; import java.util.TreeSet; +import org.cojen.dirmi.AutoDispose; import org.cojen.dirmi.Pipe; import org.cojen.dirmi.Remote; import org.cojen.dirmi.RemoteException; @@ -44,7 +45,7 @@ * @author Brian S O'Neill */ final class RemoteInfo { - private static final int F_UNDECLARED_EX = 1; + private static final int F_UNDECLARED_EX = 1, F_AUTO_DISPOSE = 2; private static final SoftCache, RemoteInfo> cCache = new SoftCache<>(); private static final CanonicalSet cCanonical = new CanonicalSet<>(); @@ -61,9 +62,11 @@ public static RemoteInfo examine(Class type) { * @param stub non-null stub to examine * @throws IllegalArgumentException if stub type is malformed */ - public static RemoteInfo examineStub(Object stub) { - RemoteExaminer.remoteType(stub); // verify that object is a stub - return examine(stub.getClass(), false); + public static RemoteInfo examineStub(Stub stub) { + // Only the invoker is required to have all the annotations, so examine that. + StubInvoker invoker = stub.invoker(); + RemoteExaminer.remoteType(invoker); // perform basic validation + return examine(invoker.getClass(), false); } private static RemoteInfo examine(Class type, boolean strict) { @@ -117,6 +120,10 @@ private static RemoteInfo doExamine(Class type, boolean strict) { } } + if (type.isAnnotationPresent(AutoDispose.class)) { + flags |= F_AUTO_DISPOSE; + } + Map methodMap = new TreeMap<>(); SortedSet methodSet = null; @@ -222,6 +229,13 @@ boolean isRemoteFailureExceptionUndeclared() { return (mFlags & F_UNDECLARED_EX) != 0; } + /** + * @see AutoDispose + */ + boolean isAutoDispose() { + return (mFlags & F_AUTO_DISPOSE) != 0; + } + /** * Returns the name of the remote interface described by this RemoteInfo, which is the same * as the interface name. diff --git a/src/main/java/org/cojen/dirmi/core/RemoteMethod.java b/src/main/java/org/cojen/dirmi/core/RemoteMethod.java index 9d77c1b4..647448e8 100644 --- a/src/main/java/org/cojen/dirmi/core/RemoteMethod.java +++ b/src/main/java/org/cojen/dirmi/core/RemoteMethod.java @@ -388,6 +388,13 @@ boolean isUnimplemented() { return (mFlags & F_UNIMPLEMENTED) != 0; } + /** + * Returns true if this method doesn't block waiting for a reply from the pipe. + */ + boolean isUnacknowledged() { + return (mFlags & (F_BATCHED | F_PIPED | F_NOREPLY)) != 0; + } + /** * Returns the name of this method. */ diff --git a/src/main/java/org/cojen/dirmi/core/RestorableStubSupport.java b/src/main/java/org/cojen/dirmi/core/RestorableStubSupport.java index 7bfbdf70..f4938f18 100644 --- a/src/main/java/org/cojen/dirmi/core/RestorableStubSupport.java +++ b/src/main/java/org/cojen/dirmi/core/RestorableStubSupport.java @@ -33,7 +33,7 @@ * * @author Brian S O'Neill */ -final class RestorableStubSupport extends ConcurrentHashMap +final class RestorableStubSupport extends ConcurrentHashMap implements StubSupport { private final CoreStubSupport mNewSupport; @@ -53,28 +53,31 @@ public void appendInfo(StringBuilder b) { } @Override - public Pipe connect(Stub stub, Class remoteFailureException) throws T { + public Pipe connect(StubInvoker stub, Class remoteFailureException) + throws T + { restore(stub, remoteFailureException); // Returning null is fine because the stub must immediately call validate. return null; } @Override - public Pipe connectUnbatched(Stub stub, Class remoteFailureException) + public Pipe connectUnbatched(StubInvoker stub, + Class remoteFailureException) throws T { return connect(stub, remoteFailureException); } @Override - public Pipe tryConnect(Stub stub, Class remoteFailureException) + public Pipe tryConnect(StubInvoker stub, Class remoteFailureException) throws T { return connect(stub, remoteFailureException); } @Override - public Pipe tryConnectUnbatched(Stub stub, + public Pipe tryConnectUnbatched(StubInvoker stub, Class remoteFailureException) throws T { @@ -82,7 +85,7 @@ public Pipe tryConnectUnbatched(Stub stub, } @Override - public boolean validate(Stub stub, Pipe pipe) { + public boolean validate(StubInvoker stub, Pipe pipe) { // Always return false, forcing the stub to obtain the restored support instance. return false; } @@ -101,7 +104,7 @@ public Object newAliasStub(Class remoteFailureException } @Override - public Stub newDisconnectedStub(Class type, Throwable cause) { + public StubInvoker newDisconnectedStub(Class type, Throwable cause) { throw new IllegalStateException(); } @@ -138,12 +141,12 @@ public T failed(Class remoteFailureException, } @Override - public void dispose(Stub stub) { + public void dispose(StubInvoker stub) { throw new IllegalStateException(); } @SuppressWarnings("unchecked") - private void restore(Stub stub, Class remoteFailureException) + private void restore(StubInvoker stub, Class remoteFailureException) throws T { // Use a latch in order for only one thread to attempt the stub restore. Other threads @@ -161,7 +164,7 @@ private void restore(Stub stub, Class remoteFailureExce } catch (InterruptedException e) { throw CoreUtils.remoteException(this, remoteFailureException, e); } - newSupport = (StubSupport) Stub.cSupportHandle.getAcquire(stub); + newSupport = (StubSupport) StubInvoker.cSupportHandle.getAcquire(stub); if (newSupport == this) { // The restore by another thread was aborted, so try again. continue; @@ -169,14 +172,14 @@ private void restore(Stub stub, Class remoteFailureExce return; } - var origin = (MethodHandle) Stub.cOriginHandle.getAcquire(stub); + var origin = (MethodHandle) StubInvoker.cOriginHandle.getAcquire(stub); try { - var newStub = (Stub) origin.invoke(); + var newStub = (StubInvoker) origin.invoke(); mNewSupport.session().mStubs.stealIdentity(stub, newStub); - newSupport = (StubSupport) Stub.cSupportHandle.getAcquire(newStub); + newSupport = (StubSupport) StubInvoker.cSupportHandle.getAcquire(newStub); // Use CAS to detect if the stub has called dispose. - var result = (StubSupport) Stub.cSupportHandle + var result = (StubSupport) StubInvoker.cSupportHandle .compareAndExchange(stub, this, newSupport); if (result != newSupport && result instanceof DisposedStubSupport) { // Locally dispose the restored stub. diff --git a/src/main/java/org/cojen/dirmi/core/SkeletonMaker.java b/src/main/java/org/cojen/dirmi/core/SkeletonMaker.java index 57573fb3..891bced5 100644 --- a/src/main/java/org/cojen/dirmi/core/SkeletonMaker.java +++ b/src/main/java/org/cojen/dirmi/core/SkeletonMaker.java @@ -195,6 +195,10 @@ private MethodHandles.Lookup finishSkeleton() { CaseInfo ci = caseMap.get(cases[i]); + if (ci.serverMethod.isUnacknowledged() && mServerInfo.isAutoDispose()) { + supportVar.invoke("acknowledged", mm.this_()); + } + if (!ci.serverMethod.isDisposer()) { mm.return_(ci.invoke(mm, pipeVar, contextVar, supportVar)); } else { diff --git a/src/main/java/org/cojen/dirmi/core/SkeletonSupport.java b/src/main/java/org/cojen/dirmi/core/SkeletonSupport.java index d2c7924b..b2739e55 100644 --- a/src/main/java/org/cojen/dirmi/core/SkeletonSupport.java +++ b/src/main/java/org/cojen/dirmi/core/SkeletonSupport.java @@ -46,6 +46,12 @@ public interface SkeletonSupport { void writeBrokenSkeletonAlias(Pipe pipe, Class type, long aliasId, Throwable exception) throws IOException; + /** + * Immediately called by a method for which RemoteMethod.isUnacknowledged is true and the + * RemoteInfo it belongs to isAutoDispose. + */ + void acknowledged(Skeleton skeleton); + /** * Called by a disposer method when finished executing. This method itself should not throw * any exceptions. diff --git a/src/main/java/org/cojen/dirmi/core/Stub.java b/src/main/java/org/cojen/dirmi/core/Stub.java index 52295598..ce476cd4 100644 --- a/src/main/java/org/cojen/dirmi/core/Stub.java +++ b/src/main/java/org/cojen/dirmi/core/Stub.java @@ -16,10 +16,6 @@ package org.cojen.dirmi.core; -import java.lang.invoke.MethodHandle; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; - import org.cojen.dirmi.Remote; /** @@ -28,61 +24,14 @@ * * @author Brian S O'Neill */ -public class Stub extends Item implements Remote { - static final VarHandle cSupportHandle, cWriterHandle, cOriginHandle; - - private static final MethodHandle cRootOrigin; - - static { - try { - var lookup = MethodHandles.lookup(); - cSupportHandle = lookup.findVarHandle(Stub.class, "support", StubSupport.class); - cWriterHandle = lookup.findVarHandle(Stub.class, "miw", MethodIdWriter.class); - cOriginHandle = lookup.findVarHandle(Stub.class, "origin", MethodHandle.class); - } catch (Throwable e) { - throw CoreUtils.rethrow(e); - } - - cRootOrigin = MethodHandles.constant(Stub.class, null); - } - - /** - * Set the root origin such that isRestorable always returns false. The root must be - * restored specially. - */ - static void setRootOrigin(Stub root) { - cOriginHandle.setRelease(root, cRootOrigin); - } - - protected StubSupport support; - protected MethodIdWriter miw; - - // Is set when this stub has become restorable. - protected MethodHandle origin; - - public Stub(long id, StubSupport support, MethodIdWriter miw) { +public abstract sealed class Stub extends Item implements Remote permits StubInvoker, StubWrapper { + protected Stub(long id) { super(id); - this.support = support; - this.miw = miw; - VarHandle.storeStoreFence(); } - /** - * Returns true if this stub is restorable following a disconnect. - * - * Note: This method must not be public or else it can conflict with a user-specified - * remote method which has the same signature. - * - * @see #setRootOrigin - */ - final boolean isRestorable() { - var origin = (MethodHandle) cOriginHandle.getAcquire(this); - if (origin == null) { - return ((StubSupport) cSupportHandle.getAcquire(this)).isLenientRestorable(); - } else { - return origin != cRootOrigin; - } - } + abstract StubSupport support(); + + abstract StubInvoker invoker(); @Override public String toString() { @@ -99,7 +48,7 @@ public String toString() { b.append(name).append('@').append(Integer.toHexString(System.identityHashCode(this))) .append("{id=").append(id); - support.appendInfo(b); + support().appendInfo(b); return b.append('}').toString(); } diff --git a/src/main/java/org/cojen/dirmi/core/StubFactory.java b/src/main/java/org/cojen/dirmi/core/StubFactory.java index 691edb21..3d907fa8 100644 --- a/src/main/java/org/cojen/dirmi/core/StubFactory.java +++ b/src/main/java/org/cojen/dirmi/core/StubFactory.java @@ -21,9 +21,9 @@ import org.cojen.dirmi.Pipe; /** - * Produces new Stub instances for client-side Remote objects. A Stub instance marshals - * requests to a remote {@link Skeleton} which in turn calls the real method. Any response is - * marshaled back for the Stub to decode. + * Produces new StubInvoker instances for client-side Remote objects. A StubInvoker instance + * marshals requests to a remote {@link Skeleton} which in turn calls the real method. Any + * response is marshaled back for the StubInvoker to decode. * * @author Brian S O'Neill */ @@ -39,7 +39,7 @@ protected StubFactory(long typeId) { * @param id remote object identifier * @param support for invoking remote methods */ - protected abstract Stub newStub(long id, StubSupport support); + protected abstract StubInvoker newStub(long id, StubSupport support); /** * Writes byte methodIds. diff --git a/src/main/java/org/cojen/dirmi/core/StubInvoker.java b/src/main/java/org/cojen/dirmi/core/StubInvoker.java new file mode 100644 index 00000000..4f449f14 --- /dev/null +++ b/src/main/java/org/cojen/dirmi/core/StubInvoker.java @@ -0,0 +1,217 @@ +/* + * Copyright 2011-2022 Cojen.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cojen.dirmi.core; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; + +import org.cojen.dirmi.Remote; + +/** + * Base class for remote stubs which actually invoke the remote methods over the pipe. This + * class must not declare any new public instance methods because they can conflict with + * user-specified remote methods which have the same signature. + * + * @author Brian S. O'Neill + */ +public non-sealed class StubInvoker extends Stub { + static final VarHandle cSupportHandle, cWriterHandle, cOriginHandle; + + private static final MethodHandle cRootOrigin; + + static { + try { + var lookup = MethodHandles.lookup(); + cSupportHandle = lookup.findVarHandle(StubInvoker.class, "support", StubSupport.class); + cWriterHandle = lookup.findVarHandle(StubInvoker.class, "miw", MethodIdWriter.class); + cOriginHandle = lookup.findVarHandle(StubInvoker.class, "origin", MethodHandle.class); + } catch (Throwable e) { + throw CoreUtils.rethrow(e); + } + + cRootOrigin = MethodHandles.constant(StubInvoker.class, null); + } + + /** + * Set the root origin such that isRestorable always returns false. The root must be + * restored specially. + */ + static void setRootOrigin(StubInvoker root) { + cOriginHandle.setRelease(root, cRootOrigin); + } + + protected StubSupport support; + protected MethodIdWriter miw; + + // Is set when this stub has become restorable. + protected MethodHandle origin; + + public StubInvoker(long id, StubSupport support, MethodIdWriter miw) { + super(id); + this.support = support; + this.miw = miw; + VarHandle.storeStoreFence(); + } + + /** + * Is called by StubMap.putAndSelectStub to finish initialization of this stub instance and + * return a selected stub. + * + * Note: This method must not be public or else it can conflict with a user-specified + * remote method which has the same signature. + * + * @return this or a StubWrapper + */ + Stub init() { + return this; + } + + /** + * Is called by StubMap.putAndSelectStub to select this stub instance or wrapper. If null + * is returned, then the wrapper was reclaimed and so this stub instance should be rejected + * entirely. + * + * Note: This method must not be public or else it can conflict with a user-specified + * remote method which has the same signature. + * + * @return this, or a StubWrapper, or null + */ + Stub select() { + return this; + } + + /** + * Note: This method must not be public or else it can conflict with a user-specified + * remote method which has the same signature. + */ + @Override + final StubSupport support() { + return (StubSupport) cSupportHandle.getAcquire(this); + } + + @Override + final StubInvoker invoker() { + return this; + } + + /** + * Returns true if this stub is restorable following a disconnect. + * + * Note: This method must not be public or else it can conflict with a user-specified + * remote method which has the same signature. + * + * @see #setRootOrigin + */ + final boolean isRestorable() { + var origin = (MethodHandle) cOriginHandle.getAcquire(this); + if (origin == null) { + return ((StubSupport) cSupportHandle.getAcquire(this)).isLenientRestorable(); + } else { + return origin != cRootOrigin; + } + } + + /** + * Base class for invokers which support automatic disposal. This class must not declare + * any new public instance methods because they can conflict with user-specified remote + * methods which have the same signature. + */ + private static abstract class WithRef extends StubInvoker { + private StubWrapper.Factory wrapperFactory; + + public WithRef(long id, StubSupport support, MethodIdWriter miw, + StubWrapper.Factory wrapperFactory) + { + super(id, support, miw); + this.wrapperFactory = wrapperFactory; + } + + final StubWrapper initWrapper() { + StubWrapper wrapper = wrapperFactory.newWrapper(this); + wrapperFactory = null; // not needed anymore + return wrapper; + } + } + + /** + * Base class for invokers which support automatic disposal and don't have any methods for + * which "isUnacknowledged" is true. This class must not declare any new public instance + * methods because they can conflict with user-specified remote methods which have the same + * signature. + */ + public static abstract class WithBasicRef extends WithRef { + private AutoDisposer.BasicRef ref; + + public WithBasicRef(long id, StubSupport support, MethodIdWriter miw, + StubWrapper.Factory wrapperFactory) + { + super(id, support, miw, wrapperFactory); + } + + @Override + final Stub init() { + // Only bother creating the ref object until it's determined that this object is + // needed. See StubMap.putAndSelectStub. + var wrapper = initWrapper(); + ref = new AutoDisposer.BasicRef(wrapper, this); + return wrapper; + } + + @Override + final Stub select() { + return ref.get(); + } + } + + /** + * Base class for invokers which support automatic disposal and have at least one method + * for which "isUnacknowledged" is true. This class must not declare any new public + * instance methods because they can conflict with user-specified remote methods which have + * the same signature. + */ + public static abstract class WithCountedRef extends WithRef { + protected AutoDisposer.CountedRef ref; + + public WithCountedRef(long id, StubSupport support, MethodIdWriter miw, + StubWrapper.Factory wrapperFactory) + { + super(id, support, miw, wrapperFactory); + } + + @Override + final Stub init() { + // Only bother creating the ref object until it's determined that this object is + // needed. See StubMap.putAndSelectStub. + var wrapper = initWrapper(); + ref = new AutoDisposer.CountedRef(wrapper, this); + return wrapper; + } + + @Override + final Stub select() { + return ref.get(); + } + + /** + * Note: Calling this method might block if a notification needs to be written. + */ + final void decRefCount(long amount) { + ref.decRefCount(amount); + } + } +} diff --git a/src/main/java/org/cojen/dirmi/core/StubMaker.java b/src/main/java/org/cojen/dirmi/core/StubMaker.java index b178e912..2a76be60 100644 --- a/src/main/java/org/cojen/dirmi/core/StubMaker.java +++ b/src/main/java/org/cojen/dirmi/core/StubMaker.java @@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; +import java.lang.ref.Reference; + import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; @@ -85,6 +87,7 @@ static StubFactory factoryFor(Class type, long typeId, RemoteInfo info) { private final RemoteInfo mServerInfo; private final ClassMaker mFactoryMaker; private final ClassMaker mStubMaker; + private final ClassMaker mWrapperMaker; private StubMaker(Class type, RemoteInfo info) { mType = type; @@ -111,7 +114,28 @@ private StubMaker(Class type, RemoteInfo info) { CoreUtils.allowAccess(mFactoryMaker); mStubMaker = mFactoryMaker.another(type.getName()) - .public_().extend(Stub.class).implement(type).final_().sourceFile(sourceFile); + .public_().implement(type).final_().sourceFile(sourceFile); + + if (!info.isAutoDispose()) { + mStubMaker.extend(StubInvoker.class); + mWrapperMaker = null; + } else { + mWrapperMaker = mFactoryMaker.another(type.getName()) + .public_().extend(StubWrapper.class).implement(type) + .final_().sourceFile(sourceFile); + + boolean needsCountedRef = false; + + for (RemoteMethod m : mServerInfo.remoteMethods()) { + if (m.isUnacknowledged()) { + needsCountedRef = true; + break; + } + } + + mStubMaker.extend(needsCountedRef ? StubInvoker.WithCountedRef.class + : StubInvoker.WithBasicRef.class); + } } /** @@ -125,7 +149,7 @@ private MethodHandle finishFactory() { MethodMaker mm = mFactoryMaker.addConstructor(long.class); mm.invokeSuperConstructor(mm.param(0)); - mm = mFactoryMaker.addMethod(Stub.class, "newStub", long.class, StubSupport.class); + mm = mFactoryMaker.addMethod(StubInvoker.class, "newStub", long.class, StubSupport.class); mm.public_().return_(mm.new_(mStubMaker, mm.param(0), mm.param(1), mm.this_())); MethodHandles.Lookup lookup = mFactoryMaker.finishLookup(); @@ -139,7 +163,7 @@ private MethodHandle finishFactory() { } /** - * Returns a Stub subclass which can be constructed with: + * Returns a StubInvoker subclass which can be constructed with: * * (long id, StubSupport support, MethodIdWriter miw) * @@ -147,10 +171,23 @@ private MethodHandle finishFactory() { * remote object class isn't found */ private Class finishStub() { - { - MethodMaker mm = mStubMaker.addConstructor - (long.class, StubSupport.class, MethodIdWriter.class); - mm.invokeSuperConstructor(mm.param(0), mm.param(1), mm.param(2)); + MethodMaker ctor = mStubMaker.addConstructor + (long.class, StubSupport.class, MethodIdWriter.class); + + StubWrapper.Factory wrapperFactory; + + if (mWrapperMaker == null) { + wrapperFactory = null; + ctor.invokeSuperConstructor(ctor.param(0), ctor.param(1), ctor.param(2)); + } else { + // Cannot call init until the wrapper class is defined, but it cannot be defined + // until the invoker is defined. There's a cyclic dependency. + wrapperFactory = new StubWrapper.Factory(); + var factoryVar = ctor.var(StubWrapper.Factory.class).setExact(wrapperFactory); + ctor.invokeSuperConstructor(ctor.param(0), ctor.param(1), ctor.param(2), factoryVar); + + MethodMaker mm = mWrapperMaker.addConstructor(mStubMaker); + mm.invokeSuperConstructor(mm.param(0)); } var it = new JoinedIterator<>(mClientInfo.remoteMethods(), mServerInfo.remoteMethods()); @@ -170,9 +207,9 @@ private Class finishStub() { lastServerMethod = serverMethod; } - Object returnType; - String methodName; - Object[] ptypes; + final Object returnType; + final String methodName; + final Object[] ptypes; if (serverMethod != null && serverMethod.isBatchedImmediate()) { // No stub method is actually generated for this variant. The skeleton variant @@ -214,6 +251,34 @@ private Class finishStub() { MethodMaker mm = mStubMaker.addMethod(returnType, methodName, ptypes).public_(); + MethodMaker mm2; + if (mWrapperMaker == null) { + mm2 = null; + } else { + // Delegate to the actual invoker. + mm2 = mWrapperMaker.addMethod(returnType, methodName, ptypes).public_(); + + Label start = mm2.label().here(); + + var params = new Object[ptypes.length]; + for (int i=0; i { + mm2.var(Reference.class).invoke("reachabilityFence", mm2.this_()); + }); + } + Class remoteFailureClass; List> thrownClasses = null; @@ -236,12 +301,18 @@ private Class finishStub() { } thrownClasses.add(exClass); mm.throws_(exClass); + if (mm2 != null) { + mm2.throws_(exClass); + } } } } } mm.throws_(remoteFailureClass); + if (mm2 != null) { + mm2.throws_(remoteFailureClass); + } RemoteMethod method = serverMethod; int methodId = serverMethodId; @@ -345,6 +416,14 @@ private Class finishStub() { Label invokeStart = mm.label().here(); Label invokeEnd = mm.label(); + if (mWrapperMaker != null && method.isUnacknowledged()) { + // The wrapper cannot be GC'd until after the server has replied back with + // decRefCount, indicating acknowledgment. If there's an IOException writing + // over the pipe, then auto disposal won't work, but it doesn't need to. The + // session will close, effectively disposing the stub. + mm.field("ref").invoke("incRefCount"); + } + pipeVar.invoke("writeLong", mm.field("id")); Variable thrownVar = null; @@ -465,7 +544,20 @@ private Class finishStub() { batchedImmediateMethodId = -1; } - return mStubMaker.finish(); + Class stubClass = mStubMaker.finish(); + + if (mWrapperMaker != null) { + MethodHandles.Lookup lookup = mWrapperMaker.finishHidden(); + try { + wrapperFactory.init + (lookup.findConstructor + (lookup.lookupClass(), MethodType.methodType(void.class, stubClass))); + } catch (Exception e) { + throw new AssertionError(e); + } + } + + return stubClass; } /** @@ -563,14 +655,14 @@ private void returnResult(MethodMaker mm, RemoteMethod clientMethod, } Object[] originStub = {mm.this_()}; - Field originField = mm.access(Stub.cOriginHandle, originStub); + Field originField = mm.access(StubInvoker.cOriginHandle, originStub); Label parentHasOrigin = mm.label(); originField.getAcquire().ifNe(null, parentHasOrigin); mm.new_(IllegalStateException.class, "Cannot make a restorable object from a non-restorable parent").throw_(); parentHasOrigin.here(); - originStub[0] = resultVar.cast(Stub.class); + originStub[0] = resultVar.cast(StubInvoker.class); originField.getAcquire().ifNe(null, finished); Class returnType = classFor(clientMethod.returnType()); diff --git a/src/main/java/org/cojen/dirmi/core/StubMap.java b/src/main/java/org/cojen/dirmi/core/StubMap.java new file mode 100644 index 00000000..21cd22b0 --- /dev/null +++ b/src/main/java/org/cojen/dirmi/core/StubMap.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Cojen.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cojen.dirmi.core; + +/** + * + * + * @author Brian S. O'Neill + */ +final class StubMap extends ItemMap { + /** + * Returns the given invoker, an existing invoker, or a wrapper. + * + * @param invoker must be a new instance + */ + synchronized Stub putAndSelectStub(StubInvoker invoker) { + Item[] items = mItems; + int slot = ((int) invoker.id) & (items.length - 1); + + for (Item existing = items[slot]; existing != null; existing = existing.mNext) { + if (existing.id == invoker.id) { + Stub selected = ((StubInvoker) existing).select(); + if (selected != null) { + return selected; + } + break; + } + } + + // Must init before calling doPut because upon doing so the invoker can be obtained by + // other threads. + Stub selected = invoker.init(); + + doPut(items, invoker, slot); + + return selected; + } +} diff --git a/src/main/java/org/cojen/dirmi/core/StubSupport.java b/src/main/java/org/cojen/dirmi/core/StubSupport.java index 38fbf339..41c678d2 100644 --- a/src/main/java/org/cojen/dirmi/core/StubSupport.java +++ b/src/main/java/org/cojen/dirmi/core/StubSupport.java @@ -22,7 +22,7 @@ import org.cojen.dirmi.Session; /** - * Object passed to a Stub instance in order for it to communicate with a remote object. + * Object passed to a StubInvoker instance in order for it to communicate with a remote object. * * @author Brian S O'Neill * @see StubFactory @@ -47,7 +47,7 @@ default void appendInfo(StringBuilder b) { * @param stub the stub requesting a connection * @return pipe for writing arguments and reading response */ - Pipe connect(Stub stub, Class remoteFailureException) throws T; + Pipe connect(StubInvoker stub, Class remoteFailureException) throws T; /** * Variant which never returns the thread-local pipe used by batched calls. @@ -55,7 +55,7 @@ default void appendInfo(StringBuilder b) { * @param stub the stub requesting a connection * @return pipe for writing arguments and reading response */ - Pipe connectUnbatched(Stub stub, Class remoteFailureException) + Pipe connectUnbatched(StubInvoker stub, Class remoteFailureException) throws T; /** @@ -64,7 +64,8 @@ Pipe connectUnbatched(Stub stub, Class remoteFailureExc * @param stub the stub requesting a connection * @return pipe for writing arguments and reading response */ - Pipe tryConnect(Stub stub, Class remoteFailureException) throws T; + Pipe tryConnect(StubInvoker stub, Class remoteFailureException) + throws T; /** * Variant which returns null if a connection cannot be established. @@ -72,7 +73,8 @@ Pipe connectUnbatched(Stub stub, Class remoteFailureExc * @param stub the stub requesting a connection * @return pipe for writing arguments and reading response */ - Pipe tryConnectUnbatched(Stub stub, Class remoteFailureException) + Pipe tryConnectUnbatched(StubInvoker stub, + Class remoteFailureException) throws T; /** @@ -80,7 +82,7 @@ Pipe tryConnectUnbatched(Stub stub, Class remoteFailure * returned. Otherwise, the caller should obtain the support instance again, and then call * connect again. */ - boolean validate(Stub stub, Pipe pipe); + boolean validate(StubInvoker stub, Pipe pipe); /** * Used by batched methods which return a Remote object. If the remote typeId is currently @@ -113,7 +115,7 @@ Object newAliasStub(Class remoteFailureException, * * @param cause optional */ - Stub newDisconnectedStub(Class type, Throwable cause); + StubInvoker newDisconnectedStub(Class type, Throwable cause); /** * Returns true if a batch sequence is in progress. @@ -156,5 +158,5 @@ Object newAliasStub(Class remoteFailureException, /** * Disposes the given stub. */ - void dispose(Stub stub); + void dispose(StubInvoker stub); } diff --git a/src/main/java/org/cojen/dirmi/core/StubWrapper.java b/src/main/java/org/cojen/dirmi/core/StubWrapper.java new file mode 100644 index 00000000..a8d0d929 --- /dev/null +++ b/src/main/java/org/cojen/dirmi/core/StubWrapper.java @@ -0,0 +1,71 @@ +/* + * Copyright 2024 Cojen.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cojen.dirmi.core; + +import java.lang.invoke.MethodHandle; + +/** + * Base class for remote stubs which delegate to an invoker for supporting AutoDispose. This + * class must not declare any new public instance methods because they can conflict with + * user-specified remote methods which have the same signature. + * + * @author Brian S. O'Neill + */ +public non-sealed abstract class StubWrapper extends Stub { + protected final StubInvoker invoker; + + protected StubWrapper(StubInvoker invoker) { + super(invoker.id); + this.invoker = invoker; + } + + @Override + final StubSupport support() { + return invoker.support(); + } + + @Override + final StubInvoker invoker() { + return invoker; + } + + /** + * @see StubInvoker.WithRef + * @see StubMaker + */ + public static final class Factory { + private MethodHandle ctorHandle; + + Factory() { + } + + void init(MethodHandle ctorHandle) { + this.ctorHandle = ctorHandle; + } + + StubWrapper newWrapper(StubInvoker invoker) { + try { + return (StubWrapper) ctorHandle.invoke(invoker); + } catch (NullPointerException e) { + // Wasn't initialized. + throw e; + } catch (Throwable e) { + throw new AssertionError(e); + } + } + } +} diff --git a/src/test/java/org/cojen/dirmi/AutoDisposeTest.java b/src/test/java/org/cojen/dirmi/AutoDisposeTest.java new file mode 100644 index 00000000..73ab6700 --- /dev/null +++ b/src/test/java/org/cojen/dirmi/AutoDisposeTest.java @@ -0,0 +1,222 @@ +/* + * Copyright 2024 Cojen.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cojen.dirmi; + +import java.net.ServerSocket; + +import org.junit.*; +import static org.junit.Assert.*; + +/** + * + * + * @author Brian S. O'Neill + */ +public class AutoDisposeTest { + public static void main(String[] args) throws Exception { + org.junit.runner.JUnitCore.main(AutoDisposeTest.class.getName()); + } + + private Environment mServerEnv, mClientEnv; + private R1Server mServer; + private ServerSocket mServerSocket; + private Session mSession; + + @Before + public void setup() throws Exception { + mServerEnv = Environment.create(); + mServer = new R1Server(); + mServerEnv.export("main", mServer); + mServerSocket = new ServerSocket(0); + mServerEnv.acceptAll(mServerSocket); + + mClientEnv = Environment.create(); + mSession = mClientEnv.connect(R1.class, "main", "localhost", mServerSocket.getLocalPort()); + } + + @After + public void teardown() throws Exception { + if (mServerEnv != null) { + mServerEnv.close(); + } + if (mClientEnv != null) { + mClientEnv.close(); + } + } + + @Test + public void basic() throws Exception { + R1 root = mSession.root(); + + assertSame(root, root.self()); + + int sleep = 100; + + while (true) { + // Need a different instance because the root cannot auto-dispose. + R1 alt = root.alt(); + + assertSame(alt, root.alt()); + assertEquals("hello", alt.echo("hello")); + + long id = extractId(alt); + alt = null; + + System.gc(); + + // Wait for ping task to flush the dispose message. + Thread.sleep(sleep); + + long newId = extractId(root.alt()); + + if (newId != id) { + // Original remote object was disposed and a new one was created. + break; + } + + // Message not received yet by the server, so try again. + sleep <<= 1; + + if (sleep >= 60_000) { + fail("not automatically disposed in a timely fashion"); + } + } + } + + @Test + public void noReply() throws Exception { + R1 root = mSession.root(); + R2 r2 = root.r2(); + + r2.update("hello"); + r2.update("world"); + + check: { + for (int i=0; i<10; i++) { + if ("world".equals(r2.getMessage())) { + break check; + } + } + fail(); + } + + long id = extractId(r2); + + r2 = null; + + int sleep = 100; + + while (true) { + System.gc(); + + // Wait for background messages to be sent. + Thread.sleep(sleep); + + long newId = extractId(root.r2()); + + if (newId != id) { + // Original remote object was disposed and a new one was created. + break; + } + + // Backround messages not sent yet, so try again. + sleep <<= 1; + + if (sleep >= 60_000) { + fail("not automatically disposed in a timely fashion"); + } + } + } + + private static long extractId(Object obj) { + String str = obj.toString(); + int ix = str.indexOf("id="); + if (ix < 0) { + fail(); + } + ix += 3; + int ix2 = str.indexOf(",", ix); + if (ix2 < 0) { + fail(); + } + return Long.parseLong(str.substring(ix, ix2)); + } + + @AutoDispose + public static interface R1 extends Remote { + R1 self() throws RemoteException; + + R1 alt() throws RemoteException; + + String echo(String msg) throws RemoteException; + + R2 r2() throws RemoteException; + } + + @AutoDispose + public static interface R2 extends Remote { + @NoReply + void update(String msg) throws RemoteException; + + String getMessage() throws RemoteException; + } + + private static class R1Server implements R1 { + private R1Server mAlt; + private R2Server mR2; + + @Override + public R1 self() { + return this; + } + + @Override + public synchronized R1 alt() { + if (mAlt == null) { + mAlt = new R1Server(); + } + return mAlt; + } + + @Override + public String echo(String msg) { + return msg; + } + + @Override + public R2 r2() { + if (mR2 == null) { + mR2 = new R2Server(); + } + return mR2; + } + } + + private static class R2Server implements R2 { + private volatile String mMessage; + + @Override + public void update(String msg) { + mMessage = msg; + } + + @Override + public String getMessage() { + return mMessage; + } + } +} diff --git a/src/test/java/org/cojen/dirmi/RemoteObjectTest.java b/src/test/java/org/cojen/dirmi/RemoteObjectTest.java index 890f26ad..c4af08e0 100644 --- a/src/test/java/org/cojen/dirmi/RemoteObjectTest.java +++ b/src/test/java/org/cojen/dirmi/RemoteObjectTest.java @@ -135,7 +135,7 @@ public void passback() throws Exception { Object[] result = root.c3(r2); assertEquals(3, result.length); - assertTrue(result[0] instanceof org.cojen.dirmi.core.Stub); + assertTrue(result[0] instanceof org.cojen.dirmi.core.StubInvoker); assertEquals(R2Server.class.getName(), result[1]); assertEquals("hello 123", result[2]); diff --git a/src/test/java/org/cojen/dirmi/core/MismatchedInterfaceTest.java b/src/test/java/org/cojen/dirmi/core/MismatchedInterfaceTest.java index fe335a52..a439a29d 100644 --- a/src/test/java/org/cojen/dirmi/core/MismatchedInterfaceTest.java +++ b/src/test/java/org/cojen/dirmi/core/MismatchedInterfaceTest.java @@ -261,7 +261,7 @@ public void commonParent() throws Exception { assertEquals("bob", ((Parent) remote).name()); RemoteInfo info1 = RemoteInfo.examine(iface); - RemoteInfo info2 = RemoteInfo.examineStub(remote); + RemoteInfo info2 = RemoteInfo.examineStub((Stub) remote); Iterator it1 = info1.remoteMethods().iterator(); Iterator it2 = info2.remoteMethods().iterator(); diff --git a/src/test/java/org/cojen/dirmi/core/RemoteInfoTest.java b/src/test/java/org/cojen/dirmi/core/RemoteInfoTest.java index 91c24df2..6fdd8f51 100644 --- a/src/test/java/org/cojen/dirmi/core/RemoteInfoTest.java +++ b/src/test/java/org/cojen/dirmi/core/RemoteInfoTest.java @@ -744,17 +744,20 @@ public void notSerialized() { @Test public void brokenStub() { try { - RemoteInfo.examineStub(R1.class); + RemoteInfo.examineStub(new StubInvoker(0, null, null)); fail(); } catch (IllegalArgumentException e) { confirm(e, "No Remote types"); } - class Stub implements R1, R2 { + class Broken extends StubInvoker implements R1, R2 { + Broken() { + super(0, null, null); + } } try { - RemoteInfo.examineStub(new Stub()); + RemoteInfo.examineStub(new Broken()); fail(); } catch (IllegalArgumentException e) { confirm(e, "At most one");