Running pyflex in ParallelΒΆ

pyasdf can be used to run a function across the data from two ASDF data sets. In most cases it will be some kind of misfit or comparision function. This example runs pyflex to pick windows given a data set of observed and another data set of synthetic data.

It can only be run with MPI:

$ mpirun -n 16 python parallel_pyflex.py
 1import time
 2
 3import pyflex
 4from pyasdf import ASDFDataSet
 5
 6ds = ASDFDataSet("./preprocessed_27s_to_60s.h5")
 7other_ds = ASDFDataSet("./preprocessed_synthetic_27s_to_60s.h5")
 8
 9event = ds.events[0]
10
11
12def weight_function(win):
13    return win.max_cc_value
14
15
16config = pyflex.Config(
17    min_period=27.0,
18    max_period=60.0,
19    stalta_waterlevel=0.11,
20    tshift_acceptance_level=15.0,
21    dlna_acceptance_level=2.5,
22    cc_acceptance_level=0.6,
23    c_0=0.7,
24    c_1=2.0,
25    c_2=0.0,
26    c_3a=1.0,
27    c_3b=2.0,
28    c_4a=3.0,
29    c_4b=10.0,
30    s2n_limit=0.5,
31    max_time_before_first_arrival=-50.0,
32    min_surface_wave_velocity=3.0,
33    window_signal_to_noise_type="energy",
34    window_weight_fct=weight_function,
35)
36
37
38def process(this_station_group, other_station_group):
39    # Make sure everything thats required is there.
40    if (
41        not hasattr(this_station_group, "StationXML")
42        or not hasattr(this_station_group, "preprocessed_27s_to_60s")
43        or not hasattr(
44            other_station_group, "preprocessed_synthetic_27s_to_60s"
45        )
46    ):
47        return
48
49    stationxml = this_station_group.StationXML
50    observed = this_station_group.preprocessed_27s_to_60s
51    synthetic = other_station_group.preprocessed_synthetic_27s_to_60s
52
53    all_windows = []
54
55    for component in ["Z", "R", "T"]:
56        obs = observed.select(component=component)
57        syn = synthetic.select(component=component)
58        if not obs or not syn:
59            continue
60
61        windows = pyflex.select_windows(
62            obs, syn, config, event=event, station=stationxml
63        )
64        print(
65            "Station %s.%s component %s picked %i windows"
66            % (
67                stationxml[0].code,
68                stationxml[0][0].code,
69                component,
70                len(windows),
71            )
72        )
73        if not windows:
74            continue
75        all_windows.append(windows)
76    return all_windows
77
78
79a = time.time()
80results = ds.process_two_files_without_parallel_output(other_ds, process)
81b = time.time()
82
83if ds.mpi.rank == 0:
84    print(results)
85    print(len(results))
86
87print("Time taken:", b - a)
88
89# Important when running with MPI as it might otherwise not be able to finish.
90del ds
91del other_ds