Plumber Integration

mirai may be used as an asynchronous / distributed backend for plumber pipelines.

Example usage is provided below for different types of endpoint.

Example GET Endpoint

The plumber router code is run in a daemon process itself so that it does not block the interactive process.

The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.

library(mirai)

# important to supply SIGINT so the plumber server is interrupted and exits cleanly when torn down
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # does not use dispatcher (suitable when all requests require similar compute)
  daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously

  pr() |>
    pr_get(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L)
            list(status = 200L, body = list(time = format(Sys.time()),
                                            msg = msg,
                                            pid = Sys.getpid()))
          },
          msg = req[["HEADERS"]][["msg"]]
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8985)
})

The API can be queried using an async HTTP client such as nanonext::ncurl_aio().

Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).

library(nanonext)
res <- lapply(1:8,
              function(i) ncurl_aio("http://127.0.0.1:8985/echo",
                                    headers = c(msg = as.character(i))))
res <- lapply(res, call_aio)
for (r in res) print(r$data)
#> [1] "{\"time\":[\"2024-04-23 17:15:11\"],\"msg\":[\"1\"],\"pid\":[36065]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:11\"],\"msg\":[\"2\"],\"pid\":[36062]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:11\"],\"msg\":[\"3\"],\"pid\":[36068]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:11\"],\"msg\":[\"4\"],\"pid\":[36060]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:12\"],\"msg\":[\"5\"],\"pid\":[36065]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:12\"],\"msg\":[\"6\"],\"pid\":[36062]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:12\"],\"msg\":[\"7\"],\"pid\":[36068]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:12\"],\"msg\":[\"8\"],\"pid\":[36060]}"

daemons(0)
#> [1] 0

Example POST Endpoint

Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.

Note that req$postBody should always be accessed in the router process and passed in as an argument to the ‘mirai’, as this is retrieved using a connection that is not serializable.

library(mirai)

# important to supply SIGINT so the plumber server is interrupted and exits cleanly when torn down
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # uses dispatcher (suitable for requests with differing compute lengths)
  daemons(4L, dispatcher = TRUE) # handles 4 requests simultaneously

  pr() |>
    pr_post(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L) # simulate expensive computation
            list(status = 200L,
                 body = list(time = format(Sys.time()),
                             msg = jsonlite::fromJSON(data)[["msg"]],
                             pid = Sys.getpid()))
          },
          data = req$postBody
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8986)
})

Querying the endpoint produces the same set of outputs as the previous example.

library(nanonext)
res <- lapply(1:8,
              function(i) ncurl_aio("http://127.0.0.1:8986/echo",
                                    method = "POST",
                                    data = sprintf('{"msg":"%d"}', i)))
res <- lapply(res, call_aio)
for (r in res) print(r$data)
#> [1] "{\"time\":[\"2024-04-23 17:15:15\"],\"msg\":[\"1\"],\"pid\":[36338]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:15\"],\"msg\":[\"2\"],\"pid\":[36343]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:15\"],\"msg\":[\"3\"],\"pid\":[36340]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:15\"],\"msg\":[\"4\"],\"pid\":[36346]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:16\"],\"msg\":[\"5\"],\"pid\":[36346]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:16\"],\"msg\":[\"6\"],\"pid\":[36338]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:16\"],\"msg\":[\"7\"],\"pid\":[36343]}"
#> [1] "{\"time\":[\"2024-04-23 17:15:16\"],\"msg\":[\"8\"],\"pid\":[36340]}"

daemons(0)
#> [1] 0