Use case: minimise execution times by performing long-running tasks concurrently in separate processes.
Multiple long computes (model fits etc.) can be performed in parallel on available computing cores.
Use mirai()
to evaluate an expression asynchronously in
a separate, clean R process.
A ‘mirai’ object is returned immediately.
library(mirai)
m <- mirai(
{
res <- rnorm(n) + m
res / rev(res)
},
m = runif(1),
n = 1e8
)
m
#> < mirai >
#> - $data for evaluated result
Above, all specified name = value
pairs are passed
through to the ‘mirai’.
The ‘mirai’ yields an ‘unresolved’ logical NA whilst the async operation is ongoing.
m$data
#> 'unresolved' logi NA
Upon completion, the ‘mirai’ resolves automatically to the evaluated result.
m$data |> str()
#> num [1:100000000] 1.556 3.118 0.506 -8.924 0.262 ...
Alternatively, explicitly call and wait for the result using
call_mirai()
.
call_mirai(m)$data |> str()
#> num [1:100000000] 1.556 3.118 0.506 -8.924 0.262 ...
For easy programmatic use of mirai()
, ‘.expr’ accepts a
pre-constructed language object, and also a list of named arguments
passed via ‘.args’. So, the following would be equivalent to the
above:
Use case: ensure execution flow of the main process is not blocked.
High-frequency real-time data cannot be written to file/database synchronously without disrupting the execution flow.
Cache data in memory and use mirai()
to perform periodic
write operations concurrently in a separate process.
Below, ‘.args’ is used to pass a list of objects already present in
the calling environment to the mirai by name. This is an alternative use
of ‘.args’, and may be combined with ...
to also pass in
name = value
pairs.
library(mirai)
x <- rnorm(1e6)
file <- tempfile()
m <- mirai(write.csv(x, file = file), .args = list(x, file))
A ‘mirai’ object is returned immediately.
unresolved()
may be used in control flow statements to
perform actions which depend on resolution of the ‘mirai’, both before
and after.
This means there is no need to actually wait (block) for a ‘mirai’ to resolve, as the example below demonstrates.
# unresolved() queries for resolution itself so no need to use it again within the while loop
while (unresolved(m)) {
cat("while unresolved\n")
Sys.sleep(0.5)
}
#> while unresolved
#> while unresolved
cat("Write complete:", is.null(m$data))
#> Write complete: TRUE
Now actions which depend on the resolution may be processed, for example the next write.
Use case: isolating code that can potentially fail in a separate process to ensure continued uptime.
As part of a data science / machine learning pipeline, iterations of model training may periodically fail for stochastic and uncontrollable reasons (e.g. buggy memory management on graphics cards).
Running each iteration in a ‘mirai’ isolates this potentially-problematic code such that even if it does fail, it does not bring down the entire pipeline.
library(mirai)
run_iteration <- function(i) {
if (runif(1) < 0.1) stop("random error\n", call. = FALSE) # simulates a stochastic error rate
sprintf("iteration %d successful\n", i)
}
for (i in 1:10) {
m <- mirai(run_iteration(i), .args = list(run_iteration, i))
while (is_error_value(call_mirai(m)$data)) {
cat(m$data)
m <- mirai(run_iteration(i), .args = list(run_iteration, i))
}
cat(m$data)
}
#> iteration 1 successful
#> iteration 2 successful
#> iteration 3 successful
#> iteration 4 successful
#> iteration 5 successful
#> iteration 6 successful
#> iteration 7 successful
#> Error: random error
#> iteration 8 successful
#> iteration 9 successful
#> iteration 10 successful
Further, by testing the return value of each ‘mirai’ for errors, error-handling code is then able to automate recovery and re-attempts, as in the above example. Further details on error handling can be found in the section below.
The end result is a resilient and fault-tolerant pipeline that minimises downtime by eliminating interruptions of long computes.
Daemons, or persistent background processes, may be set to receive ‘mirai’ requests.
This is potentially more efficient as new processes no longer need to be created on an ad hoc basis.
Call daemons()
specifying the number of daemons to
launch.
daemons(6)
#> [1] 6
To view the current status, status()
provides the number
of active connections along with a matrix of statistics for each
daemon.
status()
#> $connections
#> [1] 1
#>
#> $daemons
#> i online instance assigned complete
#> abstract://10c16c2361e2f9595cf3f437 1 1 1 0 0
#> abstract://afa4be647e5ee7ac4026dd13 2 1 1 0 0
#> abstract://53c2a46e5b2db80e33a35646 3 1 1 0 0
#> abstract://8bb76793b0cb0889597470cc 4 1 1 0 0
#> abstract://5eeab946eef2f045786c9e3e 5 1 1 0 0
#> abstract://4b126a8e3b40b61566188db5 6 1 1 0 0
The default dispatcher = TRUE
creates a
dispatcher()
background process that connects to individual
daemon processes on the local machine. This ensures that tasks are
dispatched efficiently on a first-in first-out (FIFO) basis to daemons
for processing. Tasks are queued at the dispatcher and sent to a daemon
as soon as it can accept the task for immediate execution.
Dispatcher uses synchronisation primitives from nanonext
,
waiting upon rather than polling for tasks, which is efficient both in
terms of consuming no resources while waiting, and also being fully
synchronised with events (having no latency).
daemons(0)
#> [1] 0
Set the number of daemons to zero to reset. This reverts to the default of creating a new background process for each ‘mirai’ request.
Alternatively, specifying dispatcher = FALSE
, the
background daemons connect directly to the host process.
daemons(6, dispatcher = FALSE)
#> [1] 6
Requesting the status now shows 6 connections, along with the host
URL at $daemons
.
status()
#> $connections
#> [1] 6
#>
#> $daemons
#> [1] "abstract://6ef57fb5846aa6d1c33a8b2a"
This implementation sends tasks immediately, and ensures that tasks are evenly-distributed amongst daemons. This means that optimal scheduling is not guaranteed as the duration of tasks cannot be known a priori. As an example, tasks could be queued at a daemon behind a long-running task, whilst other daemons are idle having already completed their tasks.
The advantage of this approach is that it is low-level and does not require an additional dispatcher process. It is well-suited to working with similar-length tasks, or where the number of concurrent tasks typically does not exceed available daemons.
everywhere()
may be used to evaluate an expression on
all connected daemons and persist the resultant state, regardless of a
daemon’s ‘cleanup’ setting.
everywhere(library(parallel))
The above keeps the parallel
package loaded for all
evaluations. Other types of setup task may also be performed such as
making a common resource available, etc.
daemons(0)
#> [1] 0
Set the number of daemons to zero to reset.
everywhere()
may be used to evaluate an expression on
all connected daemons and persist the resultant state, regardless of a
daemon’s ‘cleanup’ setting. This may be used for performing setup of the
evaluation environment, with particular packages loaded, or common
resources made available, etc.
The daemons interface may also be used to send tasks for computation to remote daemon processes on the network.
Call daemons()
specifying ‘url’ as a character string
such as: ‘tcp://10.75.32.70:5555’ at which daemon processes should
connect to. Alternatively, use host_url()
to automatically
construct a valid URL.
IPv6 addresses are also supported and must be enclosed in square
brackets []
to avoid confusion with the final colon
separating the port. For example, port 5555 on the IPv6 address
::ffff:a6f:50d
would be specified as
tcp://[::ffff:a6f:50d]:5555
.
For options on actually launching the daemons, please see the next section.
The default dispatcher = TRUE
creates a background
dispatcher()
process on the local machine, which listens to
a vector of URLs that remote daemon()
processes dial in to,
with each daemon having its own unique URL.
It is recommended to use a websocket URL starting ws://
instead of TCP in this scenario (used interchangeably with
tcp://
). A websocket URL supports a path after the port
number, which can be made unique for each daemon. In this way a
dispatcher can connect to an arbitrary number of daemons over a single
port.
Supplying a vector of URLs allows the use of arbitrary port numbers / paths. ‘n’ does not need to be specified if it can be inferred from the length of the ‘url’ vector, for example:
daemons(url = c("ws://10.75.32.70:5566/cpu", "ws://10.75.32.70:5566/gpu", "ws://10.75.32.70:7788/1"))
Alternatively, below a single URL is supplied, along with
n = 4
to specify that the dispatcher should listen at 4
URLs. In such a case, an integer sequence is automatically appended to
the path /1
through /4
to produce the
URLs.
Requesting status on the host machine:
status()
#> $connections
#> [1] 1
#>
#> $daemons
#> i online instance assigned complete
#> tcp://hostname:5555 1 0 0 0 0
#> tcp://hostname:5556 2 0 0 0 0
#> tcp://hostname:5557 3 0 0 0 0
#> tcp://hostname:5558 4 0 0 0 0
As per the local case, $connections
shows the single
connection to dispatcher, however $daemons
now provides a
matrix of statistics for the remote daemons.
i
index number.online
shows as 1 when there is an active connection,
or else 0 if a daemon has yet to connect or has disconnected.instance
increments by 1 every time there is a new
connection at a URL. This counter is designed to track new daemon
instances connecting after previous ones have ended (due to time-outs
etc.). The count becomes negative immediately after a URL is regenerated
by saisei()
, but increments again once a new daemon
connects.assigned
shows the cumulative number of tasks assigned
to the daemon.complete
shows the cumulative number of tasks completed
by the daemon.Dispatcher automatically adjusts to the number of daemons actually connected. Hence it is possible to dynamically scale up or down the number of daemons according to requirements (limited to the ‘n’ URLs assigned).
To reset all connections and revert to default behaviour:
daemons(0)
#> [1] 0
Closing the connection causes the dispatcher to exit automatically, and in turn all connected daemons when their respective connections with the dispatcher are terminated.
By specifying dispatcher = FALSE
, remote daemons connect
directly to the host process. The host listens at a single URL, and
distributes tasks to all connected daemons.
Note that above, calling host_url()
without a port value
uses the default of ‘0’. This is a wildcard value that will
automatically cause a free ephemeral port to be assigned. The actual
assigned port is provided in the return value of the call, or it may be
queried at any time via status()
.
The number of daemons connecting to the host URL is not limited and network resources may be added or removed at any time, with tasks automatically distributed to all connected daemons.
$connections
will show the actual number of connected
daemons.
status()
#> $connections
#> [1] 0
#>
#> $daemons
#> [1] "tcp://hostname:38145"
To reset all connections and revert to default behaviour:
daemons(0)
#> [1] 0
This causes all connected daemons to exit automatically.
To launch remote daemons, supply a remote launch configuration to the
‘remote’ argument of daemons()
when setting up daemons, or
launch_remote()
at any time afterwards.
ssh_config()
may be used to generate a remote launch
configuration if there is SSH access to the remote machine, or else
remote_config()
provides a flexible method for generating a
configuration involving a custom resource manager / application.
The first example below launches 4 daemons on the machine 10.75.32.90 (using the default SSH port of 22 as this was not specified), connecting back to the dispatcher URLs:
daemons(
n = 4,
url = host_url(ws = TRUE, port = 5555),
remote = ssh_config(remotes = "ssh://10.75.32.90")
)
The second example below launches one daemon on each of 10.75.32.90 and 10.75.32.91 using the custom SSH port of 222:
daemons(
n = 2,
url = host_url(ws = TRUE, port = 5555),
remote = ssh_config(c("ssh://10.75.32.90:222", "ssh://10.75.32.91:222"))
)
In the above examples, as the remote daemons connect back directly, port 5555 on the local machine must be open to incoming connections from the remote addresses.
Use of SSH tunnelling provides a convenient way to launch remote daemons without requiring the remote machine to be able to access the host. Often firewall configurations or security policies may prevent opening a port to accept outside connections.
In these cases SSH tunnelling offers a solution by creating a tunnel once the initial SSH connection is made. For simplicity, this SSH tunnelling implementation uses the same port on both the side of the host and that of the corresponding node. SSH key-based authentication must also already be in place.
Tunnelling requires the hostname for ‘url’ specified when setting up daemons to be either ‘127.0.0.1’ or ‘localhost’. This is as the tunnel is created between 127.0.0.1:port or equivalently localhost:port on each machine. The host listens to its localhost:port and the remotes each dial into localhost:port on their own respective machines.
The below example launches 2 nodes on the remote machine 10.75.32.90 using SSH tunnelling over port 5555 (‘url’ hostname is specified as ‘localhost’):
daemons(
url = "tcp://localhost:5555",
remote = ssh_config(
remotes = c("ssh://10.75.32.90", "ssh://10.75.32.90"),
tunnel = TRUE
)
)
As an alternative to automated launches, calling
launch_remote()
without specifying ‘remote’ may be used to
return the shell commands for deploying daemons manually. The printed
return values may be copy / pasted directly to a remote machine.
daemons(n = 2, url = host_url())
#> [1] 2
launch_remote(1:2)
#> [1]
#> Rscript -e "mirai::daemon('tcp://hostname:35849',rs=c(10407,798643379,1234561624,1951080569,289940966,-1499920593,890708900))"
#>
#> [2]
#> Rscript -e "mirai::daemon('tcp://hostname:32937',rs=c(10407,-225380490,-815479122,-2122558094,42719770,-395459946,712117507))"
daemons(0)
#> [1] 0
Note that daemons()
should be set up on the host machine
before launching daemon()
on remote resources, otherwise
the daemon instances will exit if a connection is not immediately
available. Alternatively, specifying the argument
autoexit = FALSE
will allow daemons to wait (indefinitely)
for a connection to become available.
TLS is available as an option to secure communications from the local machine to remote daemons.
An automatic zero-configuration default is implemented. Simply
specify a secure URL of the form wss://
or
tls+tcp://
when setting daemons, or use
host_url(tls = TRUE)
, for example:
Single-use keys and certificates are automatically generated and configured, without requiring any further intervention. The private key is always retained on the host machine and never transmitted.
The generated self-signed certificate is available via
launch_remote()
. This function conveniently constructs the
full shell command to launch a daemon, including the correctly specified
‘tls’ argument to daemon()
.
launch_remote(1)
#> [1]
#> Rscript -e "mirai::daemon('wss://hostname:45873/1',tls=c('-----BEGIN CERTIFICATE-----
#> MIIFNzCCAx+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAzMREwDwYDVQQDDAhrdW1h
#> bW90bzERMA8GA1UECgwITmFub25leHQxCzAJBgNVBAYTAkpQMB4XDTAxMDEwMTAw
#> MDAwMFoXDTMwMTIzMTIzNTk1OVowMzERMA8GA1UEAwwIa3VtYW1vdG8xETAPBgNV
#> BAoMCE5hbm9uZXh0MQswCQYDVQQGEwJKUDCCAiIwDQYJKoZIhvcNAQEBBQADggIP
#> ADCCAgoCggIBANFwy8cTjibC/EoHSzw2hdItjah1tfVYUf32zPy6/I+rls2D+rKR
#> 1dQKBgwyPWCa+Xb4WXqCUyDOMQtDuXRSmUUMeqhSVzwMxUm3sbm4mmvdMLqR82db
#> o4+VsNbVjesp3eLulBMmRh6nzphfKN9AMMjPt94WSSrshzopsrD2CGCB/iMzkex8
#> 7Ht8HyoDXrOhUqT6xgF+MWgLH0Yvs2vnFaFW98tvzxrJXsTlX/zxHWw11JG1JA21
#> AzAUWrKETgSAYxBlD106WH102vptIvcOJKKLgcIRLTFGGUaXv6jusnGhIFJ9TyQt
#> KGErU5y04IhJ24CbhDkhXL631DhJ+M8OAlwd8/PWi3OE68Sqj8RmxqnS9kLeHjjv
#> p5IMk1Z0Bc3nAQbfBAGOS0Mnuaew1sQqbdnpILzs7Puokr0mzjRgyYra04DXN/Tb
#> t4e13TrGGiqg0uaW4CU6VzINaW7o7GxEid6O8V7yw5NXASA3aYWE/0wSAJ3awEK9
#> iaqgZ4XZawknl3YmYGCtXeRRBwqLItilDa9eAhr7X2oeSU+bsj600ZZ83Wul6crA
#> Djh9e242oEB1PX4aeesYyITyslfRCpuIh3zM/8/bSPxg6sq59RAucxvl6RvlVtdJ
#> jDkc/o+/2DOUhQkysUDp3Td1dvfDBvjfdbOJRusBPDT/c3F4qml5oQlZAgMBAAGj
#> VjBUMBIGA1UdEwEB/wQIMAYBAf8CAQAwHQYDVR0OBBYEFO/Ne8NiBWm+M6JzjdPI
#> nrWTGVaOMB8GA1UdIwQYMBaAFO/Ne8NiBWm+M6JzjdPInrWTGVaOMA0GCSqGSIb3
#> DQEBCwUAA4ICAQCTZgv6/Uc5Ig3Zc9/0JJHm8fG45DkJJeud7fdSBXpxLkmm69W6
#> PCmRK3xYP7Eh3hrQYUuPzUjPwcam6Hdvq1SMRYnoyxWteut6J1wDeHRSc8XaMy3U
#> eiC8lgX3mPs5J76GQ2LP1FR5OaTHYaYSlsdG88LRGJeuSnJTUAPYmhz/Lf8PdFrV
#> IcXhd6eL/tmweHQAKRLy5amU8hjdcws2oDClJa3xjEXwYl8SZgHkRAG8HvZhnAGY
#> zcwvedLIGOd9W55/2OhMooDgQvZS83dEdEm9fhlsMC0uLHK8LXZBAxBv2EBnzHiP
#> LMAwVCHgTSy+zagwSoe8f8qdFPYU0LRwcJvjCb06xeZ4MRAGt4MGd7ZG9gMmNI2U
#> p6YiyhAuZrPeGZNNeRTOQYdIh/d/NXgsr4hdglUrCaIg6907puBsJnswbHrN6vFC
#> mck0bxfZaj4AGMsyJNCok3CC/mDA4jJr4jjMV8lupS+VwPUDSGR89t0JFi3kfwSE
#> vvwk9eus8ZCdJkLzTN6Gh6ffslGfygrQ4VmHwLLKumgMWGDsk8JTDTMP6+czjDkA
#> /j7YQDoOwwWg2AJYbXl3vmvz7qqLoFUMuMrH62vpn5TA9alsezigq979utD+ajiP
#> ea2oKDFXNXutPr+2LaUB4EO97YH8n9/gh0r39iP2nk+CBcmdU6jeX7u/oA==
#> -----END CERTIFICATE-----
#> ',''),rs=c(10407,781801285,1904108354,-1333668389,-978362528,-1800978463,-938634194))"
The printed value may be deployed directly on a remote machine.
As an alternative to the zero-configuration default, a certificate may also be generated via a Certificate Signing Request (CSR) to a Certificate Authority (CA), which may be a public CA or a CA internal to an organisation.
-----BEGIN CERTIFICATE-----
and
-----END CERTIFICATE-----
. Make sure to request the
certificate in the PEM format. If only available in other formats, the
TLS library used should usually provide conversion utilities.-----BEGIN PRIVATE KEY-----
and
-----END PRIVATE KEY-----
.daemons()
.cert
and key
respectively, then the
‘tls’ argument may be specified as the character vector
c(cert, key)
.daemon()
or
launch_remote()
.-----BEGIN CERTIFICATE-----
and
-----END CERTIFICATE-----
markers. The first one should be
the newly-generated TLS certificate, the same supplied to
daemons()
, and the final one should be a CA root
certificate.certchain
, then the character vector comprising this and an
empty character string c(certchain, "")
may be supplied to
the relevant ‘tls’ argument.The daemons()
interface also allows the specification of
compute profiles for managing tasks with heterogeneous compute
requirements:
Simply specify the argument .compute
when calling
daemons()
with a profile name (which is ‘default’ for the
default profile). The daemons settings are saved under the named
profile.
To create a ‘mirai’ task using a specific compute profile, specify
the ‘.compute’ argument to mirai()
, which defaults to the
‘default’ compute profile.
Similarly, functions such as status()
,
launch_local()
or launch_remote()
should be
specified with the desired ‘.compute’ argument.
If execution in a mirai fails, the error message is returned as a
character string of class ‘miraiError’ and ‘errorValue’ to facilitate
debugging. is_mirai_error()
may be used to test for mirai
execution errors.
m1 <- mirai(stop("occurred with a custom message", call. = FALSE))
call_mirai(m1)$data
#> 'miraiError' chr Error: occurred with a custom message
m2 <- mirai(mirai::mirai())
call_mirai(m2)$data
#> 'miraiError' chr Error in mirai::mirai(): missing expression, perhaps wrap in {}?
is_mirai_error(m2$data)
#> [1] TRUE
is_error_value(m2$data)
#> [1] TRUE
If a daemon instance is sent a user interrupt, the mirai will resolve
to an empty character string of class ‘miraiInterrupt’ and ‘errorValue’.
is_mirai_interrupt()
may be used to test for such
interrupts.
is_mirai_interrupt(m2$data)
#> [1] FALSE
If execution of a mirai surpasses the timeout set via the ‘.timeout’ argument, the mirai will resolve to an ‘errorValue’. This can, amongst other things, guard against mirai processes that have the potential to hang and never return.
m3 <- mirai(nanonext::msleep(1000), .timeout = 500)
call_mirai(m3)$data
#> 'errorValue' int 5 | Timed out
is_mirai_error(m3$data)
#> [1] FALSE
is_mirai_interrupt(m3$data)
#> [1] FALSE
is_error_value(m3$data)
#> [1] TRUE
is_error_value()
tests for all mirai execution errors,
user interrupts and timeouts.
Custom serialization functions may be registered to handle external pointer type reference objects.
This is demonstrated below through an example using tensors from the
torch
package.
Register the serialization and unserialization functions as a
list supplied to the ‘refhook’ argument of
serialization()
.
Set up dameons - this may be done before or after setting
serialization()
.
Use everywhere()
to make the ‘torch’ package
available on all dameons.
library(torch)
serialization(refhook = list(torch:::torch_serialize, torch::torch_load))
#> [ mirai ] serialization functions registered
daemons(1)
#> [1] 1
everywhere(library(torch))
Next, a convolutional neural network is created, using an example
from the ‘torch’ nn_module()
function.
This model is then initialized in a ‘mirai’, passing it a set of parameters.
The returned model contains many tensor elements. The initial weights of the first convolutional layer are shown below as an example.
model <- nn_module(
initialize = function(in_size, out_size) {
self$conv1 <- nn_conv2d(in_size, out_size, 5)
self$conv2 <- nn_conv2d(in_size, out_size, 5)
},
forward = function(x) {
x <- self$conv1(x)
x <- nnf_relu(x)
x <- self$conv2(x)
x <- nnf_relu(x)
x
}
)
params <- list(in_size = 1, out_size = 20)
m <- mirai(do.call(model, params), .args = list(model, params))
call_mirai(m)$data
#> An `nn_module` containing 1,040 parameters.
#>
#> ── Modules ─────────────────────────────────────────────────────────────────────────────────────────────────────────
#> • conv1: <nn_conv2d> #520 parameters
#> • conv2: <nn_conv2d> #520 parameters
m$data$parameters$conv1.weight
#> torch_tensor
#> (1,1,.,.) =
#> -0.1547 -0.0463 -0.1194 0.1927 0.1373
#> -0.1053 -0.0197 0.0712 -0.1002 0.1691
#> 0.1674 -0.0518 -0.0091 -0.1342 -0.0194
#> 0.0066 -0.1761 -0.0011 0.1351 -0.1369
#> -0.0512 -0.1049 -0.0576 0.0602 0.0147
#>
#> (2,1,.,.) =
#> -0.0157 -0.0703 0.0258 0.1334 0.0585
#> 0.1185 0.1422 0.1206 -0.1038 0.0965
#> 0.0522 -0.1032 0.0371 -0.1417 0.1959
#> 0.0152 -0.1782 -0.1098 -0.0174 -0.1722
#> 0.1262 -0.1976 -0.1075 0.0125 0.0472
#>
#> (3,1,.,.) =
#> 0.1714 -0.1572 -0.0498 0.0131 0.1972
#> -0.0246 -0.1428 -0.0165 0.0767 0.1636
#> 0.1680 -0.0441 0.0450 -0.0327 -0.0071
#> 0.0442 0.0465 -0.0389 -0.0721 -0.1658
#> -0.0559 -0.1380 0.1621 -0.0671 0.1313
#>
#> (4,1,.,.) =
#> 0.1626 0.0884 0.0327 -0.0343 -0.0225
#> 0.1842 0.0755 0.1634 -0.1781 -0.1676
#> -0.0352 0.0195 0.1289 -0.1686 -0.1826
#> 0.0696 -0.0850 -0.0042 0.1297 0.1798
#> -0.0648 -0.0289 -0.0564 -0.1230 0.1566
#>
#> (5,1,.,.) =
#> -0.0987 -0.1350 -0.0552 0.0528 -0.1221
#> ... [the output was truncated (use n=-1 to disable)]
#> [ CPUFloatType{20,1,5,5} ][ requires_grad = TRUE ]
The model parameters can then be passed to an optimiser, with that also initialized within a ‘mirai’ process.
optim <- mirai(optim_rmsprop(params = params), params = m$data$parameters)
call_mirai(optim)$data
#> <optim_rmsprop>
#> Inherits from: <torch_optimizer>
#> Public:
#> add_param_group: function (param_group)
#> clone: function (deep = FALSE)
#> defaults: list
#> initialize: function (params, lr = 0.01, alpha = 0.99, eps = 1e-08, weight_decay = 0,
#> load_state_dict: function (state_dict, ..., .refer_to_state_dict = FALSE)
#> param_groups: list
#> state: State, R6
#> state_dict: function ()
#> step: function (closure = NULL)
#> zero_grad: function ()
#> Private:
#> step_helper: function (closure, loop_fun)
daemons(0)
#> [1] 0
It can be seen that tensors and objects containing tensors can be passed seamlessly between host and daemon processes in the same way as other R objects. Indeed, the idea is that they should not have to be treated any differently.
This implementation uses the serialization methods from the ‘torch’ package directly, and has been conceived to be fast and efficient, minimising data copies where possible.
This package contains experimental functions and S3 methods using
mirai
as an alternative communications backend for R. These
were developed to fulfil a request by R Core at R Project Sprint
2023.
make_cluster()
creates a cluster of type ‘miraiCluster’,
which may be used as a cluster object for any function in the
parallel
base package such as
parallel::clusterApply()
,
parallel::parLapply()
or the load-balanced version
parallel::parLapplyLB()
.
remote_config()
or
ssh_config()
. Otherwise, the shell commands for manual
deployment of nodes on remote resources are printed in interactive
sessions.launch_remote()
may also be called on a ’miraiCluster`
to return the shell commands for deployment of remote nodes.status()
may be called on a ’miraiCluster` to check the
number of connected nodes at any time.
cl <- make_cluster(4)
cl
#> < miraiCluster >
#> - cluster ID: `1`
#> - nodes: 4
#> - active: TRUE
parallel::parLapply(cl, 1:5, rnorm)
#> [[1]]
#> [1] -1.999523
#>
#> [[2]]
#> [1] -0.1418244 -0.7273119
#>
#> [[3]]
#> [1] 0.8664890 -0.3563103 -0.1666014
#>
#> [[4]]
#> [1] -1.9063231 -0.9560182 0.4853687 1.2031169
#>
#> [[5]]
#> [1] -0.1793877 -0.7776470 0.4795629 -0.6257594 0.6307579
status(cl)
#> $connections
#> [1] 4
#>
#> $daemons
#> [1] "abstract://f9b5e42866cec9d6b4954673"
stop_cluster(cl)
cl <- make_cluster(n = 2, url = host_url())
#> Shell commands for deployment on nodes:
#>
#> [1]
#> Rscript -e "mirai::daemon('tcp://hostname:32869',rs=c(10407,1826702598,-437546033,-1477058620,-603642891,-1763296206,-1013006837))"
#>
#> [2]
#> Rscript -e "mirai::daemon('tcp://hostname:32869',rs=c(10407,-1261748477,588492272,629851003,-2139045830,-367846132,1791820173))"
stop_cluster(cl)
Created clusters are fully compatible with parallel cluster types.
As an example, they may be registered by package doParallel
for use with the foreach
package.