Introduction to SalvusFlow's API

Most user interaction with SalvusFlow should happen with the salvus_flow.api module. This tutorial presents a high-level introduction to the most important methods. For the full details please refer to SalvusFlow's API documentation.

Running Salvus on local or remote machines

The API is used to submit Salvus jobs to run at either local or remote machines. These functions exist in synchronous/blocking and asynchronous/non-blocking variants. We'll explain what this means shortly. Furthermore there are variants that execute only a single simulation and variants than can run many simulations at once. The later are potentially a lot more efficient as they can use the native job array functionality of many job scheduling systems.

  • salvus_flow.api.run_async(): Start/queue a single simulation and immediately return.
  • salvus_flow.api.run(): Start/queue a single simulation, wait for it to finish, copy all the outputs to the local machine, and delete all remote files.
  • salvus_flow.api.run_many_async(): Start/queue many simulation at once and immediately return.
  • salvus_flow.api.run_many(): : Start/queue many simulation at once, wait for them to finish, copy all the outputs to the local machine, and delete all remote files.

Difference asynchronous/synchronos execution

The synchronous variants are easy to understand: The functions run Salvus and wait until everything as completed before they return. This is most useful for small scale to medium scale simulations. The asynchronous variants submit/queue the jobs on the chosen site and then immediately return. They return SalvusJob or SalvusJobArray objects, respectively. These can be queries for the current status and once done they can also be used to get the output and many other things. This is useful for example for long-running/long-queuing jobs so one can do something else in the meanwhile.

The run_many...() versions will execute multiple jobs at once. The major limitation here is that (due to how for example the Slurm job management system works) all jobs must run on the same number of cores and also must have the same wall time. Thus the run_many...() functions are useful when running many similar jobs at once. Similar jobs are jobs that hava a similar number of elements and time-steps. This is the case for most survey or inversion style studies where one for example simulates through the same domain but for many different sources.

On sites which do not have a job queying systems (e.g. local and ssh sites) the jobs are internally run one after the other. On other sites they might potentially run in parallel, the details are up to the job scheduler.

On system that support, e.g. slurm and others, the jobs will take advantage of their native job array support.

Setting up the simulations

We will now set up all the required objects before we demonstrate how to use the various run_...() functions. These are very small simulations that can easily be run on a laptop.

Copy
# Import the api as well as the simple config and mesh objects.
import os
import shutil
from salvus_flow import api
from salvus_flow import simple_config as sc
from salvus_mesh import simple_mesh as sm

# A simple 2D homogeneous mesh.
mesh = sm.CartesianHomogeneousIsotropicElastic2D(
    vp=3000, vs=2000, rho=3000, x_max=1000, y_max=1000, max_frequency=5
)

# 17 equally spaced sources.
sources = [
    sc.source.cartesian.VectorPoint2D(
        x=200,
        y=300,
        fx=100,
        fy=200,
        source_time_function=sc.stf.Ricker(center_frequency=5.0),
    )
    for x in list(range(100, 950, 50))
]

# We will now construct one simulation object per source.
simulations = []
for src in sources:
    w = sc.simulation.Waveform(
        mesh=mesh.create_mesh(), sources=src, receivers=[]
    )
    w.physics.wave_equation.end_time_in_seconds = 200.0
    simulations.append(w)

Running a single simulation synchronously

With salvus_flow.api.run() SalvusFlow will run a simulation on the chosen machine, wait until it is done, retrieve the output (note the optional overwrite argument - it defaults to False in which case it fails if the folder already exists), and finally delete all remote files. This makes many things very convenient to use and it a very low friction way to run simulations and analyze the results.

api.run(
    # We will only run a single simulation here.
    input_file=simulations[0],
    # The site to run on.
    site_name="local",
    # Folder to which to copy the output to.
    output_folder="output",
    overwrite=True,
    wall_time_in_seconds=1,
)
Job `job_1910221509217712_227541e558` running on `local` with 4 rank(s).
Site information:
  * Salvus version: 0.10.5
  * Floating point size: 32
* Downloaded 5.5 KB of results to `output`.
* Total run time: 1.31 seconds.
* Pure simulation time: 0.88 seconds.
<salvus_flow.sites.salvus_job.SalvusJob at 0x7f7c19765208>

Running many simulation synchronously

salvus_flow.api.run_many() will do the same as salvus_flow.api.run() but for many simulations at once. The output folder will afterwards contain a subfolder for each passed simulation object.

api.run_many(
    # Pass a list of simulation objects
    input_files=simulations,
    # The site to run on.
    site_name="local",
    # Ranks and wall times have to be specified per job.
    # Both are potentially optional (not all sites require)
    # wall times, and if no ranks are given, it will always
    # use the default number of ranks given when configurig the site.
    ranks_per_job=2,
    wall_time_in_seconds_per_job=30,
    # Folder to which to copy the output to.
    output_folder="output",
    # Overwrite the output folder if it already exists.
    overwrite=True,
)
JobArray job_array_1910221509506045_a669fca9c5 with 17 jobs(s) running on local with 2 rank(s) per job.
Site information:
  * Site type: local
  * Salvus version: 0.10.5
  * Floating point size: 32
* Downloaded 97.5 KB of results  to `output`.
* Total run time: 35.48 seconds.

Running a single simulation asynchronously

The following example demonstrates how to run a single job asynchronously and how to work with the resulting SalvusJob object.

# Launch a job in the background. Note that this function
# will return immediately if there are no immediate errors.
job = api.run_async(input_file=simulations[0], site_name="local")

# Query for the current status of the job with `.update_status()`.
print("Current job status:", job.update_status())

# Do something else.
print("Doing something else.")

