Skip to content

Concurrency

Paul Louth edited this page Jul 16, 2019 · 25 revisions

One aspect of using immutable data-types like Map, Seq, HashSet, etc. is that in most applications, at some point, you're likely to have some shared reference to one and need to mutate that shared reference. This often requires using synchronisation primitives like lock (which are not composable and are prone to error).

Atom

With a nod to the atom type in Clojure language-ext has two types:

  • Atom<A>
  • Atom<M, A>

These types both wrap a value of A and provides several variants of the method: Swap (and Prelude functions swap) for atomically mutating the wrapped value without locking.

var atom = Atom(Set("A", "B", "C"));

swap(atom, old => old.Add("D"));
swap(atom, old => old.Add("E"));
swap(atom, old => old.Add("F"));

Debug.Assert(atom == Set("A", "B", "C", "D", "E", "F"));

NOTE: The above can also be written:

atom.Swap(old => old.Add("D"));
atom.Swap(old => old.Add("E"));
atom.Swap(old => old.Add("F"));

Atomic update

One thing that must be noted is that if another thread mutates the atom whilst you're running Swap then your update will rollback. Swap will then re-run the provided lambda function with the newly updated value (from the other thread) so that it can get a valid updated value to apply. This means that you must be careful to not have side-effects in the Swap function, or at the very least it needs to be reliably repeatable.

Validation

The Atom constructor can take a Func<A, bool> validation lambda. This is run against the initial value and all subsequent values before being swapped. If the validation lambda returns false for the proposed value then false is returned from Swap and no update takes place.

Events

The Atom types both have a Change event:

    public event AtomChangedEvent<A> Change;

It is fired after each successful atomic update to the wrapped value. If you're using the LanguageExt.Rx extensions then you can also consume the atom.OnChange() observable.

Metadata

The Atom<M, A> type with an M generic argument, take an additional meta-data argument on construction which can be used to pass through an environment or some sort of context for the Swap functions:

var env = new Env();

var atom = Atom(env, Set("A", "B", "C"));
atom.Swap((nenv, old) => old.Add("D"));

Additional arguments

There are also other variants of Swap that can take up to two additional arguments and pass them through to the lambda:

var atom = Atom(Set(1, 2, 3));
atom.Swap(4, 5, (x, y, old) => old.Add(x).Add(y));

Debug.Assert(atom == Set(1, 2, 3, 4, 5));

Accessing the value

The wrapped value can be accessed by calling atom.Value or using the implicit operator conversion to A.

Ref - Atomic updates of multiple values

Atom is all well and good until you need to update multiple values atomically. This is an entirely different class of problem and can't rely on the built-in Interlocked atomic operations. And so language-ext provides another type called Ref<A>. Ref wraps a value of type A just like Atom<A>. But a Ref can only be updated within a transaction.

Internally language-ext uses a Software Transactional Memory (STM) with Multi-Version Concurrency Control (MVCC)

Let's start with the classic example of a bank-account where we need to atomically debit one account and credit another. Either both operations should succeed or none.

First the definition of the Account:

public class Account
{
    public readonly int Balance;

    Account(int balance) =>
        Balance = balance;

    public Account SetBalance(int value) =>
        new Account(value);

    public static Ref<Account> New(int balance) =>
        Ref(new Account(balance), Account.Validate);

    public static bool Validate(Account a) =>
        a.Balance >= 0;
}

Note how I've made the constructor private and then made a static method called New which forces the result to be a Ref. It also provides an optional validation function to the Ref. This will be run before any transaction is committed to ensure we have a valid state.

It isn't necessary to wrap up construction like this, but for something like a bank-account type it's clear we'd always want updates to be atomic, and so this is a really nice way of enforcing the rules of the type.

Now we can write a static class to do our balance transfer:

public static class Transfer
{
    public static Unit Do(Ref<Account> from, Ref<Account> to, int amount) =>
        sync(() =>
        {
            swap(from, a => a.SetBalance(a.Balance - amount));
            swap(to, a => a.SetBalance(a.Balance + amount));
        });
}

NOTE: swap here is a convenience function for updating the value in the Ref. We could do it manually by setting the Value property, but it's not quite as pretty:

from.Value = from.Value.SetBalance(from.Value.Balance - amount);

We can also use the Swap method on Ref:

from.Swap(a => a.SetBalance(a.Balance - amount));

The key here is the sync function. That wraps the transactional operation. When it starts it will take a snapshot of the Ref world. Then all operations are performed in isolation. On completion of the lambda the transaction system will commit the changes. However, if another transaction has run and modified the same values in the meantime then the whole process starts again and the lambda is re-run with the new state of the Ref world.

