Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #1100 - ensure init and destroy are always called on JSR356 Encoders #4987

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ protected void doStart() throws Exception
protected void doStop() throws Exception
{
ShutdownThread.deregister(this);
this.encoderFactory.destroy();
this.decoderFactory.destroy();
endpointClientMetadataCache.clear();
super.doStop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@
public interface Configurable
{
void init(EndpointConfig config);

void destroy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public void init(EndpointConfig config)
{
this.decoder.init(config);
}

@Override
public void destroy()
{
this.decoder.destroy();
}
}

private static final Logger LOG = Log.getLogger(DecoderFactory.class);
Expand Down Expand Up @@ -185,6 +191,17 @@ public void init(EndpointConfig config)
}
}

@Override
public void destroy()
{
for (Wrapper wrapper : activeWrappers.values())
{
wrapper.decoder.destroy();
}

activeWrappers.clear();
}

public Wrapper newWrapper(DecoderMetadata metadata)
{
Class<? extends Decoder> decoderClass = metadata.getCoderClass();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,21 @@ public void init(EndpointConfig config)
{
this.encoder.init(config);
}

@Override
public void destroy()
{
this.encoder.destroy();
}
}

private static final Logger LOG = Log.getLogger(EncoderFactory.class);

private final EncoderMetadataSet metadatas;
private final WebSocketContainerScope containerScope;
private EncoderFactory parentFactory;
private Map<Class<?>, Wrapper> activeWrappers;
private final Map<Class<?>, Wrapper> activeWrappers;
private final EncoderFactory parentFactory;
private EndpointConfig endpointConfig;

public EncoderFactory(WebSocketContainerScope containerScope, EncoderMetadataSet metadatas)
{
Expand Down Expand Up @@ -153,31 +160,39 @@ public Wrapper getWrapperFor(Class<?> type)
@Override
public void init(EndpointConfig config)
{
this.endpointConfig = config;
if (LOG.isDebugEnabled())
{
LOG.debug("init({})", config);
}
LOG.debug("init({})", endpointConfig);

// Instantiate all declared encoders
for (EncoderMetadata metadata : metadatas)
{
Wrapper wrapper = newWrapper(metadata);
activeWrappers.put(metadata.getObjectType(), wrapper);
}
}

// Initialize all encoders
@Override
public void destroy()
{
for (Wrapper wrapper : activeWrappers.values())
{
wrapper.encoder.init(config);
wrapper.encoder.destroy();
}

activeWrappers.clear();
}

private Wrapper newWrapper(EncoderMetadata metadata)
{
if (endpointConfig == null)
throw new IllegalStateException("EndpointConfig not set");

Class<? extends Encoder> encoderClass = metadata.getCoderClass();
try
{
Encoder encoder = containerScope.getObjectFactory().createInstance(encoderClass);
encoder.init(endpointConfig);
return new Wrapper(encoder, metadata);
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ public void init(EndpointConfig config)
decoderFactory.init(config);
}

@Override
public void destroy()
{
encoderFactory.destroy();
decoderFactory.destroy();
}

@Override
public void removeMessageHandler(MessageHandler handler)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public final void onClose(CloseInfo close)
CloseCode closecode = CloseCodes.getCloseCode(close.getStatusCode());
CloseReason closereason = new CloseReason(closecode, close.getReason());
onClose(closereason);

// Destroy the JsrSession.
if (jsrsession != null)
jsrsession.destroy();
}

protected abstract void onClose(CloseReason closereason);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.websocket.jsr356.server;

import java.net.URI;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.Encoder;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerEndpointConfig;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.jsr356.EncoderFactory;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.eclipse.jetty.websocket.jsr356.server.samples.echo.EchoReturnEndpoint;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class EncoderLifeCycleTest
{
private static final Logger LOG = Log.getLogger(EncoderLifeCycleTest.class);
private static Server server;
private static URI serverUri;

@BeforeAll
public static void startServer() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);

ServletContextHandler contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/");
server.setHandler(contextHandler);

WebSocketServerContainerInitializer.configure(contextHandler, ((servletContext, serverContainer) ->
serverContainer.addEndpoint(ServerEndpointConfig.Builder.create(EchoReturnEndpoint.class, "/").build())));

// Start Server
server.start();
serverUri = new URI(String.format("ws://localhost:%d/", connector.getLocalPort()));
}

public static class StringHolder
{
private final String string;

public StringHolder(String msg)
{
string = msg;
}

public String getString()
{
return string;
}
}

public static class StringHolderSubtype extends StringHolder
{
public StringHolderSubtype(String msg)
{
super(msg + "|subtype");
}
}

public static class MyEncoder implements Encoder.Text<StringHolder>
{
public CountDownLatch initialized = new CountDownLatch(1);
public CountDownLatch destroyed = new CountDownLatch(1);

@Override
public void init(EndpointConfig config)
{
initialized.countDown();
}

@Override
public String encode(StringHolder message)
{
return message.getString();
}

@Override
public void destroy()
{
destroyed.countDown();
}
}

public static class TextMessageEndpoint extends Endpoint implements MessageHandler.Whole<String>
{
public BlockingArrayQueue<String> textMessages = new BlockingArrayQueue<>();
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CloseReason closeReason = null;

@Override
public void onOpen(Session session, EndpointConfig config)
{
session.addMessageHandler(this);
this.openLatch.countDown();
}

@Override
public void onClose(Session session, CloseReason closeReason)
{
this.closeReason = closeReason;
this.closeLatch.countDown();
}

@Override
public void onMessage(String message)
{
this.textMessages.add(message);
}
}

@ParameterizedTest
@ValueSource(classes = {StringHolder.class, StringHolderSubtype.class})
public void testEncoderLifeCycle(Class<? extends StringHolder> clazz) throws Exception
{
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
TextMessageEndpoint clientEndpoint = new TextMessageEndpoint();
ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create()
.encoders(Collections.singletonList(MyEncoder.class))
.build();

// Send an instance of our StringHolder type.
Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri);
StringHolder data = clazz.getConstructor(String.class).newInstance("test1");
session.getBasicRemote().sendObject(data);

// We received the expected echo.
String echoed = clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS);
assertThat("Echoed message", echoed, is(data.getString()));

// Verify that the encoder has been opened.
EncoderFactory encoderFactory = ((JsrSession)session).getEncoderFactory();
Object obj = encoderFactory.getEncoderFor(data.getClass());
assertThat(obj.getClass(), is(MyEncoder.class));
MyEncoder encoder = (MyEncoder)obj;
assertThat(encoder.initialized.getCount(), is(0L));

// Verify the Encoder has not been destroyed, but is destroyed after the session is closed.
assertThat(encoder.destroyed.getCount(), is(1L));
session.close();
assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(encoder.destroyed.await(5, TimeUnit.SECONDS));
}
}