Running pyflex in ParallelΒΆ
pyasdf
can be used to run a function across the data from two ASDF data
sets. In most cases it will be some kind of misfit or comparision function.
This example runs pyflex to pick
windows given a data set of observed and another data set of synthetic data.
It can only be run with MPI:
$ mpirun -n 16 python parallel_pyflex.py
1import time
2
3import pyflex
4from pyasdf import ASDFDataSet
5
6ds = ASDFDataSet("./preprocessed_27s_to_60s.h5")
7other_ds = ASDFDataSet("./preprocessed_synthetic_27s_to_60s.h5")
8
9event = ds.events[0]
10
11
12def weight_function(win):
13 return win.max_cc_value
14
15
16config = pyflex.Config(
17 min_period=27.0,
18 max_period=60.0,
19 stalta_waterlevel=0.11,
20 tshift_acceptance_level=15.0,
21 dlna_acceptance_level=2.5,
22 cc_acceptance_level=0.6,
23 c_0=0.7,
24 c_1=2.0,
25 c_2=0.0,
26 c_3a=1.0,
27 c_3b=2.0,
28 c_4a=3.0,
29 c_4b=10.0,
30 s2n_limit=0.5,
31 max_time_before_first_arrival=-50.0,
32 min_surface_wave_velocity=3.0,
33 window_signal_to_noise_type="energy",
34 window_weight_fct=weight_function,
35)
36
37
38def process(this_station_group, other_station_group):
39 # Make sure everything thats required is there.
40 if (
41 not hasattr(this_station_group, "StationXML")
42 or not hasattr(this_station_group, "preprocessed_27s_to_60s")
43 or not hasattr(
44 other_station_group, "preprocessed_synthetic_27s_to_60s"
45 )
46 ):
47 return
48
49 stationxml = this_station_group.StationXML
50 observed = this_station_group.preprocessed_27s_to_60s
51 synthetic = other_station_group.preprocessed_synthetic_27s_to_60s
52
53 all_windows = []
54
55 for component in ["Z", "R", "T"]:
56 obs = observed.select(component=component)
57 syn = synthetic.select(component=component)
58 if not obs or not syn:
59 continue
60
61 windows = pyflex.select_windows(
62 obs, syn, config, event=event, station=stationxml
63 )
64 print(
65 "Station %s.%s component %s picked %i windows"
66 % (
67 stationxml[0].code,
68 stationxml[0][0].code,
69 component,
70 len(windows),
71 )
72 )
73 if not windows:
74 continue
75 all_windows.append(windows)
76 return all_windows
77
78
79a = time.time()
80results = ds.process_two_files_without_parallel_output(other_ds, process)
81b = time.time()
82
83if ds.mpi.rank == 0:
84 print(results)
85 print(len(results))
86
87print("Time taken:", b - a)
88
89# Important when running with MPI as it might otherwise not be able to finish.
90del ds
91del other_ds