# Wait for the job to finish. Blocks until the job is done.
job.wait(
    # Optional. Defaults to whatever is specified in
    # the site configuration otherwise.
    poll_interval_in_seconds=2.0,
    # Optional. Wait at max this long before returning.
    timeout_in_seconds=60.0,
)

# Query the status again.
print("Current job status:", job.update_status())
Current job status: JobStatus.running
Doing something else.
After 0.1 seconds: running. Sleeping for 2.0000 seconds before checking job status again.
Current job status: JobStatus.finished
# Get a dictionary with information about all remote output files.
# These are not yet copied to the local machine.
job.get_output_files()
{('output',
  'meta-data',
  'meta-json-filename'): PosixPath('/root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/output/meta.json'),
 ('output',
  'meta-data',
  'progress-json-filename'): PosixPath('/root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/output/progress.json'),
 'stdout': PosixPath('/root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/stdout'),
 'stderr': PosixPath('/root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/stderr')}
# Copy the output files to the chosen folder. In this case
# it is your responsibility to make sure that the folder does not yet exist.
if os.path.exists("output_folder"):
    shutil.rmtree("output_folder")
job.copy_output(destination="output_folder")
{PosixPath('output_folder/meta.json'): 2654,
 PosixPath('output_folder/progress.json'): 157,
 PosixPath('output_folder/stdout'): 2825,
 PosixPath('output_folder/stderr'): 0}

The next command deletes all files on the remote machine and internally moves the job to the archive job group.

job.delete()
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/PID.txt ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/stdout ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/run_binary.sh ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/input/mesh.h5 ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/input/input.toml ...
🗑  Deleting folder /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/input ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/double_fork.py ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/output/progress.json ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/output/meta.json ...
🗑  Deleting folder /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/output ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/stderr ...
🗑  Deleting file   /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7/SUCCESS ...
🗑  Deleting folder /root/SalvusFlow/run/job_1910221510131748_f5c13c8ef7 ...
The job has been added to the `archive` job group.

Running many simulations asynchronously

Same as the previous example but for many jobs this time around. We'll only use two simulations here to keep the output of some commands in check.

job_array = api.run_many_async(
    # Only use the first two.
    input_files=simulations[:2],
    site_name="local",
)

# Query for the current status of the jobs with `.update_status()`.
print("Current status of jobs:", job_array.update_status())

# Do something else.
print("Doing something else.")
Current status of jobs: [<JobStatus.running: 0>, <JobStatus.pending: 5>]
Doing something else.
# Wait for the job to finish. Blocks until all jobs are done
job_array.wait(verbosity=0)

# Query the status again. Should all be finished now.
print("Current status of jobs:", job_array.update_status())
Current status of jobs: [<JobStatus.finished: 2>, <JobStatus.finished: 2>]

You still have access to each individual job.

With the following call you will get a dictionary with information about all remote output files of the first job. These are not yet copied to the local machine.

job_array.jobs[0].get_output_files()
{('output',
  'meta-data',
  'meta-json-filename'): PosixPath('/root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/output/meta.json'),
 ('output',
  'meta-data',
  'progress-json-filename'): PosixPath('/root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/output/progress.json'),
 'stdout': PosixPath('/root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/stdout'),
 'stderr': PosixPath('/root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/stderr')}

Now, we want to actually copy the output files of all jobs to the chosen folder. Note that it is the user's responsibility to make sure that the folder does not yet exist.

if os.path.exists("output_folder"):
    shutil.rmtree("output_folder")
job_array.copy_output(destination="output_folder")
[{PosixPath('output_folder/job_0000/meta.json'): 2889,
  PosixPath('output_folder/job_0000/progress.json'): 157,
  PosixPath('output_folder/job_0000/stdout'): 2827,
  PosixPath('output_folder/job_0000/stderr'): 0},
 {PosixPath('output_folder/job_0001/meta.json'): 2889,
  PosixPath('output_folder/job_0001/progress.json'): 157,
  PosixPath('output_folder/job_0001/stdout'): 2824,
  PosixPath('output_folder/job_0001/stderr'): 0}]

Now that we have all the files locally, we can safely delete the jobs on the remote machine and move all jobs to the archive job group.

job_array.delete()
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/PID.txt ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/stdout ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/input/mesh.h5 ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/input/input.toml ...
🗑  Deleting folder /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/input ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/output/progress.json ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/output/meta.json ...
🗑  Deleting folder /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/output ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/stderr ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e/SUCCESS ...
🗑  Deleting folder /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_0_of_job_array_1910221510028028_218f45738e ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/stdout ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/run_binary.sh ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/double_fork.py ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/stdout ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/input/mesh.h5 ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/input/input.toml ...
🗑  Deleting folder /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/input ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/output/progress.json ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/output/meta.json ...
🗑  Deleting folder /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/output ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/stderr ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e/SUCCESS ...
🗑  Deleting folder /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/job_1_of_job_array_1910221510028028_218f45738e ...
🗑  Deleting file   /root/SalvusFlow/run/job_array_1910221510028028_218f45738e/stderr ...
🗑  Deleting folder /root/SalvusFlow/run/job_array_1910221510028028_218f45738e ...
Jobs have been added to the `archive` job group.

Retrieve jobs and job arrays from the database

The SalvusJob and SalvusJobArray objects can also be initialized from the database assuming the names and site names are known. This is useful for fully asynchronous workflows.

# Launch job.
job = api.run_async(input_file=simulations[0], site_name="local")
# Retrieve again from DB.
new_job = api.get_job(job_name=job.job_name, site_name="local")
# These two objects refer to the same job.
assert job == new_job

# The same logic holds for job arrays.
job_array = api.run_many_async(input_files=simulations[:2], site_name="local")
new_job_array = api.get_job_array(
    job_array_name=job_array.job_array_name, site_name="local"
)
assert job_array == new_job_array
PAGE CONTENTS