Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-18520 Support stopWarmup command in thin client #11646

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ public enum ClientOperation {
OP_SET_ITERATOR_START(9022),

/** IgniteSet.iterator page. */
OP_SET_ITERATOR_GET_PAGE(9023);
OP_SET_ITERATOR_GET_PAGE(9023),

/** Stop warmup. */
OP_STOP_WARMUP(10000);

/** Code. */
private final int code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@
import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.odbc.ClientConnectionNodeRecoveryException;
import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.platform.client.ClientFlag;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.logger.NullLogger;
Expand Down Expand Up @@ -128,7 +130,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
private volatile AffinityTopologyVersion srvTopVer;

/** Channel. */
private final ClientConnection sock;
private volatile ClientConnection sock;

/** Request id. */
private final AtomicLong reqId = new AtomicLong(1);
Expand Down Expand Up @@ -199,7 +201,6 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon

List<InetSocketAddress> addrs = cfg.getAddresses();

ClientConnection sock = null;
ClientConnectionException connectionEx = null;

assert !addrs.isEmpty();
Expand All @@ -210,17 +211,33 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon

if (log.isDebugEnabled())
log.debug("Connection established: " + addr);

break;
}
catch (ClientConnectionException e) {
log.info("Can't establish connection with " + addr);

if (connectionEx != null)
connectionEx.addSuppressed(e);
else
connectionEx = e;
connectionEx = U.addSuppressed(connectionEx, e);

continue;
}

try {
handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes());
}
catch (ClientConnectionException e) {
if (!X.hasCause(e, ClientConnectionNodeRecoveryException.class))
throw e;

log.info("Can't establish connection with " + addr + ". Node in recovery mode.");

connectionEx = U.addSuppressed(connectionEx, e);

U.closeQuiet(sock);
sock = null;

continue;
}

break;
}

if (sock == null) {
Expand All @@ -229,10 +246,6 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
throw connectionEx;
}

this.sock = sock;

handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes());

assert protocolCtx != null : "Protocol context after handshake is null";

connDesc = new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), protocolCtx.toString(), srvNodeId);
Expand Down Expand Up @@ -771,6 +784,8 @@ private void handshake(ProtocolVersion ver, String user, String pwd, Map<String,
RuntimeException resultErr = null;
if (errCode == ClientStatus.AUTH_FAILED)
resultErr = new ClientAuthenticationException(err);
else if (errCode == ClientStatus.NODE_IN_RECOVERY_MODE)
throw new ClientConnectionNodeRecoveryException(err);
else if (ver.equals(srvVer))
resultErr = new ClientProtocolError(err);
else if (!supportedVers.contains(srvVer) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,11 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
});
}

/** Stops cache warmup. */
public void stopWarmUp() {
ch.service(ClientOperation.OP_STOP_WARMUP, null, null);
}

/**
* Initializes new instance of {@link IgniteClient}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.odbc;

import org.apache.ignite.IgniteCheckedException;

/**
* Indicates that node is unavailable due to recovery mode.
*/
public class ClientConnectionNodeRecoveryException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;

/** {@inheritDoc} */
public ClientConnectionNodeRecoveryException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<Clie
/** Connection-related metadata key. */
public static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();

/** */
public static final String RECOVERY_ATTR = "ignite.internal.recoveryModeConnectionEnabled";

/** Connection shifted ID for recovery mode. */
public static final long RECOVERY_SHIFTED_ID = -1;

/** Next connection id. */
private static AtomicInteger nextConnId = new AtomicInteger(1);

