The SandCastle Tutorial

https://images.unsplash.com/photo-1627873250186-50685f4910c8?crop=entropy&cs=tinysrgb&fm=jpg&ixlib=rb-1.2.1&q=80&raw_url=true&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=1170sandcastle

Photo Emily Studer on UnSplash

The SandCastle tutorial is a trivial application of lemmings in a “sandbox”. This tutorial can be done without a scheduler installed, and without a solver.

You will run a mock solver called twicer in a recursive workflow using a fake job scheduler sandbox. Once you are done with it, you can use this test case to explore what can be done with a workflow, all of it without actually running on a cluster.

This tutorial is part of the lemmings test suite. We will create the file here step-by-step, but you can fetch stuff from tests/sandbox/test_sandcastle/.

The mock solver twicer

Twicer is a small program, able to load a solution, compute on this data and dump a new solution.

If the initial solution twicer_sol.yml is:

time: 1.0
data:
  - 1.
  - 5.
  - 7.
  - 3.  

the final solution is

time: 2.0
data:
  - 2.
  - 10.
  - 14.
  - 6.  

Twicer uses the following input file twicer_in.yml:

init_sol: TEMP_0001/twicer_sol.yml
out_path: TEMP_0002 

Here init_sol is where to find the initial solution, while out_path is where to store it.

The solver twicer is defined by the python script twicer.py:

""" Mock up code twicer for lemmings tests """
import os
import yaml

### Main actions
def main(init_file, sol_path):
    """Main function of twicer
    
    init_file (str): initial solution path to read
    sol_path (str): final solution folder
    """
    
    print("Twicer execution...")
    # Load data
    print(f" - reading data from {init_file} ...")
    sol = _load_data(init_file)
    
    print(" - twicing...")
    # Increase time
    sol["time"]+=1
    # Double the content of data
    sol["data"] = [2*dat for dat in sol["data"]]

    # create output folder if missing
    if not os.path.isdir(sol_path):
        os.mkdir(sol_path)
    # dump solution
    out_file = sol_path+"/twicer_sol.yml"
    print(f" - dumping data to {out_file} ...")
    _dump_data(sol, out_file)
    print(f"Execution complete")


### I/O using yaml
def _load_data(filename):
    """Load a yaml file as dict"""
    with open(filename, "r") as fin:
        data = yaml.load(fin, Loader=yaml.SafeLoader)    
    return data


def _dump_data(data,filename):
    """Write dict to a  a yaml file"""
    with open(filename, "w") as fout:
        yaml.dump(data,fout, Dumper=yaml.SafeDumper)

## Main function call
if __name__ == "__main__":
    """If called directly from terminal"""
    input_params = _load_data("twicer_in.yml")
    init_file = input_params["init_sol"]
    sol_path = input_params["out_path"]
    main(init_file, sol_path)

In a folder, create TEMP_0001/twicer_sol.yml, twicer_in.yml and twicer.py.

In the aforementioned folder, try to execute twicer with >python twicer.py. It should simply change the content of your working directory by creating the folder TEMP_0002 with the final solution inside:

> tree
 .
├── TEMP_0001
│   └── twicer_sol.yml
└── twicer_in.yml

> python twicer.py
Twicer execution...
 - reading data from TEMP_0001/twicer_sol.yml ...
 - twicing...
 - dumping data to TEMP_0002/twicer_sol.yml ...
Execution complete

>tree
 .
├── TEMP_0001
│   └── twicer_sol.yml
├── TEMP_0002
│   └── twicer_sol.yml
└── twicer_in.yml

Create a workflow for twicer

Our aim: build a workflow able to re-submit a twicer job until we reach a time of 4 secs in the solutions. We need a recursive loop able to:

  1. Create a new run, according to what is on disk

  2. Test if the last run reached an end condition

The lemmings Job loop is often presented as follows:

          +--------------+ True   +-----------+                     +------------+True  +-------------+
Start----->Check on Start+------->|Prepare Run+--->Job submission--->Check on end+------>After End Job+------>Happy
          +-----+--------+    ^   +-----------+                     +------+-----+      +-------------+        End
                |             |                                            |
                |False        |                                            |False
                |             |                   +-------------+          |
                v             |                   |  Prior to   |          |
              Early           +-------------------+new iteration<----------+
               End                                +-------------+

