Skip to content

Commit

Permalink
Merge pull request apache#114 from hlin09/hlin09
Browse files Browse the repository at this point in the history
Add function cogroup().
shivaram committed Dec 12, 2014

Verified

This commit was signed with the committer’s verified signature.
AgeManning Age Manning
2 parents a457f7f + 47c4bb7 commit 7d81b05
Showing 8 changed files with 148 additions and 25 deletions.
1 change: 1 addition & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ exportClasses("Broadcast")
exportMethods(
"cache",
"checkpoint",
"cogroup",
"collect",
"collectPartition",
"combineByKey",
62 changes: 61 additions & 1 deletion pkg/R/RDD.R
Original file line number Diff line number Diff line change
@@ -1707,4 +1707,64 @@ setMethod("rightOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
})


#' For each key k in several RDDs, return a resulting RDD that
#' whose values are a list of values for the key in all RDDs.
#'
#' @param ... Several RDDs.
#' @param numPartitions Number of partitions to create.
#' @return a new RDD containing all pairs of elements with values in a list
#' in all RDDs.
#' @rdname cogroup
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
#' cogroup(rdd1, rdd2, numPartitions = 2L)
#' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
#'}
setGeneric("cogroup",
function(..., numPartitions) { standardGeneric("cogroup") },
signature = "...")

#' @rdname cogroup
#' @aliases cogroup,RDD-method
setMethod("cogroup",
"RDD",
function(..., numPartitions) {
rdds <- list(...)
rddsLen <- length(rdds)
for (i in 1:rddsLen) {
rdds[[i]] <- lapply(rdds[[i]],
function(x) { list(x[[1]], list(i, x[[2]])) })
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
# will not be captured into UDF if getJRDD is not invoked.
# It should be resolved together with that issue.
getJRDD(rdds[[i]]) # Capture the closure.
}
union.rdd <- Reduce(unionRDD, rdds)
group.func <- function(vlist) {
res <- list()
length(res) <- rddsLen
for (x in vlist) {
i <- x[[1]]
acc <- res[[i]]
# Create an accumulator.
if (is.null(acc)) {
acc <- SparkR:::initAccumulator()
}
SparkR:::addItemToAccumulator(acc, x[[2]])
res[[i]] <- acc
}
lapply(res, function(acc) {
if (is.null(acc)) {
list()
} else {
acc$data
}
})
}
cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions),
group.func)
})
23 changes: 23 additions & 0 deletions pkg/R/utils.R
Original file line number Diff line number Diff line change
@@ -221,3 +221,26 @@ reserialize <- function(rdd) {
return(ser.rdd)
}
}

# Fast append to list by using an accumulator.
# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
#
# The accumulator should has three fields size, counter and data.
# This function amortizes the allocation cost by doubling
# the size of the list every time it fills up.
addItemToAccumulator <- function(acc, item) {
if(acc$counter == acc$size) {
acc$size <- acc$size * 2
length(acc$data) <- acc$size
}
acc$counter <- acc$counter + 1
acc$data[[acc$counter]] <- item
}

initAccumulator <- function() {
acc <- new.env()
acc$counter <- 0
acc$data <- list(NULL)
acc$size <- 1
acc
}
16 changes: 16 additions & 0 deletions pkg/inst/tests/test_binary_function.R
Original file line number Diff line number Diff line change
@@ -25,3 +25,19 @@ test_that("union on two RDDs", {

unlink(fileName)
})

test_that("cogroup on two RDDs", {
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
actual <- collect(cogroup.rdd)
expect_equal(actual,
list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))

rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
actual <- collect(cogroup.rdd)
expect_equal(actual,
list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3)))))
})
14 changes: 0 additions & 14 deletions pkg/inst/worker/serialize.R
Original file line number Diff line number Diff line change
@@ -69,17 +69,3 @@ writeEnvironment <- function(con, e, keyValPairsSerialized = TRUE) {
}
}

# Fast append to list by using an accumulator.
# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
#
# The accumulator should has three fields size, counter and data.
# This function amortizes the allocation cost by doubling
# the size of the list every time it fills up.
addItemToAccumulator <- function(acc, item) {
if(acc$counter == acc$size) {
acc$size <- acc$size * 2
length(acc$data) <- acc$size
}
acc$counter <- acc$counter + 1
acc$data[[acc$counter]] <- item
}
7 changes: 2 additions & 5 deletions pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
@@ -108,12 +108,9 @@ if (isEmpty != 0) {
acc <- res[[bucket]]
# Create a new accumulator
if (is.null(acc)) {
acc <- new.env()
acc$counter <- 0
acc$data <- list(NULL)
acc$size <- 1
acc <- SparkR:::initAccumulator()
}
addItemToAccumulator(acc, tuple)
SparkR:::addItemToAccumulator(acc, tuple)
res[[bucket]] <- acc
}
invisible(lapply(data, hashTupleToEnvir))
35 changes: 35 additions & 0 deletions pkg/man/cogroup.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{cogroup}
\alias{cogroup}
\alias{cogroup,RDD-method}
\title{For each key k in several RDDs, return a resulting RDD that
whose values are a list of values for the key in all RDDs.}
\usage{
cogroup(..., numPartitions)

\S4method{cogroup}{RDD}(..., numPartitions)
}
\arguments{
\item{...}{Several RDDs.}

\item{numPartitions}{Number of partitions to create.}
}
\value{
a new RDD containing all pairs of elements with values in a list
in all RDDs.
}
\description{
For each key k in several RDDs, return a resulting RDD that
whose values are a list of values for the key in all RDDs.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
cogroup(rdd1, rdd2, numPartitions = 2L)
# list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
}
}

15 changes: 10 additions & 5 deletions pkg/man/foreach.Rd
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
\alias{foreach,RDD,function-method}
\alias{foreachPartition}
\alias{foreachPartition,RDD,function-method}
\title{Applies a function to all elements in an RDD, and force it evaluated.}
\title{Applies a function to all elements in an RDD, and force evaluation.}
\usage{
foreach(rdd, func)

@@ -24,21 +24,26 @@ foreachPartition(rdd, func)

\item{func}{The function to be applied to partitions.}
}
\value{
invisible NULL.

invisible NULL.
}
\description{
Applies a function to all elements in an RDD, and force it evaluated.
Applies a function to all elements in an RDD, and force evaluation.

Applies a function to each partition in an RDD, and force it evaluated.
Applies a function to each partition in an RDD, and force evaluation.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, 1:10)
foreach(rdd, print)
foreach(rdd, function(x) { save(x, file=...) })
}
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, 1:10)
foreachPartition(rdd, function(part) { lapply(part, print); NULL })
foreachPartition(rdd, function(part) { save(part, file=...); NULL })
}
}

0 comments on commit 7d81b05

Please sign in to comment.