Skip to content

Commit

Permalink
Fix table status revert. (infiniflow#2353)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix table lock bug when creating a read txn.

Issue link:infiniflow#2321

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored and vsian committed Dec 13, 2024
1 parent c75caf1 commit d7116d2
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 34 deletions.
4 changes: 2 additions & 2 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ bool TableEntry::SetCompact(TableStatus &status, Txn *txn) {
return false;
}
table_status_ = TableStatus::kCompacting;
txn->txn_store()->SetCompacting();
txn->txn_store()->SetCompacting(this);
LOG_TRACE(fmt::format("SetCompact success. Table {} is in status: {}", encode(), u8(table_status_)));
return true;
}
Expand All @@ -1571,7 +1571,7 @@ bool TableEntry::SetCreatingIndex(TableStatus &status, Txn *txn) {
return false;
}
table_status_ = TableStatus::kCreatingIndex;
txn->txn_store()->SetCreatingIndex();
txn->txn_store()->SetCreatingIndex(this);
LOG_TRACE(fmt::format("SetCreatingIndex success. Table {} is in status: {}", encode(), u8(table_status_)));
return true;
}
Expand Down
3 changes: 3 additions & 0 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import infinity_context;
import admin_statement;
import global_resource_usage;
import wal_manager;
import defer_op;

