A parallel and asynchronous Reduce for batch systems.
Note that this function only defines the computational jobs.
Each job reduces a certain number of elements on one slave.
The actual computation is started with submitJobs.
Results and partial results can be collected with reduceResultsList, reduceResults or
loadResult.
Usage
batchReduce(
fun,
xs,
init = NULL,
chunks = seq_along(xs),
more.args = list(),
reg = getDefaultRegistry()
)Arguments
- fun
[
function(aggr, x, ...)]
Function to reducexswith.- xs
[
vector]
Vector to reduce.- init
[ANY]
Initial object for reducing. SeeReduce.- chunks
[
integer(length(xs))]
Group for each element ofxs. Can be generated withchunk.- more.args
[
list]
A list of additional arguments passed tofun.- reg
[
Registry]
Registry. If not explicitly passed, uses the default registry (seesetDefaultRegistry).
Value
[data.table] with ids of added jobs stored in column “job.id”.
Examples
# define function to reduce on slave, we want to sum a vector
tmp = makeRegistry(file.dir = NA, make.default = FALSE)
#> No readable configuration file found
#> Created registry in '/tmp/batchtools-example/reg' using cluster functions 'Interactive'
xs = 1:100
f = function(aggr, x) aggr + x
# sum 20 numbers on each slave process, i.e. 5 jobs
chunks = chunk(xs, chunk.size = 5)
batchReduce(fun = f, 1:100, init = 0, chunks = chunks, reg = tmp)
#> Adding 20 jobs ...
submitJobs(reg = tmp)
#> Submitting 20 jobs in 20 chunks using cluster functions 'Interactive' ...
waitForJobs(reg = tmp)
#> [1] TRUE
# now reduce one final time on master
reduceResults(fun = function(aggr, job, res) f(aggr, res), reg = tmp)
#> [1] 5050