Processing Observed Data in Parallel¶
This fairly complex examples takes an ASDF file and produces two new data sets, each processed in a different frequency band.
It can be run with MPI. It scales fairly well and will utilize parallel I/O if your machine supports it. Please keep in mind that there is a significant start-up cost for Python on each core (special Python versions that get around that if really necessary are in existence) so don’t use too many cores.
$ mpirun -n 64 python process_observed.py
If you don’t run it with MPI with will utilize Python’s multiprocessing
module and run it on each of the machines cores. I/O is not parallel and
uses a round-robin scheme where only one core writes at single point in time.
$ python process_observed.py
1import obspy
2from obspy.core.util.geodetics import gps2DistAzimuth
3import numpy as np
4
5from pyasdf import ASDFDataSet
6
7ds = ASDFDataSet("./observed.h5")
8
9event = ds.events[0]
10
11origin = event.preferred_origin() or event.origins[0]
12event_latitude = origin.latitude
13event_longitude = origin.longitude
14
15# Figure out these parameters somehonw!
16starttime = obspy.UTCDateTime("2010-03-11T06:22:19.021324Z")
17npts = 5708
18sampling_rate = 1.0
19
20
21# Loop over both period sets. This will result in two files. It could also be
22# saved to the same file.
23for min_period, max_period in [(27.0, 60.0)]:
24 f2 = 1.0 / max_period
25 f3 = 1.0 / min_period
26 f1 = 0.8 * f2
27 f4 = 1.2 * f3
28 pre_filt = (f1, f2, f3, f4)
29
30 def process_function(st, inv):
31 st.detrend("linear")
32 st.detrend("demean")
33 st.taper(max_percentage=0.05, type="hann")
34
35 st.attach_response(inv)
36 st.remove_response(
37 output="DISP", pre_filt=pre_filt, zero_mean=False, taper=False
38 )
39
40 st.detrend("linear")
41 st.detrend("demean")
42 st.taper(max_percentage=0.05, type="hann")
43
44 st.interpolate(
45 sampling_rate=sampling_rate, starttime=starttime, npts=npts
46 )
47
48 station_latitude = inv[0][0].latitude
49 station_longitude = inv[0][0].longitude
50 _, baz, _ = gps2DistAzimuth(
51 station_latitude,
52 station_longitude,
53 event_latitude,
54 event_longitude,
55 )
56
57 components = [tr.stats.channel[-1] for tr in st]
58 if "N" in components and "E" in components:
59 st.rotate(method="NE->RT", back_azimuth=baz)
60
61 # Convert to single precision to save space.
62 for tr in st:
63 tr.data = np.require(tr.data, dtype="float32")
64
65 return st
66
67 tag_name = "preprocessed_%is_to_%is" % (int(min_period), int(max_period))
68
69 tag_map = {"raw_recording": tag_name}
70
71 ds.process(process_function, tag_name + ".h5", tag_map=tag_map)
72
73# Important when running with MPI as it might otherwise not be able to finish.
74del ds