Skip to content

Commit

Permalink
[feature](schema change) add columnsDesc to TPushReq for ligtht sc.
Browse files Browse the repository at this point in the history
  • Loading branch information
SWJTU-ZhangLei authored and Lchangliang committed Jun 17, 2022
1 parent 9ca0d89 commit d4a3d2d
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 28 deletions.
3 changes: 1 addition & 2 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ Status DeltaWriter::init() {
_build_current_tablet_schema(_req.ptable_schema_param, _tablet->tablet_schema());

RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
&_tablet->tablet_schema(), &_rowset_writer));
_tablet_schema = &(_tablet->tablet_schema());
_tablet_schema.get(), &_rowset_writer));
_schema.reset(new Schema(*_tablet_schema));
_reset_mem_table();

Expand Down
13 changes: 0 additions & 13 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,6 @@ class Tablet : public BaseTablet {
return _tablet_meta->all_beta();
}

void find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const;

Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state,
const SegmentsOverlapPB& overlap,
const doris::TabletSchema* tablet_schema,
std::unique_ptr<RowsetWriter>* rowset_writer);

Status create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id,
const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap,
const doris::TabletSchema* tablet_schema,
std::unique_ptr<RowsetWriter>* rowset_writer);

Status create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset);
const TabletSchema& tablet_schema() const override;

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TTaskType;
Expand Down Expand Up @@ -216,6 +217,12 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException {
// to make sure that the delete transaction can be done successfully.
txnState.addTableIndexes(olapTable);

//for light schema change
List<TColumn> columnsDesc = new ArrayList<TColumn>();
for (Column column : olapTable.getFullSchema()) {
columnsDesc.add(column.toThrift());
}

// task sent to be
AgentBatchTask batchTask = new AgentBatchTask();
// count total replica num
Expand Down Expand Up @@ -256,7 +263,8 @@ public void process(DeleteStmt stmt) throws DdlException, QueryStateException {
true, TPriority.NORMAL,
TTaskType.REALTIME_PUSH,
transactionId,
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId(),
columnsDesc);
pushTask.setIsSchemaChanging(false);
pushTask.setCountDownLatch(countDownLatch);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private Set<Long> submitPushTasks(LoadJob job, Database db) {
needDecompress, job.getPriority(),
TTaskType.REALTIME_PUSH,
job.getTransactionId(),
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId(), null);
pushTask.setIsSchemaChanging(autoLoadToTwoTablet);
if (AgentTaskQueue.addTask(pushTask)) {
batchTask.addTask(pushTask);
Expand Down
20 changes: 13 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TCondition;
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TPriority;
Expand Down Expand Up @@ -69,11 +70,14 @@ public class PushTask extends AgentTask {
private TBrokerScanRange tBrokerScanRange;
private TDescriptorTable tDescriptorTable;

// for light schema change
private List<TColumn> columnsDesc = null;

public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tableId, long partitionId,
long indexId, long tabletId, long replicaId, int schemaHash, long version,
String filePath, long fileSize, int timeoutSecond, long loadJobId, TPushType pushType,
List<Predicate> conditions, boolean needDecompress, TPriority priority, TTaskType taskType,
long transactionId, long signature) {
long transactionId, long signature, List<TColumn> columnsDesc) {
super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature);
this.replicaId = replicaId;
this.schemaHash = schemaHash;
Expand All @@ -92,16 +96,17 @@ public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tabl
this.transactionId = transactionId;
this.tBrokerScanRange = null;
this.tDescriptorTable = null;
this.columnsDesc = columnsDesc;
}

public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tableId, long partitionId,
long indexId, long tabletId, long replicaId, int schemaHash, long version,
String filePath, long fileSize, int timeoutSecond, long loadJobId, TPushType pushType,
List<Predicate> conditions, boolean needDecompress, TPriority priority) {
this(resourceInfo, backendId, dbId, tableId, partitionId, indexId,
tabletId, replicaId, schemaHash, version, filePath,
fileSize, timeoutSecond, loadJobId, pushType, conditions, needDecompress,
priority, TTaskType.PUSH, -1, tableId);
tabletId, replicaId, schemaHash, version, filePath,
fileSize, timeoutSecond, loadJobId, pushType, conditions, needDecompress,
priority, TTaskType.PUSH, -1, tableId, null);
}

// for load v2 (SparkLoadJob)
Expand All @@ -110,9 +115,9 @@ public PushTask(long backendId, long dbId, long tableId, long partitionId, long
TPriority priority, long transactionId, long signature,
TBrokerScanRange tBrokerScanRange, TDescriptorTable tDescriptorTable) {
this(null, backendId, dbId, tableId, partitionId, indexId,
tabletId, replicaId, schemaHash, -1, null,
0, timeoutSecond, loadJobId, pushType, null, false,
priority, TTaskType.REALTIME_PUSH, transactionId, signature);
tabletId, replicaId, schemaHash, -1, null,
0, timeoutSecond, loadJobId, pushType, null, false,
priority, TTaskType.REALTIME_PUSH, transactionId, signature, null);
this.tBrokerScanRange = tBrokerScanRange;
this.tDescriptorTable = tDescriptorTable;
}
Expand Down Expand Up @@ -173,6 +178,7 @@ public TPushReq toThrift() {
tConditions.add(tCondition);
}
request.setDeleteConditions(tConditions);
request.setColumnsDesc(columnsDesc);
break;
case LOAD_V2:
request.setBrokerScanRange(tBrokerScanRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@

import mockit.Expectations;
import mockit.Injectable;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ INSERT INTO schema_change_delete_regression_test VALUES
INSERT INTO schema_change_delete_regression_test VALUES
(2, '2017-10-01', 'Beijing', 10, 1, '2020-01-03', '2020-01-03', '2020-01-03', 1, 32, 20);

SELECT * FROM schema_change_delete_regression_test;
SELECT * FROM schema_change_delete_regression_test order by user_id ASC, last_visit_date;

ALTER table schema_change_delete_regression_test ADD COLUMN new_column INT default "1";

SELECT * FROM schema_change_delete_regression_test;
SELECT * FROM schema_change_delete_regression_test order by user_id DESC, last_visit_date;

DELETE FROM schema_change_delete_regression_test where new_column = 1;

Expand Down

0 comments on commit d4a3d2d

Please sign in to comment.