The boxes are all the places where we can inject our custom code. However the two main places are Prepare Run, just before the submission, and Check on end which will decide if the loop needs to stop or not. The graph is therefore simpler:

                +-----------+                     +------------+True  
Start---------->|Prepare Run+--->Job submission---->Check on end+----------->Happy
            ^   +-----------+                     +------+-----+             End
            |                                            |
            |                                            |False
            |                                            |
            |                                            |
            +--------------------------------------------+                          

The Prepare run code

This action, done just before the submission, should set up the run. To keep the workflow as robust and testable as possible, we will try to use actions based solely on what is on the disk. This will avoid passing information from a job to another, something feasible but hard to debug.

The following code searches on the disk what was the last solution folder, and creates a new input file accordingly

def prepare_run():
    """
    Refresh the input file
    """
    last_id = last_folder_id()

    input_twicer =  {
        "init_sol": _name_folder(last_id)+ "/twicer_sol.yml",
        "out_path": _name_folder(last_id+1)
    }
    with open("./twicer_in.yml", "w") as fout:
        yaml.dump(input_twicer,fout)

def last_folder_id(wdir = None):
    """Find the ID of the last folder of twicer
    
    Example
    .
    ├── TEMP_0001
    │   └── twicer_sol.yml
    ├── TEMP_0002
    │   └── twicer_sol.yml
    ├── TEMP_0003
    │   └── twicer_sol.yml
    ├── TEMP_0004
    │   └── twicer_sol.yml
    will return 4
    
    """
    if wdir is None:
        wdir = os.getcwd()
    folds=glob("*/twicer_sol.yml")
    max_id = 0
    for fold in folds:
        path_ = PurePath(fold)
        max_id = max(_folder_id(path_),max_id)
        print(max_id)

    return max_id

def _name_folder(id):
    """Return a  folder name from its integer id
    
    example : 55 -> TEMP_0055 """

    return f"TEMP_{id:04d}"

def _folder_id(folder_):
    """Return a integer id from its folder name
    
    example :  TEMP_0055 -> 55"""
    path_ = PurePath(folder_)
    id_ = int(str(path_.parents[0]).split("_")[-1])
    return id_

The Check on end code

This function should return False until a stopping condition is reached. Here the following function reads the output solution, and stops the chain if the simulation time achieved is over 4sec.

def check_on_end():
    """
    What to do after post job
    """
    if my_completion(4) < 1.:
        return False
    else:
        return True

def my_completion(simtime_target):
    """Evaluation my completion of the workflow"""
    last_id = last_folder_id()
    with open(_name_folder(last_id)+ "/twicer_sol.yml", "r") as fin:
        sol = yaml.load(fin)
    return sol["time"]/simtime_target

The limit here is hardcoded for now. See at the end of this tutorial how to make this a parameter.

Gathering this into a single workflow

We combine these two functions into a lemmings workflow, called sandcastle.py as follows:

"""
Lemmings script for tutorial with twicer
"""
import os
from glob import glob
from pathlib import PurePath
import yaml
from lemmings.chain.lemmingjob_base import LemmingJobBase

class LemmingJob(LemmingJobBase):
    """
    A lemming job follows always the same pattern.

              +--------------+ True   +-----------+                     +------------+True +-------------+
    Start----->Check on Start+------->|Prepare Run+--->Job submission--->Check on end+----->After End Job+------>Happy
              +-----+--------+    ^   +-----------+                     +------+-----+     +-------------+        End
                    |             |                                            |
                    |False        |                                            |False
                    |             |                   +-------------+          |
                                  |                   |  Prior to   |          |
                  Early           +-------------------+new iteration<----------+
                   End                                +-------------+
    """
    def prepare_run(self):
        """
        Refresh the input file
        """
        last_id = last_folder_id()

        input_twicer =  {
            "init_sol": _name_folder(last_id)+ "/twicer_sol.yml",
            "out_path": _name_folder(last_id+1)
        }
        with open("./twicer_in.yml", "w") as fout:
            yaml.dump(input_twicer,fout)

    def check_on_end(self):
        """
        What to do after post job
        """
        if my_completion(4) < 1.:
            return False
        else:
            return True

