Massively Parallel Jobs for Research Workflows
This section is a continuation of the previous section for dispatching jobs to Saturn Cloud. The previous section went over dispatching a single command to Saturn Cloud. However there are also times when you may want to create hundreds or thousands of runs in Saturn Cloud. Doing so by creating a job per run both makeds the Saturn Cloud UI cluttered and difficult to use, but it also makes it hard to retrieve, and understand which runs have succeeded and which have failed.
In addition, with massively parallel jobs you need to expect failures. When you dispatch a single job, the probability of intermittent failures is generally low - this is usually caused by hardware failures, network issues with data stores, AWS request rate limiting, etc. However once you start dispatching millions of jobs, the probability of having to deal with a handful of failures even under ideal conditions goes up dramatically. Massively parallel jobs in Saturn Cloud are designed around these constraints.
Specifying runs
Runs can be specified in json or yaml (yaml is much slower if you have thousands of runs)
nprocs: 4
remote_output_path: sfs://internal/hugo/intercom-backfill-2024-03-24/
runs:
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2023-12-31 --end-dt 2024-01-02 --enrich --first-seen
remote_output_path: sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-01
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-01 --end-dt 2024-01-03 --enrich --first-seen
remote_output_path: sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-02
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-02 --end-dt 2024-01-04 --enrich --first-seen
remote_output_path: sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-03
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-03 --end-dt 2024-01-05 --enrich --first-seen
remote_output_path: sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-04
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-04 --end-dt 2024-01-06 --enrich --first-seen
remote_output_path: sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-05
For example the above is a batch for a backfill script that populates intercom (our support chat tool) with information about our users so that we can better assist them with technical issues.
We expect the following parameters:
- nprocs: The number of runs to run in parallel per machine. This is determined by the machine you intend to run on, and the resource constraints of your run. For example if I have a run that takes approximately 1 GB of memory, and can consume a single CPU, and I am running on a c5.24xlarge which has 192 GB of memory, I may choose to set nprocs to 96 to maximize CPU utilization. However if my run takes 8 GB of memory, it may be safer to set nprocs to something less than (192 / 8 = 24) so I won’t consume all the memory on the machine.
- remote_output_path: a networked storage location where all results of runs will be stored.
- runs: a list of all runs we want to execute. This example only has 4 runs to keep things concise, but you will have more.
Each run expects the following parameters
- remote_output_path: this is the same parameter name as the previous section - this should be a subdirectory of the global
remote_output_path
and will house the results for this specific run - cmd: this is the command you would like to execute.
Batching Runs
The next step is to group runs together for efficiency. Batches are useful in order to reduce the impact of job overhead. For example if it takes 5 minutes to spin up a new machine, and I only allocate 5 minutes of work, then half the time of the job is wasted. At the extreme end, packing all of the runs into a single batch means I cannot leverage multiple machines to do the work. You can play with this parameter. If the batch size is unset, we default to 3 * nprocs
.
$ sc split /tmp/recipe.yaml /tmp/batch.yaml ~/commands --sync ~/workspace/saturn-operations
The above command takes a job recipe (/tmp/recipe.yaml
) along with a batch specification (/tmp/batch.yaml
) and writes the resulting batches into ~/commands
. It also synchronizes my local ~/workspace/saturn-operations
source directory with the rmeote job.
In my example, my batch.yaml
file has 168 runs. I’m planning on executing 4 commands at a time, and so the default batch size is 12. This results in 13 batches, which all look like the following:
{
"nprocs": 4,
"remote_output_path": "sfs://internal/hugo/intercom-backfill-2024-03-24/"
"runs": [
{
"remote_output_path": "sfs://internal/hugo/intercom-backfill-2024-03-24/2023-09-01",
"cmd": "python operations/pipelines/hosted/enrich_user_data.py --start-dt 2023-08-31 --end-dt 2023-09-02 --enrich --first-seen",
"local_results_dir": "/tmp/819b537dd41140e2a0bbd59b877486f3/"
},
... # runs omitted from the output for brevity
{
"remote_output_path": "sfs://internal/hugo/intercom-backfill-2024-03-24/2023-09-12",
"cmd": "python operations/pipelines/hosted/enrich_user_data.py --start-dt 2023-09-11 --end-dt 2023-09-13 --enrich --first-seen",
"local_results_dir": "/tmp/21ba080f724d4696b0ee6aff3a12da2e/"
}
],
}
The above command modified my original recipe and:
- modified the start script to download
~/workspace/saturn-operations
- modified the start script to download
~/workspace/commands
(where the batches are written) - modified the command to execute batches.
Here is the updated command section of the recipe
spec:
command:
- sc batch /home/jovyan/commands3/0.json
- sc batch /home/jovyan/commands3/1.json
- sc batch /home/jovyan/commands3/2.json
- sc batch /home/jovyan/commands3/3.json
- sc batch /home/jovyan/commands3/4.json
- sc batch /home/jovyan/commands3/5.json
- sc batch /home/jovyan/commands3/6.json
- sc batch /home/jovyan/commands3/7.json
- sc batch /home/jovyan/commands3/8.json
- sc batch /home/jovyan/commands3/9.json
- sc batch /home/jovyan/commands3/10.json
- sc batch /home/jovyan/commands3/11.json
- sc batch /home/jovyan/commands3/12.json
- sc batch /home/jovyan/commands3/13.json
sc batch
is a command that will take the above batch file as written, execute it on the machine and ensure that no more than 4 runs (nprocs
) is running at any given time, and ensure that stdout, stderr, and any results are saved to the remote_output_path
for the run.
Hardware Selections
This section of the recipe determines the hardware used for runs:
spec:
instance_type: r6axlarge
scale: 2
This means that we will consume at most 2 r6axlarge until all workloads are complete.
$ sc options
will list all available instance sizes.
Run output
The output for every run is stored in the remote_output_path
specifically
- stdout: the standard output of your command
- stderr: the standardd error of your command
- status_code: the unix status code of your command. 0 means it completed successfully
- results: Optional - any result files your job has written (more on this later)
Results
The sc batch command will populate an environment variable: SATURN_RUN_LOCAL_RESULTS_DIR
. Anything your job writes to that directory will be copied to ${remote_output_path}/results
Failures
As mentioned earlier - as the number of runs increases, having to deal with failures is almost guaranteed. As a result the batching infrastructure makes it easy to skip completed runs, and re-try failed runs. You could identify completed/failed runs your self by reading all the individual status_code
files. However the split
command has a few options to make this easier. By default sc split
will schedule all work from your batch.json
or batch.yaml
file. However if you pass --skip-completed
it will automatically ignore everything that has completed successfully, and if you pass --skip-failures
it will automatically skip everything that has failed.
Batch output
$ sc summarize-batch /tmp/batch.yaml
Will display status information for runs, based on remote storage. Output is as follows:
status status_code remote_output_path cmd
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-01 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2023-12-31 --end-dt 2024-01-02 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-02 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-01 --end-dt 2024-01-03 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-03 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-02 --end-dt 2024-01-04 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-04 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-03 --end-dt 2024-01-05 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-05 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-04 --end-dt 2024-01-06 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-06 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-05 --end-dt 2024-01-07 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-07 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-06 --end-dt 2024-01-08 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-08 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-07 --end-dt 2024-01-09 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-09 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-08 --end-dt 2024-01-10 --enrich --first-seen
This output makes it easy to grab any runs that failed, and re-run them locally.