Skip to contents

Plumber Integration

mirai may be used as an asynchronous backend for plumber pipelines.

Example usage is provided below for different types of endpoint.

Example GET Endpoint

The plumber router code is run in a daemon process itself so that it does not block the interactive process.

The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.

library(mirai)

# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # more efficient not to use dispatcher if all requests are similar length
  daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously

  pr() |>
    pr_get(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L)
            list(
              status = 200L,
              body = list(
                time = format(Sys.time()), msg = msg, pid = Sys.getpid()
              )
            )
          },
          msg = req[["HEADERS"]][["msg"]]
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8985)
})

The API can be queried using an async HTTP client such as nanonext::ncurl_aio().

Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).

library(nanonext)
res <- lapply(
  1:8,
  function(i) ncurl_aio(
    "http://127.0.0.1:8985/echo",
    headers = c(msg = as.character(i))
  )
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"1\"],\"pid\":[30785]}"
#> 
#> [[2]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"2\"],\"pid\":[30777]}"
#> 
#> [[3]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"3\"],\"pid\":[30779]}"
#> 
#> [[4]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"4\"],\"pid\":[30785]}"
#> 
#> [[5]]
#> [1] "{\"time\":[\"2024-12-01 10:05:03\"],\"msg\":[\"5\"],\"pid\":[30782]}"
#> 
#> [[6]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"6\"],\"pid\":[30782]}"
#> 
#> [[7]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"7\"],\"pid\":[30777]}"
#> 
#> [[8]]
#> [1] "{\"time\":[\"2024-12-01 10:05:04\"],\"msg\":[\"8\"],\"pid\":[30779]}"

daemons(0)
#> [1] 0

Example POST Endpoint

Below is a demonstration of the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.

Note that req$postBody should always be accessed in the router process and passed in as an argument to the ‘mirai’, as this is retrieved using a connection that is not serializable.

library(mirai)

# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1

m <- mirai({
  library(plumber)
  library(promises) # to provide the promise pipe
  library(mirai)

  # uses dispatcher - suitable when requests take differing times to complete
  daemons(4L) # handles 4 requests simultaneously

  pr() |>
    pr_post(
      "/echo",
      function(req, res) {
        mirai(
          {
            Sys.sleep(1L) # simulate expensive computation
            list(
              status = 200L,
              body = list(
                time = format(Sys.time()),
                msg = jsonlite::fromJSON(data)[["msg"]],
                pid = Sys.getpid()
              )
            )
          },
          data = req$postBody
        ) %...>% (function(x) {
          res$status <- x$status
          res$body <- x$body
        })
      }
    ) |>
    pr_run(host = "127.0.0.1", port = 8986)
})

Querying the endpoint produces the same set of outputs as the previous example.

library(nanonext)
res <- lapply(
  1:8,
  function(i) ncurl_aio(
    "http://127.0.0.1:8986/echo",
    method = "POST",
    data = sprintf('{"msg":"%d"}', i)
  )
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"1\"],\"pid\":[31046]}"
#> 
#> [[2]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"2\"],\"pid\":[31048]}"
#> 
#> [[3]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"3\"],\"pid\":[31054]}"
#> 
#> [[4]]
#> [1] "{\"time\":[\"2024-12-01 10:05:08\"],\"msg\":[\"4\"],\"pid\":[31051]}"
#> 
#> [[5]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"5\"],\"pid\":[31046]}"
#> 
#> [[6]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"6\"],\"pid\":[31048]}"
#> 
#> [[7]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"7\"],\"pid\":[31051]}"
#> 
#> [[8]]
#> [1] "{\"time\":[\"2024-12-01 10:05:09\"],\"msg\":[\"8\"],\"pid\":[31054]}"

daemons(0)
#> [1] 0