Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix : check if table meta cache should be refreshed in AT mode #4734

Merged
merged 61 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
ea0daa4
todo
Bughue Jun 27, 2022
1ac4cbb
simulate nullpointerExp
Bughue Jun 27, 2022
68f2687
fix npe
Bughue Jun 30, 2022
dc21dcd
fix npe
Bughue Jun 30, 2022
42ee542
fix npe
Bughue Jun 30, 2022
7de5d38
format
Bughue Jun 30, 2022
de27bf6
format
Bughue Jun 30, 2022
af64796
先去掉单测,让构建都完成,方便review
Bughue Jul 14, 2022
a7800e2
Merge branch 'develop' of github.com:seata/seata into dev_4572_new_fe…
Bughue Jul 14, 2022
1679535
remove t-w-r
Bughue Jul 26, 2022
d6b8610
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Aug 25, 2022
fccf6a9
Merge branch 'develop' of github.com:seata/seata into dev_4572_new_fe…
Bughue Sep 7, 2022
13c8601
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Sep 7, 2022
6faf8e7
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Sep 21, 2022
fbade58
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Dec 14, 2022
329618c
fix merge
Bughue Dec 14, 2022
4056fc2
fix test
Bughue Dec 14, 2022
9397cca
fix test
Bughue Dec 14, 2022
b2d712f
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Jan 17, 2023
2689577
Merge branch 'develop' into dev_4572_new_feild_null
funky-eyes Feb 2, 2023
c3de060
Merge branch 'develop' into dev_4572_new_feild_null
funky-eyes Feb 3, 2023
8a34079
rename exp
Bughue Feb 6, 2023
db46e53
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 6, 2023
b488939
Merge branch 'develop' into dev_4572_new_feild_null
funky-eyes Feb 6, 2023
355f106
npe
Bughue Feb 6, 2023
9209696
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 6, 2023
4fbfca4
optimize
Bughue Feb 6, 2023
90ea894
ignore unrefreshable
Bughue Feb 7, 2023
57960dd
remove todo
Bughue Feb 7, 2023
363b019
checkstyle
Bughue Feb 7, 2023
ef60d17
checkstyle
Bughue Feb 7, 2023
ac4166c
Merge branch 'develop' into dev_4572_new_feild_null
funky-eyes Feb 7, 2023
fd7aad3
checkstyle
Bughue Feb 7, 2023
125f042
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 7, 2023
20c1edc
changes
Bughue Feb 7, 2023
d1ce758
Merge branch 'develop' of github.com:seata/seata into dev_4572_new_fe…
Bughue Feb 9, 2023
b64d227
optimize
Bughue Feb 10, 2023
6062d52
optimize
Bughue Feb 10, 2023
933c3b9
private tableMeta
Bughue Feb 10, 2023
8db15a3
Merge branch 'develop' of github.com:seata/seata into dev_4572_new_fe…
Bughue Feb 13, 2023
ccf9ecf
tableMeta refresh event
Bughue Feb 15, 2023
e8b9a6b
check style
Bughue Feb 15, 2023
c901589
check style
Bughue Feb 15, 2023
072f609
currentTimeMillis
Bughue Feb 15, 2023
026f1b5
check style
Bughue Feb 15, 2023
996bfb5
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 15, 2023
1ac5bf2
comment
Bughue Feb 15, 2023
711e659
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 21, 2023
eb47a94
comment
Bughue Feb 22, 2023
332081b
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 22, 2023
ef87ff3
fix test
Bughue Feb 22, 2023
3b85ba1
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 22, 2023
de81450
fix test
Bughue Feb 22, 2023
155c5f0
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 22, 2023
a8d8781
fix test
Bughue Feb 22, 2023
febc6a0
fix test
Bughue Feb 22, 2023
9672177
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 22, 2023
16ca49a
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 23, 2023
8b22aef
optimize
Bughue Feb 28, 2023
5b4e206
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 28, 2023
96bae53
optimize
Bughue Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private void init(DataSource dataSource, String resourceGroupId) {
initResourceId();
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原定时刷新缓存的功能在这次更新后失效了?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableMeta缓存刷新失效? #6660

//todo todo_4572
tableMetaExecutor.scheduleAtFixedRate(() -> {
try (Connection connection = dataSource.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.rm.datasource.exception;

import io.seata.rm.datasource.sql.struct.TableMeta;

/**
* The type RmTableMetaException exception.
*
* @author Bughue
*/
public class RmTableMetaException extends RuntimeException {
private String columnName;
private TableMeta tableMeta;

public RmTableMetaException(String columnName, TableMeta tableMeta) {
this.columnName = columnName;
this.tableMeta = tableMeta;
}

public TableMeta getTableMeta() {
return tableMeta;
}

public String getColumnName() {
return columnName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public abstract class BaseTransactionalExecutor<T, S extends Statement> implemen
*/
protected List<SQLRecognizer> sqlRecognizers;

private TableMeta tableMeta;

/**
* Instantiates a new Base transactional executor.
Expand Down Expand Up @@ -252,13 +251,9 @@ protected TableMeta getTableMeta() {
* @return the table meta
*/
protected TableMeta getTableMeta(String tableName) {
if (tableMeta != null) {
return tableMeta;
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
tableMeta = TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType())
return TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType())
.getTableMeta(connectionProxy.getTargetConnection(), tableName, connectionProxy.getDataSourceProxy().getResourceId());
return tableMeta;
}

/**
Expand Down Expand Up @@ -406,7 +401,7 @@ protected TableRecords buildTableRecords(TableMeta tableMeta, String selectSQL,
}
}
rs = ps.executeQuery();
return TableRecords.buildRecords(tableMeta, rs);
return TableRecords.buildRecords(tableMeta, rs, statementProxy);
} finally {
IOUtil.close(rs);
}
Expand Down Expand Up @@ -447,13 +442,13 @@ protected TableRecords buildTableRecords(Map<String, List<Object>> pkValuesMap)
for (int r = 0; r < rowSize; r++) {
for (int c = 0; c < pkColumnNameList.size(); c++) {
List<Object> pkColumnValueList = pkValuesMap.get(pkColumnNameList.get(c));
int dataType = tableMeta.getColumnMeta(pkColumnNameList.get(c)).getDataType();
int dataType = getTableMeta().getColumnMeta(pkColumnNameList.get(c)).getDataType();
ps.setObject(paramIndex, pkColumnValueList.get(r), dataType);
paramIndex++;
}
}
rs = ps.executeQuery();
return TableRecords.buildRecords(getTableMeta(), rs);
return TableRecords.buildRecords(getTableMeta(), rs, statementProxy);
} finally {
IOUtil.close(rs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected TableRecords afterImage(TableRecords beforeImage) throws SQLException
try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL);) {
SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
rs = pst.executeQuery();
return TableRecords.buildRecords(tmeta, rs);
return TableRecords.buildRecords(tmeta, rs, statementProxy);
} finally {
IOUtil.close(rs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ protected TableRecords afterImage(TableRecords beforeImage) throws SQLException
try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
rs = pst.executeQuery();
return TableRecords.buildRecords(tmeta, rs);
return TableRecords.buildRecords(tmeta, rs, statementProxy);
} finally {
IOUtil.close(rs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public TableRecords buildTableRecords2(TableMeta tableMeta, String selectSQL, Ar
}

rs = ps.executeQuery();
return TableRecords.buildRecords(tableMeta, rs);
return TableRecords.buildRecords(tableMeta, rs, statementProxy);
} finally {
IOUtil.close(rs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import io.seata.common.exception.NotSupportYetException;
Expand All @@ -41,6 +43,11 @@ public class TableMeta {
*/
private final Map<String, ColumnMeta> allColumns = new LowerCaseLinkHashMap<>();

/**
* unrefreshable column name
*/
private Set<String> unrefreshableColumns = new HashSet<>();

/**
* key: index name
*/
Expand Down Expand Up @@ -92,6 +99,25 @@ public Map<String, IndexMeta> getAllIndexes() {
return allIndexes;
}

/**
* Gets unrefreshable columns.
*
* @return the all indexes
*/
public Set<String> getUnrefreshableColumns() {
return unrefreshableColumns;
}

/**
* Add unrefreshable columns.
*
* @return the all indexes
*/
public void addUnrefreshableColumn(String column) {
this.unrefreshableColumns.add(column);
}


/**
* Gets auto increase column.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public interface TableMetaCache {
*/
TableMeta getTableMeta(Connection connection, String tableName, String resourceId);

/**
* add unrefreshable column
*
* @param connection the connection
* @param tableName the table name
* @param resourceId the resource id
* @param colName
*/
void addUnrefreshableCol(Connection connection, String tableName, String resourceId, String colName);

/**
* Clear the table meta cache
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.NClob;
import java.sql.Ref;
import java.sql.ResultSet;
Expand All @@ -34,6 +35,9 @@
import javax.sql.rowset.serial.SerialJavaObject;
import javax.sql.rowset.serial.SerialRef;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.exception.RmTableMetaException;
import io.seata.rm.datasource.sql.serial.SerialArray;

/**
Expand Down Expand Up @@ -182,7 +186,7 @@ public static TableRecords empty(TableMeta tableMeta) {
* @return the table records
* @throws SQLException the sql exception
*/
public static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet) throws SQLException {
private static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet) throws SQLException {
TableRecords records = new TableRecords(tmeta);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
Map<String, ColumnMeta> primaryKeyMap = tmeta.getPrimaryKeyMap();
Expand All @@ -192,7 +196,7 @@ public static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet) th
List<Field> fields = new ArrayList<>(columnCount);
for (int i = 1; i <= columnCount; i++) {
String colName = resultSetMetaData.getColumnName(i);
ColumnMeta col = tmeta.getColumnMeta(colName);
ColumnMeta col = checkAndGetColumnMeta(tmeta,colName);
int dataType = col.getDataType();
Field field = new Field();
field.setName(col.getColumnName());
Expand Down Expand Up @@ -253,6 +257,82 @@ public static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet) th
return records;
}

/**
* check if the column is null and return
* @param tmeta the table meta
* @param colName the column nmae
*/
private static ColumnMeta checkAndGetColumnMeta(TableMeta tmeta , String colName) {
ColumnMeta col = tmeta.getColumnMeta(colName);
if (col == null) {
throw new RmTableMetaException(colName,tmeta);
}
return col;
}


/**
* Build records table records.
*
* @param tmeta the tmeta
* @param resultSet the result set
* @param statementProxy the statement proxy
* @return the table records
* @throws SQLException the sql exception
*/
public static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet, StatementProxy statementProxy)
throws SQLException {
try {
return buildRecords(tmeta, resultSet);
} catch (RmTableMetaException e) {
if (statementProxy == null) {
throw e;
}
refreshTableMeta(statementProxy, e.getTableMeta(), e.getColumnName());
// try to build again after refresh table meta success
return buildRecords(getCacheTableMeta(statementProxy, tmeta.getTableName()), resultSet);
}
}


private static void refreshTableMeta(StatementProxy statementProxy, TableMeta tmeta, String columnName)
throws SQLException {
if (columnEmptyAndRefreshable(statementProxy, tmeta, columnName)) {
synchronized (TableRecords.class) {
if (columnEmptyAndRefreshable(statementProxy, tmeta, columnName)) {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try (Connection connection = statementProxy.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType()).refresh(connection,
connectionProxy.getDataSourceProxy().getResourceId());
} catch (Exception exp) {
throw exp;
}
// still empty after refreshed
if (getCacheTableMeta(statementProxy, tmeta.getTableName()).getColumnMeta(columnName) == null) {
TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType()).addUnrefreshableCol(
connectionProxy.getTargetConnection(), tmeta.getTableName(),
connectionProxy.getDataSourceProxy().getResourceId(), columnName);
}
}
}
}
}

