Running pyflex in ParallelΒΆ

pyasdf can be used to run a function across the data from two ASDF data sets. In most cases it will be some kind of misfit or comparision function. This example runs pyflex to pick windows given a data set of observed and another data set of synthetic data.

It can only be run with MPI:

$ mpirun -n 16 python parallel_pyflex.py
 1import time
 2
 3import pyflex
 4from pyasdf import ASDFDataSet
 5
 6ds = ASDFDataSet("./preprocessed_27s_to_60s.h5")
 7other_ds = ASDFDataSet("./preprocessed_synthetic_27s_to_60s.h5")
 8
 9event = ds.events[0]
10
11
12def weight_function(win):
13    return win.max_cc_value
14
15
16config = pyflex.Config(
17    min_period=27.0,
18    max_period=60.0,
19    stalta_waterlevel=0.11,
20    tshift_acceptance_level=15.0,
21    dlna_acceptance_level=2.5,
22    cc_acceptance_level=0.6,
23    c_0=0.7,
24    c_1=2.0,
25    c_2=0.0,
26    c_3a=1.0,
27    c_3b=2.0,
28    c_4a=3.0,
29    c_4b=10.0,
30    s2n_limit=0.5,
31    max_time_before_first_arrival=-50.0,
32    min_surface_wave_velocity=3.0,
33    window_signal_to_noise_type="energy",
34    window_weight_fct=weight_function,
35)
36
37
38def process(this_station_group, other_station_group):
39    # Make sure everything thats required is there.
40    if (
41        not hasattr(this_station_group, "StationXML")
42        or not hasattr(this_station_group, "preprocessed_27s_to_60s")
43        or not hasattr(other_station_group, "preprocessed_synthetic_27s_to_60s")
44    ):
45        return
46
47    stationxml = this_station_group.StationXML
48    observed = this_station_group.preprocessed_27s_to_60s
49    synthetic = other_station_group.preprocessed_synthetic_27s_to_60s
50
51    all_windows = []
52
53    for component in ["Z", "R", "T"]:
54        obs = observed.select(component=component)
55        syn = synthetic.select(component=component)
56        if not obs or not syn:
57            continue
58
59        windows = pyflex.select_windows(
60            obs, syn, config, event=event, station=stationxml
61        )
62        print(
63            "Station %s.%s component %s picked %i windows"
64            % (
65                stationxml[0].code,
66                stationxml[0][0].code,
67                component,
68                len(windows),
69            )
70        )
71        if not windows:
72            continue
73        all_windows.append(windows)
74    return all_windows
75
76
77a = time.time()
78results = ds.process_two_files_without_parallel_output(other_ds, process)
79b = time.time()
80
81if ds.mpi.rank == 0:
82    print(results)
83    print(len(results))
84
85print("Time taken:", b - a)
86
87# Important when running with MPI as it might otherwise not be able to finish.
88del ds
89del other_ds