Technical Documentation

Worker API

Base API and schedulers

The main worker functions are wrapped in an R6 class with the name of QSys. This provides a standardized API to the lower-level messages that are sent via ZeroMQ.

The base class itself is derived in scheduler classes that add the required functions for submitting and cleaning up jobs:

+ QSys
  |- Multicore
  |- LSF
  + SGE
    |- PBS
    |- Torque
  |- etc.

A pool of workers can be created using the workers() function, which instantiates an object of the corresponding QSys-derived scheduler class. See ?workers for details.

# start up a pool of three workers using the default scheduler
w = workers(n_jobs=3)

# if we make an unclean exit for whatever reason, clean up the jobs
on.exit(w$finalize())

Worker startup

For workers that are started up via a scheduler, we do not know which machine they will run on. This is why we start up every worker with a TCP/IP address of the master socket that will distribute work.

This is achieved by the call to R common to all schedulers:

R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

On the master’s side, we wait until a worker connects:

# this will block until a worker is ready
msg = w$receive_data()

Common data and exports

Workers will start up without any knowledge of what they should process or how. In order to transfer initial data to the worker, we first create and serialize a list object with the following fields:

# create a reusable, serialized ZeroMQ object with the common data on the master
w$set_common_data(fun, const, export, rettype, common_seed, token)

Workers that connect to the master will send a list with a field token. This can be used to check if the worker already received the common data it is supposed to work on.

if (msg$token != <token>)
    w$send_common_data()

Iterated data

If the worker has already received the common data, we can send it a chunk of iterated arguments to work on. These are passed as a list of iterables, e.g. a data.frame with a column for each iterated argument.

It also needs to have a column with name <space>id<space>, which will be used to identify each call.

chunk = data.frame(arg1=1:5, arg2=5:1, ` id `=1:5)
w$send_job_data(chunk)

If the worker has finished processing, it will send a message with the field result that is a list, containing:

msg = w$receive_data()
if (!is.null(msg$result)) {
    # store result here, handle errors/warnings if required
}

Custom calls

Apart from sending common and iterated data that the worker will process in chunks, it is also possible to send arbitrary calls that it will evaluate. It needs the following fields:

w$send_call(expr, env=list(...), ref="mycall1")

Main event loop

Putting the above together in an event loop, we get what is essentially implemented in master.

w = workers(3)
on.exit(w$finalize())

while (we have new work to send) {
    msg = w$receive_data()

    if (!is.null(msg$result))
        # handle result

    if (msg$token != <token>)
        w$send_common_data()
    else
        w$send_job_data(...)
}

# if proper cleanup is successful, cancel kill-on-exit
if (w$cleanup())
    on.exit()

A loop of a similar structure can be used to extend clustermq. As an example, this was done by drake using common data and custom calls only (no iterated chunks).

ZeroMQ message specification

Communication between the master (main event loop) and workers (QSys base class) is organised in messages. These are chunks of serialized data with an id field, and other data that is required for this type of message.

Messages per type

Below, the message id is listed with the additional fields per message.

Worker

This workflow is handled by the worker() event loop of clustermq (not to be confused with the workers() control). It is the function called in every job or thread to interact with the master(). The event loop is internal, i.e. it is not modifiable and not exported.

WORKER_UP
  • Message ID indicating worker is accepting data
  • Field has to be worker_id to master or empty to ssh_proxy
  • Answer is serialized common data (fun, const, and seed) or redirect (with URL where worker can get data)
WORKER_READY
  • Message ID indicating worker is accepting chunks
  • It may contain the field result with a finished chunk
  • If processing failed, result is an object of type error
  • If success, result is a list with the following vectors:
    • result is a named rettype with results
    • warnings is a list with warning messages of individual calls
    • errors is a list with error messages of individual calls
WORKER_DONE
  • Message ID indicating worker is shutting down
  • Worker will send this in response to WORKER_STOP
  • Field has to be time (from Sys.time()), mem (max memory used) and calls (number of processed calls)
