Skip to content

Commit

Permalink
for #1172, refactor OpenTracingSQLExecutionHook
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 25, 2018
1 parent 9ded871 commit 3a3419e
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public interface ShardingExecuteCallback<I, O> {
* Execute callback.
*
* @param input input value
* @param isTrunkThread is execution in trunk thread
* @return execute result
* @throws SQLException throw when execute failure
*/
O execute(I input) throws SQLException;
O execute(I input, boolean isTrunkThread) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I>

@Override
public O call() throws SQLException {
return callback.execute(each);
return callback.execute(each, false);
}
}));
}
return result;
}

private <I, O> O syncExecute(final I input, final ShardingExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(input);
return callback.execute(input, true);
}

private <O> List<O> getResults(final O firstResult, final Collection<ListenableFuture<O>> restFutures) throws SQLException {
Expand Down Expand Up @@ -168,13 +168,13 @@ private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingE

@Override
public Collection<O> call() throws SQLException {
return callback.execute(inputGroup.getInputs());
return callback.execute(inputGroup.getInputs(), false);
}
});
}

private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
return callback.execute(executeGroup.getInputs());
return callback.execute(executeGroup.getInputs(), true);
}

private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ public interface ShardingGroupExecuteCallback<I, O> {
* Execute callback.
*
* @param inputs input values
* @param isTrunkThread is execution in trunk thread
* @return execute result
* @throws SQLException throw when execute failure
*/
Collection<O> execute(Collection<I> inputs) throws SQLException;
Collection<O> execute(Collection<I> inputs, boolean isTrunkThread) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,26 @@ public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<S
private final SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();

@Override
public final T execute(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
return execute0(statementExecuteUnit);
public final T execute(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread) throws SQLException {
return execute0(statementExecuteUnit, isTrunkThread);
}

@Override
public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits) throws SQLException {
public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread) throws SQLException {
Collection<T> result = new LinkedList<>();
for (StatementExecuteUnit each : statementExecuteUnits) {
result.add(execute0(each));
result.add(execute0(each, isTrunkThread));
}
return result;
}

private T execute0(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread) throws SQLException {
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<List<Object>> parameterSets = statementExecuteUnit.getRouteUnit().getSqlUnit().getParameterSets();
DataSourceMetaData dataSourceMetaData = DataSourceMetaDataFactory.newInstance(databaseType, statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
for (List<Object> each : parameterSets) {
sqlExecutionHook.start(statementExecuteUnit.getRouteUnit().getDataSourceName(), statementExecuteUnit.getRouteUnit().getSqlUnit().getSql(), each, dataSourceMetaData);
sqlExecutionHook.start(statementExecuteUnit.getRouteUnit().getDataSourceName(), statementExecuteUnit.getRouteUnit().getSqlUnit().getSql(), each, dataSourceMetaData, isTrunkThread);
// TODO remove after BED removed
shardingEventBus.post(SQLExecutionEventFactory.createEvent(sqlType, statementExecuteUnit, each, dataSourceMetaData));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getAsynchronizedE
new ShardingExecuteCallback<Entry<String, List<SQLUnit>>, Collection<ShardingExecuteGroup<StatementExecuteUnit>>>() {

@Override
public Collection<ShardingExecuteGroup<StatementExecuteUnit>> execute(final Entry<String, List<SQLUnit>> input) throws SQLException {
public Collection<ShardingExecuteGroup<StatementExecuteUnit>> execute(final Entry<String, List<SQLUnit>> input, final boolean isTrunkThread) throws SQLException {
return getSQLExecuteGroups(input.getKey(), input.getValue(), callback);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private List<TableMetaData> load(final Map<String, List<DataNode>> dataNodeGroup
return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback<DataNode, TableMetaData>() {

@Override
public Collection<TableMetaData> execute(final Collection<DataNode> dataNodes) throws SQLException {
public Collection<TableMetaData> execute(final Collection<DataNode> dataNodes, final boolean isTrunkThread) throws SQLException {
String dataSourceName = dataNodes.iterator().next().getDataSourceName();
DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemeName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public final class SPISQLExecutionHook implements SQLExecutionHook {
private static final ServiceLoader<SQLExecutionHook> SERVICE_LOADER = ServiceLoader.load(SQLExecutionHook.class);

@Override
public synchronized void start(final String dataSourceName, final String sql, final List<Object> parameters, final DataSourceMetaData dataSourceMetaData) {
public synchronized void start(final String dataSourceName, final String sql, final List<Object> parameters, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) {
for (SQLExecutionHook each : SERVICE_LOADER) {
each.start(dataSourceName, sql, parameters, dataSourceMetaData);
each.start(dataSourceName, sql, parameters, dataSourceMetaData, isTrunkThread);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public interface SQLExecutionHook {
* @param sql SQL to be executed
* @param parameters parameters of SQL
* @param dataSourceMetaData data source meta data
* @param isTrunkThread is execution in trunk thread
*/
void start(String dataSourceName, String sql, List<Object> parameters, DataSourceMetaData dataSourceMetaData);
void start(String dataSourceName, String sql, List<Object> parameters, DataSourceMetaData dataSourceMetaData, boolean isTrunkThread);

/**
* Handle when SQL execution finished success.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public final class OpenTracingSQLExecutionHook implements SQLExecutionHook {

private static final String OPERATION_NAME = "/" + ShardingTags.COMPONENT_NAME + "/executeSQL/";

private final ThreadLocal<Boolean> isTrunkThread = new ThreadLocal<>();
private boolean isTrunkThread;

private Span span;

@Override
public void start(final String dataSourceName, final String sql, final List<Object> parameters, final DataSourceMetaData dataSourceMetaData) {
isTrunkThread.set(OpenTracingRootInvokeHandler.isTrunkThread());
if (ExecutorDataMap.getDataMap().containsKey(OpenTracingRootInvokeHandler.ROOT_SPAN_CONTINUATION) && !isTrunkThread.get()) {
public void start(final String dataSourceName, final String sql, final List<Object> parameters, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) {
this.isTrunkThread = isTrunkThread;
if (ExecutorDataMap.getDataMap().containsKey(OpenTracingRootInvokeHandler.ROOT_SPAN_CONTINUATION) && !isTrunkThread) {
OpenTracingRootInvokeHandler.getActiveSpan().set(((ActiveSpan.Continuation) ExecutorDataMap.getDataMap().get(OpenTracingRootInvokeHandler.ROOT_SPAN_CONTINUATION)).activate());
}
span = ShardingTracer.get().buildSpan(OPERATION_NAME)
Expand Down Expand Up @@ -74,7 +74,7 @@ public void finishFailure(final Exception cause) {
}

private void deactivateSpan() {
if (!isTrunkThread.get()) {
if (!isTrunkThread) {
OpenTracingRootInvokeHandler.getActiveSpan().get().deactivate();
OpenTracingRootInvokeHandler.getActiveSpan().remove();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

@RunWith(Suite.class)
@SuiteClasses({
OpenTracingRootInvokeHandlerTest.class,
OpenTracingParsingEventHandlerTest.class,
OpenTracingSQLExecutionEventHandlerTest.class
OpenTracingRootInvokeHandlerTest.class,
OpenTracingParsingEventHandlerTest.class,
OpenTracingSQLExecutionEventHandlerTest.class
})
public final class AllHandlerTests {
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void assertExecuteSuccessForTrunkThread() {
DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class);
when(dataSourceMetaData.getHostName()).thenReturn("localhost");
when(dataSourceMetaData.getPort()).thenReturn(8888);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.<Object>asList("1", 2), dataSourceMetaData);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.<Object>asList("1", 2), dataSourceMetaData, true);
sqlExecutionHook.finishSuccess();
assertThat(getTracer().finishedSpans().size(), is(1));
MockSpan actual = getTracer().finishedSpans().get(0);
Expand All @@ -77,7 +77,7 @@ public void assertExecuteSuccessForBranchThread() {
DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class);
when(dataSourceMetaData.getHostName()).thenReturn("localhost");
when(dataSourceMetaData.getPort()).thenReturn(8888);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.<Object>asList("1", 2), dataSourceMetaData);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Arrays.<Object>asList("1", 2), dataSourceMetaData, false);
assertNotNull(OpenTracingRootInvokeHandler.getActiveSpan().get());
sqlExecutionHook.finishSuccess();
assertThat(getTracer().finishedSpans().size(), is(1));
Expand All @@ -102,7 +102,7 @@ public void assertExecuteFailure() {
DataSourceMetaData dataSourceMetaData = mock(DataSourceMetaData.class);
when(dataSourceMetaData.getHostName()).thenReturn("localhost");
when(dataSourceMetaData.getPort()).thenReturn(8888);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Collections.emptyList(), dataSourceMetaData);
sqlExecutionHook.start("ds_test", "SELECT * FROM XXX;", Collections.emptyList(), dataSourceMetaData, true);
sqlExecutionHook.finishFailure(new RuntimeException("SQL execution error"));
assertThat(getTracer().finishedSpans().size(), is(1));
MockSpan actual = getTracer().finishedSpans().get(0);
Expand Down

0 comments on commit 3a3419e

Please sign in to comment.