def my_completion(simtime_target):
    """Evaluation my completion of the workflow"""
    last_id = last_folder_id()
    with open(_name_folder(last_id)+ "/twicer_sol.yml", "r") as fin:
        sol = yaml.load(fin, Loader=yaml.SafeLoader)

    return sol["time"]/simtime_target

def last_folder_id(wdir = None):
    """Find the ID of the last folder"""
    if wdir is None:
        wdir = os.getcwd()
    folds=glob("*/twicer_sol.yml")
    max_id = 0
    for fold in folds:
        path_ = PurePath(fold)
        max_id = max(_folder_id(path_),max_id)
        print(max_id)
    return max_id

def _name_folder(id):
    """Return a  folder name from its integer id
    example : 55 -> TEMP_0055 """
    return f"TEMP_{id:04d}"

def _folder_id(folder_):
    """Return a integer id from its folder name 
    example :  TEMP_0055 -> 55"""
    path_ = PurePath(folder_)
    id_ = int(str(path_.parents[0]).split("_")[-1])
    return id_

This workflow needs an input file with the same name sandcastle.yml:

#---Required Settings--#
exec: |                      #your environement, slurm exec, module etc (writted in the batch)
      echo "hello world exec"
      python twicer.py
exec_pj: |                  # exec for the PJ (writted in the batch)
      echo "hello world exec_pj"
job_queue: long         # name of the job queue based on your '{machine}.yml'
pjob_queue: short        # name of the post-job queue based on your '{machine}.yml'
cpu_limit: null            # The maximal CPU limit for your Lemmings chain [Hours]

# #---Optional Settings--#
job_prefix: twicer              # Add a prefix to the run_name

These are the compulsory inputs for any lemming job. exec and exec_pj are lines to be added at the end of your batch submission files. We add python twicer.py to the first. Note you can include executions in the post job too. job_queue (respectively pjob_queue) is the name of the queue on which is run the commands defined in exec (respectively exec_pj). Finally cpu_limit is the max CPU Hours that can be used by this workflow and job_prefix a prefix to all your jobs.

Additionally, lemmings needs a machine file detailing the commands relative to the job submission and characteristics of the machine on which is run the job. In our case, the machine machine file is sandbox.yml:

commands:
  submit: lem_sandbox submit
  get_cpu_time: lem_sandbox acct -LEMMING-JOBID-
  dependency: "-a"
  cancel: lem_sandbox cancel
queues:
  long:
    wall_time: 00:00:30
    # core_nb is a parameter REQUIRED BY LEMMINGS in order to correctly
    # compute the elapsed CPU time during the chain
    core_nb: 1                 # core_nb = nodes*ntaks-per-node !!!
    header: |
            #!/bin/bash
            #SBX queue=long-LEMMING-WALL-TIME-
            #SBX job_name=-LEMMING-JOB_NAME-

            -EXEC-
  short:
    wall_time: 00:00:10
    # core_nb is a parameter REQUIRED BY LEMMINGS in order to correctly
    # compute the elapsed CPU time during the chain
    core_nb: 1                 # core_nb = nodes*ntaks-per-node !!!
    header: |
            #!/bin/bash
            #SBX queue=short-LEMMING-WALL-TIME-
            #SBX job_name=-LEMMING-JOB_NAME-

            -EXEC_PJ-

commands lists the commands to submit the job (submit), to get the CPU Hours (get_cpu_time), the flag to perform a job dependency (dependency), and the command to cancel the job (cancel). In queues are detailed, the queues name (long, short), and at a lower level for each queue, the maximal wall time of the queue (wall_time), the maximal number of cores possible (core_nb) and the header of the script to submit the job (header).

Testing the workflow

Before testing, refer to the sandbox help page to set up your sandbox job scheduler.

Your test folder should look like this:

