Plumber Integration
mirai
may be used as an asynchronous backend for plumber
pipelines.
Example usage is provided below for different types of endpoint.
Example GET Endpoint
The plumber router code is run in a daemon process itself so that it does not block the interactive process.
The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.
library(mirai)
# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1
m <- mirai({
library(plumber)
library(promises) # to provide the promise pipe
library(mirai)
# more efficient not to use dispatcher if all requests are similar length
daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously
pr() |>
pr_get(
"/echo",
function(req, res) {
mirai(
{
Sys.sleep(1L)
list(
status = 200L,
body = list(
time = format(Sys.time()), msg = msg, pid = Sys.getpid()
)
)
},
msg = req[["HEADERS"]][["msg"]]
) %...>% (function(x) {
res$status <- x$status
res$body <- x$body
})
}
) |>
pr_run(host = "127.0.0.1", port = 8985)
})
The API can be queried using an async HTTP client such as
nanonext::ncurl_aio()
.
Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).
library(nanonext)
res <- lapply(
1:8,
function(i) ncurl_aio(
"http://127.0.0.1:8985/echo",
headers = c(msg = as.character(i))
)
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"1\"],\"pid\":[30785]}"
#>
#> [[2]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"2\"],\"pid\":[30777]}"
#>
#> [[3]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"3\"],\"pid\":[30779]}"
#>
#> [[4]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"4\"],\"pid\":[30785]}"
#>
#> [[5]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"5\"],\"pid\":[30782]}"
#>
#> [[6]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"6\"],\"pid\":[30782]}"
#>
#> [[7]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"7\"],\"pid\":[30777]}"
#>
#> [[8]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"8\"],\"pid\":[30779]}"
daemons(0)
#> [1] 0
Example POST Endpoint
Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.
Note that req$postBody
should always be accessed in the
router process and passed in as an argument to the ‘mirai’, as this is
retrieved using a connection that is not serializable.
library(mirai)
# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1
m <- mirai({
library(plumber)
library(promises) # to provide the promise pipe
library(mirai)
# uses dispatcher - suitable when requests take differing times to complete
daemons(4L) # handles 4 requests simultaneously
pr() |>
pr_post(
"/echo",
function(req, res) {
mirai(
{
Sys.sleep(1L) # simulate expensive computation
list(
status = 200L,
body = list(
time = format(Sys.time()),
msg = jsonlite::fromJSON(data)[["msg"]],
pid = Sys.getpid()
)
)
},
data = req$postBody
) %...>% (function(x) {
res$status <- x$status
res$body <- x$body
})
}
) |>
pr_run(host = "127.0.0.1", port = 8986)
})
Querying the endpoint produces the same set of outputs as the previous example.
library(nanonext)
res <- lapply(
1:8,
function(i) ncurl_aio(
"http://127.0.0.1:8986/echo",
method = "POST",
data = sprintf('{"msg":"%d"}', i)
)
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"1\"],\"pid\":[31046]}"
#>
#> [[2]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"2\"],\"pid\":[31048]}"
#>
#> [[3]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"3\"],\"pid\":[31054]}"
#>
#> [[4]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"4\"],\"pid\":[31051]}"
#>
#> [[5]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"5\"],\"pid\":[31046]}"
#>
#> [[6]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"6\"],\"pid\":[31048]}"
#>
#> [[7]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"7\"],\"pid\":[31051]}"
#>
#> [[8]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"8\"],\"pid\":[31054]}"
daemons(0)
#> [1] 0