WORKER_ERROR
  • Some error occurred in processing flow (not the function calls themselves)
  • Field msg is describing the error
  • Master will shut down after receiving this signal

Master

This workflow is handled by the master() function of clustermq. If you are using Q() or Q_rows(), this is handled under the hood. Workers created outside of these functions can be reused within Q()/Q_rows() without knowing any of the internal message structure.

w = workers(n_jobs, ...)
# w$cleanup() for a clean shutdown at the end

The documentation below is to show it is possible to implement a custom control flow, e.g. if you want to evaluate arbitrary expressions on workers instead of defining one function to call and different arguments.

DO_SETUP
  • Message contains common data, like the function to call and its arguments
  • Required fields are: fun, const, export, rettype, common_seed, token
  • Worker will respond with WORKER_READY
# create a reusable, serialized ZeroMQ object with the common data on the master
w$set_common_data()
# send this object to a worker
w$send_common_data()
DO_CHUNK
  • Chunk of iterated arguments for the worker
  • Field has to be chunk, a data.frame where each row is a call and columns are arguments
  • Row names of chunk are used as call IDs
w$send_job_data()
DO_CALL (new in 0.8.5)
  • Evaluate a specific expression on the worker
  • Needs fields expr (the expression to be evaluated) and env (list environment to evaluate it in)
w$send_call()
WORKER_WAIT
  • Instruct the worker to wait wait seconds
  • Worker will respond with WORKER_READY
w$send_wait()
WORKER_STOP
  • Instruct the worker to exit its main event loop
  • This message has no fields
w$send_shutdown_worker()
Disconnect and reset socket state
w$disconnect_worker()

Control flow stages

The convention here is

Batch processing, no proxy

This is the default use case for Q, Q_rows. It will set the common data (DO_SETUP; function to call, constant arguments, exported data, random seed) once and then provide chunks of arguments (DO_CHUNK) as data.frames for batch processing.

  • WORKER_UP
    • DO_SETUP
  • WORKER_READY [repeat]
    • DO_CHUNK [repeat]
  • WORKER_READY
    • WORKER_STOP
  • WORKER_DONE

These can be implemented the following way:

w$set_common_data(...)

while(work remaining or w$workers_running > 0) {
    msg = w$receive_data() # wait until a worker is ready
    if (msg$id == "WORKER_UP") { # treat same as WORKER_READY if no common data
        w$send_common_data()
    } else if (msg$id == "WORKER_READY") {
        if (work remaining)
            w$send_job_data(data.frame with arguments for all calls of this chunk)
        else
            w$send_shutdown_worker()
        # ..handle your result..
    } else if (msg$id == "WORKER_DONE") {
        w$disconnect_worker()
    } else if (msg$id == "WORKER_ERROR") {
        stop("processing error")
    }
}

Evaluating custom expressions

This can be mixed with batch processing, as long as DO_SETUP is called before DO_CHUNK (otherwise it will cause WORKER_ERROR on token mismatch).

  • WORKER_UP
    • DO_SETUP or DO_CALL (e.g. to export commonly used data)
  • WORKER_READY [repeat]
    • DO_CALL [repeat]
  • WORKER_READY
    • WORKER_STOP
  • WORKER_DONE

These can be implemented the following way:

w$set_common_data(...) # optional, if common data required

while(work remaining or w$workers_running > 0) {
    msg = w$receive_data() # wait until a worker is ready
    if (msg$id == "WORKER_UP") { # treat same as WORKER_READY if no common data
        w$send_common_data()
    } else if (msg$id == "WORKER_READY") {
        if (work remaining)
            w$send_call(expr, env)
        else
            w$send_shutdown_worker()
        # ..handle your result..
    } else if (msg$id == "WORKER_DONE") {
        w$disconnect_worker()
    } else if (msg$id == "WORKER_ERROR") {
        stop("processing error")
    }
}