Running pyflex in ParallelΒΆ
pyasdf
can be used to run a function across the data from two ASDF data
sets. In most cases it will be some kind of misfit or comparision function.
This example runs pyflex to pick
windows given a data set of observed and another data set of synthetic data.
It can only be run with MPI:
$ mpirun -n 16 python parallel_pyflex.py
1import time
2
3import pyflex
4from pyasdf import ASDFDataSet
5
6ds = ASDFDataSet("./preprocessed_27s_to_60s.h5")
7other_ds = ASDFDataSet("./preprocessed_synthetic_27s_to_60s.h5")
8
9event = ds.events[0]
10
11
12def weight_function(win):
13 return win.max_cc_value
14
15
16config = pyflex.Config(
17 min_period=27.0,
18 max_period=60.0,
19 stalta_waterlevel=0.11,
20 tshift_acceptance_level=15.0,
21 dlna_acceptance_level=2.5,
22 cc_acceptance_level=0.6,
23 c_0=0.7,
24 c_1=2.0,
25 c_2=0.0,
26 c_3a=1.0,
27 c_3b=2.0,
28 c_4a=3.0,
29 c_4b=10.0,
30 s2n_limit=0.5,
31 max_time_before_first_arrival=-50.0,
32 min_surface_wave_velocity=3.0,
33 window_signal_to_noise_type="energy",
34 window_weight_fct=weight_function,
35)
36
37
38def process(this_station_group, other_station_group):
39 # Make sure everything thats required is there.
40 if (
41 not hasattr(this_station_group, "StationXML")
42 or not hasattr(this_station_group, "preprocessed_27s_to_60s")
43 or not hasattr(other_station_group, "preprocessed_synthetic_27s_to_60s")
44 ):
45 return
46
47 stationxml = this_station_group.StationXML
48 observed = this_station_group.preprocessed_27s_to_60s
49 synthetic = other_station_group.preprocessed_synthetic_27s_to_60s
50
51 all_windows = []
52
53 for component in ["Z", "R", "T"]:
54 obs = observed.select(component=component)
55 syn = synthetic.select(component=component)
56 if not obs or not syn:
57 continue
58
59 windows = pyflex.select_windows(
60 obs, syn, config, event=event, station=stationxml
61 )
62 print(
63 "Station %s.%s component %s picked %i windows"
64 % (
65 stationxml[0].code,
66 stationxml[0][0].code,
67 component,
68 len(windows),
69 )
70 )
71 if not windows:
72 continue
73 all_windows.append(windows)
74 return all_windows
75
76
77a = time.time()
78results = ds.process_two_files_without_parallel_output(other_ds, process)
79b = time.time()
80
81if ds.mpi.rank == 0:
82 print(results)
83 print(len(results))
84
85print("Time taken:", b - a)
86
87# Important when running with MPI as it might otherwise not be able to finish.
88del ds
89del other_ds