Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing custom allocators for CpuStorage #419

Open
cmacmackin opened this issue Feb 28, 2020 · 4 comments
Open

Allowing custom allocators for CpuStorage #419

cmacmackin opened this issue Feb 28, 2020 · 4 comments

Comments

@cmacmackin
Copy link

This isn't so much a feature request, more an attempt to gauge the level of interest in working on something like this (plus an offer to help with it).

I am currently thinking through how to implement something like Fortran Coarrays for parallel programming in Nim.* Ideally these could build on top of Arraymancer tensors (although they'd be in an independent library). However, in order to achieve the "Parallel Global Address Space" memory model, I would need to be able to allocate tensor memory using specialised routines provided by low-level communication libraries written in C.

So my question is whether there would be interest in providing some means for the user to specify a custom memory allocator for CpuStorage. I haven't completely thought through how to achieve this yet. Perhaps Tensor could be made generic on the storage type as well as the element type, although that might look rather messy for those just wanting to use the default allocator.

This could have uses well beyond a coarray library. It would allow experimentation with different types of allocators (as has been suggested in #112) and could allow the user to choose the one most appropriate for their task.

*While libraries already exist for multithreading (shared memory parllelism), I don't know of any which provide for distributed memory parallelism. Shared memory can currently only do of order 100 processors, but some tasks require thousands of cores and must therefore use distributed memory parallel technologies. Of these, the most widely used is MPI, but this is a difficult programming model to work with. Parallel Global Address Space models, such as Fortran's coarrays are conceptually and syntactically much simpler.

@mratsim
Copy link
Owner

mratsim commented Mar 1, 2020

External allocators

Instead of allowing external allocators, my plan is to have tensors just be a ptr + length view (or openarray if we have openarrays as values nim-lang/RFCs#178).

The idea is that in many cases, users will have buffers in vector, matrix or even nd-array format and we would be able to zero-copy between Arraymancer and Numpy, PyTorch, Tensorflow, ...

This would also enable custom allocators.

The work is actually done but was blocked by a showstopper but 2 months ago (which has been resolved since then):

type
RawImmutableView*[T] = distinct ptr UncheckedArray[T]
RawMutableView*[T] = distinct ptr UncheckedArray[T]
Metadata* = DynamicStackArray[int]
Tensor*[T] = object # Total stack: 128 bytes = 2 cache-lines
shape*: Metadata # 56 bytes
strides*: Metadata # 56 bytes
offset*: int # 8 bytes
storage*: CpuStorage[T] # 8 bytes
CpuStorage*{.shallow.}[T] = ref object # Total heap: 25 bytes = 1 cache-line
when supportsCopyMem(T):
raw_buffer*: ptr UncheckedArray[T] # 8 bytes
memalloc*: pointer # 8 bytes
isMemOwner*: bool # 1 byte
else: # Tensors of strings, other ref types or non-trivial destructors
raw_buffer*: seq[T] # 8 bytes (16 for seq v2 backed by destructors?)

And an example allocator:

proc finalizer[T](storage: CpuStorage[T]) =
static: assert T.supportsCopyMem, "Tensors of seq, strings, ref types and types with non-trivial destructors cannot be finalized by this proc"
if storage.isMemOwner and not storage.memalloc.isNil:
storage.memalloc.deallocShared()
proc allocCpuStorage*[T](storage: var CpuStorage[T], size: int) =
## Allocate aligned memory to hold `size` elements of type T.
## If T does not supports copyMem, it is also zero-initialized.
## I.e. Tensors of seq, strings, ref types or types with non-trivial destructors
## are always zero-initialized. This prevents potential GC issues.
when T.supportsCopyMem:
new(storage, finalizer[T])
{.noSideEffect.}:
storage.memalloc = allocShared(sizeof(T) * size + LASER_MEM_ALIGN - 1)
storage.isMemOwner = true
storage.raw_buffer = align_raw_data(T, storage.memalloc)
else: # Always 0-initialize Tensors of seq, strings, ref types and types with non-trivial destructors
new(storage)
newSeq[T](storage.raw_buffer, size)

Coarrays / PGAS / Distributed computing

I'm not familiar with Fortran coarrays. I did find a couple of libraries with PGAS while implementing Weave (https://github.com/mratsim/weave) in particular:

Unfortunately I don't have access to a cluster but Weave was built from the ground up with the architecture to support distributed computing. It's using channels and message-passing even at the single node level (and it's faster than OpenMP / Intel TBB).
I.e. adding distributed computing capabilities to Weave should be much easier than in any other framework, you could for example using MPI channels see mratsim/weave#73

@cmacmackin
Copy link
Author

Your approach to external allocators looks like exactly what I need. However, how would a user supply a custom allocator when constructing an array?

I've taken a look at weave and what you have done is very impressive (although some of the theoretical background goes over my head). However, if I understand correctly, currently data parallelism is achieved by sending pointers to the captured variables to each thread. Clearly this would not work for distributed memory, as that location in memory on the remote processor would not contain the same data. Instead it would be necessary to actually perform copy-by-values. For large tasks this almost certainly would not be efficient; most of the memory could be kept permanently on the remote processor, with only small amounts needing to be communicated.

As an example, consider the heat-diffusion problem you use for benchmarking. You create a separate task for each row in the grid of data. This works just fine with shared memory, as threads can always read the heat values from adjacent rows when evolving the system forward in time. With distributed memory, it becomes necessary to perform a (potentially expensive) remote communication when reading from adjacent rows located on separate processors. Creating a very large number of tasks would thus be unsuitable (unless some very clever macro-magic can be used to optimise grouping of tasks/data). Coppying the whole grid to each task/processor would also likely be too expensive.

The conventional way to solve this with MPI is to store a subsection of the grid on each processor and to communicate only the shared boundaries to adjacent processors. This requires one processor to send data and another to receive it, with care taken to avoid deadlocks or inefficient use of blocking communication.

Coarrays (and other forms of PGAS) simplify things by making the communication "one-sided". Either a processor "gets" some data from a remote one, or "puts" some data on it, without having to worry about the other processor making a corresponding call. Various types of barriers can be used to ensure processors remain sufficiently in sync. An example of coarrays in use can be found here:
https://github.com/sourceryinstitute/Scientific-Software-Design/blob/master/ssdSource/chapter12/burgers_caf_v5/periodic_2nd_order.F90#L180
Some peculiarities of Fortran to be aware of:

  • component access and calls to methods are indicated by a percent sign instead of the usual dot
  • both procedure calls and array indexing uses parentheses
  • coarray indexing (i.e., references to data on another processor) use brackets

As far as I could tell, none of the libraries you referenced use a PGAS model, although HPX uses something they call an "Active Global Address Space", the details of which I confess I'm a bit hazy on. It appears that the libraries handles a lot of the decisions about which processors on which to store data, what to communicate, what to mirror, etc.

In any case, the framework I was planning to use is called OpenSHMEM (OpenMPI provides a free implementation). This is one of the backends which has been used to implement coarrays in Fortran and its functionality closely mirrors what I'd like to implement. I don't think I'll have time tonight, but I'll try to provide an example of what your heat diffusion benchmark in weave would look like if written to use coarrays as I would like to implement them in Nim.

@mratsim
Copy link
Owner

mratsim commented Mar 4, 2020

Allocator interface

Your approach to external allocators looks like exactly what I need. However, how would a user supply a custom allocator when constructing an array?

Ideally I would reuse what @Araq had in mind for custom allocators for sequences in Nim but AFAIK that has been scraped so I have to think of something.

Currently (assuming the branch is merged) you would allocate a buffer and then myTensor.storage.raw_buffer = myCustomAllocatedBuffer

I have to think on what kind of interface I want to offer for custom allocators for Arraymancer routines. Passing an allocator or a context to each proc would be very tedious and/or strange (reshaping might allocate for example) but using a global might cause issue when we want to mix local tensors and distributed one.

If you have suggestion of libraries that have good mixed local + distributed APIs I'm interested otherwise I will look into Tensorflow (which AFAIK has 3+ ways of doing the same thing ....) and PyTorch.

Weave - distributed sync

I've taken a look at weave and what you have done is very impressive (although some of the theoretical background goes over my head). However, if I understand correctly, currently data parallelism is achieved by sending pointers to the captured variables to each thread. Clearly this would not work for distributed memory, as that location in memory on the remote processor would not contain the same data. Instead it would be necessary to actually perform copy-by-values. For large tasks this almost certainly would not be efficient; most of the memory could be kept permanently on the remote processor, with only small amounts needing to be communicated.

The distributed communication library would have to handle copies (could be an internal one to Weave or MPI). Reduction operations which require synchronization for in-place updates would also need a specific handling, I'm not sure how MPI deals with them. Note that Weave has an experimental generic reduction scheme that guarantees ordering and avoids requiring locks and/or atomics and so can serve as a good basis for distributed reduction: https://github.com/mratsim/weave/blob/052ae40a/weave/parallel_reduce.nim#L240-L260

  block:
    proc sumReduce(n: int): int =
      var waitableSum: Flowvar[int]

      # expandMacros:
      parallelReduceImpl i in 0 .. n, stride = 1:
        reduce(waitableSum):
          prologue:
            var localSum = 0
          fold:
            localSum += i
          merge(remoteSum):
            localSum += sync(remoteSum)
          return localSum

      result = sync(waitableSum)

    init(Weave)
    let sum1M = sumReduce(1000000)
    echo "Sum reduce(0..1000000): ", sum1M
    doAssert sum1M == 500_000_500_000
    exit(Weave)

To be clear, Weave is a scheduler. You give it tasks and constraints implicit (this task will spawn other tasks) or explicit (pledges/data dependencies, more below) and it will schedule them on the hardware to take the best advantage of the parallelism available and maximize your throughput. The core is built agnostic to data synchronization scheme it just requires that there are channels to send tasks and steal requests between workers and that when a task is scheduled the data required is present (but movement of data can be triggered by sending a task to a thief).

Weave: as a base to build distributed parallelism models

Programming models like Legion dependent partitioning or DACE that makes optimizing memory-access on distributed and/or heterogeneous architecture a first-class concern could generate the algorithm and copy schedule (what to copy, where and what are the dependencies before triggering a dependent task) for Weave to execute.
Weave can expose distributed data dependencies primitives (a generalization of the Pledge: https://github.com/mratsim/weave/blob/052ae40a/weave/parallel_tasks.nim#L246-L308)

  block: # Delayed computation

    proc echoA(pA: Pledge) =
      echo "Display A, sleep 1s, create parallel streams 1 and 2"
      sleep(1000)
      pA.fulfill()

    proc echoB1(pB1: Pledge) =
      echo "Display B1, sleep 1s"
      sleep(1000)
      pB1.fulfill()

    proc echoB2() =
      echo "Display B2, exit stream"

    proc echoC1() =
      echo "Display C1, exit stream"

    proc main() =
      echo "Sanity check 3: Dataflow parallelism"
      init(Weave)
      let pA = newPledge()
      let pB1 = newPledge()
      spawnDelayed pB1, echoC1()
      spawnDelayed pA, echoB2()
      spawnDelayed pA, echoB1(pB1)
      spawn echoA(pA)
      exit(Weave)

    main()

  block: # Delayed computation with multiple dependencies

    proc echoA(pA: Pledge) =
      echo "Display A, sleep 1s, create parallel streams 1 and 2"
      sleep(1000)
      pA.fulfill()

    proc echoB1(pB1: Pledge) =
      echo "Display B1, sleep 1s"
      sleep(1000)
      pB1.fulfill()

    proc echoB2(pB2: Pledge) =
      echo "Display B2, no sleep"
      pB2.fulfill()

    proc echoC12() =
      echo "Display C12, exit stream"

    proc main() =
      echo "Sanity check 4: Dataflow parallelism with multiple dependencies"
      init(Weave)
      let pA = newPledge()
      let pB1 = newPledge()
      let pB2 = newPledge()
      spawnDelayed pB1, pB2, echoC12()
      spawnDelayed pA, echoB2(pB2)
      spawnDelayed pA, echoB1(pB1)
      spawn echoA(pA)
      exit(Weave)

    main()

So that the distributed computation is only triggered when data is ready and can notify when they are done with their computation.
This pledge scheme also works for fine-grained control of ready array ranges so you can trigger fine-grained work at the array cell level (https://github.com/mratsim/weave/blob/052ae40a/weave/parallel_for.nim#L408-L437), it's a bit expensive to track large arrays of size over a billion though.

Heat benchmark

Regarding the heat benchmark it's the well known parallel heat benchmark from Cilk (1996) so it probably already exist in Fortran somewhere.

Conclusion

If you have primitives that:

  • can abstract away the distributed memory allocation, addressing and copies so that it works like shared-memory arrays
  • have a distributed channel
    I should be able to add those to Weave to have a very nice mixed shared-memory and distributed parallelism model.

That said that still doesn't solve Arraymancer allocator API woes ;).

@cmacmackin
Copy link
Author

Custom allocators for seqs (and strings, for that matter) would certainly be appreciated as it would allow coarrays to work with these builtin types. Barring that, would it be possible to have tensors/storage retain procedure pointers to the allocators/deallocators which were passed at creation (with some sort of standard default value). Whenever a new allocation/deallocation needs to be done in another routine, Arraymancer could just use the routines associated with the first tensor argument in that procedure. The same principle could apply when creating a tensor result.

So, something like the following:

import math, arraymancer

const
    x = @[1, 2, 3, 4, 5]
    y = @[1, 2, 3, 4, 5]

var
    vandermonde: seq[seq[int]]
    row: seq[int]

vandermonde = newSeq[seq[int]]()

for i, xx in x:
    row = newSeq[int]()
    vandermonde.add(row)
    for j, yy in y:
        vandermonde[i].add(xx^yy)

# possibly optionally include a reallocator?
let foo = vandermonde.toTensor(myCustomAllocator, myCustomDeallocator)
# bar would use whatever the default allocator is (e.g., aligned memory)
let bar = vandermonde[1..3].toTensor()
# baz would use the allocators of the first argument (e.g., myCustomAllocator, etc.)
let baz = foo[3..5, _] + bar

A slight variation on this would be to define an object type which contains pointers to these procedures and to pass the object instead. This would reduce the number of arguments and ensure that consistent allocators/deallocators are always provided.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants