Moju Kapu モジュカプ

How mirai and crew Are Powering the Next Generation of Parallel Computing in R

Charlie Gao and Will Landau

Hibiki AI Limited, Eli Lilly and Company

2024-07-09

moju-kapu (モジュカプ) is shorthand for modular encapsulation (モジュラーカプセル化)

  • Effective stand-alone tools < > entire integrated systems
  • Natural limits of a package
  • Interfaces for developers as well as end-users
  • Layered engineering approach

mirai

mirai logo mirai - the / moju kapu /

  • Motivation: production-grade parallel computing for R
  • Modularity: interfaces with and enhances base R / Shiny etc.
  • Encapsulation: developer interface for 3rd party extensions such as crew

mirai

Parallel & distributed computing for R

mirai logo

The 新幹線 Shinkansen (bullet train)

Shinkansen

mirai logo mirai - The Shinkansen of Parallel Computing

  1. Fast
  • 100x faster
future::value(future::future(1))
#> [1] 1
mirai::mirai(1)[]
#> [1] 1

Toy train

Shinkansen

mirai logo mirai - The Shinkansen of Parallel Computing

  1. Fast
  2. Reliable
  • 100x faster
  • WYSIWYG concept

headline

mirai logo mirai - The Shinkansen of Parallel Computing

  1. Fast
  2. Reliable
  • 100x faster
  • WYSIWYG concept
model$accuracy
#> [1] 0.95
f <- future::future(model$accuracy)

Error in getGlobalsAndPackages(expr, envir = envir, tweak = tweakExpression, : The total size of the 1 globals exported for future expression (‘model$accuracy’) is 762.94 MiB.. This exceeds the maximum allowed size of 500.00 MiB (option ‘future.globals.maxSize’). There is one global: ‘model’ (762.94 MiB of class ‘list’)

mirai logo mirai - The Shinkansen of Parallel Computing

  1. Fast
  2. Reliable
  • 100x faster
  • WYSIWYG concept
model$accuracy
#> [1] 0.95
m <- mirai::mirai(x, x = model$accuracy)
m[]
#> [1] 0.95

mirai logo mirai - The Shinkansen of Parallel Computing

  1. Fast
  2. Reliable
  3. Scalable
  • 100x faster
  • WYSIWYG concept
  • one million promises

tokyo metro

shanghai maglev

Modularity: Interfaces with and Enhances

R parallel   An alternative communications backend for R

Shiny Plumber Async backend (supports Shiny ExtendedTask)

Arrow   Host ADBC database connections

torch   Seamless handling of Torch tensors

crew: an encapsulation of mirai

Motivation for crew

  • targets is a pipeline tool for reproducible computation at scale in R
  • Manages large workflows in statistics and data science:
    • Bayesian data analysis
    • Machine learning
    • Simulation of clinical trials
    • Genomic data analysis

Challenges

  1. Scale out parallel workers to meet demand
  2. Scale in parallel workers to conserve resources
  3. Tailor itself to arbitrary distributed computing environments

A targets pipeline

A worker is an R process that runs tasks

Add workers to meet demand

Reuse workers for subsequent tasks

Discard workers no longer needed

Beginnings: mirai and crew

  • crew needed a backend to communicate with parallel workers over a local network connection
  • Originally considered Redis, but mirai is ideal:
    • Does not depend on Redis server
    • Can send larger data objects over the network
  • crew began using mirai in February 2023

How mirai supports crew

  • crew launches workers, mirai sends tasks to workers
  • mirai supports modular building blocks for crew:
    • mirai::daemon(url = "..."): turn any R process into a worker on the network.
    • mirai::saisei(): rotates websocket connections
    • Down-scaling workers: maximum idle time, maximum number of tasks

Moju Kapu design of crew

  • Encapsulation: centralized R6 “controller” interface
  • Modularity: plugins for different computing environments

Encapsulation: R6 classes to wrap mirai



# Start a new controller.
x <- crew::crew_controller_local(
  workers = 10,
  seconds_idle = 30
)

# Submit many parallel tasks.
x$walk(
  rnorm(1, x),
  iterate = list(x = seq_len(1000))
)

# Optional: wait for all tasks.
x$wait(mode = "all")

# Collect results so far.
str(unlist(x$collect()$result))
#> num [1:1000] 3.2 4.1 2.31 ...

Modularity: crew plugins

crew_controller_local()


crew_controller_slurm(
  slurm_memory_gigabytes_per_cpu = 16,
  script_lines = "module load R/4.4.0"
)
crew_controller_aws_batch(
  aws_batch_job_definition = "your_def",
  aws_batch_job_queue = "your_queue"
)
your_custom_controller(...)

Users can write crew plugins

custom_launcher_class <- R6::R6Class(
  classname = "custom_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    launch_worker = function(call, name, launcher, worker, instance) {

      # 1. Reserve compute for R to run, e.g. start a job on a cluster.
      # 2. Make that job start an R process.
      # 3. Make that R process run the code in `call`.

    },
    terminate_worker = function(handle) {

      # Terminate a worker.

    }
  )
)

targets accepts any crew controller

tar_option_set(
  controller = crew_controller_aws_batch(
    workers = 3,
    seconds_idle = 60,
    aws_batch_job_definition = "your_def",
    aws_batch_job_queue = "your_queue",
    aws_batch_region = "us-east-2"
  )
)

Thank you

mirai logo The Shinkansen of parallel computing

crew logo Bringing mirai to distributed data science workloads

The obvious choice for long-distance travel…

get to your destination faster!

Appendix: Supporting Slides

mirai - 100x Faster

Benchmarking:

library(mirai)
daemons(6)
#> [1] 6

library(future)
plan("multisession", workers = 6)

mirai(1)[]
#> [1] 1
value(future(1))
#> [1] 1

bench::mark(mirai(1)[], value(future(1)), relative = TRUE)
#> # A tibble: 2 × 6
#>   expression         min median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>       <dbl>  <dbl>     <dbl>     <dbl>    <dbl>
#> 1 mirai(1)[]          1      1       148.        1       NaN
#> 2 value(future(1))  186.   159.        1       152.      Inf

Created on 2024-06-27 with reprex v2.1.0

mirai - WYSIWYG Concept

Production usage requires ‘correctness’ over ‘convenience’

Compare and contrast:

library(mirai)

res <- list(model = double(1e8),
            acc = 0.95)

m <- mirai(2 * x, x = res$acc)
m[]
#> [1] 1.9
library(future)

res <- list(model = double(1e8),
            acc = 0.95)

f <- future(2 * res$acc)
#> Error in getGlobalsAndPackages(
#> expr, envir = envir, tweak =
#> tweakExpression, : The total
#> size of the 1 globals exported
#> for future expression ('2 *
#> res$acc') is 762.94 MiB.. This
#> exceeds the maximum allowed
#> size of 500.00 MiB (option
#> 'future.globals.maxSize').
#> There is one global: 'res'
#> (762.94 MiB of class 'list')

mirai - One Million Promises

library(mirai)
daemons(8, dispatcher = FALSE)
#> [1] 8

r <- 0
start <- Sys.time()
m <- mirai_map(1:1000000, \(x) x, .promise = \(x) r <<- r + x)
Sys.time() - start
#> Time difference of 6.14953 mins

later::run_now()
r
#> [1] 500000500000

Created on 2024-06-27 with reprex v2.1.0
Running on an Intel i7 Gen 11 notebook with 16GB RAM.