Asynchronous parallel / distributed map of a function over a list or vector using mirai, with optional promises integration.
Usage
mirai_map(.x, .f, ..., .args = list(), .promise = NULL, .compute = "default")
Arguments
- .x
a list or atomic vector.
- .f
a function to be applied to each element of
.x
.- ...
(optional) named arguments (name = value pairs) specifying objects referenced, but not defined, in
.f
, or an environment containing such objects.- .args
(optional) further constant arguments to
.f
, provided as a list.- .promise
(optional) if supplied, registers a promise against each mirai. Either a function, supplied to the ‘onFulfilled’ argument of
promises::then()
or a list of 2 functions, supplied respectively to ‘onFulfilled’ and ‘onRejected’ forpromises::then()
. Using this argument requires the promises package.- .compute
[default 'default'] character value for the compute profile to use (each compute profile has its own independent set of daemons).
Details
Sends each application of function .f
on an element of
.x
for computation in a separate mirai
call.
This simple and transparent behaviour is designed to make full use of mirai scheduling to minimise overall execution time.
Facilitates recovery from partial failure by returning all ‘miraiError’ / ‘errorValue’ as the case may be, thus allowing only the failures to be re-run. If using dispatcher, ‘retry’ should be specified as FALSE to ensure crashes are returned as errors.
Note: requires daemons to have previously been set. If not, then one local daemon is set before the function propceeds.
Results
x[]
collects the results of a mirai_map x
. This will wait
for all asynchronous operations to complete if still in progress,
blocking but user-interruptible.
x[.progress]
collects the results whilst showing a text progress
indicator.
x[.stop]
collects the results applying early stopping, which stops
at the first failure and aborts all remaining in-progress operations.
x[c(.stop, .progress)]
combines early stopping with a progress
indicator.
Examples
if (interactive()) {
# Only run examples in interactive R sessions
daemons(4, dispatcher = FALSE)
res <- mirai_map(1:3, rnorm, .args = list(mean = 20, sd = 2))[]
res
mp <- mirai_map(
c(a = 2, b = 3, c = 4),
function(x) do(x, as.logical(x %% 2)),
do = nanonext::random
)
unresolved(mp)
mp
mp[]
unresolved(mp)
# progress indicator counts up to 4 seconds
res <- mirai_map(1:4, Sys.sleep)[.progress]
daemons(0)
# generates warning as daemons not set
# stops early when second element returns an error
tryCatch(
mirai_map(list(a = 1, b = "a", c = 3), sum)[.stop],
error = identity
)
# promises example that outputs the results, including errors, to the console
if (requireNamespace("promises", quietly = TRUE)) {
daemons(1, dispatcher = FALSE)
ml <- mirai_map(
1:30,
function(x) {Sys.sleep(0.1); if (x == 30) stop(x) else x},
.promise = list(
function(x) cat(paste(x, "")),
function(x) { cat(conditionMessage(x), "\n"); daemons(0) }
)
)
}
}