Running pyflex in ParallelΒΆ

pyasdf can be used to run a function across the data from two ASDF data sets. In most cases it will be some kind of misfit or comparision function. This example runs pyflex to pick windows given a data set of observed and another data set of synthetic data.

It can only be run with MPI:

$ mpirun -n 16 python parallel_pyflex.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import pyflex
from pyasdf import ASDFDataSet

ds = ASDFDataSet("./preprocessed_27s_to_60s.h5")
other_ds = ASDFDataSet("./preprocessed_synthetic_27s_to_60s.h5")

event = ds.events[0]


def weight_function(win):
    return win.max_cc_value

config = pyflex.Config(
    min_period=27.0, max_period=60.0,
    stalta_waterlevel=0.11,
    tshift_acceptance_level=15.0,
    dlna_acceptance_level=2.5,
    cc_acceptance_level=0.6,
    c_0=0.7, c_1=2.0, c_2=0.0, c_3a=1.0,
    c_3b=2.0, c_4a=3.0, c_4b=10.0,
    s2n_limit=0.5,
    max_time_before_first_arrival=-50.0,
    min_surface_wave_velocity=3.0,
    window_signal_to_noise_type="energy",
    window_weight_fct=weight_function)


def process(this_station_group, other_station_group):
    # Make sure everything thats required is there.
    if not hasattr(this_station_group, "StationXML") or \
            not hasattr(this_station_group, "preprocessed_27s_to_60s") or \
            not hasattr(other_station_group,
                        "preprocessed_synthetic_27s_to_60s"):
        return

    stationxml = this_station_group.StationXML
    observed = this_station_group.preprocessed_27s_to_60s
    synthetic = other_station_group.preprocessed_synthetic_27s_to_60s

    all_windows = []

    for component in ["Z", "R", "T"]:
        obs = observed.select(component=component)
        syn = synthetic.select(component=component)
        if not obs or not syn:
            continue

        windows = pyflex.select_windows(
            obs, syn, config, event=event, station=stationxml)
        print("Station %s.%s component %s picked %i windows" % (
            stationxml[0].code, stationxml[0][0].code, component,
            len(windows)))
        if not windows:
            continue
        all_windows.append(windows)
    return all_windows

import time
a = time.time()
results = ds.process_two_files_without_parallel_output(other_ds, process)
b = time.time()

if ds.mpi.rank == 0:
    print(results)
    print len(results)

print "Time taken:", b - a

# Important when running with MPI as it might otherwise not be able to finish.
del ds
del other_ds