Source code for eqcorrscan.utils.catalog_to_dd

"""
Functions to generate hypoDD input files from catalogs.

:copyright:
    EQcorrscan developers.

:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import numpy as np
import logging
from collections import namedtuple, defaultdict, Counter
from multiprocessing import cpu_count, Pool, shared_memory

from obspy import UTCDateTime, Stream
from obspy.core.event import (
    Catalog, Event, Origin, Magnitude, Pick, WaveformStreamID, Arrival,
    OriginQuality)
from eqcorrscan.utils.clustering import dist_mat_km
from eqcorrscan.core.lag_calc import _concatenate_and_correlate, _xcorr_interp
from eqcorrscan.utils.correlate import pool_boy

Logger = logging.getLogger(__name__)

SeedPickID = namedtuple("SeedPickID", ["seed_id", "phase_hint"])


# Some hypoDD specific event holders - classes were faster than named-tuples

class SparseEvent(object):
    def __init__(self, resource_id, picks, origin_time):
        self.resource_id = resource_id
        self.picks = picks
        self.origin_time = origin_time

    def __repr__(self):
        return ("SparseEvent(resource_id={0}, origin_time={1}, picks=[{2} "
                "picks])".format(
                    self.resource_id, self.origin_time, len(self.picks)))


class SparsePick(object):
    def __init__(self, tt, time, time_weight, seed_id, phase_hint,
                 waveform_id):
        self.tt = tt
        self.time = time
        self.time_weight = time_weight
        self.seed_id = seed_id
        self.phase_hint = phase_hint
        self.waveform_id = waveform_id

    def __repr__(self):
        return ("SparsePick(seed_id={0}, phase_hint={1}, tt={2:.2f}, "
                "time_weight={3})".format(
                    self.seed_id, self.phase_hint, self.tt, self.time_weight))

    @property
    def station(self):
        return self.seed_id.split('.')[1]

    @property
    def channel(self):
        return self.seed_id.split('.')[-1]


# Generic helpers

class _DTObs(object):
    """ Holder for phase observations """

    def __init__(self, station, tt1, tt2, weight, phase):
        self.station = station
        assert len(self.station) <= 7, "Station must be <= five characters"
        self.tt1 = tt1
        self.tt2 = tt2
        self.weight = weight
        self.phase = phase
        assert self.phase in "PS", "Only P or S phases accepted"

    def __repr__(self):
        return ("_DTObs(station={0}, tt1={1:.2f}, tt2={2:.2f}, weight={3:.2f},"
                " phase={4})".format(self.station, self.tt1, self.tt2,
                                     self.weight, self.phase))

    @property
    def ct_string(self):
        return "{sta:<7s} {tt1:7.3f} {tt2:7.3f} {weight:6.4f} {phase}".format(
            sta=self.station, tt1=self.tt1, tt2=self.tt2, weight=self.weight,
            phase=self.phase)

    @property
    def cc_string(self):
        return "{sta:<7s} {dt:7.3f} {weight:6.4f} {phase}".format(
            sta=self.station, dt=self.tt1 - self.tt2, weight=self.weight,
            phase=self.phase)


class _EventPair(object):
    """ Holder for event paid observations. """

    def __init__(self, event_id_1, event_id_2, obs=None):
        self.event_id_1 = event_id_1
        self.event_id_2 = event_id_2
        self.obs = obs or list()

    def __repr__(self):
        return ("_EventPair(event_id_1={0}, event_id_2={1}, obs=[{2} "
                "observations])".format(self.event_id_1, self.event_id_2,
                                        len(self.obs)))

    @property
    def _header(self):
        return "# {event_id_1:>9d} {event_id_2:>9d}".format(
            event_id_1=self.event_id_1, event_id_2=self.event_id_2)

    @property
    def ct_string(self):
        parts = [self._header]
        parts.extend([obs.ct_string for obs in self.obs])
        return '\n'.join(parts)

    @property
    def cc_string(self):
        parts = [self._header + " 0.0"]  # No origin time correction
        parts.extend([obs.cc_string for obs in self.obs])
        return '\n'.join(parts)


def _generate_event_id_mapper(catalog, event_id_mapper=None):
    """
    Generate or fill an event_id_mapper mapping event resource id to integer id
    """
    event_id_mapper = event_id_mapper or dict()
    try:
        largest_event_id = max(event_id_mapper.values())
    except ValueError:
        largest_event_id = 0
    for event in catalog:
        if str(event.resource_id) not in event_id_mapper.keys():
            event_id = largest_event_id + 1
            largest_event_id = event_id
            event_id_mapper.update({str(event.resource_id): event_id})
    return event_id_mapper


def _make_sparse_event(event):
    """ Make a sparse event with just the info hypoDD needs. """
    origin_time = (event.preferred_origin() or event.origins[0]).time
    time_weight_dict = {
        arr.pick_id: arr.time_weight or 1.0 for origin in event.origins
        for arr in origin.arrivals}
    sparse_event = SparseEvent(
        resource_id=event.resource_id.id,
        origin_time=origin_time,
        picks=[SparsePick(
            tt=pick.time - origin_time,
            time=pick.time,
            seed_id=pick.waveform_id.get_seed_string(),
            phase_hint=pick.phase_hint[0],  # Only use P or S hints.
            time_weight=time_weight_dict.get(pick.resource_id, 1.0),
            waveform_id=pick.waveform_id)
            for pick in event.picks])
    return sparse_event


def _prepare_stream(stream, event, extract_len, pre_pick, seed_pick_ids=None):
    """
    Slice stream around picks

    returns a dictionary of traces keyed by phase_hint.
    """
    seed_pick_ids = seed_pick_ids or {
        SeedPickID(pick.waveform_id.get_seed_string(), pick.phase_hint[0])
        for pick in event.picks if pick.phase_hint.startswith(("P", "S"))}
    stream_sliced = defaultdict(Stream)
    for seed_pick_id in seed_pick_ids:
        pick = [pick for pick in event.picks
                if pick.waveform_id.get_seed_string() == seed_pick_id.seed_id
                and pick.phase_hint[0] == seed_pick_id.phase_hint]
        if len(pick) > 1:
            Logger.warning(
                "Multiple picks for {seed_id}, phase-hint {phase_hint}, using "
                "the earliest".format(
                    seed_id=seed_pick_id.seed_id,
                    phase_hint=seed_pick_id.phase_hint))
            pick = sorted(pick, key=lambda p: p.time)
        elif len(pick) == 0:
            continue
        pick = pick[0]
        tr = stream.select(id=seed_pick_id.seed_id).merge()
        if len(tr) == 0:
            continue
        else:
            tr = tr[0]
        Logger.debug(
            f"Trimming trace on {tr.id} between {tr.stats.starttime} - "
            f"{tr.stats.endtime} to {pick.time - pre_pick} - "
            f"{(pick.time - pre_pick) + extract_len}")
        tr = stream.select(id=seed_pick_id.seed_id).slice(
            starttime=pick.time - pre_pick,
            endtime=(pick.time - pre_pick) + extract_len).merge()
        if len(tr) == 0:
            continue
        if len(tr) > 1:
            Logger.error("Multiple traces for {seed_id}".format(
                seed_id=seed_pick_id.seed_id))
            continue
        tr = tr[0]

        # If there is one sample too many after this remove the first one
        # by convention
        n_samples_intended = extract_len * tr.stats.sampling_rate
        if len(tr.data) == n_samples_intended + 1:
            tr.data = tr.data[1:len(tr.data)]
        # if tr.stats.endtime - tr.stats.starttime != extract_len:
        if tr.stats.npts < n_samples_intended:
            Logger.warning(
                "Insufficient data ({rlen} s) for {tr_id}, discarding. Check "
                "that your traces are at least of length {length} s, with a "
                "pre_pick time of at least {prepick} s!".format(
                    rlen=tr.stats.endtime - tr.stats.starttime,
                    tr_id=tr.id, length=extract_len, prepick=pre_pick))
            continue
        stream_sliced.update(
            {seed_pick_id.phase_hint:
             stream_sliced[seed_pick_id.phase_hint] + tr})
    return stream_sliced


# Time calculators

def _compute_dt_correlations(catalog, master, min_link, event_id_mapper,
                             stream_dict, min_cc, extract_len, pre_pick,
                             shift_len, interpolate, max_workers=1,
                             shm_data_shape=None, shm_dtype=None,
                             weight_by_square=True, **kwargs):
    """ Compute cross-correlation delay times. """
    max_workers = max_workers or 1
    Logger.info(
        f"Correlating {str(master.resource_id)} with {len(catalog)} events")
    differential_times_dict = dict()
    # Assign trace data from shared memory
    for (key, stream) in stream_dict.items():
        for tr in stream:
            if len(tr.data) == 0 and hasattr(tr, 'shared_memory_name'):
                shm = shared_memory.SharedMemory(name=tr.shared_memory_name)
                # Reconstructing numpy data array
                sm_data = np.ndarray(
                    shm_data_shape, dtype=shm_dtype, buffer=shm.buf)
                tr.data = np.zeros_like(sm_data)
                # Copy data into process memory
                tr.data[:] = sm_data[:]

    master_stream = _prepare_stream(
        stream=stream_dict[str(master.resource_id)], event=master,
        extract_len=extract_len, pre_pick=pre_pick)
    available_seed_ids = {tr.id for st in master_stream.values() for tr in st}
    Logger.debug(f"The channels provided are: {available_seed_ids}")
    master_seed_ids = {
        SeedPickID(pick.waveform_id.get_seed_string(), pick.phase_hint[0])
        for pick in master.picks if
        pick.phase_hint[0] in "PS" and
        pick.waveform_id.get_seed_string() in available_seed_ids}
    Logger.debug(f"Using channels: {master_seed_ids}")
    # Dictionary of travel-times for master keyed by {station}_{phase_hint}
    master_tts = dict()
    try:
        master_origin_time = (
            master.preferred_origin() or master.origins[0]).time
    except AttributeError:  # In case it's a SparseEvent
        master_origin_time = master.origin_time
    for pick in master.picks:
        if pick.phase_hint[0] not in "PS":
            continue
        tt1 = pick.time - master_origin_time
        master_tts.update({
            "{0}_{1}".format(
                pick.waveform_id.station_code, pick.phase_hint[0]): tt1})

    matched_length = extract_len + (2 * shift_len)
    matched_pre_pick = pre_pick + shift_len
    # We will use this to maintain order
    event_dict = {str(event.resource_id): event for event in catalog}
    event_ids = set(event_dict.keys())
    # Check for overlap
    _stream_event_ids = set(stream_dict.keys())
    if len(event_ids.difference(_stream_event_ids)):
        Logger.warning(
            f"Missing streams for {event_ids.difference(_stream_event_ids)}")
        # Just use the event ids that we actually have streams for!
        event_ids = event_ids.intersection(_stream_event_ids)
    # Reorder event_ids according to original order
    event_ids = [key for key in event_dict.keys() if key in event_ids]

    if max_workers > 1:
        with pool_boy(Pool, len(event_ids), cores=max_workers) as pool:
            results = [pool.apply_async(
                _prepare_stream,
                args=(stream_dict[event_id], event_dict[event_id],
                      matched_length, matched_pre_pick),
                kwds=dict(seed_pick_ids=master_seed_ids))
                        for event_id in event_ids]
        matched_streams = {id_res[0]: id_res[1].get()
                           for id_res in zip(event_ids, results)}
    else:
        matched_streams = {
            event_id: _prepare_stream(
                stream=stream_dict[event_id], event=event_dict[event_id],
                extract_len=matched_length, pre_pick=matched_pre_pick,
                seed_pick_ids=master_seed_ids)
            for event_id in event_ids}

    sampling_rates = {tr.stats.sampling_rate for st in master_stream.values()
                      for tr in st}
    for phase_hint in master_stream.keys():  # Loop over P and S separately
        for sampling_rate in sampling_rates:  # Loop over separate samp rates
            delta = 1.0 / sampling_rate
            _master_stream = master_stream[phase_hint].select(
                sampling_rate=sampling_rate)
            if len(_master_stream) == 0:
                continue
            _matched_streams = dict()
            for key, value in matched_streams.items():
                _st = value[phase_hint].select(sampling_rate=sampling_rate)
                if len(_st) > 0:
                    _matched_streams.update({key: _st})
            if len(_matched_streams) == 0:
                Logger.info("No matching data for {0}, {1} phase".format(
                    str(master.resource_id), phase_hint))
                continue
            # Check lengths
            master_length = [tr.stats.npts for tr in _master_stream]
            if len(set(master_length)) > 1:
                Logger.warning("Multiple lengths found - check that you "
                               "are providing sufficient data")
            master_length = Counter(master_length).most_common(1)[0][0]
            _master_stream = _master_stream.select(npts=master_length)
            matched_length = Counter(
                (tr.stats.npts for st in _matched_streams.values()
                 for tr in st))
            if len(matched_length) > 1:
                Logger.warning("Multiple lengths of stream found - taking "
                               "the most common. Check that you are "
                               "providing sufficient data")
            matched_length = matched_length.most_common(1)[0][0]
            if matched_length < master_length:
                Logger.error("Matched streams are shorter than the master, "
                             "will not correlate")
                continue
            # Remove empty streams and generate an ordered list of event_ids
            used_event_ids, used_matched_streams = [], []
            for event_id, _matched_stream in _matched_streams.items():
                _matched_stream = _matched_stream.select(npts=matched_length)
                if len(_matched_stream) > 0:
                    used_event_ids.append(event_id)
                    used_matched_streams.append(_matched_stream)
            # Check that there are matching seed ids.
            master_seed_ids = set(tr.id for tr in _master_stream)
            matched_seed_ids = set(
                tr.id for st in used_matched_streams for tr in st)
            if not matched_seed_ids.issubset(master_seed_ids):
                Logger.warning(
                    "After checking length there are no matched traces: "
                    f"master: {master_seed_ids}, matched: {matched_seed_ids}")
                continue
            # Do the correlations
            Logger.debug(
                f"Correlating channels: {[tr.id for tr in _master_stream]}")
            ccc_out, used_chans = _concatenate_and_correlate(
                template=_master_stream, streams=used_matched_streams,
                cores=max_workers)
            # Convert ccc_out to pick-time
            for i, used_event_id in enumerate(used_event_ids):
                for j, chan in enumerate(used_chans[i]):
                    if not chan.used:
                        continue
                    correlation = ccc_out[i][j]
                    if interpolate:
                        shift, cc_max = _xcorr_interp(correlation, dt=delta,
                                                      **kwargs)
                    else:
                        cc_max = np.amax(correlation)
                        shift = np.argmax(correlation) * delta
                    if cc_max < min_cc:
                        continue
                    shift -= shift_len
                    pick = [p for p in event_dict[used_event_id].picks
                            if p.phase_hint[0] == phase_hint
                            and p.waveform_id.station_code == chan.channel[0]
                            and p.waveform_id.channel_code == chan.channel[1]]
                    pick = sorted(pick, key=lambda p: p.time)[0]
                    try:
                        tt2 = pick.time - (
                                event_dict[used_event_id].preferred_origin() or
                                event_dict[used_event_id].origins[0]).time
                    except AttributeError:
                        tt2 = pick.time - event_dict[used_event_id].origin_time
                    tt2 += shift
                    diff_time = differential_times_dict.get(
                        used_event_id, None)
                    if diff_time is None:
                        diff_time = _EventPair(
                            event_id_1=event_id_mapper[
                                str(master.resource_id)],
                            event_id_2=event_id_mapper[used_event_id])
                    weight = cc_max
                    if weight_by_square:
                        weight **= 2
                    diff_time.obs.append(
                        _DTObs(station=chan.channel[0],
                               tt1=master_tts["{0}_{1}".format(
                                   chan.channel[0], phase_hint)],
                               tt2=tt2, weight=weight,
                               phase=phase_hint[0]))
                    differential_times_dict.update({used_event_id: diff_time})
    # Threshold on min_link
    differential_times = [dt for dt in differential_times_dict.values()
                          if len(dt.obs) >= min_link]
    return differential_times


def _compute_dt(sparse_catalog, master, min_link, event_id_mapper):
    """
    Inner function to compute differential times between a catalog and a
    master event.
    """
    return [_make_event_pair(
        sparse_event=event, master=master, event_id_mapper=event_id_mapper,
        min_link=min_link) for event in sparse_catalog]


def _make_event_pair(sparse_event, master, event_id_mapper, min_link):
    """
    Make an event pair for a given event and master event.
    """
    differential_times = _EventPair(
        event_id_1=event_id_mapper[master.resource_id],
        event_id_2=event_id_mapper[sparse_event.resource_id])
    for master_pick in master.picks:
        if master_pick.phase_hint and \
                master_pick.phase_hint not in "PS":  # pragma: no cover
            continue
        matched_picks = [p for p in sparse_event.picks
                         if p.station == master_pick.station
                         and p.phase_hint == master_pick.phase_hint]
        for matched_pick in matched_picks:
            differential_times.obs.append(
                _DTObs(station=master_pick.station,
                       tt1=master_pick.tt, tt2=matched_pick.tt,
                       weight=(master_pick.time_weight +
                               matched_pick.time_weight) / 2.0,
                       phase=master_pick.phase_hint))
    if len(differential_times.obs) >= min_link:
        return differential_times
    return


def _prep_horiz_picks(catalog, stream_dict, event_id_mapper):
    """
    Fill in horizontal picks for the alternate horizontal channel for events in
    catalog.
    """
    # keep user input safe
    catalog = catalog.copy()
    for event in catalog:
        event_S_picks = [
            pick for pick in event.picks if pick.phase_hint.upper().startswith(
                'S') and pick.waveform_id.get_seed_string()[-1] in 'EN12XY']
        st = stream_dict[str(event.resource_id)]
        st = Stream([tr for tr in st if tr.stats.channel[-1] in 'EN12XY'])
        for tr in st:
            tr_picks = [
                pick for pick in event_S_picks
                if tr.id == pick.waveform_id.get_seed_string()]
            if len(tr_picks) > 0:
                continue
            else:
                tr_picks = [
                    pick for pick in event_S_picks
                    if tr.id[0:-1] == pick.waveform_id.get_seed_string()[0:-1]]
                new_wav_id = WaveformStreamID(network_code=tr.stats.network,
                                              station_code=tr.stats.station,
                                              location_code=tr.stats.location,
                                              channel_code=tr.stats.channel)
                for pick in tr_picks:
                    new_pick = SparsePick(tt=pick.tt, time=pick.time,
                                          time_weight=pick.time_weight,
                                          seed_id=new_wav_id.get_seed_string(),
                                          phase_hint=pick.phase_hint,
                                          waveform_id=new_wav_id)
                    event.picks.append(new_pick)
    return catalog


def stream_dict_to_shared_mem(stream_dict):
    """
    Move the data of streams from a dict of (key, obspy.stream) into shared
    memory so that the data can be retrieved by multiple processes in parallel.
    This can help speed up parallel execution because the initiation of each
    worker process becomes cheaper (less data to transfer). For now this only
    puts the numpy array in trace.data into shared memory (because it's easy).

    :type stream_dict: dict of (key, `obspy.stream`)
    :param stream_dict: dict of streams that should be moved to shared memory

    :returns: stream_dict, shm_name_list, shm_data_shapes, shm_data_dtypes

    :rtype: dict
    :return: Dictionary streams that were moved to shared memory
    :rtype: list
    :return: List of names to the shared memory address for each trace.
    :rtype: list
    :return:
        List of numpy-array shaped for each trace-data array in shared memory.
    :rtype: list
    :return: List of data types for each trace-data-array in shared memory.

    """
    shm_name_list = []
    shm_data_shapes = []
    shm_data_dtypes = []
    shm_references = []
    for (key, stream) in stream_dict.items():
        for tr in stream:
            data_array = tr.data
            # Let SharedMemory create suitable filename itself:
            shm = shared_memory.SharedMemory(
                create=True, size=data_array.nbytes)
            shm_name_list.append(shm.name)
            shm_references.append(shm)
            # Now create a NumPy array backed by shared memory
            shm_data_shape = data_array.shape
            shm_data_dtype = data_array.dtype
            shared_data_array = np.ndarray(
                shm_data_shape, dtype=shm_data_dtype, buffer=shm.buf)
            # Copy the original data into shared memory
            shared_data_array[:] = data_array[:]
            # tr.data = shared_data_array
            tr.data = np.array([])
            tr.shared_memory_name = shm.name
            shm_data_shapes.append(shm_data_shape)
            shm_data_dtypes.append(shm_data_dtype)
    shm_data_shapes = list(set(shm_data_shapes))
    shm_data_dtypes = list(set(shm_data_dtypes))
    return (stream_dict, shm_name_list, shm_references, shm_data_shapes,
            shm_data_dtypes)


[docs] def compute_differential_times(catalog, correlation, stream_dict=None, event_id_mapper=None, max_sep=8., min_link=8, min_cc=None, extract_len=None, pre_pick=None, shift_len=None, interpolate=False, all_horiz=False, max_workers=None, max_trace_workers=1, use_shared_memory=False, weight_by_square=True, *args, **kwargs): """ Generate groups of differential times for a catalog. :type catalog: `obspy.core.event.Catalog` :param catalog: Catalog of events to get differential times for :type correlation: bool :param correlation: If True will generate cross-correlation derived differential-times for a dt.cc file. If false, will generate catalog times for a dt.ct file. :type stream_dict: dict :param stream_dict: Dictionary of streams keyed by event-id (the event.resource_id.id, NOT the hypoDD event-id) :type event_id_mapper: dict :param event_id_mapper: Dictionary mapping event resource id to an integer event id for hypoDD. If this is None, or missing events then the dictionary will be updated to include appropriate event-ids. This should be of the form {event.resource_id.id: integer_id} :type max_sep: float :param max_sep: Maximum hypocentral separation in km to link events :type min_link: int :param min_link: Minimum shared phase observations to link events :type min_cc: float :param min_cc: Threshold to include cross-correlation results. :type extract_len: float :param extract_len: Length in seconds to extract around the pick :type pre_pick: float :param pre_pick: Time before the pick to start the correlation window :type shift_len: float :param shift_len: Time (+/-) to allow pick to vary in seconds. e.g. if shift_len is set to 1s, the pick will be allowed to shift between pick_time - 1 and pick_time + 1. :type interpolate: bool :param interpolate: Whether to interpolate correlations or not. Allows subsample accuracy :type max_workers: int :param max_workers: Maximum number of workers for parallel correlation of events. If None then all threads will be used. :type max_trace_workers: int :param max_trace_workers: Maximum number of workers for parallel correlation of traces insted of events. If None then all threads will be used (but can only be used when max_workers = 1). :type use_shared_memory: bool :param use_shared_memory: Whether to move trace data arrays into shared memory for computing trace correlations. Can speed up total execution time by ~20 % for hypodd-correlations with a lot of clustered seismicity. :type weight_by_square: bool :param weight_by_square: Whether to compute correlation weights as the square of the maximum correlation (True), or the maximum correlation (False). :rtype: dict :return: Dictionary of differential times keyed by event id. :rtype: dict :return: Dictionary of event_id_mapper .. note:: The arguments min_cc, stream_dict, extract_len, pre_pick, shift_len and interpolate are only required if correlation=True. Two parallelization strategies are available for correlating waveforms: parallelization across events (default) or across each event's traces (when max_workers = 1 and max_traces_workers > 1). The former is often quicker for short traces because it generally loads the CPU better for multiple events and may require more memory, but the latter can be quicker for few events with many or very long traces and requires less memory. .. note:: Differential times are computed as travel-time for event 1 minus travel-time for event 2 (tt1 - tt2). """ include_master = kwargs.get("include_master", False) correlation_kwargs = dict( min_cc=min_cc, stream_dict=stream_dict, extract_len=extract_len, pre_pick=pre_pick, shift_len=shift_len, interpolate=interpolate, max_workers=max_workers, weight_by_square=weight_by_square) for key, value in kwargs.items(): correlation_kwargs.update({key: value}) if correlation: for arg, name in correlation_kwargs.items(): assert arg is not None, "{0} is required for correlation".format( name) # Ensure all events have locations and picks. event_id_mapper = _generate_event_id_mapper( catalog=catalog, event_id_mapper=event_id_mapper) distances = dist_mat_km(catalog) distance_filter = distances <= max_sep if not include_master: np.fill_diagonal(distance_filter, 0) # Do not match events to themselves - this is the default, # only included for testing # Reformat catalog to sparse catalog sparse_catalog = [_make_sparse_event(ev) for ev in catalog] if all_horiz: sparse_catalog = _prep_horiz_picks(sparse_catalog, stream_dict, event_id_mapper) additional_args = dict(min_link=min_link, event_id_mapper=event_id_mapper) if correlation: differential_times = {} additional_args.update(correlation_kwargs) n = len(sparse_catalog) if max_workers == 1: # If desired, parallelize over traces instead of events: max_trace_workers = max_trace_workers or cpu_count() additional_args.update(dict(max_workers=max_trace_workers)) for i, master in enumerate(sparse_catalog): master_id = str(master.resource_id) sub_catalog = [ev for j, ev in enumerate(sparse_catalog) if distance_filter[i][j]] if master_id not in additional_args["stream_dict"].keys(): Logger.warning( f"{master_id} not in waveforms, skipping") continue differential_times.update({ master_id: _compute_dt_correlations( sub_catalog, master, **additional_args)}) Logger.info( f"Completed correlations for core event {i} of {n}") else: sub_catalogs = ([ev for i, ev in enumerate(sparse_catalog) if master_filter[i]] for master_filter in distance_filter) # Move trace data into shared memory if use_shared_memory: (shm_stream_dict, shm_name_list, shm_references, shm_data_shapes, shm_dtypes) = ( stream_dict_to_shared_mem(stream_dict)) if len(shm_data_shapes) == 1 and len(shm_dtypes) == 1: shm_data_shape = shm_data_shapes[0] shm_dtype = shm_dtypes[0] additional_args.update({'stream_dict': shm_stream_dict}) additional_args.update({'shm_data_shape': shm_data_shape}) additional_args.update({'shm_dtype': shm_dtype}) else: use_shared_memory = False with pool_boy(Pool, n, cores=max_workers) as pool: # Parallelize over events instead of traces additional_args.update(dict(max_workers=1)) results = [ pool.apply_async( _compute_dt_correlations, args=(sub_catalog, master), kwds=additional_args) for sub_catalog, master in zip(sub_catalogs, sparse_catalog) if str(master.resource_id) in additional_args[ "stream_dict"].keys()] Logger.info('Submitted asynchronous jobs to workers.') differential_times = { master.resource_id: result.get() for master, result in zip(sparse_catalog, results) if str(master.resource_id) in additional_args[ "stream_dict"].keys()} Logger.debug('Got results from workers.') # Destroy shared memory if use_shared_memory: for shm_name in shm_name_list: shm = shared_memory.SharedMemory(name=shm_name) shm.close() shm.unlink() else: sub_catalogs = ([ev for i, ev in enumerate(sparse_catalog) if master_filter[i]] for master_filter in distance_filter) max_workers = max_workers or cpu_count() if max_workers > 1: with pool_boy( Pool, len(sparse_catalog), cores=max_workers) as pool: results = [pool.apply_async( _compute_dt, args=(sub_catalog, master), kwds=additional_args) for master, sub_catalog in zip( sparse_catalog, sub_catalogs)] differential_times = { master.resource_id: result.get() for master, result in zip(sparse_catalog, results)} else: differential_times = { master.resource_id: _compute_dt( sub_catalog, master, **additional_args) for master, sub_catalog in zip(sparse_catalog, sub_catalogs)} # Remove Nones for key, value in differential_times.items(): differential_times.update({key: [v for v in value if v is not None]}) return differential_times, event_id_mapper
# dt.ct functions
[docs] def write_catalog(catalog, event_id_mapper=None, max_sep=8, min_link=8, max_workers=None): """ Generate a dt.ct file for hypoDD for a series of events. :type catalog: `obspy.core.event.Catalog` :param catalog: Catalog of events :type event_id_mapper: dict :param event_id_mapper: Dictionary mapping event resource id to an integer event id for hypoDD. If this is None, or missing events then the dictionary will be updated to include appropriate event-ids. This should be of the form {event.resource_id.id: integer_id} :type max_sep: float :param max_sep: Maximum separation between event pairs in km :type min_link: int :param min_link: Minimum links for an event to be paired, e.g. minimum number of picks from the same station and channel (and phase) that are shared between two events for them to be paired. :type max_workers: int :param max_workers: Maximum number of workers for parallel processing. If None then all threads will be used. :returns: event_id_mapper .. note:: Differential times are computed as travel-time for event 1 minus travel-time for event 2 (tt1 - tt2). """ differential_times, event_id_mapper = compute_differential_times( catalog=catalog, correlation=False, event_id_mapper=event_id_mapper, max_sep=max_sep, min_link=min_link, max_workers=max_workers) with open("dt.ct", "w") as f: for master_id, linked_events in differential_times.items(): for linked_event in linked_events: f.write(linked_event.ct_string) f.write("\n") return event_id_mapper
# dt.cc functions def _meta_filter_stream(event_id, stream_dict, lowcut, highcut): return _filter_stream( event_id=event_id, st=stream_dict[event_id], lowcut=lowcut, highcut=highcut) def _filter_stream(event_id, st, lowcut, highcut): if lowcut is not None and highcut is not None: st_out = st.copy().detrend().filter( "bandpass", freqmin=lowcut, freqmax=highcut, corners=4, zerophase=True) elif lowcut is None and highcut is not None: st_out = st.copy().detrend().filter( "lowpass", freq=highcut, corners=4, zerophase=True) elif lowcut is not None and highcut is None: st_out = st.copy().detrend().filter( "highpass", freq=lowcut, corners=4, zerophase=True) else: st_out = st # Don't need to copy if we aren't doing anything. return {event_id: st_out}
[docs] def write_correlations(catalog, stream_dict, extract_len, pre_pick, shift_len, event_id_mapper=None, lowcut=1.0, highcut=10.0, max_sep=8, min_link=8, min_cc=0.0, interpolate=False, all_horiz=False, max_workers=None, parallel_process=False, weight_by_square=True, *args, **kwargs): """ Write a dt.cc file for hypoDD input for a given list of events. Takes an input list of events and computes pick refinements by correlation. Outputs a single files, dt.cc file with weights as the square of the cross-correlation. :type catalog: `obspy.core.event.Catalog` :param catalog: Catalog of events to get differential times for :type event_id_mapper: dict :param event_id_mapper: Dictionary mapping event resource id to an integer event id for hypoDD. If this is None, or missing events then the dictionary will be updated to include appropriate event-ids. This should be of the form {event.resource_id.id: integer_id} :type stream_dict: dict :param stream_dict: Dictionary of streams keyed by event-id (the event.resource_id.id, NOT the hypoDD event-id) :type extract_len: float :param extract_len: Length in seconds to extract around the pick :type pre_pick: float :param pre_pick: Time before the pick to start the correlation window :type shift_len: float :param shift_len: Time to allow pick to vary in seconds :type lowcut: float :param lowcut: Lowcut in Hz - set to None to apply no lowcut :type highcut: float :param highcut: Highcut in Hz - set to None to apply no highcut :type max_sep: float :param max_sep: Maximum separation between event pairs in km :type min_link: int :param min_link: Minimum links for an event to be paired :type min_cc: float :param min_cc: Threshold to include cross-correlation results. :type interpolate: bool :param interpolate: Whether to interpolate correlations or not. Allows subsample accuracy :type max_workers: int :param max_workers: Maximum number of workers for parallel processing. If None then all threads will be used. :type parallel_process: bool :param parallel_process: Whether to process streams in parallel or not. Experimental, may use too much memory. :type weight_by_square: bool :param weight_by_square: Whether to compute correlation weights as the square of the maximum correlation (True), or the maximum correlation (False). :rtype: dict :returns: event_id_mapper .. note:: You can provide processed waveforms, or let this function filter your data for you. Filtering is undertaken by detrending and bandpassing with a 8th order zerophase butterworth filter. .. note:: Differential times are computed as travel-time for event 1 minus travel-time for event 2 (tt1 - tt2). """ # Depreciated argument cc_thresh = kwargs.get("cc_thresh", None) if cc_thresh: min_cc = cc_thresh Logger.warning("cc_thresh is depreciated, use min_cc instead") max_workers = max_workers or cpu_count() processed_stream_dict = stream_dict # Process the streams if not (lowcut is None and highcut is None): processed_stream_dict = dict() if parallel_process: max_process_workers = int(max(np.array( [max_workers, kwargs.get('max_trace_workers')], dtype=np.float64))) with pool_boy( Pool, len(stream_dict), cores=max_process_workers) as pool: results = [pool.apply_async( _meta_filter_stream, (key, stream_dict, lowcut, highcut)) for key in stream_dict.keys()] for result in results: processed_stream_dict.update(result.get()) else: for key in stream_dict.keys(): processed_stream_dict.update(_meta_filter_stream( stream_dict=stream_dict, lowcut=lowcut, highcut=highcut, event_id=key)) correlation_times, event_id_mapper = compute_differential_times( catalog=catalog, correlation=True, event_id_mapper=event_id_mapper, max_sep=max_sep, min_link=min_link, max_workers=max_workers, stream_dict=processed_stream_dict, min_cc=min_cc, extract_len=extract_len, pre_pick=pre_pick, shift_len=shift_len, interpolate=interpolate, all_horiz=all_horiz, weight_by_square=weight_by_square, **kwargs) with open("dt.cc", "w") as f: for master_id, linked_events in correlation_times.items(): for linked_event in linked_events: f.write(linked_event.cc_string) f.write("\n") return event_id_mapper
# Phase-file functions def _hypodd_phase_pick_str(pick, sparse_event): """ Make a hypodd phase.dat style pick string. """ pick_str = "{station:5s} {tt:7.4f} {weight:5.3f} {phase:1s}".format( station=pick.waveform_id.station_code, tt=pick.tt, weight=pick.weight, phase=pick.phase_hint[0].upper()) return pick_str def _hypodd_phase_str(event, event_id_mapper): """ Form a phase-string for hypoDD from an obspy event. """ try: origin = event.preferred_origin() or event.origins[0] except (IndexError, AttributeError): Logger.error("No origin for event {0}".format(event.resource_id.id)) return try: magnitude = (event.preferred_magnitude() or event.magnitudes[0]).mag except (IndexError, AttributeError): Logger.warning("No magnitude") magnitude = 0.0 try: time_error = origin.quality['standard_error'] or 0.0 except (TypeError, AttributeError): Logger.warning('No time residual in header') time_error = 0.0 z_err = (origin.depth_errors.uncertainty or 0.0) / 1000. # Note that these should be in degrees, but GeoNet uses meters. x_err = (origin.longitude_errors.uncertainty or 0.0) / 1000. y_err = (origin.latitude_errors.uncertainty or 0.0) / 1000. x_err = max(x_err, y_err) event_str = [( "# {year:4d} {month:02d} {day:02d} {hour:02d} {minute:02d} " "{second:02d} {latitude:10.6f} {longitude: 10.6f} {depth:10.6f} " "{magnitude:4.2f} {err_h:4.2f} {err_z:4.2f} {t_rms:4.2f} " "{event_id:9d}".format( year=origin.time.year, month=origin.time.month, day=origin.time.day, hour=origin.time.hour, minute=origin.time.minute, second=origin.time.second, latitude=origin.latitude, longitude=origin.longitude, depth=origin.depth / 1000., magnitude=magnitude, err_h=x_err, err_z=z_err, t_rms=time_error, event_id=event_id_mapper[event.resource_id.id]))] sparse_event = _make_sparse_event(event) for pick in sparse_event.picks: if pick.phase_hint[0] not in "PS": continue event_str.append( "{station:5s} {tt:7.2f} {weight:5.3f} {phase_hint:1s}".format( station=pick.station, tt=pick.tt, weight=pick.time_weight, phase_hint=pick.phase_hint[0].upper())) return "\n".join(event_str)
[docs] def write_phase(catalog, event_id_mapper=None): """ Write a phase.dat formatted file from an obspy catalog. :type catalog: `obspy.core.event.Catalog` :param catalog: Catalog of events :type event_id_mapper: dict :param event_id_mapper: Dictionary mapping event resource id to an integer event id for hypoDD. If this is None, or missing events then the dictionary will be updated to include appropriate event-ids. This should be of the form {event.resource_id.id: integer_id} :return: event_id_mapper """ event_id_mapper = _generate_event_id_mapper(catalog, event_id_mapper) phase_strings = "\n".join( [_hypodd_phase_str(event, event_id_mapper) for event in catalog]) with open("phase.dat", "w") as f: f.write(phase_strings) return event_id_mapper
[docs] def read_phase(ph_file): """ Read hypoDD phase files into Obspy catalog class. :type ph_file: str :param ph_file: Phase file to read event info from. :returns: Catalog of events from file. :rtype: :class:`obspy.core.event.Catalog` >>> from obspy.core.event.catalog import Catalog >>> # Get the path to the test data >>> import eqcorrscan >>> import os >>> TEST_PATH = os.path.dirname(eqcorrscan.__file__) + '/tests/test_data' >>> catalog = read_phase(TEST_PATH + '/tunnel.phase') >>> isinstance(catalog, Catalog) True """ ph_catalog = Catalog() f = open(ph_file, 'r') # Topline of each event is marked by # in position 0 for line in f: if line[0] == '#': if 'event_text' not in locals(): event_text = {'header': line.rstrip(), 'picks': []} else: ph_catalog.append(_phase_to_event(event_text)) event_text = {'header': line.rstrip(), 'picks': []} else: event_text['picks'].append(line.rstrip()) ph_catalog.append(_phase_to_event(event_text)) f.close() return ph_catalog
def _phase_to_event(event_text): """ Function to convert the text for one event in hypoDD phase format to \ event object. :type event_text: dict :param event_text: dict of two elements, header and picks, header is a \ str, picks is a list of str. :returns: obspy.core.event.Event """ ph_event = Event() # Extract info from header line # YR, MO, DY, HR, MN, SC, LAT, LON, DEP, MAG, EH, EZ, RMS, ID header = event_text['header'].split() ph_event.origins.append(Origin()) ph_event.origins[0].time = \ UTCDateTime(year=int(header[1]), month=int(header[2]), day=int(header[3]), hour=int(header[4]), minute=int(header[5]), second=int(header[6].split('.')[0]), microsecond=int(float(('0.' + header[6].split('.')[1])) * 1000000)) ph_event.origins[0].latitude = float(header[7]) ph_event.origins[0].longitude = float(header[8]) ph_event.origins[0].depth = float(header[9]) * 1000 ph_event.origins[0].quality = OriginQuality( standard_error=float(header[13])) ph_event.magnitudes.append(Magnitude()) ph_event.magnitudes[0].mag = float(header[10]) ph_event.magnitudes[0].magnitude_type = 'M' # Extract arrival info from picks! for i, pick_line in enumerate(event_text['picks']): pick = pick_line.split() _waveform_id = WaveformStreamID(station_code=pick[0]) pick_time = ph_event.origins[0].time + float(pick[1]) ph_event.picks.append(Pick(waveform_id=_waveform_id, phase_hint=pick[3], time=pick_time)) ph_event.origins[0].arrivals.append(Arrival(phase=ph_event.picks[i], pick_id=ph_event.picks[i]. resource_id)) ph_event.origins[0].arrivals[i].time_weight = float(pick[2]) return ph_event # Event file functions def _hypodd_event_str(event, event_id): """ Make an event.dat style string for an event. :type event_id: int :param event_id: Must be an integer. """ assert isinstance(event_id, int) try: origin = event.preferred_origin() or event.origins[0] except (IndexError, AttributeError): Logger.error("No origin for event {0}".format(event.resource_id.id)) return try: magnitude = (event.preferred_magnitude() or event.magnitudes[0]).mag except (IndexError, AttributeError): Logger.warning("No magnitude") magnitude = 0.0 try: time_error = origin.quality['standard_error'] or 0.0 except (TypeError, AttributeError): Logger.warning('No time residual in header') time_error = 0.0 z_err = (origin.depth_errors.uncertainty or 0.0) / 1000. # Note that these should be in degrees, but GeoNet uses meters. x_err = (origin.longitude_errors.uncertainty or 0.0) / 1000. y_err = (origin.latitude_errors.uncertainty or 0.0) / 1000. x_err = max(x_err, y_err) event_str = ( "{year:4d}{month:02d}{day:02d} {hour:2d}{minute:02d}" "{seconds:02d}{microseconds:02d} {latitude:8.4f} {longitude:9.4f}" " {depth:8.4f} {magnitude:5.2f} {x_err:5.2f} {z_err:5.2f} " "{time_err:5.2f} {event_id:9d}".format( year=origin.time.year, month=origin.time.month, day=origin.time.day, hour=origin.time.hour, minute=origin.time.minute, seconds=origin.time.second, microseconds=round(origin.time.microsecond / 1e4), latitude=origin.latitude, longitude=origin.longitude, depth=origin.depth / 1000., magnitude=magnitude, x_err=x_err, z_err=z_err, time_err=time_error, event_id=event_id)) return event_str
[docs] def write_event(catalog, event_id_mapper=None): """ Write obspy.core.event.Catalog to a hypoDD format event.dat file. :type catalog: obspy.core.event.Catalog :param catalog: A catalog of obspy events. :type event_id_mapper: dict :param event_id_mapper: Dictionary mapping event resource id to an integer event id for hypoDD. If this is None, or missing events then the dictionary will be updated to include appropriate event-ids. This should be of the form {event.resource_id.id: integer_id} :returns: dictionary of event-id mappings. """ event_id_mapper = _generate_event_id_mapper( catalog=catalog, event_id_mapper=event_id_mapper) event_strings = [ _hypodd_event_str(event, event_id_mapper[event.resource_id.id]) for event in catalog] event_strings = "\n".join(event_strings) with open("event.dat", "w") as f: f.write(event_strings) return event_id_mapper
# Station.dat functions
[docs] def write_station(inventory, use_elevation=False, filename="station.dat"): """ Write a hypoDD formatted station file. :type inventory: obspy.core.Inventory :param inventory: Inventory of stations to write - should include channels if use_elevation=True to incorporate channel depths. :type use_elevation: bool :param use_elevation: Whether to write elevations (requires hypoDD >= 2) :type filename: str :param filename: File to write stations to. """ station_strings = [] formatter = "{sta:<7s} {lat:>9.5f} {lon:>10.5f}" if use_elevation: formatter = " ".join([formatter, "{elev:>5.0f}"]) for network in inventory: for station in network: parts = dict(sta=station.code, lat=station.latitude, lon=station.longitude) if use_elevation: channel_depths = {chan.depth for chan in station} if len(channel_depths) == 0: Logger.warning("No channels provided, using 0 depth.") depth = 0.0 else: depth = channel_depths.pop() if len(channel_depths) > 1: Logger.warning( f"Multiple depths for {station.code}, using {depth}") parts.update(dict(elev=station.elevation - depth)) station_strings.append(formatter.format(**parts)) with open(filename, "w") as f: f.write("\n".join(station_strings))
if __name__ == '__main__': import doctest doctest.testmod()