namespace infinity {

Expand Down Expand Up @@ -534,6 +535,7 @@ WalEntry *Txn::GetWALEntry() const { return wal_entry_.get(); }
// }

TxnTimeStamp Txn::Commit() {
DeferFn defer_op([&] { txn_store_.RevertTableStatus(); });
if (wal_entry_->cmds_.empty() && txn_store_.ReadOnly()) {
// Don't need to write empty WalEntry (read-only transactions).
TxnTimeStamp commit_ts = txn_mgr_->GetReadCommitTS(this);
Expand Down Expand Up @@ -623,6 +625,7 @@ void Txn::CancelCommitBottom() {
}

void Txn::Rollback() {
DeferFn defer_op([&] { txn_store_.RevertTableStatus(); });
auto state = this->GetTxnState();
TxnTimeStamp abort_ts = 0;
if (state == TxnState::kStarted) {
Expand Down
41 changes: 20 additions & 21 deletions src/storage/txn/txn_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,6 @@ Tuple<UniquePtr<String>, Status> TxnTableStore::Compact(Vector<Pair<SharedPtr<Se
}

void TxnTableStore::Rollback(TransactionID txn_id, TxnTimeStamp abort_ts) {
if (added_txn_num_) {
table_entry_->DecWriteTxnNum();
}
if (append_state_.get() != nullptr) {
// Rollback the data already been appended.
Catalog::RollbackAppend(table_entry_, txn_id, abort_ts, this);
Expand Down Expand Up @@ -453,9 +450,6 @@ void TxnTableStore::Commit(TransactionID txn_id, TxnTimeStamp commit_ts) {
}
}
}
if (added_txn_num_) {
table_entry_->DecWriteTxnNum();
}
}

void TxnTableStore::MaintainCompactionAlg() {
Expand Down Expand Up @@ -526,6 +520,20 @@ void TxnTableStore::AddDeltaOp(CatalogDeltaEntry *local_delta_ops, TxnManager *t
}
}

void TxnTableStore::TryRevert() {
if (table_status_ == TxnStoreStatus::kCompacting) {
table_status_ = TxnStoreStatus::kNone;
table_entry_->SetCompactDone();
} else if (table_status_ == TxnStoreStatus::kCreatingIndex) {
table_status_ = TxnStoreStatus::kNone;
table_entry_->SetCreateIndexDone();
}
if (added_txn_num_) {
added_txn_num_ = false;
table_entry_->DecWriteTxnNum();
}
}

TxnStore::TxnStore(Txn *txn, Catalog *catalog) : txn_(txn), catalog_(catalog) {}

void TxnStore::AddDBStore(DBEntry *db_entry) { txn_dbs_.emplace(db_entry, ptr_seq_n_++); }
Expand Down Expand Up @@ -679,8 +687,6 @@ void TxnStore::CommitBottom(TransactionID txn_id, TxnTimeStamp commit_ts) {
for (auto [table_entry, ptr_seq_n] : txn_tables_) {
table_entry->Commit(commit_ts);
}

this->RevertTableStatus();
}

void TxnStore::Rollback(TransactionID txn_id, TxnTimeStamp abort_ts) {
Expand All @@ -699,8 +705,6 @@ void TxnStore::Rollback(TransactionID txn_id, TxnTimeStamp abort_ts) {
db_entry->Cleanup();
catalog_->RemoveDBEntry(db_entry, txn_id);
}

this->RevertTableStatus();
}

bool TxnStore::ReadOnly() const {
Expand All @@ -727,18 +731,13 @@ bool TxnStore::ReadOnly() const {
}

void TxnStore::RevertTableStatus() {
if (table_status_ == TxnStoreStatus::kCompacting) {
for (const auto &[table_name, table_store] : txn_tables_store_) {
LOG_INFO(fmt::format("Txn {}: Revert table {} status from compacting to done", txn_->TxnID(), table_name));
table_store->GetTableEntry()->SetCompactDone();
}
} else if (table_status_ == TxnStoreStatus::kCreatingIndex) {
for (const auto &[table_name, table_store] : txn_tables_store_) {
LOG_INFO(fmt::format("Txn {}: Revert table {} status from creating index to done", txn_->TxnID(), table_name));
table_store->GetTableEntry()->SetCreateIndexDone();
}
return;
for (const auto &[table_name, table_store] : txn_tables_store_) {
table_store->TryRevert();
}
}

void TxnStore::SetCompacting(TableEntry *table_entry) { GetTxnTableStore(table_entry)->SetCompacting(); }

void TxnStore::SetCreatingIndex(TableEntry *table_entry) { GetTxnTableStore(table_entry)->SetCreatingIndex(); }

} // namespace infinity
28 changes: 19 additions & 9 deletions src/storage/txn/txn_store.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ public: // Setter, Getter

void AddWriteTxnNum() { added_txn_num_ = true; }

bool AddedTxnNum() const { return added_txn_num_; }

private:
std::mutex mtx_{};

Expand All @@ -190,6 +192,21 @@ private:
bool added_txn_num_{false};

bool has_update_{false};

public:
void SetCompacting() { table_status_ = TxnStoreStatus::kCompacting; }

void SetCreatingIndex() { table_status_ = TxnStoreStatus::kCreatingIndex; }

void TryRevert();

private:
enum struct TxnStoreStatus {
kNone = 0,
kCreatingIndex,
kCompacting,
};
TxnStoreStatus table_status_{TxnStoreStatus::kNone};
};

export class TxnStore {
Expand Down Expand Up @@ -232,18 +249,11 @@ public:

void RevertTableStatus();

void SetCompacting() { table_status_ = TxnStoreStatus::kCompacting; }
void SetCompacting(TableEntry *table_entry);

void SetCreatingIndex() { table_status_ = TxnStoreStatus::kCreatingIndex; }
void SetCreatingIndex(TableEntry *table_entry);

private:
enum struct TxnStoreStatus {
kNone = 0,
kCreatingIndex,
kCompacting,
};
TxnStoreStatus table_status_{TxnStoreStatus::kNone};

// Txn store
Txn *txn_{}; // TODO: remove this
Catalog *catalog_{};
Expand Down
18 changes: 16 additions & 2 deletions test/sql/ddl/alter/lock_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,25 @@ UNLOCK TABLE products;
statement ok
INSERT INTO products VALUES (2, 2, 'abcdef');

statement ok
DELETE FROM products WHERE product_no=1;

statement ok
DELETE FROM products WHERE product_no=-1;

statement ok
COMPACT TABLE products;

statement ok
LOCK TABLE products;

query
SELECT * FROM products;
----
1 2 abcdef
2 2 abcdef

statement ok
DROP TABLE products;
UNLOCK TABLE products;

statement ok
DROP TABLE products;

0 comments on commit d7116d2

Please sign in to comment.