SalvusFlow
should happen with the salvus.flow.api
module, which is directly accessible from the salvus.namespace
.SalvusFlow
's API documentation.salvus.flow.api.run_async()
: Start/queue a single simulation and immediately return.salvus.flow.api.run()
: Start/queue a single simulation, wait for it to finish, copy all the outputs to the local machine, and delete all remote files.salvus.flow.api.run_many_async()
: Start/queue many simulation at once and immediately return.salvus.flow.api.run_many()
: : Start/queue many simulation at once, wait for them to finish, copy all the outputs to the local machine, and delete all remote files.SalvusJob
or SalvusJobArray
objects, respectively. These can be queries for the current status and once done they can also be used to get the output and many other things. This is useful for example for long-running/long-queuing jobs so one can do something else in the meanwhile.run_many...()
versions will execute multiple jobs at once. The major limitation here is that (due to how for example the Slurm job management system works) all jobs must run on the same number of cores and also must have the same wall time. Thus the run_many...()
functions are useful when running many similar jobs at once. Similar jobs are jobs that hava a similar number of elements and time-steps. This is the case for most survey or inversion style studies where one for example simulates through the same domain but for many different sources.local
and ssh
sites) the jobs are internally run one after the other. On other sites they might potentially run in parallel, the details are up to the job scheduler.run_...()
functions. These are very small simulations that can easily be run on a laptop.# Import the api as well as the simple config and mesh objects.
import os
import shutil
import salvus.namespace as sn
SALVUS_FLOW_SITE_NAME = os.environ.get("SITE_NAME", "local")
# A simple 2D homogeneous mesh.
mesh = sn.simple_mesh.CartesianHomogeneousIsotropicElastic2D(
vp=3000, vs=2000, rho=3000, x_max=1000, y_max=1000, max_frequency=5
)
# 5 equally spaced sources.
sources = [
sn.simple_config.source.cartesian.VectorPoint2D(
x=200,
y=300,
fx=100,
fy=200,
source_time_function=sn.simple_config.stf.Ricker(center_frequency=5.0),
)
for x in list(range(100, 950, 200))
]
receiver = sn.simple_config.receiver.cartesian.Point2D(
x=600.0, y=500.0, station_code="000", fields=["velocity"]
)
# We will now construct one simulation object per source.
simulations = []
for src in sources:
w = sn.simple_config.simulation.Waveform(
mesh=mesh.create_mesh(), sources=src, receivers=receiver
)
w.physics.wave_equation.end_time_in_seconds = 5.0
simulations.append(w)
salvus.flow.api.run()
SalvusFlow will run a simulation on the chosen machine, wait until it is done, retrieve the output (note the optional overwrite
argument - it defaults to False
in which case it fails if the folder already exists), and finally delete all remote files. This makes many things very convenient to use and it a very low friction way to run simulations and analyze the results.sn.api.run(
# We will only run a single simulation here.
input_file=simulations[0],
# The site to run on.
site_name=SALVUS_FLOW_SITE_NAME,
# Folder to which to copy the output to.
output_folder="output",
overwrite=True,
wall_time_in_seconds=1,
)
SalvusJob `job_2411062239496615_803d6ef8f4` running on `local` with 4 rank(s). Site information: * Salvus version: 2024.1.2 * Floating point size: 32 -> Current Task: Time loop complete* Downloaded 21.9 KB of results to `output`. * Total run time: 0.76 seconds. * Pure simulation time: 0.40 seconds.
<salvus.flow.executors.salvus_job.SalvusJob at 0x72493a1e4f90>
salvus.flow.api.run_many()
will do the same as salvus.flow.api.run()
but for many simulations at once. The output folder will afterwards contain a subfolder for each passed simulation object.sn.api.run_many(
# Pass a list of simulation objects
input_files=simulations,
# The site to run on.
site_name=SALVUS_FLOW_SITE_NAME,
# Ranks and wall times have to be specified per job.
# Both are potentially optional (not all sites require)
# wall times, and if no ranks are given, it will always
# use the default number of ranks given when configuring the site.
ranks_per_job=2,
wall_time_in_seconds_per_job=60,
# Folder to which to copy the output to.
output_folder="output",
# Overwrite the output folder if it already exists.
overwrite=True,
)
JobArray job_array_2411062239149283_634926b140 with 5 jobs(s) running on local with 2 rank(s) per job. Site information: * Site type: local * Salvus version: 2024.1.2 * Floating point size: 32
* Downloaded 111.9 KB of results to `output`. * Total run time: 5.03 seconds.
<salvus.flow.executors.salvus_job_array.SalvusJobArray at 0x72492c80d050>
SalvusJob
object.# Launch a job in the background. Note that this function
# will return immediately if there are no immediate errors.
job = sn.api.run_async(
input_file=simulations[0], site_name=SALVUS_FLOW_SITE_NAME
)
# Query for the current status of the job with `.update_status()`.
print("Current job status:", job.update_status())
# Do something else.
print("Doing something else.")
# Wait for the job to finish. Blocks until the job is done.
job.wait(
# Optional. Defaults to whatever is specified in
# the site configuration otherwise.
poll_interval_in_seconds=2.0,
# Optional. Wait at max this long before returning.
timeout_in_seconds=300.0,
)
# Query the status again.
print("Current job status:", job.update_status())
Current job status: JobStatus.running Doing something else. After 0.0 seconds: running. Sleeping for 2.0000 seconds before checking job status again. Current job status: JobStatus.finished
# Get a dictionary with information about all remote output files.
# These are not yet copied to the local machine.
job.get_output_files()
({('output', 'meta_data', 'meta_json_filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/meta.json'), ('output', 'meta_data', 'progress_json_filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/progress.json'), ('output', 'point_data', 'filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/receivers.h5'), 'stdout': PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/stdout'), 'stderr': PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/stderr')}, False)
# Copy the output files to the chosen folder. In this case
# it is your responsibility to make sure that the folder does not yet exist.
if os.path.exists("output_folder"):
shutil.rmtree("output_folder")
job.copy_output(destination="output_folder")
({PosixPath('output_folder/meta.json'): 4932, PosixPath('output_folder/progress.json'): 157, PosixPath('output_folder/receivers.h5'): 13872, PosixPath('output_folder/stdout'): 3131, PosixPath('output_folder/stderr'): 0, PosixPath('output_folder/job_info.json'): 367}, False)
job.delete()
🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/local_submission_template.py ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/PID.txt ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/stderr ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/stdout ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/input/mesh.h5 ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/input/input.toml ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/input ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/SUCCESS ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/receivers.h5 ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/progress.json ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output/meta.json ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/output ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c/run_job.sh ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_2411062239192248_a1e43e3a9c ...
job_array = sn.api.run_many_async(
# Only use the first two.
input_files=simulations[:2],
site_name=SALVUS_FLOW_SITE_NAME,
)
# Query for the current status of the jobs with `.update_status()`.
print("Current status of jobs:", job_array.update_status())
# Do something else.
print("Doing something else.")
Current status of jobs: [<JobStatus.running: 2>, <JobStatus.pending: 1>] Doing something else.
# Wait for the job to finish. Blocks until all jobs are done
job_array.wait(verbosity=0)
# Query the status again. Should all be finished now.
print("Current status of jobs:", job_array.update_status())
Current status of jobs: [<JobStatus.finished: 3>, <JobStatus.finished: 3>]
job_array.jobs[0].get_output_files()
({('output', 'meta_data', 'meta_json_filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/meta.json'), ('output', 'meta_data', 'progress_json_filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/progress.json'), ('output', 'point_data', 'filename'): PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/receivers.h5'), 'stdout': PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/stdout'), 'stderr': PurePosixPath('/builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/stderr')}, False)
if os.path.exists("output_folder"):
shutil.rmtree("output_folder")
job_array.copy_output(destination="output_folder")
[({PosixPath('output_folder/job_0000/meta.json'): 5262, PosixPath('output_folder/job_0000/progress.json'): 157, PosixPath('output_folder/job_0000/receivers.h5'): 13872, PosixPath('output_folder/job_0000/stdout'): 3138, PosixPath('output_folder/job_0000/stderr'): 0, PosixPath('output_folder/job_0000/job_info.json'): 488}, False), ({PosixPath('output_folder/job_0001/meta.json'): 5262, PosixPath('output_folder/job_0001/progress.json'): 157, PosixPath('output_folder/job_0001/receivers.h5'): 13872, PosixPath('output_folder/job_0001/stdout'): 3138, PosixPath('output_folder/job_0001/stderr'): 0, PosixPath('output_folder/job_0001/job_info.json'): 488}, False)]
job_array.delete()
🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/stderr ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/stdout ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/input/mesh.h5 ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/input/input.toml ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/input ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/SUCCESS ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/receivers.h5 ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/progress.json ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output/meta.json ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a/output ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_0_of_job_array_2411062239484615_cee29eb07a ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/local_submission_template.py ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/PID.txt ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/stderr ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/stdout ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/run_job.sh ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/stderr ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/stdout ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/input/mesh.h5 ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/input/input.toml ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/input ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/SUCCESS ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/output/receivers.h5 ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/output/progress.json ... 🗑 Deleting file /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/output/meta.json ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a/output ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a/job_1_of_job_array_2411062239484615_cee29eb07a ... 🗑 Deleting folder /builds/Mondaic/core/TutorialsAndIntegrationTests/SALVUS_INSTALL/SalvusFlow/run/job_array_2411062239484615_cee29eb07a ...
SalvusJob
and SalvusJobArray
objects can also be initialized from the database assuming the names and site names are known. This is useful for fully asynchronous workflows.# Launch job.
job = sn.api.run_async(
input_file=simulations[0], site_name=SALVUS_FLOW_SITE_NAME
)
# Retrieve again from DB.
new_job = sn.api.get_job(
job_name=job.job_name, site_name=SALVUS_FLOW_SITE_NAME
)
# These two objects refer to the same job.
assert job == new_job
# The same logic holds for job arrays.
job_array = sn.api.run_many_async(
input_files=simulations[:2], site_name=SALVUS_FLOW_SITE_NAME
)
new_job_array = sn.api.get_job_array(
job_array_name=job_array.job_array_name, site_name=SALVUS_FLOW_SITE_NAME
)
assert job_array == new_job_array