Source code for eqcorrscan.utils.findpeaks

"""Functions to find peaks in data above a certain threshold.

:copyright:
    EQcorrscan developers.

:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import ctypes
import random
import logging
import warnings

import numpy as np

from typing import List, Union

from multiprocessing import Pool, cpu_count
from concurrent.futures import ThreadPoolExecutor
from scipy import ndimage
from obspy import UTCDateTime
from obspy.core.event import Catalog, Event

from eqcorrscan.utils.correlate import pool_boy
from eqcorrscan.utils.libnames import _load_cdll
from eqcorrscan.utils.clustering import dist_mat_km


Logger = logging.getLogger(__name__)


def is_prime(number):
    """
    Function to test primality of a number. Function lifted from online
    resource:
        http://www.codeproject.com/Articles/691200/Primality-test-algorithms-Prime-test-The-fastest-w

    This function is distributed under a separate licence:
        This article, along with any associated source code and files, is \
        licensed under The Code Project Open License (CPOL)

    :type number: int
    :param number: Integer to test for primality

    :returns: bool

    >>> is_prime(4)
    False
    >>> is_prime(3)
    True
    """
    ''' if number != 1 '''
    if number > 1:
        ''' repeat the test few times '''
        for time in range(3):
            ''' Draw a RANDOM number in range of number ( Z_number )  '''
            randomNumber = random.randint(2, number - 1)
            ''' Test if a^(n-1) = 1 mod n '''
            if pow(randomNumber, number - 1, number) != 1:
                return False
        return True
    else:
        ''' case number == 1 '''
        return False


[docs] def find_peaks_compiled(arr, thresh, trig_int, full_peaks=False): """ Determine peaks in an array of data above a certain threshold. :type arr: numpy.ndarray :param arr: 1-D numpy array is required :type thresh: float :param thresh: The threshold below which will be considered noise and peaks will not be found in. :type trig_int: int :param trig_int: The minimum difference in samples between triggers, if multiple peaks within this window this code will find the highest. :type full_peaks: bool :param full_peaks: If True, will decluster within data-sections above the threshold, rather than just taking the peak within that section. This will take more time. This defaults to False for match_filter. :return: peaks: Lists of tuples of peak values and locations. :rtype: list """ if not np.any(np.abs(arr) > thresh): # Fast fail return [] if not full_peaks: peak_vals, peak_indices = _find_peaks_c(array=arr, threshold=thresh) else: peak_vals = arr peak_indices = np.arange(arr.shape[0]) peaks = [] if len(peak_vals) > 0: peaks = decluster( peaks=np.array(peak_vals), index=np.array(peak_indices), trig_int=trig_int + 1, threshold=thresh) peaks = sorted(peaks, key=lambda peak: peak[1], reverse=False) return peaks
[docs] def find_peaks2_short(arr, thresh, trig_int, full_peaks=False): """ Determine peaks in an array of data above a certain threshold. Uses a mask to remove data below threshold and finds peaks in what is left. :type arr: numpy.ndarray :param arr: 1-D numpy array is required :type thresh: float :param thresh: The threshold below which will be considered noise and peaks will not be found in. :type trig_int: int :param trig_int: The minimum difference in samples between triggers, if multiple peaks within this window this code will find the highest. :type full_peaks: bool :param full_peaks: If True will by decluster within data-sections above the threshold, rather than just taking the peak within that section. This will take more time. This defaults to False for match_filter. :return: peaks: Lists of tuples of peak values and locations. :rtype: list >>> import numpy as np >>> arr = np.random.randn(100) >>> threshold = 10 >>> arr[40] = 20 >>> arr[60] = 100 >>> find_peaks2_short(arr, threshold, 3) [(20.0, 40), (100.0, 60)] """ # Set everything below the threshold to zero image = np.copy(arr) Logger.debug("Threshold: {0}\tMax: {1}".format(thresh, max(image))) image[np.abs(image) < thresh] = 0 if len(image[np.abs(image) > thresh]) == 0: Logger.debug("No values over threshold {0}".format(thresh)) return [] if np.all(np.abs(arr) > thresh): Logger.debug("All values above threshold, running full peak finding") full_peaks = True Logger.debug('Found {0} samples above the threshold'.format( len(image[image > thresh]))) initial_peaks = [] # Find the peaks labeled_image, number_of_objects = ndimage.label(image) peak_slices = ndimage.find_objects(labeled_image) for peak_slice in peak_slices: window = arr[peak_slice[0].start: peak_slice[0].stop] if peak_slice[0].stop - peak_slice[0].start >= trig_int and full_peaks: window_peaks, window_peak_indexes = ([], []) for i in np.arange(peak_slice[0].start, peak_slice[0].stop): if i == peak_slice[0].start: prev_value = 0 else: prev_value = arr[i - 1] if i == peak_slice[0].stop - 1: next_value = 0 else: next_value = arr[i + 1] # Check for consistent sign - either both greater or # both smaller. if (next_value - arr[i]) * (prev_value - arr[i]) > 0: window_peaks.append(arr[i]) window_peak_indexes.append(i) peaks = decluster( peaks=np.array(window_peaks), trig_int=trig_int + 1, index=np.array(window_peak_indexes)) else: peaks = [(window[np.argmax(abs(window))], int(peak_slice[0].start + np.argmax(abs(window))))] initial_peaks.extend(peaks) peaks = decluster(peaks=np.array(list(zip(*initial_peaks))[0]), index=np.array(list(zip(*initial_peaks))[1]), trig_int=trig_int + 1) if initial_peaks: peaks = sorted(peaks, key=lambda time: time[1], reverse=False) # Convert peaks to python types peaks = [(p[0].item(), p[1].item()) for p in peaks] return peaks else: Logger.info('No peaks for you!') return []
[docs] def multi_find_peaks(arr, thresh, trig_int, parallel=True, full_peaks=False, cores=None, internal_func=find_peaks_compiled): """ Wrapper for find-peaks for multiple arrays. :type arr: numpy.ndarray :param arr: 2-D numpy array is required :type thresh: list :param thresh: The threshold below which will be considered noise and peaks will not be found in. One threshold per array. :type trig_int: int :param trig_int: The minimum difference in samples between triggers, if multiple peaks within this window this code will find the highest. :type parallel: bool :param parallel: Whether to compute in parallel or not - will use multiprocessing if not using the compiled internal_func :type full_peaks: bool :param full_peaks: See `eqcorrscan.utils.findpeaks.find_peaks2_short` :type cores: int :param cores: Maximum number of processes to spin up for parallel peak-finding :type internal_func: callable :param internal_func: Function to use for peak finding - defaults to the compiled version. :returns: List of list of tuples of (peak, index) in same order as input arrays """ peaks = [] if not parallel or cores == 1: for sub_arr, arr_thresh in zip(arr, thresh): peaks.append(internal_func( arr=sub_arr, thresh=arr_thresh, trig_int=trig_int, full_peaks=full_peaks)) else: if cores is None: cores = min(arr.shape[0], cpu_count()) if internal_func.__name__ != 'find_peaks_compiled': with pool_boy(Pool=Pool, traces=arr.shape[0], cores=cores) as pool: params = ((sub_arr, arr_thresh, trig_int, full_peaks) for sub_arr, arr_thresh in zip(arr, thresh)) results = [ pool.apply_async(internal_func, param) for param in params] peaks = [res.get() for res in results] else: to_run = ((arr[i], thresh[i], trig_int) for i in range(len(thresh))) with ThreadPoolExecutor(cores) as executor: results = executor.map( lambda args: find_peaks_compiled(*args), to_run) peaks = [r for r in results] return peaks
def decluster_distance_time(peaks, index, trig_int, catalog, hypocentral_separation, threshold=0, num_threads=None): """ Decluster based on time between peaks, and distance between events. Peaks, index and catalog must all be sorted the same way, e.g. peak[i] corresponds to index[i] and catalog[i]. Peaks that are within the time threshold of one-another, but correspond to events separated by more than the hypocentral_separation threshold will not be removed. :type peaks: np.array :param peaks: array of peak values :type index: np.ndarray :param index: locations of peaks :type trig_int: int :param trig_int: Minimum trigger interval in samples :type catalog: obspy.core.event.Catalog :param catalog: Catalog of events with origins to use to measure inter-event distances :type hypocentral_separation: float :param hypocentral_separation: Maximum inter-event distance to decluster over in km :type threshold: float :param threshold: Minimum absolute peak value to retain it :type num_threads: int :param num_threads: Number of threads to use for distance matrix calculation. :return: list of tuples of (value, sample) """ utilslib = _load_cdll("libutils") length = peaks.shape[0] trig_int = int(trig_int) for var in [index.max(), trig_int]: if var == ctypes.c_long(var).value: long_type = ctypes.c_long func = utilslib.decluster_dist_time elif var == ctypes.c_longlong(var).value: long_type = ctypes.c_longlong func = utilslib.decluster_dist_time_ll else: raise OverflowError("Maximum index larger than internal long long") func.argtypes = [ np.ctypeslib.ndpointer(dtype=np.float32, shape=(length,), flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=long_type, shape=(length,), flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=np.float32, shape=(length * length,), flags='C_CONTIGUOUS'), long_type, ctypes.c_float, long_type, ctypes.c_float, np.ctypeslib.ndpointer(dtype=np.uint32, shape=(length,), flags='C_CONTIGUOUS')] func.restype = ctypes.c_int sorted_inds = np.abs(peaks).argsort() # Sort everything in the same way. arr = peaks[sorted_inds[::-1]] inds = index[sorted_inds[::-1]] sorted_events = [catalog[i] for i in sorted_inds[::-1]] distance_matrix = dist_mat_km( catalog=sorted_events, num_threads=num_threads) arr = np.ascontiguousarray(arr, dtype=np.float32) inds = np.ascontiguousarray(inds, dtype=long_type) distance_matrix = np.ascontiguousarray( distance_matrix.flatten(order="C"), dtype=np.float32) out = np.zeros(len(arr), dtype=np.uint32) ret = func( arr, inds, distance_matrix, long_type(length), np.float32(threshold), long_type(trig_int), hypocentral_separation, out) if ret != 0: raise MemoryError("Issue with c-routine, returned %i" % ret) peaks_out = list(zip(arr[out.astype(bool)], inds[out.astype(bool)])) return peaks_out
[docs] def decluster(peaks, index, trig_int, threshold=0): """ Decluster peaks based on an enforced minimum separation. :type peaks: np.array :param peaks: array of peak values :type index: np.ndarray :param index: locations of peaks :type trig_int: int :param trig_int: Minimum trigger interval in samples :type threshold: float :param threshold: Minimum absolute peak value to retain it. :return: list of tuples of (value, sample) """ utilslib = _load_cdll("libutils") length = peaks.shape[0] trig_int = int(trig_int) for var in [index.max(), trig_int]: if var == ctypes.c_long(var).value: long_type = ctypes.c_long func = utilslib.decluster elif var == ctypes.c_longlong(var).value: long_type = ctypes.c_longlong func = utilslib.decluster_ll else: raise OverflowError("Maximum index larger than internal long long") func.argtypes = [ np.ctypeslib.ndpointer(dtype=np.float32, shape=(length,), flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=long_type, shape=(length,), flags='C_CONTIGUOUS'), long_type, ctypes.c_float, long_type, np.ctypeslib.ndpointer(dtype=np.uint32, shape=(length,), flags='C_CONTIGUOUS')] func.restype = ctypes.c_int sorted_inds = np.abs(peaks).argsort() arr = peaks[sorted_inds[::-1]] inds = index[sorted_inds[::-1]] arr = np.ascontiguousarray(arr, dtype=np.float32) inds = np.ascontiguousarray(inds, dtype=long_type) out = np.zeros(len(arr), dtype=np.uint32) ret = func( arr, inds, long_type(length), np.float32(threshold), long_type(trig_int), out) if ret != 0: raise MemoryError("Issue with c-routine, returned %i" % ret) peaks_out = list(zip(arr[out.astype(bool)], inds[out.astype(bool)])) return peaks_out
def _find_peaks_c(array, threshold): """ Use a C func to find peaks in the array. """ utilslib = _load_cdll("libutils") length = array.shape[0] utilslib.find_peaks.argtypes = [ np.ctypeslib.ndpointer(dtype=np.float32, shape=(length, ), flags='C_CONTIGUOUS'), ctypes.c_long, ctypes.c_float, np.ctypeslib.ndpointer(dtype=np.uint32, shape=(length, ), flags='C_CONTIGUOUS')] utilslib.find_peaks.restype = ctypes.c_int arr = np.ascontiguousarray(array, np.float32) out = np.ascontiguousarray(np.zeros((length, ), dtype=np.uint32)) ret = utilslib.find_peaks(arr, ctypes.c_long(length), threshold, out) if ret != 0: raise MemoryError("Internal error") peaks_locations = np.nonzero(out) return array[peaks_locations], peaks_locations[0]
[docs] def coin_trig(peaks, stachans, samp_rate, moveout, min_trig, trig_int): """ Find network coincidence triggers within peaks of detection statistics. Useful for finding network detections from sets of detections on individual stations. :type peaks: list :param peaks: List of lists of tuples of (peak, index) for each \ station-channel. Index should be in samples. :type stachans: list :param stachans: List of tuples of (station, channel) in the order of \ peaks. :type samp_rate: float :param samp_rate: Sampling rate in Hz :type moveout: float :param moveout: Allowable network moveout in seconds. :type min_trig: int :param min_trig: Minimum station-channels required to declare a trigger. :type trig_int: float :param trig_int: Minimum allowable time between network triggers in seconds. :return: List of tuples of (peak, index), for the earliest detected station. :rtype: list >>> peaks = [[(0.5, 100), (0.3, 800)], [(0.4, 120), (0.7, 850)]] >>> triggers = coin_trig(peaks, [('a', 'Z'), ('b', 'Z')], 10, 3, 2, 1) >>> print(triggers) [(0.45, 100)] """ triggers = [] for stachan, _peaks in zip(stachans, peaks): for peak in _peaks: trigger = (peak[1], peak[0], '.'.join(stachan)) triggers.append(trigger) coincidence_triggers = [] for i, master in enumerate(triggers): slaves = triggers[i + 1:] coincidence = 1 trig_time = master[0] trig_val = master[1] for slave in slaves: if abs(slave[0] - master[0]) <= (moveout * samp_rate) and \ slave[2] != master[2]: coincidence += 1 if slave[0] < master[0]: trig_time = slave[0] trig_val += slave[1] if coincidence >= min_trig: coincidence_triggers.append((trig_val / coincidence, trig_time)) # Sort by trigger-value, largest to smallest - remove duplicate detections if coincidence_triggers: coincidence_triggers.sort(key=lambda tup: tup[0], reverse=True) output = [coincidence_triggers[0]] for coincidence_trigger in coincidence_triggers[1:]: add = True for peak in output: # If the event occurs within the trig_int time then do not add # it, and break out of the inner loop. if abs(coincidence_trigger[1] - peak[1]) < (trig_int * samp_rate): add = False break if add: output.append((coincidence_trigger[0], coincidence_trigger[1])) output.sort(key=lambda tup: tup[1]) return output else: return []
# ##################### Declustering based on pick-time ####################### def _pick_time( event: Event, station: str, phase_hint: str, zero_time: UTCDateTime ) -> float: """ Get the average pick time for a station for an event, relative to a reference time. :param event: Event to read picks from :station: Station to get picks for :phase_hint: Phase hint to get picks for :zero_time: Reference time to compare to :returns: Pick time relative to zero-time in seconds. """ picks = [p for p in event.picks if p.waveform_id.station_code == station and p.phase_hint == phase_hint] if len(picks) == 0: return np.nan pick_times = [p.time - zero_time for p in picks] if len(pick_times) == 1: return pick_times[0] return sum(pick_times) / len(pick_times) def average_pick_time_diff_matrix( catalog: Union[Catalog, List[Event]], compiled: bool = True ) -> np.ndarray: """ Compute the average pick time difference matrix for events vs events :param catalog: Catalog of events to compare pick times for :compiled: Whether to use internal compiled code (faster) or not. :returns: Pick time difference matrix - square. """ # Make arrays of pick times for each phase and each station sta_phases = list( {f"{p.waveform_id.station_code}_{p.phase_hint}" for ev in catalog for p in ev.picks if p.phase_hint in "PS"}) # Using a list for sta_phases to retain order pick_arrays = dict() zero_time = (catalog[0].preferred_origin() or catalog[0].origins[-1]).time # This bit is not very fast, but gets the arrays in C shape for sta_ph in sta_phases: pick_arrays.update({ sta_ph: np.array( [_pick_time(ev, *sta_ph.split('_'), zero_time=zero_time) for ev in catalog])}) if compiled: return _compute_matrix_c( pick_arrays=pick_arrays, n_events=len(catalog)) else: return _compute_matrix( pick_arrays=pick_arrays, n_events=len(catalog)) def _compute_matrix(pick_arrays: dict, n_events: int) -> np.ndarray: out = np.zeros((n_events, n_events)) for i in range(n_events): # Matrix is symmetric, so we can just do upper or lower half. for j in range(i + 1, n_events): # We get occasional RuntimeWarnings about mean of empty slice # when no stations are shared. nan is returned for this case. with warnings.catch_warnings(): warnings.simplefilter("ignore") pick_delta = np.nanmean( [pick_arrays[sta_ph][i] - pick_arrays[sta_ph][j] for sta_ph in pick_arrays.keys()]) out[i, j] = pick_delta out -= out.T return np.nan_to_num(out, nan=np.inf) def _compute_matrix_c(pick_arrays: dict, n_events: int) -> np.ndarray: out = np.ascontiguousarray( np.zeros(n_events * n_events), np.float64) pick_array = np.ascontiguousarray( np.concatenate([value for value in pick_arrays.values()]), np.float64) utilslib = _load_cdll("libutils") utilslib.average_pick_time_diff_matrix.argtypes = [ ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, shape=(len(pick_array), ), flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=np.float64, shape=(len(out), ), flags='C_CONTIGUOUS')] utilslib.average_pick_time_diff_matrix.restype = ctypes.c_int ret = utilslib.average_pick_time_diff_matrix( ctypes.c_int(len(pick_arrays.keys())), ctypes.c_int(n_events), pick_array, out) if ret != 0: raise Exception("Error in internal C code.") out = out.reshape(n_events, n_events) out -= out.T return np.nan_to_num(out, nan=np.inf) def get_det_val(event: Event) -> Union[float, None]: """ Get the detection value for an event. Prefers sum of pick correlations. :param event: Event to get detection value for. :returns: Detection value. """ det_val = get_comment_val(value_name="detect_val", event=event) pick_sum = 0 for pick in event.picks: pick_val = get_comment_val("cc_max", event=pick) if pick_val: pick_sum += pick_val if det_val: if pick_sum > det_val: return pick_sum else: return det_val elif pick_sum: return pick_sum return None def get_comment_val(value_name: str, event: Event) -> Union[float, None]: """ Get value from EQcorrscan-style comment. :param value_name: Comment string before value :param event: Event to search comments for :returns: Comment value. """ value = None for comment in event.comments: if value_name in comment.text: if "=" in comment.text: # Should be a number try: value = float(comment.text.split('=')[-1]) except ValueError: # Leave as a string break except Exception as e: Logger.exception( f"Could not get {value_name} {comment.text} " f"due to {e}") else: break elif ":" in comment.text: # Should be a string try: value = comment.text.split(": ")[-1] except Exception as e: Logger.exception( f"Could not get {value_name} from {comment.text} " f"due to {e}") else: break return value def decluster_pick_times( catalog: Union[Catalog, List[Event]], trig_int: float ) -> Catalog: """ Decluster a catalog based on average pick-time similarity. If two events have (on average) pick-times within trig_int seconds of each other, then the event with the highest detection value, or number of picks (if detection values are not found) will be retained and the other event discarded. Useful for after lag-calc to remove events that have different template locations, but stem from similar pick-times. :param catalog: Catalog of events to decluster :trig_int: Minimum inter-event time in seconds :returns: Declustered catalog. """ detect_vals = np.array([abs(get_det_val(ev) or len(ev.picks)) for ev in catalog]) # Sort catalog in order of detect_vals - from largest to smallest order = np.argsort(detect_vals)[::-1] # detect_vals = detect_vals[order] if isinstance(catalog, Catalog): events = catalog.events else: events = catalog events = [events[ind] for ind in order] # This is obviously very slow at the moment pick_matrix = average_pick_time_diff_matrix(catalog=events) # We only care about the absolute value (+/- trig_int) pick_matrix = np.abs(pick_matrix) drop_events = np.zeros(len(events), dtype=bool) # Loop through from highest detection value to lowest - this way we keep # the best detections for i, ev in enumerate(events): # If we are dropping this event then carry on if drop_events[i]: continue # Find any events within trig int and set their drop status to True drop_mask = pick_matrix[i] <= trig_int # Make sure we retain the "core" event in this loop drop_mask[i] = False drop_events[drop_mask] = True # _dropped = [ev for j, ev in _dropped(events) if drop_mask[j]] # Get the events that we want to keep! return Catalog([ev for i, ev in enumerate(events) if not drop_events[i]]) if __name__ == "__main__": import doctest doctest.testmod()