Version:

Introduction to SalvusFlow's API

Most user interaction with SalvusFlow should happen with the salvus.flow.api module, which is directly accessible from the salvus.namespace.
This tutorial presents a high-level introduction to the most important methods. For the full details please refer to SalvusFlow's API documentation.

Running Salvus on local or remote machines

The API is used to submit Salvus jobs to run at either local or remote machines. These functions exist in synchronous/blocking and asynchronous/non-blocking variants. We'll explain what this means shortly. Furthermore there are variants that execute only a single simulation and variants than can run many simulations at once. The later are potentially a lot more efficient as they can use the native job array functionality of many job scheduling systems.
  • salvus.flow.api.run_async(): Start/queue a single simulation and immediately return.
  • salvus.flow.api.run(): Start/queue a single simulation, wait for it to finish, copy all the outputs to the local machine, and delete all remote files.
  • salvus.flow.api.run_many_async(): Start/queue many simulation at once and immediately return.
  • salvus.flow.api.run_many(): : Start/queue many simulation at once, wait for them to finish, copy all the outputs to the local machine, and delete all remote files.
Note that after importing the salvus namespace, you can directly access the api module from there.
The synchronous variants are easy to understand: The functions run Salvus and wait until everything as completed before they return. This is most useful for small scale to medium scale simulations. The asynchronous variants submit/queue the jobs on the chosen site and then immediately return. They return SalvusJob or SalvusJobArray objects, respectively. These can be queries for the current status and once done they can also be used to get the output and many other things. This is useful for example for long-running/long-queuing jobs so one can do something else in the meanwhile.
The run_many...() versions will execute multiple jobs at once. The major limitation here is that (due to how for example the Slurm job management system works) all jobs must run on the same number of cores and also must have the same wall time. Thus the run_many...() functions are useful when running many similar jobs at once. Similar jobs are jobs that hava a similar number of elements and time-steps. This is the case for most survey or inversion style studies where one for example simulates through the same domain but for many different sources.
On sites which do not have a job queying systems (e.g. local and ssh sites) the jobs are internally run one after the other. On other sites they might potentially run in parallel, the details are up to the job scheduler.
On system that support, e.g. slurm and others, the jobs will take advantage of their native job array support.

Setting up the simulations

We will now set up all the required objects before we demonstrate how to use the various run_...() functions. These are very small simulations that can easily be run on a laptop.
Copy
# Import the api as well as the simple config and mesh objects.
import os
import shutil
import salvus.namespace as sn

SALVUS_FLOW_SITE_NAME = os.environ.get("SITE_NAME", "local")

# A simple 2D homogeneous mesh.
mesh = sn.simple_mesh.CartesianHomogeneousIsotropicElastic2D(
    vp=3000, vs=2000, rho=3000, x_max=1000, y_max=1000, max_frequency=5
)

# 5 equally spaced sources.
sources = [
    sn.simple_config.source.cartesian.VectorPoint2D(
        x=200,
        y=300,
        fx=100,
        fy=200,
        source_time_function=sn.simple_config.stf.Ricker(center_frequency=5.0),
    )
    for x in list(range(100, 950, 200))
]

receiver = sn.simple_config.receiver.cartesian.Point2D(
    x=600.0, y=500.0, station_code="000", fields=["velocity"]
)

# We will now construct one simulation object per source.
simulations = []
for src in sources:
    w = sn.simple_config.simulation.Waveform(
        mesh=mesh.create_mesh(), sources=src, receivers=receiver
    )
    w.physics.wave_equation.end_time_in_seconds = 5.0
    simulations.append(w)
With salvus.flow.api.run() SalvusFlow will run a simulation on the chosen machine, wait until it is done, retrieve the output (note the optional overwrite argument - it defaults to False in which case it fails if the folder already exists), and finally delete all remote files. This makes many things very convenient to use and it a very low friction way to run simulations and analyze the results.
sn.api.run(
    # We will only run a single simulation here.
    input_file=simulations[0],
    # The site to run on.
    site_name=SALVUS_FLOW_SITE_NAME,
    # Folder to which to copy the output to.
    output_folder="output",
    overwrite=True,
    wall_time_in_seconds=1,
)
SalvusJob `job_2411062239496615_803d6ef8f4` running on `local` with 4 rank(s).
Site information:
  * Salvus version: 2024.1.2
  * Floating point size: 32
