diff --git a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyRpcTransport.java b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyRpcTransport.java index 8daa027..73de8b2 100644 --- a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyRpcTransport.java +++ b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/grizzly/GrizzlyRpcTransport.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009 - 2019 Deutsches Elektronen-Synchroton, + * Copyright (c) 2009 - 2022 Deutsches Elektronen-Synchroton, * Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY * * This library is free software; you can redistribute it and/or modify @@ -20,10 +20,14 @@ package org.dcache.oncrpc4j.grizzly; +import java.nio.ByteOrder; import org.dcache.oncrpc4j.rpc.ReplyQueue; +import org.dcache.oncrpc4j.rpc.RpcMessageParserTCP; import org.dcache.oncrpc4j.xdr.Xdr; import java.net.InetSocketAddress; import java.nio.channels.CompletionHandler; +import org.glassfish.grizzly.memory.BuffersBuffer; +import org.glassfish.grizzly.nio.transport.TCPNIOTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.glassfish.grizzly.Buffer; @@ -48,6 +52,12 @@ public class GrizzlyRpcTransport implements RpcTransport { private final InetSocketAddress _localAddress; private final InetSocketAddress _remoteAddress; + /** + * If true, then underlying transport is stream-oriented (like TCP) and messages must be separated + * by record marking. + */ + private final boolean _isStreaming; + private final static Logger _log = LoggerFactory.getLogger(GrizzlyRpcTransport.class); public GrizzlyRpcTransport(Connection connection, ReplyQueue replyQueue) { @@ -59,6 +69,7 @@ public GrizzlyRpcTransport(Connection connection, InetSocketA _replyQueue = replyQueue; _localAddress = _connection.getLocalAddress(); _remoteAddress = remoteAddress; + _isStreaming = connection.getTransport() instanceof TCPNIOTransport; } @Override @@ -68,10 +79,19 @@ public boolean isOpen() { @Override public void send(final Xdr xdr, A attachment, CompletionHandler handler) { - final Buffer buffer = xdr.asBuffer(); - buffer.allowBufferDispose(true); requireNonNull(handler, "CompletionHandler can't be null"); + Buffer buffer = xdr.asBuffer(); + + // add record marker, if needed + if (_isStreaming) { + int len = buffer.remaining() | RpcMessageParserTCP.RPC_LAST_FRAG; + Buffer marker = _connection.getMemoryManager().allocate(Integer.BYTES); + marker.order(ByteOrder.BIG_ENDIAN); + marker.putInt(len); + marker.flip(); + buffer = BuffersBuffer.create(_connection.getMemoryManager(), marker, buffer); + } // pass destination address to handle UDP connections as well _connection.write(_remoteAddress, buffer, diff --git a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/RpcMessageParserTCP.java b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/RpcMessageParserTCP.java index 7eb32dd..2ab3077 100644 --- a/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/RpcMessageParserTCP.java +++ b/oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/RpcMessageParserTCP.java @@ -35,11 +35,11 @@ public class RpcMessageParserTCP extends BaseFilter { /** * RPC fragment record marker mask */ - private final static int RPC_LAST_FRAG = 0x80000000; + public final static int RPC_LAST_FRAG = 0x80000000; /** * RPC fragment size mask */ - private final static int RPC_SIZE_MASK = 0x7fffffff; + public final static int RPC_SIZE_MASK = 0x7fffffff; @Override public NextAction handleRead(FilterChainContext ctx) throws IOException { @@ -61,24 +61,6 @@ public NextAction handleRead(FilterChainContext ctx) throws IOException { return ctx.getInvokeAction(reminder); } - @Override - public NextAction handleWrite(FilterChainContext ctx) throws IOException { - - Buffer b = ctx.getMessage(); - int len = b.remaining() | RPC_LAST_FRAG; - - Buffer marker = ctx.getMemoryManager().allocate(4); - marker.order(ByteOrder.BIG_ENDIAN); - marker.putInt(len); - marker.flip(); - marker.allowBufferDispose(true); - b.allowBufferDispose(true); - Buffer composite = BuffersBuffer.create(ctx.getMemoryManager(), marker, b); - composite.allowBufferDispose(true); - ctx.setMessage(composite); - return ctx.getInvokeAction(); - } - private boolean isAllFragmentsArrived(Buffer messageBuffer) { final Buffer buffer = messageBuffer.duplicate(); buffer.order(ByteOrder.BIG_ENDIAN);