Task Queue is implemented in R for asynchronous tasks based on PostgreSQL database. This package is only suitable for parallel computing without any communication among parallel tasks (i.e. Embarrassingly parallel).
Challenge of parallel computing in R
Several R packages have been using for parallel computing (e.g. High-Performance and Parallel Computing with R which are not suitable for asynchronous tasks.
- Uneven load among cores/workers. Some workers run faster than others and then wait for others to stop.
- Cannot utilise the new available workers after a parallel task is started.
taskqueue is designed to utilise all available computing resources until all tasks are finished through dynamic allocating tasks to workers.
Installation
Install the developing version from Github.
devtools::install_github('byzheng/taskqueue')PostgreSQL installation and configuration
As large amount of concurrent requests might simultaneously reach the database, a specific server is preferred to install PostgreSQL database and can be connected by all workers. Following the PostgreSQL website to install and configure PostgreSQL. A database should be created for each user.
In all workers, five environmental variables should be added to .Renviron file to connect database, i.e.
PGHOST=
PGPORT=
PGUSER=
PGPASSWORD=
PGDATABASE=
Resource
A computing resource is defined as a facility/computer which can run multiple jobs/workers.
A new resource can be added by resource_add with configurations.
-
nameas resource name -
typeresource type. Only supportslurmat this stage -
hostnetwork name to access resource -
nodenameobtain bySys.info()in the resource -
workersmaximum number of available cores in resource -
log_folderfolder to store log files in the resource
Currently only slurm is supported although I plan to implement other types of resource.
resource_add(name = "hpc",
type = "slurm",
host = "hpc.example.com",
nodename = "hpc",
workers = 500,
log_folder = "/home/user/log_folder/")log_folder is important for troubleshooting and split by project. It wouble be better to store in the high speed hard drive as the frequent I/O process to write log files.
Project
taskqueue manages tasks by project which has its own resources, working directory, runtime requirements and other configurations.
A project with unique name can be created by project_add function and assign common requirements (e.g. memory).
project_add("test_project", memory = 20)Assign a resource to project which can be used to schedule workers with configurations (e.g. working directory, total runtimes, project specified workers).
project_resource_add(project = "test_project",
resource = "hpc")Now tasks can be added into database through function task_add, e.g.
task_add(project, num = 100, clean = TRUE)
Now we can develop a function to actually perform data processing, which
- takes task
idas the first argument, - expect no return values,
- save final output into file system and check whether this task is finished.
Finally, call worker to run wrapper function with project name.
library(taskqueue)
fun_test <- function(i) {
# Check output file
out_file <- sprintf("%s.Rds", i)
if (file.exists(out_file)) {
return()
}
# Perform the actual computing
# ....
# Save final output
saveRDS(i, out_file)
}
worker("test1", fun_test)After developing and testing the wrapper function, we can save it into a file (e.g. rcode.R) and then schedule to run it with following functions.
- Reset all or
failed/workingtasks toidlestatus withproject_reset - Start the project to allow workers consuming tasks with
project_start - Schedule tasks into resources (e.g.
worker_slurmfor slurm cluster) - Check task status with
task_status - Stop project with
project_stop
# Reset status for all tasks in a project
project_reset("test1")
# Start project
project_start("test1")
# Schedule task on slurm resource `hpc`
worker_slurm("test1", "hpc", "rcode.R", modules = "sqlite/3.43.1")
# Check status of all tasks
task_status("test1")
# Stop the project
project_stop("test1")A task has four status
-
idle: task is not running. -
working: task is still running on one of worker. -
failed: task is failed with some reason. Check the log folder for trouble shooting. -
finished: task is finished without errors.