Asynchronous parallel map of a function over a list or vector using mirai, with optional promises integration. Performs multiple map over 2D lists/vectors, allowing advanced patterns such as map over the rows of a dataframe or matrix.
Usage
mirai_map(.x, .f, ..., .args = list(), .promise = NULL, .compute = "default")
Arguments
- .x
a list or atomic vector. If a 2D list (i.e. list of lists/vectors of the same length), multiple map is performed over a slice of the list. If a matrix, multiple map is performed over the rows of the matrix.
- .f
a function to be applied to each element of
.x
, or across all sub-elements of.x
as the case may be.- ...
(optional) named arguments (name = value pairs) specifying objects referenced, but not defined, in
.f
.- .args
(optional) further constant arguments to
.f
, provided as a list.- .promise
(optional) if supplied, registers a promise against each mirai. Either a function, supplied to the ‘onFulfilled’ argument of
promises::then()
or a list of 2 functions, supplied respectively to ‘onFulfilled’ and ‘onRejected’ forpromises::then()
. Using this argument requires the promises package.- .compute
[default 'default'] character value for the compute profile to use (each compute profile has its own independent set of daemons).
Details
Sends each application of function .f
on an element of
.x
(or each application of .f
on a slice of .x
) for
computation in a separate mirai
call.
This simple and transparent behaviour is designed to make full use of mirai scheduling to minimise overall execution time.
Facilitates recovery from partial failure by returning all ‘miraiError’ / ‘errorValue’ as the case may be, thus allowing only the failures to be re-run.
Note: requires daemons to have previously been set. If not, then one local daemon is set before the function propceeds.
Results
x[]
collects the results of a ‘mirai_map’ x
and
returns a list. This will wait for all asynchronous operations to
complete if still in progress, blocking but user-interruptible.
x[.flat]
collects and flattens map results to a vector, checking
that they are of the same type to avoid coercion. Note: errors if an
‘errorValue’ has been returned or results are of differing type.
x[.progress]
collects map results whilst showing a text progress
indicator.
x[.stop]
collects map results applying early stopping, which stops
at the first failure and cancels remaining operations. Note: operations
already in progress continue to completion, although their results are
not collected.
The options above may be combined in a vector, for example: x[c(.stop, .progress)]
applies early stopping together with a
progress indicator.
Examples
if (interactive()) {
# Only run examples in interactive R sessions
daemons(4, dispatcher = FALSE)
# map with constant args specified via '.args'
mirai_map(1:3, rnorm, .args = list(mean = 20, sd = 2))[]
# flatmap with function definition passed via '...'
mirai_map(1:3, function(x) func(1L, x, x + 1L), func = stats::runif)[.flat]
# sum slices of a list of vectors
(listvec <- list(1:3, c(4, 3, 2)))
mirai_map(listvec, sum)[.flat]
# sum rows of a matrix
(mat <- matrix(1:4, nrow = 2L))
mirai_map(mat, sum)[.flat]
# map over rows of a dataframe
df <- data.frame(a = c("Aa", "Bb"), b = c(1L, 4L))
mirai_map(df, function(...) sprintf("%s: %d", ...))[.flat]
# indexed map over a vector
v <- c("egg", "got", "ten", "nap", "pie")
mirai_map(list(1:length(v), v), sprintf, .args = list(fmt = "%d_%s"))[.flat]
# return a 'mirai_map' object, check for resolution, collect later
mp <- mirai_map(
c(a = 2, b = 3, c = 4),
function(x, y) do(x, as.logical(x %% y)),
do = nanonext::random,
.args = list(y = 2)
)
unresolved(mp)
mp
mp[]
unresolved(mp)
# progress indicator counts up from 0 to 4 seconds
res <- mirai_map(1:4, Sys.sleep)[.progress]
daemons(0)
# generates warning as daemons not set
# stops early when second element returns an error
tryCatch(
mirai_map(list(list(1, "a", 3), 3:1), sum)[.stop],
error = identity
)
# promises example that outputs the results, including errors, to the console
if (requireNamespace("promises", quietly = TRUE)) {
daemons(1, dispatcher = FALSE)
ml <- mirai_map(
1:30,
function(x) {Sys.sleep(0.1); if (x == 30) stop(x) else x},
.promise = list(
function(x) cat(paste(x, "")),
function(x) { cat(conditionMessage(x), "\n"); daemons(0) }
)
)
}
}