Expand Down Expand Up @@ -374,6 +380,9 @@ private void onHandshake(GridNioSession ses, ClientMessage msg) {
if (connCtx.isVersionSupported(ver)) {
connCtx.initializeFromHandshake(ses, ver, reader);

if (nodeInRecoveryMode() && !Boolean.parseBoolean(connCtx.attributes().get(RECOVERY_ATTR)))
throw new ClientConnectionNodeRecoveryException("Node in recovery mode.");

ses.addMeta(CONN_CTX_META_KEY, connCtx);
}
else
Expand Down Expand Up @@ -435,8 +444,11 @@ private void onHandshake(GridNioSession ses, ClientMessage msg) {

writer.doWriteString(e.getMessage());

if (ver.compareTo(ClientConnectionContext.VER_1_1_0) >= 0)
writer.writeInt(ClientStatus.FAILED);
if (ver.compareTo(ClientConnectionContext.VER_1_1_0) >= 0) {
writer.writeInt(e instanceof ClientConnectionNodeRecoveryException
? ClientStatus.NODE_IN_RECOVERY_MODE
: ClientStatus.FAILED);
}
}

ses.send(new ClientMessage(writer.array()));
Expand Down Expand Up @@ -473,7 +485,18 @@ private ClientListenerConnectionContext prepareContext(byte clientType, GridNioS
* @return connection id.
*/
private long nextConnectionId() {
return (ctx.discovery().localNode().order() << 32) + nextConnId.getAndIncrement();
long shiftedId = nodeInRecoveryMode() ? RECOVERY_SHIFTED_ID : ctx.discovery().localNode().order();

return (shiftedId << 32) + nextConnId.getAndIncrement();
}

/**
* @return {@code True} if node in recovery mode and does not join topology yet.
* {@link GridKernalContext#recoveryMode()} returns {@code true} before join topology
* and some resources (local node etc.) are not available.
*/
private boolean nodeInRecoveryMode() {
return !ctx.discovery().localJoinFuture().isDone();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ public ClientListenerProcessor(GridKernalContext ctx) {
);

distrThinCfg = new DistributedThinClientConfiguration(ctx);

srv.start();
}
catch (Exception e) {
throw new IgniteCheckedException("Failed to start client connector processor.", e);
Expand Down Expand Up @@ -304,14 +306,6 @@ private void registerClientMetrics(MetricRegistry mreg) {
}
}

/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) throws IgniteCheckedException {
super.onKernalStart(active);

if (srv != null)
srv.start();
}

/**
* Register an Ignite MBean for managing clients connections.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public interface ClientListenerRequest {
* @return Request ID.
*/
public long requestId();

/** @return {@code True} if request can be handled before node join topology. */
default boolean beforeStartupRequest() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.ClientMessage;
import org.apache.ignite.internal.processors.platform.client.beforestart.ClientCacheStopWarmupRequest;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryConfigurationGetRequest;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeGetRequest;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNameGetRequest;
Expand Down Expand Up @@ -405,6 +406,10 @@ public class ClientMessageParser implements ClientListenerMessageParser {
/** Get service topology. */
private static final short OP_SERVICE_GET_TOPOLOGY = 7003;

/** Operations that are performed before a node is joined to the topology. */
/** Stop warmup. */
private static final short OP_STOP_WARMUP = 10000;

/** Marshaller. */
private final GridBinaryMarshaller marsh;

Expand Down Expand Up @@ -438,7 +443,12 @@ public class ClientMessageParser implements ClientListenerMessageParser {
BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), inStream,
null, null, true, true);

return decode(reader);
ClientListenerRequest req = decode(reader);

if (ctx.kernalContext().recoveryMode() && !req.beforeStartupRequest())
return new ClientRawRequest(req.requestId(), ClientStatus.FAILED, "Node in recovery mode.");

return req;
}

/**
Expand Down Expand Up @@ -718,6 +728,9 @@ public ClientListenerRequest decode(BinaryReaderExImpl reader) {

case OP_SERVICE_GET_TOPOLOGY:
return new ClientServiceTopologyRequest(reader);

case OP_STOP_WARMUP:
return new ClientCacheStopWarmupRequest(reader);
}

return new ClientRawRequest(reader.readLong(), ClientStatus.INVALID_OP_CODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ private ClientStatus() {
/** Invalid node status. */
public static final int INVALID_NODE_STATE = 10;

/** Node in recovery mode. */
public static final int NODE_IN_RECOVERY_MODE = 11;

/** Functionality is disabled. */
public static final int FUNCTIONALITY_DISABLED = 100;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.platform.client.beforestart;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;

/** Stop warmup request. */
public class ClientCacheStopWarmupRequest extends ClientRequest {
/** */
public ClientCacheStopWarmupRequest(BinaryRawReader reader) {
super(reader);
}

/** {@inheritDoc} */
@Override public ClientResponse process(ClientConnectionContext ctx) {
try {
ctx.kernalContext().cache().stopWarmUp();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}

return super.process(ctx);
}

/** {@inheritDoc} */
@Override public boolean beforeStartupRequest() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ private static byte getNameMapperType(@Nullable BinaryConfiguration cfg) {

return basicNameMapper.isSimpleName() ? NAME_MAPPER_BASIC_SIMPLE : NAME_MAPPER_BASIC_FULL;
}

/** {@inheritDoc} */
@Override public boolean beforeStartupRequest() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11707,7 +11707,7 @@ private static <T, R> Collection<R> doInParallel(
* @param err Error to add.
* @return New root error.
*/
private static Throwable addSuppressed(Throwable root, Throwable err) {
public static <T extends Throwable> T addSuppressed(T root, T err) {
assert err != null;

if (root == null)
Expand Down
Loading