-> Current Task: Time loop complete* Downloaded 21.9 KB of results to `output`.
* Total run time: 0.76 seconds.
* Pure simulation time: 0.40 seconds.
<salvus.flow.executors.salvus_job.SalvusJob at 0x72493a1e4f90>
salvus.flow.api.run_many() will do the same as salvus.flow.api.run() but for many simulations at once. The output folder will afterwards contain a subfolder for each passed simulation object.
sn.api.run_many(
    # Pass a list of simulation objects
    input_files=simulations,
    # The site to run on.
    site_name=SALVUS_FLOW_SITE_NAME,
    # Ranks and wall times have to be specified per job.
    # Both are potentially optional (not all sites require)
    # wall times, and if no ranks are given, it will always
    # use the default number of ranks given when configuring the site.
    ranks_per_job=2,
    wall_time_in_seconds_per_job=60,
    # Folder to which to copy the output to.
    output_folder="output",
    # Overwrite the output folder if it already exists.
    overwrite=True,
)

JobArray job_array_2411062239149283_634926b140 with 5 jobs(s) running on local with 2 rank(s) per job.
Site information:
  * Site type: local
  * Salvus version: 2024.1.2
  * Floating point size: 32

* Downloaded 111.9 KB of results  to `output`.
* Total run time: 5.03 seconds.
<salvus.flow.executors.salvus_job_array.SalvusJobArray at 0x72492c80d050>
The following example demonstrates how to run a single job asynchronously and how to work with the resulting SalvusJob object.
# Launch a job in the background. Note that this function
# will return immediately if there are no immediate errors.
job = sn.api.run_async(
    input_file=simulations[0], site_name=SALVUS_FLOW_SITE_NAME
)

# Query for the current status of the job with `.update_status()`.
print("Current job status:", job.update_status())

# Do something else.
print("Doing something else.")

# Wait for the job to finish. Blocks until the job is done.
job.wait(
    # Optional. Defaults to whatever is specified in
    # the site configuration otherwise.
    poll_interval_in_seconds=2.0,
    # Optional. Wait at max this long before returning.
    timeout_in_seconds=300.0,
)

