Skip to content

Commit

Permalink
for #1172, add connection size for root invoke
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 25, 2018
1 parent 6041a74 commit a7ce754
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface RootInvokeHook {

/**
* Handle when root invoke finished success.
*
* @param connectionCount connection count
*/
void finishSuccess();
void finishSuccess(int connectionCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public void start() {
}

@Override
public void finishSuccess() {
public void finishSuccess(final int connectionCount) {
for (RootInvokeHook each : SERVICE_LOADER) {
each.finishSuccess();
each.finishSuccess(connectionCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public final void close() throws SQLException {
HintManagerHolder.clear();
MasterVisitedManager.clear();
TransactionTypeHolder.clear();
int connectionSize = cachedConnections.size();
forceExecuteTemplateForClose.execute(cachedConnections.entries(), new ForceExecuteCallback<Map.Entry<String, Connection>>() {

@Override
Expand All @@ -238,7 +239,7 @@ public void execute(final Entry<String, Connection> cachedConnections) throws SQ
}
}
});
rootInvokeHook.finishSuccess();
rootInvokeHook.finishSuccess(connectionSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public void start() {
}

@Override
public void finishSuccess() {
ACTIVE_SPAN.get().deactivate();
public void finishSuccess(final int connectionCount) {
ACTIVE_SPAN.get().setTag(ShardingTags.CONNECTION_COUNT.getKey(), connectionCount).deactivate();
ACTIVE_SPAN.remove();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void assertRootInvoke() {
assertTrue(OpenTracingRootInvokeHandler.isTrunkThread());
assertNotNull(OpenTracingRootInvokeHandler.getActiveSpan().get());
assertTrue(ExecutorDataMap.getDataMap().containsKey(OpenTracingRootInvokeHandler.ROOT_SPAN_CONTINUATION));
rootInvokeHook.finishSuccess();
rootInvokeHook.finishSuccess(1);
assertFalse(OpenTracingRootInvokeHandler.isTrunkThread());
assertNull(OpenTracingRootInvokeHandler.getActiveSpan().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void assertExecuteSuccessTrunkThread() {
assertThat(actualTags.get(Tags.PEER_HOSTNAME.getKey()), CoreMatchers.<Object>is("localhost"));
assertThat(actualTags.get(Tags.PEER_PORT.getKey()), CoreMatchers.<Object>is(8888));
assertThat(actualTags.get(ShardingTags.CONNECTION_COUNT.getKey()), CoreMatchers.<Object>is(3));
new OpenTracingRootInvokeHandler().finishSuccess();
new OpenTracingRootInvokeHandler().finishSuccess(2);
}

@Test
Expand Down Expand Up @@ -104,6 +104,6 @@ public void assertExecuteFailure() {
assertThat(actualTags.get(Tags.COMPONENT.getKey()), CoreMatchers.<Object>is(ShardingTags.COMPONENT_NAME));
assertThat(actualTags.get(Tags.SPAN_KIND.getKey()), CoreMatchers.<Object>is(Tags.SPAN_KIND_CLIENT));
assertSpanError(actual, RuntimeException.class, "get connection error");
new OpenTracingRootInvokeHandler().finishSuccess();
new OpenTracingRootInvokeHandler().finishSuccess(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void assertExecuteSuccessForTrunkThread() {
assertThat(actualTags.get(Tags.DB_INSTANCE.getKey()), CoreMatchers.<Object>is("ds_test"));
assertThat(actualTags.get(Tags.DB_STATEMENT.getKey()), CoreMatchers.<Object>is("SELECT * FROM XXX;"));
assertThat(actualTags.get(ShardingTags.DB_BIND_VARIABLES.getKey()), CoreMatchers.<Object>is("1,2"));
new OpenTracingRootInvokeHandler().finishSuccess();
new OpenTracingRootInvokeHandler().finishSuccess(2);
}

@Test
Expand Down Expand Up @@ -125,6 +125,6 @@ public void assertExecuteFailure() {
assertThat(actualTags.get(Tags.DB_STATEMENT.getKey()), CoreMatchers.<Object>is("SELECT * FROM XXX;"));
assertThat(actualTags.get(ShardingTags.DB_BIND_VARIABLES.getKey()), CoreMatchers.<Object>is(""));
assertSpanError(actual, RuntimeException.class, "SQL execution error");
new OpenTracingRootInvokeHandler().finishSuccess();
new OpenTracingRootInvokeHandler().finishSuccess(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public final class BackendConnection implements AutoCloseable {

private final CloseConnectionHook closeConnectionHook = new SPICloseConnectionHook();

/**
* Get connection size.
*
* @return connection size
*/
public int getConnectionSize() {
return cachedConnections.size();
}

/**
* Get connections of current thread datasource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class CommandExecutor implements Runnable {
@Override
public void run() {
rootInvokeHook.start();
int connectionSize = 0;
try (MySQLPacketPayload payload = new MySQLPacketPayload(message);
BackendConnection backendConnection = new BackendConnection()) {
setBackendConnection(backendConnection);
Expand All @@ -131,14 +132,16 @@ public void run() {
if (commandPacket instanceof QueryCommandPacket && !(responsePackets.get().getHeadPacket() instanceof OKPacket) && !(responsePackets.get().getHeadPacket() instanceof ErrPacket)) {
writeMoreResults((QueryCommandPacket) commandPacket, responsePackets.get().getPackets().size());
}
connectionSize = backendConnection.getConnectionSize();
} catch (final SQLException ex) {
context.writeAndFlush(new ErrPacket(++currentSequenceId, ex));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
context.writeAndFlush(new ErrPacket(1, ServerErrorCode.ER_STD_UNKNOWN_EXCEPTION, ex.getMessage()));
} finally {
rootInvokeHook.finishSuccess(connectionSize);
}
rootInvokeHook.finishSuccess();
}

private CommandPacket getCommandPacket(final MySQLPacketPayload payload, final BackendConnection backendConnection, final FrontendHandler frontendHandler) throws SQLException {
Expand Down

0 comments on commit a7ce754

Please sign in to comment.