Processing Observed Data in Parallel

This fairly complex examples takes an ASDF file and produces two new data sets, each processed in a different frequency band.

It can be run with MPI. It scales fairly well and will utilize parallel I/O if your machine supports it. Please keep in mind that there is a significant start-up cost for Python on each core (special Python versions that get around that if really necessary are in existence) so don’t use too many cores.

$ mpirun -n 64 python process_observed.py

If you don’t run it with MPI with will utilize Python’s multiprocessing module and run it on each of the machines cores. I/O is not parallel and uses a round-robin scheme where only one core writes at single point in time.

$ python process_observed.py
 1import obspy
 2from obspy.core.util.geodetics import gps2DistAzimuth
 3import numpy as np
 4
 5from pyasdf import ASDFDataSet
 6
 7ds = ASDFDataSet("./observed.h5")
 8
 9event = ds.events[0]
10
11origin = event.preferred_origin() or event.origins[0]
12event_latitude = origin.latitude
13event_longitude = origin.longitude
14
15# Figure out these parameters somehonw!
16starttime = obspy.UTCDateTime("2010-03-11T06:22:19.021324Z")
17npts = 5708
18sampling_rate = 1.0
19
20
21# Loop over both period sets. This will result in two files. It could also be
22# saved to the same file.
23for min_period, max_period in [(27.0, 60.0)]:
24    f2 = 1.0 / max_period
25    f3 = 1.0 / min_period
26    f1 = 0.8 * f2
27    f4 = 1.2 * f3
28    pre_filt = (f1, f2, f3, f4)
29
30    def process_function(st, inv):
31        st.detrend("linear")
32        st.detrend("demean")
33        st.taper(max_percentage=0.05, type="hann")
34
35        st.attach_response(inv)
36        st.remove_response(
37            output="DISP", pre_filt=pre_filt, zero_mean=False, taper=False
38        )
39
40        st.detrend("linear")
41        st.detrend("demean")
42        st.taper(max_percentage=0.05, type="hann")
43
44        st.interpolate(
45            sampling_rate=sampling_rate, starttime=starttime, npts=npts
46        )
47
48        station_latitude = inv[0][0].latitude
49        station_longitude = inv[0][0].longitude
50        _, baz, _ = gps2DistAzimuth(
51            station_latitude,
52            station_longitude,
53            event_latitude,
54            event_longitude,
55        )
56
57        components = [tr.stats.channel[-1] for tr in st]
58        if "N" in components and "E" in components:
59            st.rotate(method="NE->RT", back_azimuth=baz)
60
61        # Convert to single precision to save space.
62        for tr in st:
63            tr.data = np.require(tr.data, dtype="float32")
64
65        return st
66
67    tag_name = "preprocessed_%is_to_%is" % (int(min_period), int(max_period))
68
69    tag_map = {"raw_recording": tag_name}
70
71    ds.process(process_function, tag_name + ".h5", tag_map=tag_map)
72
73# Important when running with MPI as it might otherwise not be able to finish.
74del ds