From d7116d260795bde1d1df88ea918c98681a7ac9e7 Mon Sep 17 00:00:00 2001 From: shen yushi Date: Wed, 11 Dec 2024 16:47:02 +0800 Subject: [PATCH] Fix table status revert. (#2353) ### What problem does this PR solve? Fix table lock bug when creating a read txn. Issue link:#2321 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Test cases --- src/storage/meta/entry/table_entry.cpp | 4 +-- src/storage/txn/txn.cpp | 3 ++ src/storage/txn/txn_store.cpp | 41 +++++++++++++------------- src/storage/txn/txn_store.cppm | 28 ++++++++++++------ test/sql/ddl/alter/lock_table.slt | 18 +++++++++-- 5 files changed, 60 insertions(+), 34 deletions(-) diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index b43465c663..80b876149f 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -1558,7 +1558,7 @@ bool TableEntry::SetCompact(TableStatus &status, Txn *txn) { return false; } table_status_ = TableStatus::kCompacting; - txn->txn_store()->SetCompacting(); + txn->txn_store()->SetCompacting(this); LOG_TRACE(fmt::format("SetCompact success. Table {} is in status: {}", encode(), u8(table_status_))); return true; } @@ -1571,7 +1571,7 @@ bool TableEntry::SetCreatingIndex(TableStatus &status, Txn *txn) { return false; } table_status_ = TableStatus::kCreatingIndex; - txn->txn_store()->SetCreatingIndex(); + txn->txn_store()->SetCreatingIndex(this); LOG_TRACE(fmt::format("SetCreatingIndex success. Table {} is in status: {}", encode(), u8(table_status_))); return true; } diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index 75edf1ff49..c152b1a174 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -55,6 +55,7 @@ import infinity_context; import admin_statement; import global_resource_usage; import wal_manager; +import defer_op; namespace infinity { @@ -534,6 +535,7 @@ WalEntry *Txn::GetWALEntry() const { return wal_entry_.get(); } // } TxnTimeStamp Txn::Commit() { + DeferFn defer_op([&] { txn_store_.RevertTableStatus(); }); if (wal_entry_->cmds_.empty() && txn_store_.ReadOnly()) { // Don't need to write empty WalEntry (read-only transactions). TxnTimeStamp commit_ts = txn_mgr_->GetReadCommitTS(this); @@ -623,6 +625,7 @@ void Txn::CancelCommitBottom() { } void Txn::Rollback() { + DeferFn defer_op([&] { txn_store_.RevertTableStatus(); }); auto state = this->GetTxnState(); TxnTimeStamp abort_ts = 0; if (state == TxnState::kStarted) { diff --git a/src/storage/txn/txn_store.cpp b/src/storage/txn/txn_store.cpp index 3dccb268c1..4f921876c4 100644 --- a/src/storage/txn/txn_store.cpp +++ b/src/storage/txn/txn_store.cpp @@ -296,9 +296,6 @@ Tuple, Status> TxnTableStore::Compact(VectorDecWriteTxnNum(); - } if (append_state_.get() != nullptr) { // Rollback the data already been appended. Catalog::RollbackAppend(table_entry_, txn_id, abort_ts, this); @@ -453,9 +450,6 @@ void TxnTableStore::Commit(TransactionID txn_id, TxnTimeStamp commit_ts) { } } } - if (added_txn_num_) { - table_entry_->DecWriteTxnNum(); - } } void TxnTableStore::MaintainCompactionAlg() { @@ -526,6 +520,20 @@ void TxnTableStore::AddDeltaOp(CatalogDeltaEntry *local_delta_ops, TxnManager *t } } +void TxnTableStore::TryRevert() { + if (table_status_ == TxnStoreStatus::kCompacting) { + table_status_ = TxnStoreStatus::kNone; + table_entry_->SetCompactDone(); + } else if (table_status_ == TxnStoreStatus::kCreatingIndex) { + table_status_ = TxnStoreStatus::kNone; + table_entry_->SetCreateIndexDone(); + } + if (added_txn_num_) { + added_txn_num_ = false; + table_entry_->DecWriteTxnNum(); + } +} + TxnStore::TxnStore(Txn *txn, Catalog *catalog) : txn_(txn), catalog_(catalog) {} void TxnStore::AddDBStore(DBEntry *db_entry) { txn_dbs_.emplace(db_entry, ptr_seq_n_++); } @@ -679,8 +687,6 @@ void TxnStore::CommitBottom(TransactionID txn_id, TxnTimeStamp commit_ts) { for (auto [table_entry, ptr_seq_n] : txn_tables_) { table_entry->Commit(commit_ts); } - - this->RevertTableStatus(); } void TxnStore::Rollback(TransactionID txn_id, TxnTimeStamp abort_ts) { @@ -699,8 +705,6 @@ void TxnStore::Rollback(TransactionID txn_id, TxnTimeStamp abort_ts) { db_entry->Cleanup(); catalog_->RemoveDBEntry(db_entry, txn_id); } - - this->RevertTableStatus(); } bool TxnStore::ReadOnly() const { @@ -727,18 +731,13 @@ bool TxnStore::ReadOnly() const { } void TxnStore::RevertTableStatus() { - if (table_status_ == TxnStoreStatus::kCompacting) { - for (const auto &[table_name, table_store] : txn_tables_store_) { - LOG_INFO(fmt::format("Txn {}: Revert table {} status from compacting to done", txn_->TxnID(), table_name)); - table_store->GetTableEntry()->SetCompactDone(); - } - } else if (table_status_ == TxnStoreStatus::kCreatingIndex) { - for (const auto &[table_name, table_store] : txn_tables_store_) { - LOG_INFO(fmt::format("Txn {}: Revert table {} status from creating index to done", txn_->TxnID(), table_name)); - table_store->GetTableEntry()->SetCreateIndexDone(); - } - return; + for (const auto &[table_name, table_store] : txn_tables_store_) { + table_store->TryRevert(); } } +void TxnStore::SetCompacting(TableEntry *table_entry) { GetTxnTableStore(table_entry)->SetCompacting(); } + +void TxnStore::SetCreatingIndex(TableEntry *table_entry) { GetTxnTableStore(table_entry)->SetCreatingIndex(); } + } // namespace infinity diff --git a/src/storage/txn/txn_store.cppm b/src/storage/txn/txn_store.cppm index afe849b893..ff4efe2612 100644 --- a/src/storage/txn/txn_store.cppm +++ b/src/storage/txn/txn_store.cppm @@ -165,6 +165,8 @@ public: // Setter, Getter void AddWriteTxnNum() { added_txn_num_ = true; } + bool AddedTxnNum() const { return added_txn_num_; } + private: std::mutex mtx_{}; @@ -190,6 +192,21 @@ private: bool added_txn_num_{false}; bool has_update_{false}; + +public: + void SetCompacting() { table_status_ = TxnStoreStatus::kCompacting; } + + void SetCreatingIndex() { table_status_ = TxnStoreStatus::kCreatingIndex; } + + void TryRevert(); + +private: + enum struct TxnStoreStatus { + kNone = 0, + kCreatingIndex, + kCompacting, + }; + TxnStoreStatus table_status_{TxnStoreStatus::kNone}; }; export class TxnStore { @@ -232,18 +249,11 @@ public: void RevertTableStatus(); - void SetCompacting() { table_status_ = TxnStoreStatus::kCompacting; } + void SetCompacting(TableEntry *table_entry); - void SetCreatingIndex() { table_status_ = TxnStoreStatus::kCreatingIndex; } + void SetCreatingIndex(TableEntry *table_entry); private: - enum struct TxnStoreStatus { - kNone = 0, - kCreatingIndex, - kCompacting, - }; - TxnStoreStatus table_status_{TxnStoreStatus::kNone}; - // Txn store Txn *txn_{}; // TODO: remove this Catalog *catalog_{}; diff --git a/test/sql/ddl/alter/lock_table.slt b/test/sql/ddl/alter/lock_table.slt index f964b39af7..1aca10fdc5 100644 --- a/test/sql/ddl/alter/lock_table.slt +++ b/test/sql/ddl/alter/lock_table.slt @@ -42,11 +42,25 @@ UNLOCK TABLE products; statement ok INSERT INTO products VALUES (2, 2, 'abcdef'); +statement ok +DELETE FROM products WHERE product_no=1; + +statement ok +DELETE FROM products WHERE product_no=-1; + +statement ok +COMPACT TABLE products; + +statement ok +LOCK TABLE products; + query SELECT * FROM products; ---- -1 2 abcdef 2 2 abcdef statement ok -DROP TABLE products; \ No newline at end of file +UNLOCK TABLE products; + +statement ok +DROP TABLE products;