diff --git a/docs/tla-plus/ParallelCommits/ParallelCommits.tla b/docs/tla-plus/ParallelCommits/ParallelCommits.tla index a86406e167a8..d937f2631da6 100644 --- a/docs/tla-plus/ParallelCommits/ParallelCommits.tla +++ b/docs/tla-plus/ParallelCommits/ParallelCommits.tla @@ -1,62 +1,111 @@ -------------------------- MODULE ParallelCommits -------------------------- EXTENDS TLC, Integers, FiniteSets, Sequences -CONSTANTS KEYS, PREVENTERS +CONSTANTS KEYS, PREVENTERS, MAX_EPOCH ASSUME Cardinality(KEYS) > 0 -STATUSES == {"pending", "staging", "committed", "aborted"} +Max(a, b) == IF a > b THEN a ELSE b (*--algorithm parallelcommits variables - record = "pending"; - intent_writes = {}; - tscache = [k \in KEYS |-> FALSE]; + record = [status |-> "pending", epoch |-> 0]; + intent_writes = [k \in KEYS |-> 0]; + tscache = [k \in KEYS |-> 0]; + commit_ack = FALSE; define + RecordStatuses == {"pending", "staging", "committed", "aborted"} + RecordCommitted == record.status = "committed" + RecordAborted == record.status = "aborted" + RecordFinalized == RecordCommitted \/ RecordAborted + TypeInvariant == - /\ record \in STATUSES + /\ record \in [status: RecordStatuses, epoch: 0..MAX_EPOCH] + /\ DOMAIN intent_writes = KEYS + /\ \A k \in KEYS: intent_writes[k] \in 0..MAX_EPOCH + /\ DOMAIN tscache = KEYS + /\ \A k \in KEYS: tscache[k] \in 0..MAX_EPOCH + + TemporalTxnRecordProperties == + \* The txn record always ends with either a COMMITTED or ABORTED status. + /\ <>[]RecordFinalized + \* Once the txn record moves to a finalized status, it stays there. + /\ [](RecordCommitted => []RecordCommitted) + /\ [](RecordAborted => []RecordAborted) + \* The txn record's epoch must always grow. + /\ [][record'.epoch >= record.epoch]_record + + TemporalIntentProperties == + \* Intent writes' epochs must always grow. + /\ [][\A k \in KEYS: intent_writes'[k] >= intent_writes[k]]_intent_writes - TemporalProperties == - <>[](record \in {"aborted", "committed"}) + TemporalTSCacheProperties == + \* The timestamp cache always advances. + /\ [][\A k \in KEYS: tscache'[k] >= tscache[k]]_tscache + + \* If the client is acked, the record should eventually be committed. + AckLeadsToCommit == commit_ack ~> RecordCommitted end define; process committer = "committer" variables - to_write = KEYS; + txn_epoch = 0; + to_write = {}; have_staged_record = FALSE; begin - StagingWrites: - while to_write /= {} \/ ~have_staged_record do - either - await to_write /= {}; - with key \in to_write do - if tscache[key] then - \* Write prevented. - record := "aborted"; + ParallelCommitLoop: + \* Reset variables for each epoch. + txn_epoch := txn_epoch + 1; + to_write := KEYS; + have_staged_record := FALSE; + + \* Give up after MAX_EPOCH attempts. + if txn_epoch > MAX_EPOCH then + goto EndCommitter; + end if; + + TryStageWrites: + while to_write /= {} \/ ~have_staged_record do + either + await to_write /= {}; + with key \in to_write do + if tscache[key] >= txn_epoch then + \* Write prevented. Try again at new epoch. + \* TODO attempt refresh within current epoch. + goto ParallelCommitLoop; + else + \* Write successful. + intent_writes[key] := txn_epoch; + to_write := to_write \ {key}; + end if; + end with; + or + await ~have_staged_record; + have_staged_record := TRUE; + if record.status = "pending" then + \* Move to staging status. + record := [status |-> "staging", epoch |-> txn_epoch]; + elsif record.status = "staging" then + \* Bump record epoch. + assert record.epoch < txn_epoch; + record.epoch := txn_epoch; + elsif record.status = "aborted" then + \* Aborted before STAGING transaction record. goto EndCommitter; - else - \* Write successful. - intent_writes := intent_writes \union {key}; - to_write := to_write \ {key}; + elsif record.status = "committed" then + assert FALSE; end if; - end with; - or - await ~have_staged_record; - have_staged_record := TRUE; - if record = "pending" then - record := "staging"; - elsif record = "aborted" then - \* Aborted before STAGING transaction record. - goto EndCommitter; - else - assert FALSE; - end if; - end either - end while; + end either + end while; + + \* Ack the client now that all writes have succeeded. + AckClient: + commit_ack := TRUE; ExplicitCommit: - if record = "staging" then - record := "committed"; - elsif record = "committed" then + if record.status = "staging" then + \* Make implicit commit explicit. + record.status := "committed"; + elsif record.status = "committed" then \* Already committed by a recovery process. skip; else @@ -70,48 +119,77 @@ end process; fair process preventer \in PREVENTERS variable - checked_writes = {}; + prevent_epoch = 0; + found_writes = {}; begin - PushRecord: - if record = "pending" then - record := "aborted"; - goto EndRecover; - elsif record = "staging" then - skip; - elsif record = "committed" then - goto EndRecover; - elsif record = "aborted" then - goto EndRecover; - end if; + PreventLoop: + found_writes := {}; - PreventWrites: - while checked_writes /= KEYS do - with key \in KEYS \ checked_writes do - if key \in intent_writes then - checked_writes := checked_writes \union {key} - else - tscache[key] := TRUE; - goto RecoverRecord; - end if; - end with; - end while; - - RecoverRecord: - with prevented = checked_writes /= KEYS do - if record = "pending" then - assert FALSE; - elsif record = "staging" then + PushRecord: + if record.status = "pending" then + \* Transaction not yet staged, abort. + record.status := "aborted"; + goto EndRecover; + elsif record.status = "staging" then + \* Transaction staging, kick off recovery process. + prevent_epoch := record.epoch; + elsif record.status = "committed" then + goto EndRecover; + elsif record.status = "aborted" then + goto EndRecover; + end if; + + PreventWrites: + while found_writes /= KEYS do + with key \in KEYS \ found_writes do + with intent_epoch = intent_writes[key] do + if intent_epoch = prevent_epoch then + \* Intent found. Did not prevent. + found_writes := found_writes \union {key} + else + \* Intent missing. Prevent. + tscache[key] := Max(tscache[key], prevent_epoch); + goto RecoverRecord; + end if; + end with; + end with; + end while; + + RecoverRecord: + with prevented = found_writes /= KEYS do if prevented then - record := "aborted"; + if record.status = "aborted" then + \* Already aborted, nothing to do. + skip; + elsif record.status = "pending" then + assert FALSE; + elsif record.status = "committed" then + \* This must have been at a later epoch. + assert record.epoch > prevent_epoch; + elsif record.status = "staging" then + if record.epoch > prevent_epoch then + \* Try to prevent at higher epoch. + goto PreventLoop; + else + \* Can abort as result of recovery. + record.status := "aborted"; + end if; + end if; else - record := "committed"; + \* The transaction was implicitly committed. + if record.status = "aborted" then + assert FALSE; + elsif record.status = "pending" then + assert FALSE; + elsif record.status = "committed" then + assert record.epoch = prevent_epoch; + elsif record.status = "staging" then + assert record.epoch = prevent_epoch; + \* Can commit as result of recovery. + record.status := "committed"; + end if; end if; - elsif record = "committed" then - assert ~prevented; - elsif record = "aborted" then - assert prevented; - end if; - end with; + end with; EndRecover: skip; @@ -119,146 +197,242 @@ begin end process; end algorithm;*) \* BEGIN TRANSLATION -VARIABLES record, intent_writes, tscache, pc +VARIABLES record, intent_writes, tscache, commit_ack, pc (* define statement *) +RecordStatuses == {"pending", "staging", "committed", "aborted"} +RecordCommitted == record.status = "committed" +RecordAborted == record.status = "aborted" +RecordFinalized == RecordCommitted \/ RecordAborted + TypeInvariant == - /\ record \in STATUSES + /\ record \in [status: RecordStatuses, epoch: 0..MAX_EPOCH] + /\ DOMAIN intent_writes = KEYS + /\ \A k \in KEYS: intent_writes[k] \in 0..MAX_EPOCH + /\ DOMAIN tscache = KEYS + /\ \A k \in KEYS: tscache[k] \in 0..MAX_EPOCH + +TemporalTxnRecordProperties == + + /\ <>[]RecordFinalized + + /\ [](RecordCommitted => []RecordCommitted) + /\ [](RecordAborted => []RecordAborted) + + /\ [][record'.epoch >= record.epoch]_record + +TemporalIntentProperties == + + /\ [][\A k \in KEYS: intent_writes'[k] >= intent_writes[k]]_intent_writes -TemporalProperties == - <>[](record \in {"aborted", "committed"}) +TemporalTSCacheProperties == -VARIABLES to_write, have_staged_record, checked_writes + /\ [][\A k \in KEYS: tscache'[k] >= tscache[k]]_tscache -vars == << record, intent_writes, tscache, pc, to_write, have_staged_record, - checked_writes >> + +AckLeadsToCommit == commit_ack ~> RecordCommitted + +VARIABLES txn_epoch, to_write, have_staged_record, prevent_epoch, + found_writes + +vars == << record, intent_writes, tscache, commit_ack, pc, txn_epoch, + to_write, have_staged_record, prevent_epoch, found_writes >> ProcSet == {"committer"} \cup (PREVENTERS) Init == (* Global variables *) - /\ record = "pending" - /\ intent_writes = {} - /\ tscache = [k \in KEYS |-> FALSE] + /\ record = [status |-> "pending", epoch |-> 0] + /\ intent_writes = [k \in KEYS |-> 0] + /\ tscache = [k \in KEYS |-> 0] + /\ commit_ack = FALSE (* Process committer *) - /\ to_write = KEYS + /\ txn_epoch = 0 + /\ to_write = {} /\ have_staged_record = FALSE (* Process preventer *) - /\ checked_writes = [self \in PREVENTERS |-> {}] - /\ pc = [self \in ProcSet |-> CASE self = "committer" -> "StagingWrites" - [] self \in PREVENTERS -> "PushRecord"] - -StagingWrites == /\ pc["committer"] = "StagingWrites" - /\ IF to_write /= {} \/ ~have_staged_record - THEN /\ \/ /\ to_write /= {} - /\ \E key \in to_write: - IF tscache[key] - THEN /\ record' = "aborted" - /\ pc' = [pc EXCEPT !["committer"] = "EndCommitter"] - /\ UNCHANGED << intent_writes, - to_write >> - ELSE /\ intent_writes' = (intent_writes \union {key}) - /\ to_write' = to_write \ {key} - /\ pc' = [pc EXCEPT !["committer"] = "StagingWrites"] - /\ UNCHANGED record - /\ UNCHANGED have_staged_record - \/ /\ ~have_staged_record - /\ have_staged_record' = TRUE - /\ IF record = "pending" - THEN /\ record' = "staging" - /\ pc' = [pc EXCEPT !["committer"] = "StagingWrites"] - ELSE /\ IF record = "aborted" - THEN /\ pc' = [pc EXCEPT !["committer"] = "EndCommitter"] - ELSE /\ Assert(FALSE, - "Failure of assertion at line 51, column 11.") - /\ pc' = [pc EXCEPT !["committer"] = "StagingWrites"] - /\ UNCHANGED record - /\ UNCHANGED <> - ELSE /\ pc' = [pc EXCEPT !["committer"] = "ExplicitCommit"] - /\ UNCHANGED << record, intent_writes, to_write, - have_staged_record >> - /\ UNCHANGED << tscache, checked_writes >> + /\ prevent_epoch = [self \in PREVENTERS |-> 0] + /\ found_writes = [self \in PREVENTERS |-> {}] + /\ pc = [self \in ProcSet |-> CASE self = "committer" -> "ParallelCommitLoop" + [] self \in PREVENTERS -> "PreventLoop"] + +ParallelCommitLoop == /\ pc["committer"] = "ParallelCommitLoop" + /\ txn_epoch' = txn_epoch + 1 + /\ to_write' = KEYS + /\ have_staged_record' = FALSE + /\ IF txn_epoch' > MAX_EPOCH + THEN /\ pc' = [pc EXCEPT !["committer"] = "EndCommitter"] + ELSE /\ pc' = [pc EXCEPT !["committer"] = "TryStageWrites"] + /\ UNCHANGED << record, intent_writes, tscache, + commit_ack, prevent_epoch, found_writes >> + +TryStageWrites == /\ pc["committer"] = "TryStageWrites" + /\ IF to_write /= {} \/ ~have_staged_record + THEN /\ \/ /\ to_write /= {} + /\ \E key \in to_write: + IF tscache[key] >= txn_epoch + THEN /\ pc' = [pc EXCEPT !["committer"] = "ParallelCommitLoop"] + /\ UNCHANGED << intent_writes, + to_write >> + ELSE /\ intent_writes' = [intent_writes EXCEPT ![key] = txn_epoch] + /\ to_write' = to_write \ {key} + /\ pc' = [pc EXCEPT !["committer"] = "TryStageWrites"] + /\ UNCHANGED <> + \/ /\ ~have_staged_record + /\ have_staged_record' = TRUE + /\ IF record.status = "pending" + THEN /\ record' = [status |-> "staging", epoch |-> txn_epoch] + /\ pc' = [pc EXCEPT !["committer"] = "TryStageWrites"] + ELSE /\ IF record.status = "staging" + THEN /\ Assert(record.epoch < txn_epoch, + "Failure of assertion at line 89, column 13.") + /\ record' = [record EXCEPT !.epoch = txn_epoch] + /\ pc' = [pc EXCEPT !["committer"] = "TryStageWrites"] + ELSE /\ IF record.status = "aborted" + THEN /\ pc' = [pc EXCEPT !["committer"] = "EndCommitter"] + ELSE /\ IF record.status = "committed" + THEN /\ Assert(FALSE, + "Failure of assertion at line 95, column 13.") + ELSE /\ TRUE + /\ pc' = [pc EXCEPT !["committer"] = "TryStageWrites"] + /\ UNCHANGED record + /\ UNCHANGED <> + ELSE /\ pc' = [pc EXCEPT !["committer"] = "AckClient"] + /\ UNCHANGED << record, intent_writes, to_write, + have_staged_record >> + /\ UNCHANGED << tscache, commit_ack, txn_epoch, + prevent_epoch, found_writes >> + +AckClient == /\ pc["committer"] = "AckClient" + /\ commit_ack' = TRUE + /\ pc' = [pc EXCEPT !["committer"] = "ExplicitCommit"] + /\ UNCHANGED << record, intent_writes, tscache, txn_epoch, + to_write, have_staged_record, prevent_epoch, + found_writes >> ExplicitCommit == /\ pc["committer"] = "ExplicitCommit" - /\ IF record = "staging" - THEN /\ record' = "committed" - ELSE /\ IF record = "committed" + /\ IF record.status = "staging" + THEN /\ record' = [record EXCEPT !.status = "committed"] + ELSE /\ IF record.status = "committed" THEN /\ TRUE ELSE /\ Assert(FALSE, - "Failure of assertion at line 63, column 7.") + "Failure of assertion at line 112, column 7.") /\ UNCHANGED record /\ pc' = [pc EXCEPT !["committer"] = "EndCommitter"] - /\ UNCHANGED << intent_writes, tscache, to_write, - have_staged_record, checked_writes >> + /\ UNCHANGED << intent_writes, tscache, commit_ack, + txn_epoch, to_write, have_staged_record, + prevent_epoch, found_writes >> EndCommitter == /\ pc["committer"] = "EndCommitter" /\ TRUE /\ pc' = [pc EXCEPT !["committer"] = "Done"] - /\ UNCHANGED << record, intent_writes, tscache, to_write, - have_staged_record, checked_writes >> + /\ UNCHANGED << record, intent_writes, tscache, commit_ack, + txn_epoch, to_write, have_staged_record, + prevent_epoch, found_writes >> -committer == StagingWrites \/ ExplicitCommit \/ EndCommitter +committer == ParallelCommitLoop \/ TryStageWrites \/ AckClient + \/ ExplicitCommit \/ EndCommitter + +PreventLoop(self) == /\ pc[self] = "PreventLoop" + /\ found_writes' = [found_writes EXCEPT ![self] = {}] + /\ pc' = [pc EXCEPT ![self] = "PushRecord"] + /\ UNCHANGED << record, intent_writes, tscache, + commit_ack, txn_epoch, to_write, + have_staged_record, prevent_epoch >> PushRecord(self) == /\ pc[self] = "PushRecord" - /\ IF record = "pending" - THEN /\ record' = "aborted" + /\ IF record.status = "pending" + THEN /\ record' = [record EXCEPT !.status = "aborted"] /\ pc' = [pc EXCEPT ![self] = "EndRecover"] - ELSE /\ IF record = "staging" - THEN /\ TRUE + /\ UNCHANGED prevent_epoch + ELSE /\ IF record.status = "staging" + THEN /\ prevent_epoch' = [prevent_epoch EXCEPT ![self] = record.epoch] /\ pc' = [pc EXCEPT ![self] = "PreventWrites"] - ELSE /\ IF record = "committed" + ELSE /\ IF record.status = "committed" THEN /\ pc' = [pc EXCEPT ![self] = "EndRecover"] - ELSE /\ IF record = "aborted" + ELSE /\ IF record.status = "aborted" THEN /\ pc' = [pc EXCEPT ![self] = "EndRecover"] ELSE /\ pc' = [pc EXCEPT ![self] = "PreventWrites"] + /\ UNCHANGED prevent_epoch /\ UNCHANGED record - /\ UNCHANGED << intent_writes, tscache, to_write, - have_staged_record, checked_writes >> + /\ UNCHANGED << intent_writes, tscache, commit_ack, + txn_epoch, to_write, have_staged_record, + found_writes >> PreventWrites(self) == /\ pc[self] = "PreventWrites" - /\ IF checked_writes[self] /= KEYS - THEN /\ \E key \in KEYS \ checked_writes[self]: - IF key \in intent_writes - THEN /\ checked_writes' = [checked_writes EXCEPT ![self] = checked_writes[self] \union {key}] - /\ pc' = [pc EXCEPT ![self] = "PreventWrites"] - /\ UNCHANGED tscache - ELSE /\ tscache' = [tscache EXCEPT ![key] = TRUE] - /\ pc' = [pc EXCEPT ![self] = "RecoverRecord"] - /\ UNCHANGED checked_writes + /\ IF found_writes[self] /= KEYS + THEN /\ \E key \in KEYS \ found_writes[self]: + LET intent_epoch == intent_writes[key] IN + IF intent_epoch = prevent_epoch[self] + THEN /\ found_writes' = [found_writes EXCEPT ![self] = found_writes[self] \union {key}] + /\ pc' = [pc EXCEPT ![self] = "PreventWrites"] + /\ UNCHANGED tscache + ELSE /\ tscache' = [tscache EXCEPT ![key] = Max(tscache[key], prevent_epoch[self])] + /\ pc' = [pc EXCEPT ![self] = "RecoverRecord"] + /\ UNCHANGED found_writes ELSE /\ pc' = [pc EXCEPT ![self] = "RecoverRecord"] - /\ UNCHANGED << tscache, checked_writes >> - /\ UNCHANGED << record, intent_writes, to_write, - have_staged_record >> + /\ UNCHANGED << tscache, found_writes >> + /\ UNCHANGED << record, intent_writes, commit_ack, + txn_epoch, to_write, have_staged_record, + prevent_epoch >> RecoverRecord(self) == /\ pc[self] = "RecoverRecord" - /\ LET prevented == checked_writes[self] /= KEYS IN - IF record = "pending" - THEN /\ Assert(FALSE, - "Failure of assertion at line 102, column 9.") - /\ UNCHANGED record - ELSE /\ IF record = "staging" - THEN /\ IF prevented - THEN /\ record' = "aborted" - ELSE /\ record' = "committed" - ELSE /\ IF record = "committed" - THEN /\ Assert(~prevented, - "Failure of assertion at line 110, column 9.") - ELSE /\ IF record = "aborted" - THEN /\ Assert(prevented, - "Failure of assertion at line 112, column 9.") - ELSE /\ TRUE + /\ LET prevented == found_writes[self] /= KEYS IN + IF prevented + THEN /\ IF record.status = "aborted" + THEN /\ TRUE + /\ pc' = [pc EXCEPT ![self] = "EndRecover"] + /\ UNCHANGED record + ELSE /\ IF record.status = "pending" + THEN /\ Assert(FALSE, + "Failure of assertion at line 165, column 13.") + /\ pc' = [pc EXCEPT ![self] = "EndRecover"] + /\ UNCHANGED record + ELSE /\ IF record.status = "committed" + THEN /\ Assert(record.epoch > prevent_epoch[self], + "Failure of assertion at line 168, column 13.") + /\ pc' = [pc EXCEPT ![self] = "EndRecover"] + /\ UNCHANGED record + ELSE /\ IF record.status = "staging" + THEN /\ IF record.epoch > prevent_epoch[self] + THEN /\ pc' = [pc EXCEPT ![self] = "PreventLoop"] + /\ UNCHANGED record + ELSE /\ record' = [record EXCEPT !.status = "aborted"] + /\ pc' = [pc EXCEPT ![self] = "EndRecover"] + ELSE /\ pc' = [pc EXCEPT ![self] = "EndRecover"] + /\ UNCHANGED record + ELSE /\ IF record.status = "aborted" + THEN /\ Assert(FALSE, + "Failure of assertion at line 181, column 13.") /\ UNCHANGED record - /\ pc' = [pc EXCEPT ![self] = "EndRecover"] - /\ UNCHANGED << intent_writes, tscache, to_write, - have_staged_record, checked_writes >> + ELSE /\ IF record.status = "pending" + THEN /\ Assert(FALSE, + "Failure of assertion at line 183, column 13.") + /\ UNCHANGED record + ELSE /\ IF record.status = "committed" + THEN /\ Assert(record.epoch = prevent_epoch[self], + "Failure of assertion at line 185, column 13.") + /\ UNCHANGED record + ELSE /\ IF record.status = "staging" + THEN /\ Assert(record.epoch = prevent_epoch[self], + "Failure of assertion at line 187, column 13.") + /\ record' = [record EXCEPT !.status = "committed"] + ELSE /\ TRUE + /\ UNCHANGED record + /\ pc' = [pc EXCEPT ![self] = "EndRecover"] + /\ UNCHANGED << intent_writes, tscache, commit_ack, + txn_epoch, to_write, have_staged_record, + prevent_epoch, found_writes >> EndRecover(self) == /\ pc[self] = "EndRecover" /\ TRUE /\ pc' = [pc EXCEPT ![self] = "Done"] - /\ UNCHANGED << record, intent_writes, tscache, to_write, - have_staged_record, checked_writes >> + /\ UNCHANGED << record, intent_writes, tscache, commit_ack, + txn_epoch, to_write, have_staged_record, + prevent_epoch, found_writes >> -preventer(self) == PushRecord(self) \/ PreventWrites(self) - \/ RecoverRecord(self) \/ EndRecover(self) +preventer(self) == PreventLoop(self) \/ PushRecord(self) + \/ PreventWrites(self) \/ RecoverRecord(self) + \/ EndRecover(self) Next == committer \/ (\E self \in PREVENTERS: preventer(self)) @@ -276,5 +450,5 @@ Termination == <>(\A self \in ProcSet: pc[self] = "Done") ============================================================================= \* Modification History -\* Last modified Tue May 14 18:50:26 EDT 2019 by nathan +\* Last modified Wed May 15 17:47:28 EDT 2019 by nathan \* Created Mon May 13 10:03:40 EDT 2019 by nathan diff --git a/docs/tla-plus/ParallelCommits/ParallelCommits.toolbox/ParallelCommits___Model_1.launch b/docs/tla-plus/ParallelCommits/ParallelCommits.toolbox/ParallelCommits___Model_1.launch index e8720a106123..e8ef823c943e 100644 --- a/docs/tla-plus/ParallelCommits/ParallelCommits.toolbox/ParallelCommits___Model_1.launch +++ b/docs/tla-plus/ParallelCommits/ParallelCommits.toolbox/ParallelCommits___Model_1.launch @@ -12,27 +12,31 @@ - + - + - + + + + - + +