Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding an mapply-like blockApply() function to the API #11

Open
PeteHaitch opened this issue Mar 12, 2018 · 2 comments
Open

Adding an mapply-like blockApply() function to the API #11

PeteHaitch opened this issue Mar 12, 2018 · 2 comments
Assignees

Comments

@PeteHaitch
Copy link
Collaborator

Hi Hervé,

In a recent email you wrote:

At some point the old block-processing functions (i.e. block_APPLY(), block_MAPPLY(), and
block_APPLY_and_COMBINE()) will go away.

I'd like to request that a mapply-like blockApply() function is part of the exported API. In minfi, we need to iterate over multiple (conformable) array-like objects in parallel for several of the preprocessing routines (e.g., converting the red & green channels from the array to methylated/unmethylated calls).

The return value is a large matrix-like object, so it'd be great if this mapply-like blockApply() also supported the sink or BACKEND argument (as in #10)

@PeteHaitch
Copy link
Collaborator Author

PeteHaitch commented Mar 12, 2018

FYI, this is what I'm currently using, but I'd prefer to retire this for an 'official' solution.

# NOTE: A mapply()-like function for conformable arrays.
# NOTE: Different from DelayedArray:::block_Mapply(); designed to have an API more like
#       DelayedArray::blockArray()
blockMapply <- function(FUN, ..., MoreArgs = NULL, grids = NULL,
                        BPREDO = list(), BPPARAM = bpparam()) {
    FUN <- match.fun(FUN)
    dots <- unname(list(...))
    # Check conformable grids
    if (is.null(grids)) {
        grids <- replicate(length(dots), NULL)
    }
    grids <- mapply(
        FUN = DelayedArray:::.normarg_grid,
        grids,
        dots,
        SIMPLIFY = FALSE,
        USE.NAMES = FALSE)
    grids_dims <- lapply(grids, dim)
    all_same_grids_dims <- all(
        vapply(X = grids_dims,
               FUN = function(dim) all(dim == grids_dims[[1L]]),
               FUN.VALUE = logical(1L)))
    if (!all_same_grids_dims) {
        stop("non-conformable grids")
    }
    stopifnot(length(dots) == length(grids))
    nblock <- length(grids[[1]])
    bplapply(seq_len(nblock), function(b) {
        if (DelayedArray:::get_verbose_block_processing()) {
            message("Processing block ", b, "/", nblock, " ... ",
                    appendLF = FALSE)
        }
        viewports <- lapply(grids, function(grid) grid[[b]])
        blocks <- mapply(
            FUN = function(x, grid, viewport) {
                block <- DelayedArray:::extract_block(x, viewport)
                if (!is.array(block)) {
                    block <- DelayedArray:::.as_array_or_matrix(block)
                }
                attr(block, "from_grid") <- grid
                attr(block, "block_id") <- b
                block
            },
            x = dots,
            grid = grids,
            viewport = viewports,
            SIMPLIFY = FALSE,
            USE.NAMES = FALSE)
        block_ans <- do.call(FUN, c(blocks, MoreArgs))
        if (DelayedArray:::get_verbose_block_processing()) {
            message("OK")
        }
        block_ans
    },
    BPREDO = BPREDO,
    BPPARAM = BPPARAM)
}

@PeteHaitch
Copy link
Collaborator Author

PeteHaitch commented Apr 5, 2018

In minfi, I needed a function that allowed me to iterate over multiple DelayedMatrix objects ("inputs") and write to multiple RealizationSink instances ("outputs"). What's more, although all the "inputs" have the same dimensions, and all the "outputs" have the same dimensions, the "inputs" and "outputs" have different dimensions (specifically, the "outputs" have fewer rows than the "inputs" but they all have the same number of rows).

Here is what I cam up with:

# NOTE: blockMapply() with the option to write the blocks to multiple 'sinks'.
#       Useful, for example, to apply a function across column-blocks of
#       multiple DelayedMatrix objects, write these results to disk, and then
#       wrap these in a DelayedMatrix.
# NOTE: `dots_grids`, `sinks_grids`, and `sinks` should all be lists
blockMapplyWithRealization <- function(FUN, ..., MoreArgs = NULL, sinks = NULL,
                                       dots_grids = NULL, sinks_grids = NULL,
                                       BPREDO = list(), BPPARAM = bpparam()) {
    FUN <- match.fun(FUN)
    dots <- unname(list(...))
    # Check valid `sinks`
    stopifnot(is.null(sinks) || is.list(sinks))
    # Check conformable dots_grids and sinks_grids
    if (is.null(dots_grids)) {
        dots_grids <- replicate(length(dots), NULL)
    } else {
        stopifnot(is.list(dots_grids))
    }
    dots_grids <- mapply(
        FUN = DelayedArray:::.normarg_grid,
        dots_grids,
        dots,
        SIMPLIFY = FALSE,
        USE.NAMES = FALSE)
    if (is.null(sinks_grids)) {
        sinks_grids <- replicate(length(sinks), NULL)
    } else {
        stopifnot(is.list(sinks_grids))
    }
    sinks_grids <- mapply(
        FUN = DelayedArray:::.normarg_grid,
        sinks_grids,
        sinks,
        SIMPLIFY = FALSE,
        USE.NAMES = FALSE)
    grids_dims <- lapply(c(dots_grids, sinks_grids), dim)
    all_same_grids_dims <- all(
        vapply(X = grids_dims,
               FUN = function(dim) all(dim == grids_dims[[1L]]),
               FUN.VALUE = logical(1L)))
    if (!all_same_grids_dims) {
        stop("non-conformable 'dots_grids' and 'sinks_grids'")
    }
    stopifnot(length(dots) == length(dots_grids),
              length(sinks) == length(sinks_grids))

    # Loop over blocks of `dots` and write to `sinks`
    nblock <- length(dots_grids[[1]])
    bplapply(seq_len(nblock), function(b) {
        if (DelayedArray:::get_verbose_block_processing()) {
            message("Processing block ", b, "/", nblock, " ... ",
                    appendLF = FALSE)
        }
        input_viewports <- lapply(dots_grids, function(grid) grid[[b]])
        output_viewports <- lapply(sinks_grids, function(grid) grid[[b]])
        blocks <- mapply(
            FUN = function(x, grid, viewport) {
                block <- DelayedArray:::extract_block(x, viewport)
                if (!is.array(block)) {
                    block <- DelayedArray:::.as_array_or_matrix(block)
                }
                attr(block, "from_grid") <- grid
                attr(block, "block_id") <- b
                block
            },
            x = dots,
            grid = dots_grids,
            viewport = input_viewports,
            SIMPLIFY = FALSE,
            USE.NAMES = FALSE)
        block_ans <- do.call(FUN, c(blocks, MoreArgs))
        if (!is.list(block_ans)) {
            block_ans <- list(block_ans)
        }
        # NOTE: This is the only part different from blockMapply()
        if (!is.null(sinks)) {
            mapply(function(ans, sink, viewport) {
                write_block_to_sink(ans, sink, viewport)
            }, ans = block_ans, sink = sinks, viewport = output_viewports,
            SIMPLIFY = FALSE, USE.NAMES = FALSE)
            block_ans <- NULL
        }
        if (DelayedArray:::get_verbose_block_processing()) {
            message("OK")
        }
        block_ans
    },
    BPREDO = BPREDO,
    BPPARAM = BPPARAM)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants