From d4a3d2d88d1476447a68b5cf7d9eea7dfab2d494 Mon Sep 17 00:00:00 2001 From: SWJTU-ZhangLei <1091517373@qq.com> Date: Thu, 2 Jun 2022 14:30:32 +0800 Subject: [PATCH] [feature](schema change) add columnsDesc to TPushReq for ligtht sc. --- be/src/olap/delta_writer.cpp | 3 +-- be/src/olap/tablet.h | 13 ------------ .../org/apache/doris/load/DeleteHandler.java | 10 +++++++++- .../org/apache/doris/load/LoadChecker.java | 2 +- .../java/org/apache/doris/task/PushTask.java | 20 ++++++++++++------- .../doris/alter/SchemaChangeHandlerTest.java | 2 -- .../test_delete_schema_change.sql | 4 ++-- 7 files changed, 26 insertions(+), 28 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index ea19b002945902..76bcf9187a7f8c 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -123,8 +123,7 @@ Status DeltaWriter::init() { _build_current_tablet_schema(_req.ptable_schema_param, _tablet->tablet_schema()); RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING, - &_tablet->tablet_schema(), &_rowset_writer)); - _tablet_schema = &(_tablet->tablet_schema()); + _tablet_schema.get(), &_rowset_writer)); _schema.reset(new Schema(*_tablet_schema)); _reset_mem_table(); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index ad21d838fd8a92..baad23237ae143 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -268,19 +268,6 @@ class Tablet : public BaseTablet { return _tablet_meta->all_beta(); } - void find_alpha_rowsets(std::vector* rowsets) const; - - Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state, - const SegmentsOverlapPB& overlap, - const doris::TabletSchema* tablet_schema, - std::unique_ptr* rowset_writer); - - Status create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id, - const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap, - const doris::TabletSchema* tablet_schema, - std::unique_ptr* rowset_writer); - - Status create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset); const TabletSchema& tablet_schema() const override; private: diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index ef7bd7e9734714..7fb3c31c7a4010 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -65,6 +65,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PushTask; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TPriority; import org.apache.doris.thrift.TPushType; import org.apache.doris.thrift.TTaskType; @@ -216,6 +217,12 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException { // to make sure that the delete transaction can be done successfully. txnState.addTableIndexes(olapTable); + //for light schema change + List columnsDesc = new ArrayList(); + for (Column column : olapTable.getFullSchema()) { + columnsDesc.add(column.toThrift()); + } + // task sent to be AgentBatchTask batchTask = new AgentBatchTask(); // count total replica num @@ -256,7 +263,8 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException { true, TPriority.NORMAL, TTaskType.REALTIME_PUSH, transactionId, - Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId()); + Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId(), + columnsDesc); pushTask.setIsSchemaChanging(false); pushTask.setCountDownLatch(countDownLatch); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java index 3e79763ce974ad..a54a1241c36561 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java @@ -465,7 +465,7 @@ private Set submitPushTasks(LoadJob job, Database db) { needDecompress, job.getPriority(), TTaskType.REALTIME_PUSH, job.getTransactionId(), - Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId()); + Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId(), null); pushTask.setIsSchemaChanging(autoLoadToTwoTablet); if (AgentTaskQueue.addTask(pushTask)) { batchTask.addTask(pushTask); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java index c24189141d5905..2d368a74e7f320 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.thrift.TBrokerScanRange; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TCondition; import org.apache.doris.thrift.TDescriptorTable; import org.apache.doris.thrift.TPriority; @@ -69,11 +70,14 @@ public class PushTask extends AgentTask { private TBrokerScanRange tBrokerScanRange; private TDescriptorTable tDescriptorTable; + // for light schema change + private List columnsDesc = null; + public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, long replicaId, int schemaHash, long version, String filePath, long fileSize, int timeoutSecond, long loadJobId, TPushType pushType, List conditions, boolean needDecompress, TPriority priority, TTaskType taskType, - long transactionId, long signature) { + long transactionId, long signature, List columnsDesc) { super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature); this.replicaId = replicaId; this.schemaHash = schemaHash; @@ -92,6 +96,7 @@ public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tabl this.transactionId = transactionId; this.tBrokerScanRange = null; this.tDescriptorTable = null; + this.columnsDesc = columnsDesc; } public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tableId, long partitionId, @@ -99,9 +104,9 @@ public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tabl String filePath, long fileSize, int timeoutSecond, long loadJobId, TPushType pushType, List conditions, boolean needDecompress, TPriority priority) { this(resourceInfo, backendId, dbId, tableId, partitionId, indexId, - tabletId, replicaId, schemaHash, version, filePath, - fileSize, timeoutSecond, loadJobId, pushType, conditions, needDecompress, - priority, TTaskType.PUSH, -1, tableId); + tabletId, replicaId, schemaHash, version, filePath, + fileSize, timeoutSecond, loadJobId, pushType, conditions, needDecompress, + priority, TTaskType.PUSH, -1, tableId, null); } // for load v2 (SparkLoadJob) @@ -110,9 +115,9 @@ public PushTask(long backendId, long dbId, long tableId, long partitionId, long TPriority priority, long transactionId, long signature, TBrokerScanRange tBrokerScanRange, TDescriptorTable tDescriptorTable) { this(null, backendId, dbId, tableId, partitionId, indexId, - tabletId, replicaId, schemaHash, -1, null, - 0, timeoutSecond, loadJobId, pushType, null, false, - priority, TTaskType.REALTIME_PUSH, transactionId, signature); + tabletId, replicaId, schemaHash, -1, null, + 0, timeoutSecond, loadJobId, pushType, null, false, + priority, TTaskType.REALTIME_PUSH, transactionId, signature, null); this.tBrokerScanRange = tBrokerScanRange; this.tDescriptorTable = tDescriptorTable; } @@ -173,6 +178,7 @@ public TPushReq toThrift() { tConditions.add(tCondition); } request.setDeleteConditions(tConditions); + request.setColumnsDesc(columnsDesc); break; case LOAD_V2: request.setBrokerScanRange(tBrokerScanRange); diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeHandlerTest.java index 412d072ca7ded4..048b18a4645e34 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeHandlerTest.java @@ -48,8 +48,6 @@ import mockit.Expectations; import mockit.Injectable; -import org.junit.Assert; -import org.junit.Test; import java.util.List; import java.util.Map; diff --git a/regression-test/suites/schema_change/test_delete_schema_change.sql b/regression-test/suites/schema_change/test_delete_schema_change.sql index 34af3c90ecdd31..c69b14b5f4706d 100644 --- a/regression-test/suites/schema_change/test_delete_schema_change.sql +++ b/regression-test/suites/schema_change/test_delete_schema_change.sql @@ -27,11 +27,11 @@ INSERT INTO schema_change_delete_regression_test VALUES INSERT INTO schema_change_delete_regression_test VALUES (2, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', '2020-01-03', 1, 32, 20); -SELECT * FROM schema_change_delete_regression_test; +SELECT * FROM schema_change_delete_regression_test order by user_id ASC, last_visit_date; ALTER table schema_change_delete_regression_test ADD COLUMN new_column INT default "1"; -SELECT * FROM schema_change_delete_regression_test; +SELECT * FROM schema_change_delete_regression_test order by user_id DESC, last_visit_date; DELETE FROM schema_change_delete_regression_test where new_column = 1;