Skip to content

Commit

Permalink
for #1172, refactor ExecutorDataMap => ShardingExecuteDataMap
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 25, 2018
1 parent b6de854 commit cb16dd0
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* </p>
*/

package io.shardingsphere.core.executor.sql.execute.threadlocal;
package io.shardingsphere.core.executor;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
Expand All @@ -24,12 +24,13 @@
import java.util.Map;

/**
* Executor runtime data map.
* Sharding execute data map for threadlocal even cross multiple threads.
*
* @author caohao
* @author zhangliang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ExecutorDataMap {
public final class ShardingExecuteDataMap {

private static ThreadLocal<Map<String, Object>> dataMap = new ThreadLocal<Map<String, Object>>() {

Expand All @@ -45,7 +46,7 @@ protected Map<String, Object> initialValue() {
* @param dataMap data map
*/
public static void setDataMap(final Map<String, Object> dataMap) {
ExecutorDataMap.dataMap.set(dataMap);
ShardingExecuteDataMap.dataMap.set(dataMap);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;

import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -92,13 +91,13 @@ public <I, O> List<O> execute(final Collection<I> inputs, final ShardingExecuteC

private <I, O> Collection<ListenableFuture<O>> asyncExecute(final Collection<I> inputs, final ShardingExecuteCallback<I, O> callback) {
Collection<ListenableFuture<O>> result = new ArrayList<>(inputs.size());
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
for (final I each : inputs) {
result.add(executorService.submit(new Callable<O>() {

@Override
public O call() throws SQLException {
ExecutorDataMap.setDataMap(dataMap);
ShardingExecuteDataMap.setDataMap(dataMap);
return callback.execute(each, false);
}
}));
Expand Down Expand Up @@ -168,12 +167,12 @@ private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(fin
}

private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
return executorService.submit(new Callable<Collection<O>>() {

@Override
public Collection<O> call() throws SQLException {
ExecutorDataMap.setDataMap(dataMap);
ShardingExecuteDataMap.setDataMap(dataMap);
return callback.execute(inputGroup.getInputs(), false);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.ShardingExecuteDataMap;
import io.shardingsphere.transaction.api.config.SoftTransactionConfiguration;
import io.shardingsphere.transaction.bed.BEDSoftTransaction;
import io.shardingsphere.transaction.bed.sync.BestEffortsDeliveryListener;
Expand Down Expand Up @@ -102,8 +102,8 @@ public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
if (getCurrentTransaction().isPresent()) {
throw new UnsupportedOperationException("Cannot support nested transaction.");
}
ExecutorDataMap.getDataMap().put(TRANSACTION, result);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
ShardingExecuteDataMap.getDataMap().put(TRANSACTION, result);
ShardingExecuteDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
return result;
}

Expand All @@ -113,7 +113,7 @@ public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
* @return transaction configuration from current thread
*/
public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() {
Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG);
Object transactionConfig = ShardingExecuteDataMap.getDataMap().get(TRANSACTION_CONFIG);
return (null == transactionConfig)
? Optional.<SoftTransactionConfiguration>absent()
: Optional.of((SoftTransactionConfiguration) transactionConfig);
Expand All @@ -125,7 +125,7 @@ public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfig
* @return current transaction
*/
public static Optional<AbstractSoftTransaction> getCurrentTransaction() {
Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION);
Object transaction = ShardingExecuteDataMap.getDataMap().get(TRANSACTION);
return (null == transaction)
? Optional.<AbstractSoftTransaction>absent()
: Optional.of((AbstractSoftTransaction) transaction);
Expand All @@ -135,7 +135,7 @@ public static Optional<AbstractSoftTransaction> getCurrentTransaction() {
* Close transaction manager from current thread.
*/
static void closeCurrentTransactionManager() {
ExecutorDataMap.getDataMap().put(TRANSACTION, null);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null);
ShardingExecuteDataMap.getDataMap().put(TRANSACTION, null);
ShardingExecuteDataMap.getDataMap().put(TRANSACTION_CONFIG, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.opentracing.ActiveSpan;
import io.opentracing.tag.Tags;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.ShardingExecuteDataMap;
import io.shardingsphere.core.spi.root.RootInvokeHook;
import io.shardingsphere.opentracing.ShardingTracer;
import io.shardingsphere.opentracing.constant.ShardingTags;
Expand All @@ -40,7 +40,7 @@ public final class OpenTracingRootInvokeHook implements RootInvokeHook {
@Override
public void start() {
activeSpan = ShardingTracer.get().buildSpan(OPERATION_NAME).withTag(Tags.COMPONENT.getKey(), ShardingTags.COMPONENT_NAME).startActive();
ExecutorDataMap.getDataMap().put(ACTIVE_SPAN_CONTINUATION, activeSpan.capture());
ShardingExecuteDataMap.getDataMap().put(ACTIVE_SPAN_CONTINUATION, activeSpan.capture());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.opentracing.ActiveSpan;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.ShardingExecuteDataMap;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.routing.RouteUnit;
import io.shardingsphere.core.spi.executor.SQLExecutionHook;
Expand All @@ -47,7 +47,7 @@ public final class OpenTracingSQLExecutionHook implements SQLExecutionHook {
@Override
public void start(final RouteUnit routeUnit, final DataSourceMetaData dataSourceMetaData, final boolean isTrunkThread) {
if (!isTrunkThread) {
activeSpan = ((ActiveSpan.Continuation) ExecutorDataMap.getDataMap().get(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION)).activate();
activeSpan = ((ActiveSpan.Continuation) ShardingExecuteDataMap.getDataMap().get(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION)).activate();
}
span = ShardingTracer.get().buildSpan(OPERATION_NAME)
.withTag(Tags.COMPONENT.getKey(), ShardingTags.COMPONENT_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import io.opentracing.util.ThreadLocalActiveSpanSource;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.ShardingExecuteDataMap;
import io.shardingsphere.opentracing.ShardingTracer;
import io.shardingsphere.opentracing.constant.ShardingErrorLogTags;
import org.hamcrest.CoreMatchers;
Expand Down Expand Up @@ -54,7 +54,7 @@ public static MockTracer getTracer() {
@BeforeClass
public static void initTracer() {
ShardingTracer.init(TRACER);
ExecutorDataMap.getDataMap().remove(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION);
ShardingExecuteDataMap.getDataMap().remove(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.shardingsphere.opentracing.hook;

import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.ShardingExecuteDataMap;
import io.shardingsphere.core.spi.root.RootInvokeHook;
import io.shardingsphere.core.spi.root.SPIRootInvokeHook;
import org.junit.Test;
Expand All @@ -31,7 +31,7 @@ public final class OpenTracingRootInvokeHookTest extends BaseOpenTracingHookTest
@Test
public void assertRootInvoke() {
rootInvokeHook.start();
assertTrue(ExecutorDataMap.getDataMap().containsKey(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION));
assertTrue(ShardingExecuteDataMap.getDataMap().containsKey(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION));
rootInvokeHook.finish(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.opentracing.ActiveSpan.Continuation;
import io.opentracing.mock.MockSpan;
import io.opentracing.tag.Tags;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.ShardingExecuteDataMap;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.routing.RouteUnit;
import io.shardingsphere.core.routing.SQLUnit;
Expand Down Expand Up @@ -60,13 +60,13 @@ private ActiveSpan mockActiveSpan() {
Continuation continuation = mock(Continuation.class);
ActiveSpan result = mock(ActiveSpan.class);
when(continuation.activate()).thenReturn(result);
ExecutorDataMap.getDataMap().put(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION, continuation);
ShardingExecuteDataMap.getDataMap().put(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION, continuation);
return result;
}

@After
public void tearDown() {
ExecutorDataMap.getDataMap().remove(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION);
ShardingExecuteDataMap.getDataMap().remove(OpenTracingRootInvokeHook.ACTIVE_SPAN_CONTINUATION);
}

@Test
Expand Down

0 comments on commit cb16dd0

Please sign in to comment.