.
├── TEMP_0001
│   └── twicer_sol.yml
├── sandbox.yml
├── sandcastle.py
├── sandcastle.yml
└── twicer.py

Start lemmings

When you are ready, open two terminals. In the first one, start the sandbox:

>lem_sandbox start -d 120
INFO - Starting sandbox...
INFO - Freq: 3s
INFO - Max duration: 120s
INFO - Daemon spawn.
INFO - Daemon spawn.
(...)

in the second one, start your lemming job:

>lemmings run --machine-file sandbox.yml --inputfile sandcastle.yml --job-prefix twicer sandcastle.py
INFO - 
##############################
Starting Lemmings 0.8.0...
##############################

INFO -     Job name     :twicer_JEXA93
INFO -     Loop         :1
INFO -     Status       :start
INFO -     Worflow path :/Users/dauptain/TEST/test_workflow_single/sandcastle.py
INFO -     Imput path   :/Users/dauptain/TEST/test_workflow_single/sandcastle.yml
INFO -     Machine path :/Users/dauptain/TEST/test_workflow_single/sandbox.yml
INFO -     Farming mode :False
INFO -     Lemmings START
INFO -          Check on startTrue (False -> Exit)
INFO -          Prior to job
INFO -     Lemmings SPAWN
INFO -          Prepare run
INFO -          Submit batch 95575 
INFO -          Submit batch post job 95578

Monitor during the run

You can then monitor what happens using the lemmings status until completion:

>lemmings status
Status for chain twicer_JEXA93 
+------+---------------+----------------+-----------+--------------+--------+---------+
| Loop | Solution path | Job end status |  progress | CPU time (h) | job ID | pjob ID |
+------+---------------+----------------+-----------+--------------+--------+---------+
|  0   |   Submitted   |   Submitted    | Submitted |  Submitted   | 95575  |  95578  |
+------+---------------+----------------+-----------+--------------+--------+---------+
>lemmings status
Status for chain twicer_JEXA93 
+------+---------------+-----------------+-----------+--------------+--------+---------+
| Loop | Solution path |  Job end status |  progress | CPU time (h) | job ID | pjob ID |
+------+---------------+-----------------+-----------+--------------+--------+---------+
|  0   |       ./      | ended, continue |     NA    |    0.001     | 95575  |  95578  |
|  1   |   Submitted   |    Submitted    | Submitted |  Submitted   | 95590  |  95591  |
+------+---------------+-----------------+-----------+--------------+--------+---------+
Status for chain twicer_JEXA93 
+------+---------------+-----------------+-----------+--------------+--------+---------+
| Loop | Solution path |  Job end status |  progress | CPU time (h) | job ID | pjob ID |
+------+---------------+-----------------+-----------+--------------+--------+---------+
|  0   |       ./      | ended, continue |     NA    |    0.001     | 95575  |  95578  |
|  1   |       ./      | ended, continue |     NA    |    0.002     | 95590  |  95591  |
|  2   |   Submitted   |    Submitted    | Submitted |  Submitted   | 95611  |  95612  |
+------+---------------+-----------------+-----------+--------------+--------+---------+

Switch back to the sandbox terminal, and you will see the succession of jobs :

