# Generic Python modules.
import os
import random
import typing
from functools import partial
from pathlib import Path
from typing import Callable, List, Union
from tqdm import tqdm
import numpy as np
# Modules to plot and manipulate data.
import h5py
import matplotlib.pyplot as plt
import numpy as np
import obspy
import pyasdf
from numpy.random import random as rand
from obspy import UTCDateTime
# Module specific to Salvus.
import salvus.flow.api
import salvus.flow.simple_config as config
from salvus.flow.simple_config import simulation
import salvus.mesh
from salvus.mesh import unstructured_mesh
from salvus.mesh.unstructured_mesh import UnstructuredMesh
from gradient_helper_functions import (
test_material_gradient,
get_unit_cube_mesh,
)
import parameterization
receiver_type = random.choice(["classic", "block"])
dim = int(os.environ.get("DIM", 2))
ranks = int(os.environ.get("NUM_MPI_RANKS", 4))
model_order = int(os.environ.get("MODEL_ORDER", 4))
n_elem_per_dim = int(os.environ.get("N_ELEM_PER_DIM", 4))
physics = os.environ.get("PHYSICS", "ElasticCompliance")
site_name = os.environ.get("SITE_NAME", "local_f64")
print(f"Running gradient test on site: {site_name}")
deform = True
pr, ps = True, True # Peturb source and receiver locations.
vp, vs, rho = 5.8e3, 4.0e3, 2.6e3 # Material parameters.
ap = parameterization.AcousticRhoVp(vp=vp, rho=rho)
ep = parameterization.ElasticRhoVpVs(vp=vp, vs=vs, rho=rho)
if physics == "ElasticRhoVpVs":
convert_to = parameterization.ElasticRhoVpVs
elif physics == "AcousticRhoVp":
convert_to = parameterization.AcousticRhoVp
elif physics == "ElasticCompliance":
convert_to = parameterization.ElasticCompliance
elif physics == "AcousticCompliance":
convert_to = parameterization.AcousticCompliance
else:
raise ValueError("Unknown physical system.")
acoustic = convert_to.physics() == parameterization.Physics.Acoustic
p = ap if acoustic else ep
final_parameterization = p.convert(convert_to)
print("{:-^80}".format("Gradient Test"))
print(
"{0:<5}{1:<20}{2:<15}{3:<10}{4:<20}{5:<10}".format(
"DIM", "PHYSICS", "MODEL ORDER", "RANKS", "ELEM PER DIM", "REC TYPE"
)
)
print(
"{0:<5}{1:<20}{2:<15}{3:<10}{4:<20}{5:<10}".format(
dim, physics, model_order, ranks, n_elem_per_dim, receiver_type
)
)
Running gradient test on site: local_f64 ---------------------------------Gradient Test---------------------------------- DIM PHYSICS MODEL ORDER RANKS ELEM PER DIM REC TYPE 2 ElasticCompliance 4 4 4 block
simple_mesh
constructors introduced in previous tutorials. To make this notebook as general as possible, we begin by writing a function which will return a mesh appropriate for a given parameterization.mesh = get_unit_cube_mesh(
dim=dim,
model_order=model_order,
n_elem_per_dim=n_elem_per_dim,
par=final_parameterization,
deform=deform,
)
# Randomly perturb receiver locations.
r_fields = "phi" if acoustic else "displacement"
# pval_r = 0.15 * pr * rand(size=None) + 0.425
pval_r = 0.55
r_loc_2d = [(pval_r, pval_r)]
r_loc_3d = [(pval_r, pval_r, pval_r)]
# Randomly perturb source locations.
pval_s = 0.15 # * ps * rand(size=None)
s_loc_2d = [
(0 + pval_s, 0 + pval_s),
(0 + pval_s, 1 - pval_s),
(1 - pval_s, 0 + pval_s),
(1 - pval_s, 1 - pval_s),
]
s_loc_3d = [
(0 + pval_s, 0 + pval_s, 0 + pval_s),
(0 + pval_s, 1 - pval_s, 0 + pval_s),
(1 - pval_s, 0 + pval_s, 0 + pval_s),
(1 - pval_s, 1 - pval_s, 0 + pval_s),
(0 + pval_s, 0 + pval_s, 1 - pval_s),
(0 + pval_s, 1 - pval_s, 1 - pval_s),
(1 - pval_s, 0 + pval_s, 1 - pval_s),
(1 - pval_s, 1 - pval_s, 1 - pval_s),
]
s_loc = s_loc_2d if dim == 2 else s_loc_3d
r_loc = r_loc_2d if dim == 2 else r_loc_3d
# Set up sources and receivers.
src_config = config.source.cartesian
if dim == 2:
src_type = (
partial(src_config.ScalarPoint2D, f=1)
if acoustic
else partial(src_config.VectorPoint2D, fx=1, fy=1)
)
sources = [
src_type(x=x[0], y=x[1], source_time_function=config.stf.Delta())
for x in s_loc_2d
]
receivers = [
config.receiver.cartesian.Point2D(
x=x[0], y=x[1], station_code=f"{i:03d}", fields=[r_fields]
)
for i, x in enumerate(r_loc_2d)
]
else:
src_type = (
partial(src_config.ScalarPoint3D, f=1)
if acoustic
else partial(src_config.VectorPoint3D, fx=1, fy=1, fz=1)
)
sources = [
src_type(
x=x[0], y=x[1], z=x[2], source_time_function=config.stf.Delta()
)
for x in s_loc_3d
]
receivers = [
config.receiver.cartesian.Point3D(
x=x[0], y=x[1], z=x[2], station_code=f"{i:03d}", fields=[r_fields]
)
for i, x in enumerate(r_loc_3d)
]
# Needed for the adjoint source.
w = config.simulation.Waveform(mesh=mesh, sources=sources, receivers=receivers)
# Timing.
ts = 1e-5 if dim == 2 else 5e-6
start_time, end_time, time_step = 0.0, 1e-4, ts / n_elem_per_dim
w.physics.wave_equation.end_time_in_seconds = end_time
w.physics.wave_equation.time_step_in_seconds = time_step
w.physics.wave_equation.start_time_in_seconds = start_time
# For gradient computation.
w.output.volume_data.format = "hdf5"
w.output.volume_data.filename = "output.h5"
w.output.volume_data.fields = ["adjoint-checkpoint"]
w.output.volume_data.sampling_interval_in_time_steps = "auto-for-checkpointing"
w.validate()
mesh.write_h5("model.h5")
w
<salvus.flow.simple_config.simulation.waveform.Waveform object at 0x76b83e1c4a90>
salvus.flow.api.run(
ranks=ranks,
input_file=w,
get_all=True,
site_name=site_name,
overwrite=True,
delete_remote_files=False,
output_folder="fwd_output",
verbosity=0,
)
<salvus.flow.executors.salvus_job.SalvusJob at 0x76b8616fe810>
def compute_misfit(rec_file: Path, adj_src: typing.Union[Path, None] = None):
"""
A function to compute the energy misfit and corresponding adjoin source.
:param rec_file: File containing the receivers from the forward run.
:param adj_src: File which, if provided, will contain the adjoint sources.
:returns: A measure of misft. Will write adjoint sources if adj_src is a valid file path.
"""
misfit = 0.0
adj_out = h5py.File(adj_src, mode="w") if adj_src else None
with pyasdf.ASDFDataSet(rec_file, mode="r") as fh:
for rec in fh.waveforms:
# For this simplified test there should always ever be a single
# tag.
tags = rec.get_waveform_tags()
assert len(tags) == 1, tags
u = rec[tags[0]]
adj = np.empty((u[0].stats.npts, len(u)))
for _i, cmp in enumerate(u):
misfit += time_step * (cmp.data * cmp.data).sum()
adj[:, _i] = -time_step * cmp.data
if adj_out:
if receiver_type == "classic":
dset = adj_out.create_dataset(
name=u[0].stats.station, data=adj
)
dset.attrs["starttime"] = start_time * 1e9
dset.attrs["sampling_rate"] = 1 / time_step
else:
ncmp = 1 if acoustic else dim
stf = np.empty((1, ncmp, adj.shape[0]))
stf[0, :] = adj.T
group = adj_out.create_group("adjoint_sources")
group.create_dataset(name="stf", data=stf)
group.create_dataset(name="coordinates", data=r_loc)
group.attrs["start_time_in_seconds"] = start_time
group.attrs["sampling_rate_in_hertz"] = 1 / time_step
group.attrs["spatial_type"] = (
np.string_("scalar")
if acoustic
else np.string_("vector")
)
# dset = adj_out.create_dataset(name=u[0].stats.station, data=adj)
# dset.attrs["starttime"] = start_time * 1e9
# dset.attrs["sampling_rate"] = 1 / time_step
if adj_out:
adj_out.close()
return misfit * 0.5
misfit_00 = compute_misfit(
Path("fwd_output/receivers.h5"), Path("adjoint_sources.h5")
)
if dim == 2:
adj_srcs = [
src_type(
x=x[0],
y=x[1],
source_time_function=config.stf.Custom(
filename="adjoint_sources.h5", dataset_name=f"{i:03d}"
),
)
for i, x in enumerate(r_loc_2d)
]
else:
adj_srcs = [
src_type(
x=x[0],
y=x[1],
z=x[2],
source_time_function=config.stf.Custom(
filename="adjoint_sources.h5", dataset_name=f"{i:03d}"
),
)
for i, x in enumerate(r_loc_3d)
]
w_adjoint = config.simulation.Waveform(mesh=mesh)
if receiver_type == "classic":
w_adjoint.adjoint.point_source = adj_srcs
else:
w_adjoint.adjoint.point_source_block = {
"filename": "adjoint_sources.h5",
"groups": ["adjoint_sources"],
}
w_adjoint.adjoint.gradient.output_filename = "gradient.h5"
w_adjoint.adjoint.forward_meta_json_filename = "fwd_output/meta.json"
w_adjoint.adjoint.gradient.parameterization = (
final_parameterization.json_string
)
w_adjoint.validate()
salvus.flow.api.run(
ranks=ranks,
get_all=True,
overwrite=True,
site_name=site_name,
delete_remote_files=False,
input_file=w_adjoint,
output_folder="adj_output",
verbosity=0,
)
<salvus.flow.executors.salvus_job.SalvusJob at 0x76b86133b910>
g = None
gradient = UnstructuredMesh.from_h5("adj_output/gradient.h5")
if dim == 2:
g = gradient
g
<salvus.mesh.data_structures.unstructured_mesh.unstructured_mesh.UnstructuredMesh object at 0x76b8614eba50>
# Update model
h = np.logspace(-11, -2, 10)
pars = [x for x, y in final_parameterization.parameters()]
nx = 1
ny = 1
f, a = plt.subplots(ny, nx, figsize=(5, 5))
# Perturb parameters one at a time.
print("{:-^80}".format("Results"))
for _i, p in enumerate(pars):
# first try to get away with just three runs
error = test_material_gradient(
all_h=[h[0], h[3], h[-1]],
model=mesh,
gradient=gradient,
parameter=p,
simulation=w,
misfit_function=compute_misfit,
site_name=site_name,
m0=misfit_00,
ranks=ranks,
quiet=True,
)
try:
assert np.min(error) < 1e-5
assert error[0] > np.min(error)
assert error[-1] > np.min(error)
used_h = [h[0], h[3], h[-1]]
except:
# run again with more step lengths
error = test_material_gradient(
all_h=h,
model=mesh,
gradient=gradient,
parameter=p,
simulation=w,
misfit_function=compute_misfit,
site_name=site_name,
m0=misfit_00,
ranks=ranks,
quiet=True,
)
used_h = h
assert np.min(error) < 1e-5
assert error[0] > np.min(error)
assert error[-1] > np.min(error)
finally:
# Generate a plot
a.set_title(f"Gradient Test ({physics})")
a.loglog(used_h, error, label=p)
a.set_xlabel("$h$")
a.set_ylabel("Relative Error")
print(
"{0:<20}{1:<30}{2:<10}{3:<30}{4:<30}".format(
p, np.min(error), np.argmin(error), error[0], error[-1]
)
)
print("{:-^80}\n".format(""))
a.legend()
plt.savefig(f"[physics].png")
------------------------------------Results------------------------------------- MU 6.302276484692483e-08 1 7.711804538616327e-06 0.13435919728769538 RHO 1.2783736826630788e-08 1 6.111188122235739e-05 0.015867585403164314 LAMBDA 1.5275796598955864e-06 1 0.00260540697651886 0.0007209848516080492 --------------------------------------------------------------------------------