Plumber Integration

{mirai} may be used as an asynchronous / distributed backend for {plumber} pipelines.

Example usage is provided below for different types of endpoint.

Example GET Endpoint

The plumber router code is run in a daemon process itself so that it does not block the interactive process.

The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.

library(mirai)

# important to supply SIGINT so the plumber server is interrupted and exits cleanly when torn down
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # does not use dispatcher (suitable when all requests require similar compute)
  daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously

  pr() |>
    pr_get(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L)
            list(status = 200L, body = list(time = format(Sys.time()),
                                            msg = req[["HEADERS"]][["msg"]],
                                            pid = Sys.getpid()))
          },
          req = req
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8985)
})

The API can be queried using an async HTTP client such as nanonext::ncurl_aio().

Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).

library(nanonext)
res <- lapply(1:8,
              function(i) ncurl_aio("http://127.0.0.1:8985/echo",
                                    headers = c(msg = as.character(i))))
res <- lapply(res, call_aio)
for (r in res) print(r$data)
#> [1] "{\"time\":[\"2024-02-25 11:46:01\"],\"msg\":[\"1\"],\"pid\":[25940]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:01\"],\"msg\":[\"2\"],\"pid\":[25948]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:01\"],\"msg\":[\"3\"],\"pid\":[25942]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:02\"],\"msg\":[\"4\"],\"pid\":[25940]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:01\"],\"msg\":[\"5\"],\"pid\":[25945]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:02\"],\"msg\":[\"6\"],\"pid\":[25948]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:02\"],\"msg\":[\"7\"],\"pid\":[25945]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:02\"],\"msg\":[\"8\"],\"pid\":[25942]}"

daemons(0)
#> [1] 0

Example POST Endpoint

Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.

It is important to note in this case that req$postBody should be accessed in the router process and passed in as an argument to the ‘mirai’ as this is retrieved using a connection that is not serializable.

library(mirai)

# important to supply SIGINT so the plumber server is interrupted and exits cleanly when torn down
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # uses dispatcher (suitable for requests with differing compute lengths)
  daemons(4L, dispatcher = TRUE) # handles 4 requests simultaneously

  pr() |>
    pr_post(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L) # simulate expensive computation
            list(status = 200L,
                 body = list(time = format(Sys.time()),
                             msg = jsonlite::fromJSON(data)[["msg"]],
                             pid = Sys.getpid()))
          },
          data = req$postBody
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8986)
})

Querying the endpoint produces the same set of outputs as the previous example.

library(nanonext)
res <- lapply(1:8,
              function(i) ncurl_aio("http://127.0.0.1:8986/echo",
                                    method = "POST",
                                    data = sprintf('{"msg":"%d"}', i)))
res <- lapply(res, call_aio)
for (r in res) print(r$data)
#> [1] "{\"time\":[\"2024-02-25 11:46:05\"],\"msg\":[\"1\"],\"pid\":[26224]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:05\"],\"msg\":[\"2\"],\"pid\":[26226]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:05\"],\"msg\":[\"3\"],\"pid\":[26229]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:05\"],\"msg\":[\"4\"],\"pid\":[26235]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:06\"],\"msg\":[\"5\"],\"pid\":[26224]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:06\"],\"msg\":[\"6\"],\"pid\":[26226]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:06\"],\"msg\":[\"7\"],\"pid\":[26235]}"
#> [1] "{\"time\":[\"2024-02-25 11:46:06\"],\"msg\":[\"8\"],\"pid\":[26229]}"

daemons(0)
#> [1] 0