From 3a3419eaf45e8cb46147f9a455e2eb82660442b5 Mon Sep 17 00:00:00 2001 From: terrymanu Date: Wed, 26 Sep 2018 00:16:38 +0800 Subject: [PATCH] for #1172, refactor OpenTracingSQLExecutionHook --- .../core/executor/ShardingExecuteCallback.java | 3 ++- .../core/executor/ShardingExecuteEngine.java | 8 ++++---- .../core/executor/ShardingGroupExecuteCallback.java | 3 ++- .../executor/sql/execute/SQLExecuteCallback.java | 12 ++++++------ .../sql/prepare/SQLExecutePrepareTemplate.java | 2 +- .../metadata/table/executor/TableMetaDataLoader.java | 2 +- .../core/spi/executor/SPISQLExecutionHook.java | 4 ++-- .../core/spi/executor/SQLExecutionHook.java | 3 ++- .../handler/OpenTracingSQLExecutionHook.java | 10 +++++----- .../opentracing/handler/AllHandlerTests.java | 6 +++--- .../OpenTracingSQLExecutionEventHandlerTest.java | 6 +++--- 11 files changed, 31 insertions(+), 28 deletions(-) diff --git a/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteCallback.java b/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteCallback.java index 0d837ea6afb41..c8092ba061b96 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteCallback.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteCallback.java @@ -33,8 +33,9 @@ public interface ShardingExecuteCallback { * Execute callback. * * @param input input value + * @param isTrunkThread is execution in trunk thread * @return execute result * @throws SQLException throw when execute failure */ - O execute(I input) throws SQLException; + O execute(I input, boolean isTrunkThread) throws SQLException; } diff --git a/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteEngine.java b/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteEngine.java index b229acac25464..8e6db592c8e47 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteEngine.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingExecuteEngine.java @@ -95,7 +95,7 @@ private Collection> asyncExecute(final Collection @Override public O call() throws SQLException { - return callback.execute(each); + return callback.execute(each, false); } })); } @@ -103,7 +103,7 @@ public O call() throws SQLException { } private O syncExecute(final I input, final ShardingExecuteCallback callback) throws SQLException { - return callback.execute(input); + return callback.execute(input, true); } private List getResults(final O firstResult, final Collection> restFutures) throws SQLException { @@ -168,13 +168,13 @@ private ListenableFuture> asyncGroupExecute(final ShardingE @Override public Collection call() throws SQLException { - return callback.execute(inputGroup.getInputs()); + return callback.execute(inputGroup.getInputs(), false); } }); } private Collection syncGroupExecute(final ShardingExecuteGroup executeGroup, final ShardingGroupExecuteCallback callback) throws SQLException { - return callback.execute(executeGroup.getInputs()); + return callback.execute(executeGroup.getInputs(), true); } private List getGroupResults(final Collection firstResults, final Collection>> restFutures) throws SQLException { diff --git a/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingGroupExecuteCallback.java b/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingGroupExecuteCallback.java index 5004ef1a4cee0..2f2208e881b36 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingGroupExecuteCallback.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/executor/ShardingGroupExecuteCallback.java @@ -34,8 +34,9 @@ public interface ShardingGroupExecuteCallback { * Execute callback. * * @param inputs input values + * @param isTrunkThread is execution in trunk thread * @return execute result * @throws SQLException throw when execute failure */ - Collection execute(Collection inputs) throws SQLException; + Collection execute(Collection inputs, boolean isTrunkThread) throws SQLException; } diff --git a/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/execute/SQLExecuteCallback.java b/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/execute/SQLExecuteCallback.java index 70577748fb6d4..7539ba10579ea 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/execute/SQLExecuteCallback.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/execute/SQLExecuteCallback.java @@ -64,26 +64,26 @@ public abstract class SQLExecuteCallback implements ShardingExecuteCallback execute(final Collection statementExecuteUnits) throws SQLException { + public final Collection execute(final Collection statementExecuteUnits, final boolean isTrunkThread) throws SQLException { Collection result = new LinkedList<>(); for (StatementExecuteUnit each : statementExecuteUnits) { - result.add(execute0(each)); + result.add(execute0(each, isTrunkThread)); } return result; } - private T execute0(final StatementExecuteUnit statementExecuteUnit) throws SQLException { + private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread) throws SQLException { ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); ExecutorDataMap.setDataMap(dataMap); List> parameterSets = statementExecuteUnit.getRouteUnit().getSqlUnit().getParameterSets(); DataSourceMetaData dataSourceMetaData = DataSourceMetaDataFactory.newInstance(databaseType, statementExecuteUnit.getStatement().getConnection().getMetaData().getURL()); for (List each : parameterSets) { - sqlExecutionHook.start(statementExecuteUnit.getRouteUnit().getDataSourceName(), statementExecuteUnit.getRouteUnit().getSqlUnit().getSql(), each, dataSourceMetaData); + sqlExecutionHook.start(statementExecuteUnit.getRouteUnit().getDataSourceName(), statementExecuteUnit.getRouteUnit().getSqlUnit().getSql(), each, dataSourceMetaData, isTrunkThread); // TODO remove after BED removed shardingEventBus.post(SQLExecutionEventFactory.createEvent(sqlType, statementExecuteUnit, each, dataSourceMetaData)); } diff --git a/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareTemplate.java b/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareTemplate.java index 022749bc24218..5219f8702a593 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareTemplate.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/executor/sql/prepare/SQLExecutePrepareTemplate.java @@ -81,7 +81,7 @@ private Collection> getAsynchronizedE new ShardingExecuteCallback>, Collection>>() { @Override - public Collection> execute(final Entry> input) throws SQLException { + public Collection> execute(final Entry> input, final boolean isTrunkThread) throws SQLException { return getSQLExecuteGroups(input.getKey(), input.getValue(), callback); } }); diff --git a/sharding-core/src/main/java/io/shardingsphere/core/metadata/table/executor/TableMetaDataLoader.java b/sharding-core/src/main/java/io/shardingsphere/core/metadata/table/executor/TableMetaDataLoader.java index efa95e2e29810..aa9e8837c8404 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/metadata/table/executor/TableMetaDataLoader.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/metadata/table/executor/TableMetaDataLoader.java @@ -76,7 +76,7 @@ private List load(final Map> dataNodeGroup return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback() { @Override - public Collection execute(final Collection dataNodes) throws SQLException { + public Collection execute(final Collection dataNodes, final boolean isTrunkThread) throws SQLException { String dataSourceName = dataNodes.iterator().next().getDataSourceName(); DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName); String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemeName(); diff --git a/sharding-core/src/main/java/io/shardingsphere/core/spi/executor/SPISQLExecutionHook.java b/sharding-core/src/main/java/io/shardingsphere/core/spi/executor/SPISQLExecutionHook.java index fb9ca2f28cc55..3525a33341bd2 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/spi/executor/SPISQLExecutionHook.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/spi/executor/SPISQLExecutionHook.java @@ -32,9 +32,9 @@ public final class SPISQLExecutionHook implements SQLExecutionHook { private static final ServiceLoader SERVICE_LOADER = ServiceLoader.load(SQLExecutionHook.class); @Override - public synchronized void start(final String dataSourceName, final String sql, final List parameters, final DataSourceMetaData dataSourceMetaData) { + public synchronized void start(final String dataSourceName, final String sql, final List parameters, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) { for (SQLExecutionHook each : SERVICE_LOADER) { - each.start(dataSourceName, sql, parameters, dataSourceMetaData); + each.start(dataSourceName, sql, parameters, dataSourceMetaData, isTrunkThread); } } diff --git a/sharding-core/src/main/java/io/shardingsphere/core/spi/executor/SQLExecutionHook.java b/sharding-core/src/main/java/io/shardingsphere/core/spi/executor/SQLExecutionHook.java index 20859a60935a5..236a75ed6ff25 100644 --- a/sharding-core/src/main/java/io/shardingsphere/core/spi/executor/SQLExecutionHook.java +++ b/sharding-core/src/main/java/io/shardingsphere/core/spi/executor/SQLExecutionHook.java @@ -35,8 +35,9 @@ public interface SQLExecutionHook { * @param sql SQL to be executed * @param parameters parameters of SQL * @param dataSourceMetaData data source meta data + * @param isTrunkThread is execution in trunk thread */ - void start(String dataSourceName, String sql, List parameters, DataSourceMetaData dataSourceMetaData); + void start(String dataSourceName, String sql, List parameters, DataSourceMetaData dataSourceMetaData, boolean isTrunkThread); /** * Handle when SQL execution finished success. diff --git a/sharding-opentracing/src/main/java/io/shardingsphere/opentracing/handler/OpenTracingSQLExecutionHook.java b/sharding-opentracing/src/main/java/io/shardingsphere/opentracing/handler/OpenTracingSQLExecutionHook.java index 8cb4d5d41ea2d..888914cf1a5dd 100644 --- a/sharding-opentracing/src/main/java/io/shardingsphere/opentracing/handler/OpenTracingSQLExecutionHook.java +++ b/sharding-opentracing/src/main/java/io/shardingsphere/opentracing/handler/OpenTracingSQLExecutionHook.java @@ -38,14 +38,14 @@ public final class OpenTracingSQLExecutionHook implements SQLExecutionHook { private static final String OPERATION_NAME = "/" + ShardingTags.COMPONENT_NAME + "/executeSQL/"; - private final ThreadLocal isTrunkThread = new ThreadLocal<>(); + private boolean isTrunkThread; private Span span; @Override - public void start(final String dataSourceName, final String sql, final List parameters, final DataSourceMetaData dataSourceMetaData) { - isTrunkThread.set(OpenTracingRootInvokeHandler.isTrunkThread()); - if (ExecutorDataMap.getDataMap().containsKey(OpenTracingRootInvokeHandler.ROOT_SPAN_CONTINUATION) && !isTrunkThread.get()) { + public void start(final String dataSourceName, final String sql, final List parameters, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) { + this.isTrunkThread = isTrunkThread; + if (ExecutorDataMap.getDataMap().containsKey(OpenTracingRootInvokeHandler.ROOT_SPAN_CONTINUATION) && !isTrunkThread) { OpenTracingRootInvokeHandler.getActiveSpan().set(((ActiveSpan.Continuation) ExecutorDataMap.getDataMap().get(OpenTracingRootInvokeHandler.ROOT_SPAN_CONTINUATION)).activate()); } span = ShardingTracer.get().buildSpan(OPERATION_NAME) @@ -74,7 +74,7 @@ public void finishFailure(final Exception cause) { } private void deactivateSpan() { - if (!isTrunkThread.get()) { + if (!isTrunkThread) { OpenTracingRootInvokeHandler.getActiveSpan().get().deactivate(); OpenTracingRootInvokeHandler.getActiveSpan().remove(); } diff --git a/sharding-opentracing/src/test/java/io/shardingsphere/opentracing/handler/AllHandlerTests.java b/sharding-opentracing/src/test/java/io/shardingsphere/opentracing/handler/AllHandlerTests.java index 1037b0ac245be..247fac68740b5 100644 --- a/sharding-opentracing/src/test/java/io/shardingsphere/opentracing/handler/AllHandlerTests.java +++ b/sharding-opentracing/src/test/java/io/shardingsphere/opentracing/handler/AllHandlerTests.java @@ -23,9 +23,9 @@ @RunWith(Suite.class) @SuiteClasses({ - OpenTracingRootInvokeHandlerTest.class, - OpenTracingParsingEventHandlerTest.class, - OpenTracingSQLExecutionEventHandlerTest.class + OpenTracingRootInvokeHandlerTest.class, + OpenTracingParsingEventHandlerTest.class, + OpenTracingSQLExecutionEventHandlerTest.class }) public final class AllHandlerTests { } diff --git a/sharding-opentracing/src/test/java/io/shardingsphere/opentracing/handler/OpenTracingSQLExecutionEventHandlerTest.java b/sharding-opentracing/src/test/java/io/shardingsphere/opentracing/handler/OpenTracingSQLExecutionEventHandlerTest.java index d331e993cff4d..1fe0de56ca8c8 100644 --- a/sharding-opentracing/src/test/java/io/shardingsphere/opentracing/handler/OpenTracingSQLExecutionEventHandlerTest.java +++ b/sharding-opentracing/src/test/java/io/shardingsphere/opentracing/handler/OpenTracingSQLExecutionEventHandlerTest.java @@ -51,7 +51,7 @@ public void assertExecuteSuccessForTrunkThread() { DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class); when(dataSourceMetaData.getHostName()).thenReturn("localhost"); when(dataSourceMetaData.getPort()).thenReturn(8888); - sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.asList("1", 2), dataSourceMetaData); + sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.asList("1", 2), dataSourceMetaData, true); sqlExecutionHook.finishSuccess(); assertThat(getTracer().finishedSpans().size(), is(1)); MockSpan actual = getTracer().finishedSpans().get(0); @@ -77,7 +77,7 @@ public void assertExecuteSuccessForBranchThread() { DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class); when(dataSourceMetaData.getHostName()).thenReturn("localhost"); when(dataSourceMetaData.getPort()).thenReturn(8888); - sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.asList("1", 2), dataSourceMetaData); + sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.asList("1", 2), dataSourceMetaData, false); assertNotNull(OpenTracingRootInvokeHandler.getActiveSpan().get()); sqlExecutionHook.finishSuccess(); assertThat(getTracer().finishedSpans().size(), is(1)); @@ -102,7 +102,7 @@ public void assertExecuteFailure() { DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class); when(dataSourceMetaData.getHostName()).thenReturn("localhost"); when(dataSourceMetaData.getPort()).thenReturn(8888); - sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Collections.emptyList(), dataSourceMetaData); + sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Collections.emptyList(), dataSourceMetaData, true); sqlExecutionHook.finishFailure(new RuntimeException("SQL execution error")); assertThat(getTracer().finishedSpans().size(), is(1)); MockSpan actual = getTracer().finishedSpans().get(0);