A high-level interface for running embarrassingly parallel tasks on HPC
clusters. Combines project creation, task addition, and worker scheduling
into a single function call, similar to lapply.
Usage
tq_apply(
n,
fun,
project,
resource,
memory = 10,
hour = 24,
account = NULL,
working_dir = getwd(),
...
)Arguments
- n
Integer specifying the number of tasks to run. Your function will be called with arguments 1, 2, ..., n.
- fun
Function to execute for each task. Must accept the task ID as its first argument. Should save results to disk.
- project
Character string for project name. Will be created if it doesn't exist, updated if it does.
- resource
Character string for resource name. Must already exist (created via
resource_add).- memory
Memory requirement in GB for each task. Default is 10 GB.
- hour
Maximum runtime in hours for worker jobs. Default is 24 hours.
- account
Optional character string for SLURM account/allocation. Default is NULL.
- working_dir
Working directory on the cluster where tasks execute. Default is current directory (
getwd()).- ...
Additional arguments passed to
funfor every task.
Details
This function automates the standard taskqueue workflow:
Creates or updates the project with specified memory
Assigns the resource to the project
Adds
ntasks (cleaning any existing tasks)Resets all tasks to idle status
Schedules workers on the SLURM cluster
Equivalent to manually calling:
project_add(project, memory = memory)
project_resource_add(project, resource, working_dir, account, hour, n)
task_add(project, n, clean = TRUE)
project_reset(project)
worker_slurm(project, resource, fun = fun, ...)Before using tq_apply:
Initialize database:
db_init()Create resource:
resource_add(...)Configure
.Renvironwith database credentials
Your worker function should:
Take task ID as first argument
Save results to files (not return values)
Be idempotent (check if output exists)
Examples
if (FALSE) { # \dontrun{
# Not run:
# Simple example
my_simulation <- function(i, param) {
out_file <- sprintf("results/sim_%04d.Rds", i)
if (file.exists(out_file)) return()
result <- run_simulation(i, param)
saveRDS(result, out_file)
}
# Run 100 simulations on HPC
tq_apply(
n = 100,
fun = my_simulation,
project = "my_study",
resource = "hpc",
memory = 16,
hour = 6,
param = 5
)
# Monitor progress
project_status("my_study")
task_status("my_study")
} # }