sync covers the ACI of ACID:

  • Atomic - means that every change to Refs made within a transaction occurs or none do
  • Consistent - means that each new value can be checked with a validator function before allowing the transaction to commit
  • Isolated - means that no transaction sees the effects of any other transaction while it is running.

There are two isolation modes, the default being Isolation.Snapshot

  • Isolation.Snapshot - This is the slightly weaker isolation mode. A snapshot transaction will only fail if there are write conflicts between two transactions. This is usually good enough, but can lead to the problem of:
    • Transaction 1: read ref A to get the value to write in ref B
    • Transaction 2: read ref B to get the value to write in ref A
    • These two will never have a write conflict, but the reads conflict with the writes
  • Isolation.Serialisable - This is the strictest isolation mode and solves the problem mentioned above. However, you should only use this if you have the problem above, as it's more likely to encounter conflicts and have to re-run the transaction.

This is a very good video on Multi-Version Concurrency Control and the two isolation modes listed above

Multiple atomic reads

Sometimes you're not looking to modify a value, but you want a consistent snapshot of multiple items. One thing to note is:

  • Ref reads never block other readers or writers
  • Ref writes never block other readers or writers

And so we can very quickly get a snapshot of a number of Ref values. If we have a couple of Ref<Account> values:

Ref<Account> accountA = Account.New(100);
Ref<Account> accountB = Account.New(400);

var (balanceA, balanceB) = sync(() => (accountA.Value, accountB.Value));

This will always give you a consistent read. You can read from the Value property outside of a sync, it will just return the latest version. But reading the Value properties from multiple Ref values outside of a sync could lead to an inconsistent view.

It is also possible to set the Value property with yourRef.Value = ..., but it must be done within a sync otherwise an exception will be thrown.

BIG BIG NOTE: The types that Ref wraps should be immutable. Mutable objects can be modified outside of the STM system, breaking this whole concept. If you do this, you're on your own!

commute- Relaxing consistency and side-effects

There are some concurrency use cases where you can afford to be a bit more relaxed to gain some performance. For example, imagine you were keeping a transaction log through the day. You might be relaxed about the order of transactions in the log if you knew that the final balance was always correct. Basically if you receive two deposits of £100 and £50 you won’t mind too much if they are logged as £100 then £50 or £50 then £100 because addition is commutative.

Let's add a Deposit method to our Account type:

public class Account
{
    public readonly int Balance;

    Account(int balance) =>
        Balance = balance;

    public Account SetBalance(int value) =>
        new Account(value);

    public Account Deposit(int value) =>
        new Account(Balance + value);

    public static Ref<Account> New(int balance) =>
        Ref(new Account(balance), Account.Validate);

    public static bool Validate(Account a) =>
        a.Balance >= 0;

    public override string ToString() =>
        Balance.ToString();
}

Now, we can write a simple function to deposit:

static void LogDeposit(Ref<Account> account, int amount) =>
    sync(() => commute(account, a => a.Deposit(amount)));

Then a little test to prove our assumptions:

var bank = Account.New(0);

LogDeposit(bank, 100);
LogDeposit(bank, 50);

Assert.True(bank.Value.Balance == 150);

var bank2 = Account.New(0);

LogDeposit(bank2, 50);
LogDeposit(bank2, 100);

Assert.True(bank2.Value.Balance == 150);

The key function is commute (or Commute if you're calling the method from a Ref<A>). It has the following properties:

  • Must be run from within a sync transaction
  • We provide it with a function to run against the Ref, also provided
  • The result will go into the transaction's state
  • When the transaction is committed: the Func is run again with the live Ref value, not the Ref value from the transaction snapshot/state.

The thing to note about commute is that when the function is called it sets the in-transaction value of the Ref, but the actual change is applied at commit time, by running the function passed to commute again with the latest Ref value. This means that the value you calculate in your transaction might not be the value that is finally committed to the Ref. This needs careful thought. You need to be sure that your processing doesn’t rely on seeing the latest value of the Ref.

Although the operation is atomic, it clearly has a weaker consistency story than swap. We need to make sure of the following for this to work out for us:

  • The bound Ref value is immutable
  • The operation performed on the Ref is commutative

If this isn't the case, then you essentially have a last writer wins situation and you're on your own.

Conclusion

I hope the Atom and Ref types finds some use, I know I've bumped up against this issue many times in the past and have either ended up manually building synchronisation primitives or fallen back to using the ugly ConcurrentDictionary or similar.

Concurrency source code