(...)
INFO - Daemon spawn.
INFO - Daemon spawn.
INFO - start job twicer_JEXA93_95575
INFO - Command source  ~/Python_envs/dev_opentea/bin/activate ; export LEMMINGS_MACHINE=None ; echo "hello world exec" ; python twicer.py
INFO - Daemon spawn.
INFO - stop job twicer_JEXA93_95575
INFO - start job twicer_JEXA93_95578
INFO - Command source  ~/Python_envs/dev_opentea/bin/activate ; export LEMMINGS_MACHINE=None ; echo "hello world exec_pj" ; lemmings run sandcastle -s post_job --yaml=/Users/dauptain/TEST/test_workflow/sandcastle.yml --machine-file=sandbox.yml
INFO - Daemon spawn.
INFO - stop job twicer_JEXA93_95578
INFO - start job twicer_JEXA93_95590
INFO - Command source  ~/Python_envs/dev_opentea/bin/activate ; export LEMMINGS_MACHINE=None ; echo "hello world exec" ; python twicer.py
INFO - Daemon spawn.
INFO - stop job twicer_JEXA93_95590
INFO - start job twicer_JEXA93_95591
INFO - Command source  ~/Python_envs/dev_opentea/bin/activate ; export LEMMINGS_MACHINE=None ; echo "hello world exec_pj" ; lemmings run sandcastle -s post_job --yaml=/Users/dauptain/TEST/test_workflow/sandcastle.yml --machine-file=sandbox.yml
INFO - Daemon spawn.
INFO - stop job twicer_JEXA93_95591
INFO - start job twicer_JEXA93_95611
INFO - Command source  ~/Python_envs/dev_opentea/bin/activate ; export LEMMINGS_MACHINE=None ; echo "hello world exec" ; python twicer.py
INFO - Daemon spawn.
INFO - stop job twicer_JEXA93_95611
INFO - start job twicer_JEXA93_95612
INFO - Command source  ~/Python_envs/dev_opentea/bin/activate ; export LEMMINGS_MACHINE=None ; echo "hello world exec_pj" ; lemmings run sandcastle -s post_job --yaml=/Users/dauptain/TEST/test_workflow/sandcastle.yml --machine-file=sandbox.yml
INFO - Daemon spawn.
INFO - stop job twicer_JEXA93_95612
INFO - Daemon spawn.
INFO - Daemon spawn.
INFO - Daemon spawn.
(...)

Look at the result

In your folder you will have something like:

.
├── TEMP_0001
│   └── twicer_sol.yml
├── TEMP_0002
│   └── twicer_sol.yml
├── TEMP_0003
│   └── twicer_sol.yml
├── TEMP_0004
│   └── twicer_sol.yml
├── batch_job
├── batch_pjob
├── database.yml
├── sandbox.yml
├── sandcastle.py
├── sandcastle.yml
├── twicer.py
├── twicer_JEXA93
    └── twicer_JEXA93.log

The three additional runs TEMP_0002-3-4 were created as expected. database.yml what lemmings used to keep track of your run. You can remove it (or not).

Lemmings also save the last status of the job in ./twicer_JEXA93/twicer_JEXA93.log:

Lemmings Version : 0.7.0

+------+---------------------+--------+---------+-------+-----------------------+
| Loop |       datetime      | job_id | pjob_id | dtsum |      end_cpu_time     |
+------+---------------------+--------+---------+-------+-----------------------+
|  0   | 2022-05-25 14:29:03 | 95575  |  95578  |  None | 0.0008333333333333334 |
|  1   | 2022-05-25 14:29:09 | 95590  |  95591  |  None | 0.0016666666666666668 |
|  2   | 2022-05-25 14:29:16 | 95611  |  95612  |  None |         0.0025        |
+------+---------------------+--------+---------+-------+-----------------------+

We see that the whole process took 9 seconds.

Replay

You can clean up your working directory with:

rm -rf twicer_* TEMP_0002 TEMP_0003 TEMP_0004 batch_* database.json 

Improve the workflow

Here are some additions to the workflow that could prove useful for a real application.

Add a custom parameter in the input file

Your current workflow uses an hardcoded time limit. We will read now the time limit from the input file.

We add the limit simulation_end_time in the yaml input:

#---Required Settings--#
exec: |                      #your environement, slurm exec, module etc (writted in the batch)
      echo "hello world exec"
      python twicer.py
exec_pj: |                  # exec for the PJ (writted in the batch)
      echo "hello world exec_pj"
job_queue: long         # name of the job queue based on your '{machine}.yml'
pjob_queue: short        # name of the post-job queue based on your '{machine}.yml'
cpu_limit: 1            # The maximal CPU limit for your Lemmings chain [Hours]

# #---Optional Settings--#
job_prefix: twicer              # Add a prefix to the run_name

# #---Custom Settings--#
custom_params:
    simulation_end_time: 7    #final end time desired

The workflow will read this by asking it through self:

def check_on_end(self):
    """
    What to do after post job
    """
    limit = self.user.custom_params["simulation_end_time"]
    if my_completion(limit) < 1.:
        return False
    else:
        return True