# Query the status again.
print("Current job status:", job.update_status())
Current job status: JobStatus.running
Doing something else.
After 0.0 seconds: running. Sleeping for 2.0000 seconds before checking job status again.
Current job status: JobStatus.finished
# Get a dictionary with information about all remote output files.
# These are not yet copied to the local machine.
job.get_output_files()
({('output',
   'meta_data',
   'meta_json_filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/meta.json'),
  ('output',
   'meta_data',
   'progress_json_filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/progress.json'),
  ('output',
   'point_data',
   'filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/receivers.h5'),
  'stdout': PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/stdout'),
  'stderr': PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/stderr')},
 False)
# Copy the output files to the chosen folder. In this case
# it is your responsibility to make sure that the folder does not yet exist.
if os.path.exists("output_folder"):
    shutil.rmtree("output_folder")
job.copy_output(destination="output_folder")
({PosixPath('output_folder/meta.json'): 4932,
  PosixPath('output_folder/progress.json'): 157,
  PosixPath('output_folder/receivers.h5'): 13872,
  PosixPath('output_folder/stdout'): 3131,
  PosixPath('output_folder/stderr'): 0,
  PosixPath('output_folder/job_info.json'): 367},
 False)
The next command deletes all files on the remote machine and removes it from the internal database.
job.delete()
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/local_submission_template.py ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/PID.txt ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/stderr ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/stdout ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/input/mesh.h5 ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/input/input.toml ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/input ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/SUCCESS ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/receivers.h5 ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/progress.json ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/meta.json ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/run_job.sh ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c ...
Same as the previous example but for many jobs this time around. We'll only use two simulations here to keep the output of some commands in check.
job_array = sn.api.run_many_async(
    # Only use the first two.
    input_files=simulations[:2],
    site_name=SALVUS_FLOW_SITE_NAME,
)

# Query for the current status of the jobs with `.update_status()`.
print("Current status of jobs:", job_array.update_status())

# Do something else.
print("Doing something else.")

Current status of jobs: [<JobStatus.running: 2>, <JobStatus.pending: 1>]
Doing something else.
# Wait for the job to finish. Blocks until all jobs are done
job_array.wait(verbosity=0)

# Query the status again. Should all be finished now.
print("Current status of jobs:", job_array.update_status())
Current status of jobs: [<JobStatus.finished: 3>, <JobStatus.finished: 3>]
You still have access to each individual job.
With the following call you will get a dictionary with information about all remote output files of the first job. These are not yet copied to the local machine.
job_array.jobs[0].get_output_files()
({('output',
   'meta_data',
   'meta_json_filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/meta.json'),
  ('output',
   'meta_data',
   'progress_json_filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/progress.json'),
  ('output',
   'point_data',
   'filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/receivers.h5'),
  'stdout': PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/stdout'),
  'stderr': PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/stderr')},
 False)
Now, we want to actually copy the output files of all jobs to the chosen folder. Note that it is the user's responsibility to make sure that the folder does not yet exist.
if os.path.exists("output_folder"):
    shutil.rmtree("output_folder")
job_array.copy_output(destination="output_folder")
[({PosixPath('output_folder/job_0000/meta.json'): 5262,
   PosixPath('output_folder/job_0000/progress.json'): 157,
   PosixPath('output_folder/job_0000/receivers.h5'): 13872,
   PosixPath('output_folder/job_0000/stdout'): 3138,
   PosixPath('output_folder/job_0000/stderr'): 0,
   PosixPath('output_folder/job_0000/job_info.json'): 488},
  False),
 ({PosixPath('output_folder/job_0001/meta.json'): 5262,
   PosixPath('output_folder/job_0001/progress.json'): 157,
   PosixPath('output_folder/job_0001/receivers.h5'): 13872,
   PosixPath('output_folder/job_0001/stdout'): 3138,
   PosixPath('output_folder/job_0001/stderr'): 0,
   PosixPath('output_folder/job_0001/job_info.json'): 488},
  False)]
Now that we have all the files locally, we can safely delete the jobs on the remote machine and removes them from the internal database.
job_array.delete()
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/stderr ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/stdout ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/input/mesh.h5 ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/input/input.toml ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/input ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/SUCCESS ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/receivers.h5 ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/progress.json ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/meta.json ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/local_submission_template.py ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/PID.txt ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/stderr ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/stdout ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/run_job.sh ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/stderr ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/stdout ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/input/mesh.h5 ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/input/input.toml ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/input ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/SUCCESS ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/output/receivers.h5 ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/output/progress.json ...
🗑  Deleting file   /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/output/meta.json ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/output ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a ...
🗑  Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a ...

Retrieve jobs and job arrays from the database

The SalvusJob and SalvusJobArray objects can also be initialized from the database assuming the names and site names are known. This is useful for fully asynchronous workflows.
# Launch job.
job = sn.api.run_async(
    input_file=simulations[0], site_name=SALVUS_FLOW_SITE_NAME
)
# Retrieve again from DB.
new_job = sn.api.get_job(
    job_name=job.job_name, site_name=SALVUS_FLOW_SITE_NAME
)
# These two objects refer to the same job.
assert job == new_job

# The same logic holds for job arrays.
job_array = sn.api.run_many_async(
    input_files=simulations[:2], site_name=SALVUS_FLOW_SITE_NAME
)
new_job_array = sn.api.get_job_array(
    job_array_name=job_array.job_array_name, site_name=SALVUS_FLOW_SITE_NAME
)
assert job_array == new_job_array

PAGE CONTENTS