Skip to content

Commit

Permalink
for #1172, refactor spi for ot
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Sep 25, 2018
1 parent afcfab4 commit 5c6cb19
Show file tree
Hide file tree
Showing 64 changed files with 905 additions and 967 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.shardingsphere.core.bootstrap;

import io.shardingsphere.core.event.ShardingEventListenerRegistrySPILoader;
import io.shardingsphere.core.spi.ShardingSPILoader;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

Expand All @@ -34,7 +33,6 @@ public final class ShardingBootstrap {
* Initialize sharding bootstrap.
*/
public static void init() {
ShardingSPILoader.loadAllShardingSPI();
ShardingEventListenerRegistrySPILoader.registerListeners();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@
import io.shardingsphere.core.event.ShardingEventBusInstance;
import io.shardingsphere.core.event.executor.SQLExecutionEvent;
import io.shardingsphere.core.event.executor.SQLExecutionEventFactory;
import io.shardingsphere.core.spi.event.executor.SQLExecutionEventHandlerLoader;
import io.shardingsphere.core.spi.event.executor.SQLExecutionFinishEvent;
import io.shardingsphere.core.spi.event.executor.SQLExecutionStartEvent;
import io.shardingsphere.core.executor.ShardingExecuteCallback;
import io.shardingsphere.core.executor.ShardingGroupExecuteCallback;
import io.shardingsphere.core.executor.StatementExecuteUnit;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorExceptionHandler;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaDataFactory;
import io.shardingsphere.core.spi.executor.SPISQLExecutionHook;
import io.shardingsphere.core.spi.executor.SQLExecutionHook;
import lombok.RequiredArgsConstructor;

import java.sql.SQLException;
Expand Down Expand Up @@ -62,6 +61,8 @@ public abstract class SQLExecuteCallback<T> implements ShardingExecuteCallback<S

private final EventBus shardingEventBus = ShardingEventBusInstance.getInstance();

private final SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();

@Override
public final T execute(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
return execute0(statementExecuteUnit);
Expand All @@ -82,26 +83,26 @@ private T execute0(final StatementExecuteUnit statementExecuteUnit) throws SQLEx
List<List<Object>> parameterSets = statementExecuteUnit.getRouteUnit().getSqlUnit().getParameterSets();
DataSourceMetaData dataSourceMetaData = DataSourceMetaDataFactory.newInstance(databaseType, statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
for (List<Object> each : parameterSets) {
SQLExecutionEventHandlerLoader.getInstance().start(new SQLExecutionStartEvent(statementExecuteUnit.getRouteUnit(), each, dataSourceMetaData));
sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), each, dataSourceMetaData);
// TODO remove after BED removed
shardingEventBus.post(SQLExecutionEventFactory.createEvent(sqlType, statementExecuteUnit, each, dataSourceMetaData));
}
try {
T result = executeSQL(statementExecuteUnit);
for (List<Object> each : parameterSets) {
sqlExecutionHook.finishSuccess();
// TODO remove after BED removed
SQLExecutionEvent finishEvent = SQLExecutionEventFactory.createEvent(sqlType, statementExecuteUnit, each, dataSourceMetaData);
finishEvent.setExecuteSuccess();
SQLExecutionEventHandlerLoader.getInstance().finish(new SQLExecutionFinishEvent());
// TODO remove after BED removed
shardingEventBus.post(finishEvent);
}
return result;
} catch (final SQLException ex) {
for (List<Object> each : parameterSets) {
sqlExecutionHook.finishFailure(ex);
// TODO remove after BED removed
SQLExecutionEvent finishEvent = SQLExecutionEventFactory.createEvent(sqlType, statementExecuteUnit, each, dataSourceMetaData);
finishEvent.setExecuteFailure(ex);
SQLExecutionEventHandlerLoader.getInstance().finish(new SQLExecutionFinishEvent());
// TODO remove after BED removed
shardingEventBus.post(finishEvent);
}
ExecutorExceptionHandler.handleException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

import com.google.common.collect.Lists;
import io.shardingsphere.core.constant.ConnectionMode;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.executor.ShardingExecuteCallback;
import io.shardingsphere.core.executor.ShardingExecuteEngine;
import io.shardingsphere.core.executor.ShardingExecuteGroup;
import io.shardingsphere.core.executor.StatementExecuteUnit;
import io.shardingsphere.core.executor.sql.execute.threadlocal.ExecutorDataMap;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaDataFactory;
import io.shardingsphere.core.routing.RouteUnit;
import io.shardingsphere.core.routing.SQLUnit;
import io.shardingsphere.core.spi.connection.get.GetConnectionHook;
import io.shardingsphere.core.spi.connection.get.SPIGetConnectionHook;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;

Expand All @@ -49,10 +53,14 @@
@AllArgsConstructor
public final class SQLExecutePrepareTemplate {

private final DatabaseType databaseType;

private final int maxConnectionsSizePerQuery;

private ShardingExecuteEngine shardingExecuteEngine;

private final GetConnectionHook getConnectionHook = new SPIGetConnectionHook();

/**
* Get execute unit groups.
*
Expand Down Expand Up @@ -81,7 +89,7 @@ private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getAsynchronizedE
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
List<Collection<ShardingExecuteGroup<StatementExecuteUnit>>> results = shardingExecuteEngine.execute(sqlUnitGroups.entrySet(),
new ShardingExecuteCallback<Entry<String, List<SQLUnit>>, Collection<ShardingExecuteGroup<StatementExecuteUnit>>>() {

@Override
public Collection<ShardingExecuteGroup<StatementExecuteUnit>> execute(final Entry<String, List<SQLUnit>> input) throws SQLException {
ExecutorDataMap.setDataMap(dataMap);
Expand Down Expand Up @@ -112,7 +120,15 @@ private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups(
int desiredPartitionSize = Math.max(sqlUnits.size() / maxConnectionsSizePerQuery, 1);
List<List<SQLUnit>> sqlUnitGroups = Lists.partition(sqlUnits, desiredPartitionSize);
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitGroups.size());
getConnectionHook.start(dataSourceName);
List<Connection> connections;
try {
connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitGroups.size());
} catch (final SQLException ex) {
getConnectionHook.finishFailure(ex);
throw ex;
}
getConnectionHook.finishSuccess(DataSourceMetaDataFactory.newInstance(databaseType, connections.get(0).getMetaData().getURL()), connections.size());
int count = 0;
for (List<SQLUnit> each : sqlUnitGroups) {
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

import com.google.common.base.Optional;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.spi.event.parsing.ParsingEventHandlerLoader;
import io.shardingsphere.core.spi.event.parsing.ParsingFinishEvent;
import io.shardingsphere.core.spi.event.parsing.ParsingStartEvent;
import io.shardingsphere.core.metadata.datasource.ShardingDataSourceMetaData;
import io.shardingsphere.core.metadata.table.ShardingTableMetaData;
import io.shardingsphere.core.optimizer.OptimizeEngineFactory;
Expand Down Expand Up @@ -54,6 +51,8 @@
import io.shardingsphere.core.routing.type.unicast.UnicastRoutingEngine;
import io.shardingsphere.core.rule.ShardingRule;
import io.shardingsphere.core.rule.TableRule;
import io.shardingsphere.core.spi.parsing.ParsingHook;
import io.shardingsphere.core.spi.parsing.SPIParsingHook;
import io.shardingsphere.core.util.SQLLogger;
import lombok.RequiredArgsConstructor;

Expand Down Expand Up @@ -83,19 +82,20 @@ public final class ParsingSQLRouter implements ShardingRouter {

private final ShardingDataSourceMetaData shardingDataSourceMetaData;

private final ParsingHook parsingHook = new SPIParsingHook();

@Override
public SQLStatement parse(final String logicSQL, final boolean useCache) {
ParsingEventHandlerLoader.getInstance().start(new ParsingStartEvent(logicSQL));
ParsingFinishEvent finishEvent = new ParsingFinishEvent();
parsingHook.start(logicSQL);
try {
return new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingTableMetaData).parse(useCache);
SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingTableMetaData).parse(useCache);
parsingHook.finishSuccess();
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
finishEvent.setException(ex);
parsingHook.finishFailure(ex);
throw ex;
} finally {
ParsingEventHandlerLoader.getInstance().finish(finishEvent);
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,34 @@
* </p>
*/

package io.shardingsphere.core.spi.event.connection.close;
package io.shardingsphere.core.spi.connection.close;

import io.shardingsphere.core.spi.event.ShardingStartEvent;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Close connection start event.
* Connection hook.
*
* @author zhangyonglun
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
public final class CloseConnectionStartEvent extends ShardingStartEvent {
public interface CloseConnectionHook {

private final String dataSource;
/**
* Handle when close connection started.
*
* @param dataSourceName data source name
* @param dataSourceMetaData data source meta data
*/
void start(String dataSourceName, DataSourceMetaData dataSourceMetaData);

private final DataSourceMetaData dataSourceMetaData;
/**
* Handle when close connection finished success.
*/
void finishSuccess();

/**
* Handle when close connection finished failure.
*
* @param cause failure cause
*/
void finishFailure(Exception cause);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.spi.connection.close;

import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;

import java.util.ServiceLoader;

/**
* Connection hook for SPI.
*
* @author zhangliang
*/
public final class SPICloseConnectionHook implements CloseConnectionHook {

private static final ServiceLoader<CloseConnectionHook> SERVICE_LOADER;

static {
SERVICE_LOADER = ServiceLoader.load(CloseConnectionHook.class);
}

@Override
public void start(final String dataSourceName, final DataSourceMetaData dataSourceMetaData) {
for (CloseConnectionHook each : SERVICE_LOADER) {
each.start(dataSourceName, dataSourceMetaData);
}
}

@Override
public void finishSuccess() {
for (CloseConnectionHook each : SERVICE_LOADER) {
each.finishSuccess();
}
}

@Override
public void finishFailure(final Exception cause) {
for (CloseConnectionHook each : SERVICE_LOADER) {
each.finishFailure(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,36 @@
* </p>
*/

package io.shardingsphere.core.spi.event.connection.get;
package io.shardingsphere.core.spi.connection.get;

import io.shardingsphere.core.spi.event.ShardingFinishEvent;
import io.shardingsphere.core.metadata.datasource.DataSourceMetaData;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Get connection finish event.
* Connection hook.
*
* @author zhangyonglun
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
public final class GetConnectionFinishEvent extends ShardingFinishEvent {
public interface GetConnectionHook {

private final int connectionCount;
/**
* Handle when get connection started.
*
* @param dataSourceName data source name
*/
void start(String dataSourceName);

private final DataSourceMetaData dataSourceMetaData;
/**
* Handle when get connection finished success.
*
* @param dataSourceMetaData data source meta data
* @param connectionCount connection count
*/
void finishSuccess(DataSourceMetaData dataSourceMetaData, int connectionCount);

/**
* Handle when get connection finished failure.
*
* @param cause failure cause
*/
void finishFailure(Exception cause);
}
Loading

0 comments on commit 5c6cb19

Please sign in to comment.