Here self.user.custom_params will provide you all what you put in the custom_params section of your YAML input. Warning, custom_params is a reserved field in the YAML workflow file, do not try other naming or storage.

Prevent job submission if workflow was already completed

In its current state, the workflow will only stop after the first run. We can use the place Check on Start to add a little bit of moderation to the workflow.

 def check_on_start(self):
    """
    Verify if the condition is already satisfied before launching a lemmings chain.
    """
    limit = self.user.custom_params["simulation_end_time"]
    if my_completion(limit) < 1.:
        return True
    else:
        print("This workflow already reached completion!")
        return False

Note that the logic is the opposite of check_on_end() : If check is False , the workflow is interrupted.

We get at the end :

>lemmings run --machine-file sandbox.yml --inputfile sandcastle.yml --job-prefix twicer sandcastle.py
Overriding the $LEMMINGS_MACHINE by a user defined {machine}.yml
Starting chain named twicer_MOLE48
Your CPU limit is 1 [hours]. Confirm it typing the same value: 1
This workflow already reached completion!

Provide a progression information to lemmings

By default, lemmings status returns the following table:

>lemmings status
Status for chain twicer_GADI44 
+------+---------------+-----------------+-----------+--------------+--------+---------+
| Loop | Solution path |  Job end status |  progress | CPU time (h) | job ID | pjob ID |
+------+---------------+-----------------+-----------+--------------+--------+---------+
|  0   |       ./      | ended, continue |     NA    |    0.001     | 17826  |  17827  |
|  1   |       ./      | ended, continue |     NA    |    0.002     | 17836  |  17837  |
|  2   |       ./      | ended, continue |     NA    |    0.002     | 17854  |  17855  |
|  3   |       ./      | ended, continue |     NA    |    0.003     | 17870  |  17871  |
|  4   |       ./      | ended, continue |     NA    |    0.004     | 17885  |  17886  |
|  5   |   Submitted   |    Submitted    | Submitted |  Submitted   | 17901  |  17902  |
+------+---------------+-----------------+-----------+--------------+--------+---------+

The column progress can be used to return a customized assessment of the workflow completion, using the method set_progress_var() at the end of each run:

    def check_on_end(self):
        """
        What to do after post job
        """
        limit = self.user.custom_params["simulation_end_time"]

        completion = my_completion(limit)
        self.set_progress_var(completion)
        if completion < 1.:
            return False
        else:
            return True

With this the status will look like:

>lemmings status
Status for chain twicer_PUBU75 
+------+---------------+-----------------+-----------+--------------+--------+---------+
| Loop | Solution path |  Job end status |  progress | CPU time (h) | job ID | pjob ID |
+------+---------------+-----------------+-----------+--------------+--------+---------+
|  0   |       ./      | ended, continue |   0.2857  |    0.001     | 27784  |  27785  |
|  1   |       ./      | ended, continue |   0.4286  |    0.002     | 27797  |  27798  |
|  2   |       ./      | ended, continue |   0.5714  |    0.003     | 27810  |  27811  |
|  3   |       ./      | ended, continue |   0.7143  |    0.004     | 27823  |  27825  |
|  4   |   Submitted   |    Submitted    | Submitted |  Submitted   | 27835  |  27836  |
+------+---------------+-----------------+-----------+--------------+--------+---------+

Takeaways

Here are some takeaways from this tutorial

  • If you want to learn how to do things with Lemmings, try out first using the sandbox and a tiny example: your feedback will be orders of magnitude faster than with any real job scheduler.

  • In 90% of cases, the basic recursive loop of lemmings requires only the customization of prepare_run(), check_on_end(). Other places are sugarcoat.

  • A workflow can easily run endlessly, and sneakily eat all your CPU allocation. If you aim for production runs, adding some warnings and sanity check is a good idea.

  • If your workflow needs some info, find the information from what can be found on disk by reading your files. This is more reliable in the long run than loading your info in a persistent database (which is harder to re-read, test and debug)

  • Do not think your workflow as run 1, 2, 3, 4 but as N+1, N+2, N+3, with N read from what is on disk.

That’s all folks!