private static boolean columnEmptyAndRefreshable(StatementProxy statementProxy, TableMeta tmeta,
String columnName) {
TableMeta cacheTableMeta = getCacheTableMeta(statementProxy, tmeta.getTableName());
return cacheTableMeta.getColumnMeta(columnName) == null
&& !cacheTableMeta.getUnrefreshableColumns().contains(columnName);
}


private static TableMeta getCacheTableMeta(StatementProxy statementProxy, String tableName) {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableMeta tmeta = TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType()).getTableMeta(
connectionProxy.getTargetConnection(), tableName, connectionProxy.getDataSourceProxy().getResourceId());
return tmeta;
}

/**
* since there is no parameterless constructor for Blob, Clob and NClob just like mysql,
* it needs to be converted to Serial_ type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ public TableMeta getTableMeta(final Connection connection, final String tableNam
return tmeta;
}

@Override
public void addUnrefreshableCol(final Connection connection, String tableName, String resourceId, String colName) {
if (StringUtils.isNullOrEmpty(tableName)) {
throw new IllegalArgumentException("TableMeta cannot be fetched without tableName");
}

TableMeta tmeta;
final String key = getCacheKey(connection, tableName, resourceId);
tmeta = TABLE_META_CACHE.getIfPresent(key);
tmeta.addUnrefreshableColumn(colName);
TABLE_META_CACHE.put(key, tmeta);
}

@Override
public void refresh(final Connection connection, String resourceId) {
ConcurrentMap<String, TableMeta> tableMetaMap = TABLE_META_CACHE.asMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ protected TableRecords queryCurrentRecords(Connection conn) throws SQLException
}

checkSet = statement.executeQuery();
currentRecords = TableRecords.buildRecords(tableMeta, checkSet);
currentRecords = TableRecords.buildRecords(tableMeta, checkSet, null);
} finally {
IOUtil.close(checkSet, statement);
}
Expand Down
Loading