Source code for eqcorrscan.utils.correlate

"""
Correlation functions for multi-channel cross-correlation of seismic data.

Various routines used mostly for testing, including links to a compiled
routine using FFTW, a Numpy fft routine which uses bottleneck for normalisation
and a compiled time-domain routine. These have varying levels of efficiency,
both in terms of overall speed, and in memory usage.  The time-domain is the
most memory efficient but slowest routine (although fast for small cases of
less than a few hundred correlations), the Numpy routine is fast, but memory
inefficient due to a need to store large double-precision arrays for
normalisation.  The fftw compiled routine is faster and more memory efficient
than the Numpy routine.

:copyright:
    EQcorrscan developers.

:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import contextlib
import copy
import ctypes
import os
import logging
from multiprocessing import Pool as ProcessPool, cpu_count
from multiprocessing.pool import ThreadPool

import numpy as np
import math

from packaging import version
from timeit import default_timer

from obspy import UTCDateTime

from eqcorrscan.utils.libnames import _load_cdll
from eqcorrscan.utils import FMF_INSTALLED
from eqcorrscan.utils.pre_processing import _stream_quick_select


Logger = logging.getLogger(__name__)

# This is for building docs on readthedocs, which has an old version of
# scipy - without this, this module cannot be imported, which breaks the docs
# Instead we define a dummy function that returns what it is given.
READ_THE_DOCS = os.environ.get('READTHEDOCS', None) == 'True'
if not READ_THE_DOCS:
    from scipy.fftpack.helper import next_fast_len
else:
    def next_fast_len(a):
        return a

XCOR_FUNCS = {}  # cache of functions for doing cross correlations

# methods added to each xcorr func registered
# these implement the stream interface
XCORR_STREAM_METHODS = ('multithread', 'multiprocess', 'concurrent',
                        'stream_xcorr')

# these implement the array interface
XCOR_ARRAY_METHODS = ('array_xcorr')

# Gain shift for low-variance stabilization
MULTIPLIER = 1e8

# Minimum version for compatible correlations from Fast Matched Filter
MIN_FMF_VERSION = version.parse("1.4.0")


class CorrelationError(Exception):
    """ Error handling for correlation functions. """

    def __init__(self, value):
        """ Raise error.

        .. rubric:: Example

        >>> CorrelationError("Something happened")
        Something happened
        """
        self.value = value

    def __repr__(self):
        """ Print error value.

        .. rubric:: Example

        >>> print(CorrelationError("Error").__repr__())
        Error
        """
        return self.value

    def __str__(self):
        """ Print otherwise

        .. rubric:: Example

        >>> print(CorrelationError("Error"))
        Error
        """
        return self.value


# ------------------ Context manager for switching out default
class _Context:
    """ class for permanently or temporarily changing items in a dict """

    def __init__(self, cache, value_to_switch):
        """
        :type cache: dict
        :param cache: A dict to store values in
        :type value_to_switch: str
        :param value_to_switch:
            The key in cache to switch based on different contexts
        """
        self.cache = cache
        self.value_to_switch = value_to_switch
        self.previous_value = None

    def __call__(self, new_value, *args, **kwargs):
        """ # TODO change docs if this ever becomes general use
        Set a new value for the default xcorr function.

        This function can be called directly to permanently change the
        default normxcorr function or it may be used as a context manager
        to only modify it temporarily.

        :param new_value:
        :return:
        """
        self.previous_value = copy.deepcopy(
            self.cache.get(self.value_to_switch))
        self.cache[self.value_to_switch] = get_array_xcorr(new_value)
        return self

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.revert()

    def __repr__(self):
        """ this hides the fact _Context instance are returned after calls """
        name = self.cache[self.value_to_switch].__str__()
        if hasattr(self.cache[self.value_to_switch], '__name__'):
            name = self.cache[self.value_to_switch].__name__
        out_str = ("%s changed to %s" % (self.value_to_switch, name))
        return out_str

    def revert(self):
        """ revert the default xcorr function to previous value """
        # Have to use the previous value as this may contain some custom
        # stream_xcorr functions
        self.cache[self.value_to_switch] = self.previous_value


set_xcorr = _Context(XCOR_FUNCS, 'default')


# ---------------------- generic concurrency functions

@contextlib.contextmanager
def pool_boy(Pool, traces, **kwargs):
    """
    A context manager for handling the setup and cleanup of a pool object.

    :param Pool: any Class (not instance) that implements the multiprocessing
        Pool interface
    :param traces: The number of traces to process
    :type traces: int
    """
    # All parallel processing happens on a per-trace basis, we shouldn't create
    # more workers than there are traces
    n_cores = kwargs.get('cores', cpu_count())
    if n_cores is None:
        n_cores = cpu_count()
    if n_cores > traces:
        n_cores = traces
    pool = Pool(n_cores)
    yield pool
    pool.close()
    pool.join()


def _pool_normxcorr(templates, stream, stack, pool, func, *args, **kwargs):
    chans = [[] for _i in range(len(templates))]
    array_dict_tuple = _get_array_dicts(templates, stream, stack=stack)
    stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple
    # get parameter iterator
    params = ((template_dict[sid], stream_dict[sid], pad_dict[sid])
              for sid in seed_ids)
    # get cc results and used chans into their own lists
    results = [pool.apply_async(func, param) for param in params]
    try:
        xcorrs, tr_chans = zip(*(res.get() for res in results))
    except KeyboardInterrupt as e:  # pragma: no cover
        pool.terminate()
        raise e
    if stack:
        cccsums = np.sum(xcorrs, axis=0)
    else:
        cccsums = np.asarray(xcorrs).swapaxes(0, 1)
    no_chans = np.sum(np.array(tr_chans).astype(int), axis=0)
    for seed_id, tr_chan in zip(seed_ids, tr_chans):
        for chan, state in zip(chans, tr_chan):
            if state:
                chan.append(seed_id)
    if stack:
        cccsums = _zero_invalid_correlation_sums(cccsums, pad_dict, chans)
    chans = [[(seed_id.split('.')[1], seed_id.split('.')[-1].split('_')[0])
              for seed_id in _chans] for _chans in chans]
    return cccsums, no_chans, chans


def _general_multithread(func):
    """ return the general multithreading function using func """

    def multithread(templates, stream, stack=True, *args, **kwargs):
        with pool_boy(ThreadPool, len(stream), **kwargs) as pool:
            return _pool_normxcorr(
                templates, stream, stack=stack, pool=pool, func=func)

    return multithread


def _general_multiprocess(func):
    def multiproc(templates, stream, stack=True, *args, **kwargs):
        with pool_boy(ProcessPool, len(stream), **kwargs) as pool:
            return _pool_normxcorr(
                templates, stream, stack=stack, pool=pool, func=func)

    return multiproc


def _general_serial(func):
    def stream_xcorr(templates, stream, stack=True, *args, **kwargs):
        no_chans = np.zeros(len(templates))
        chans = [[] for _ in range(len(templates))]
        array_dict_tuple = _get_array_dicts(templates, stream, stack=stack)
        stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple
        if stack:
            cccsums = np.zeros([len(templates),
                                len(stream[0]) - len(templates[0][0]) + 1])
        else:
            cccsums = np.zeros([len(templates), len(seed_ids),
                                len(stream[0]) - len(templates[0][0]) + 1])
        for chan_no, seed_id in enumerate(seed_ids):
            tr_cc, tr_chans = func(template_dict[seed_id],
                                   stream_dict[seed_id],
                                   pad_dict[seed_id])
            if stack:
                cccsums = np.sum([cccsums, tr_cc], axis=0)
            else:
                cccsums[:, chan_no] = tr_cc
            no_chans += tr_chans.astype(int)
            for chan, state in zip(chans, tr_chans):
                if state:
                    chan.append(seed_id)
        if stack:
            cccsums = _zero_invalid_correlation_sums(cccsums, pad_dict, chans)
        chans = [[(seed_id.split('.')[1], seed_id.split('.')[-1].split('_')[0])
                  for seed_id in _chans] for _chans in chans]
        return cccsums, no_chans, chans

    return stream_xcorr


[docs] def register_array_xcorr(name, func=None, is_default=False): """ Decorator for registering correlation functions. Each function must have the same interface as numpy_normxcorr, which is *f(templates, stream, pads, *args, **kwargs)* any number of specific kwargs can be used. Register_normxcorr can be used as a decorator (with or without arguments) or as a callable. :param name: The name of the function for quick access, or the callable that will be wrapped when used as a decorator. :type name: str, callable :param func: The function to register :type func: callable, optional :param is_default: True if this function should be marked as default normxcorr :type is_default: bool :return: callable """ valid_methods = set(list(XCOR_ARRAY_METHODS) + list(XCORR_STREAM_METHODS)) cache = {} def register(register_str): """ Register a function as an implementation. :param register_str: The registration designation :type register_str: str """ if register_str not in valid_methods: msg = 'register_name must be in %s' % valid_methods raise ValueError(msg) def _register(func): cache[register_str] = func setattr(cache['func'], register_str, func) return func return _register def wrapper(func, func_name=None): # register the functions in the XCOR fname = func_name or name.__name__ if callable(name) else str(name) XCOR_FUNCS[fname] = func # if is_default: # set function as default # XCOR_FUNCS['default'] = func # attach some attrs, this is a bit of a hack to avoid pickle problems func.register = register cache['func'] = func func.multithread = _general_multithread(func) func.multiprocess = _general_multiprocess(func) func.concurrent = _general_multithread(func) func.stream_xcorr = _general_serial(func) func.array_xcorr = func func.registered = True if is_default: # set function as default XCOR_FUNCS['default'] = copy.deepcopy(func) return func # used as a decorator if callable(name): return wrapper(name) # used as a normal function (called and passed a function) if callable(func): return wrapper(func, func_name=name) # called, then used as a decorator return wrapper
def _zero_invalid_correlation_sums(cccsums, pad_dict, used_seed_ids): """ Zero the end portion of the correlation sum that does not include the full set of channels. :param cccsums: 2D numpy array :type cccsums: np.ndarray :param pad_dict: pad_dict from _get_array_dicts :type pad_dict: dict :param used_seed_ids: The SEED IDs actually used in correlation :type used_seed_ids: list of list of str :return: Valid correlation stack - end will be zero-ed up to maxmimum moveout :rtype: np.ndarray """ # TODO: This is potentially quite a slow way to do this. for i, cccsum in enumerate(cccsums): moveouts = [value[i] for key, value in pad_dict.items() if key in used_seed_ids[i]] max_moveout = 0 if len(moveouts): max_moveout = max(moveouts) if max_moveout: cccsum[-max_moveout:] = 0.0 return cccsums # ------------------ array_xcorr fetching functions def _get_registerd_func(name_or_func): """ get a xcorr function from a str or callable. """ # get the function or register callable if callable(name_or_func): func = register_array_xcorr(name_or_func) else: func = XCOR_FUNCS[name_or_func or 'default'] assert callable(func), 'func is not callable' # ensure func has the added methods if not hasattr(func, 'registered'): func = register_array_xcorr(func) return func
[docs] def get_array_xcorr(name_or_func=None): """ Get an normalized cross correlation function that takes arrays as inputs. See :func:`eqcorrscan.utils.correlate.array_normxcorr` for expected function signature. :param name_or_func: Either a name of a registered xcorr function or a callable that implements the standard array_normxcorr signature. :type name_or_func: str or callable :return: callable wth array_normxcorr interface see also :func:`eqcorrscan.utils.correlate.get_stream_xcorr` """ func = _get_registerd_func(name_or_func) return func.array_xcorr
# ----------------------- registered array_xcorr functions
[docs] @register_array_xcorr('numpy') def numpy_normxcorr(templates, stream, pads, *args, **kwargs): """ Compute the normalized cross-correlation using numpy and bottleneck. :param templates: 2D Array of templates :type templates: np.ndarray :param stream: 1D array of continuous data :type stream: np.ndarray :param pads: List of ints of pad lengths in the same order as templates :type pads: list :return: np.ndarray of cross-correlations :return: np.ndarray channels used """ import bottleneck # Generate a template mask used_chans = ~np.isnan(templates).any(axis=1) # Currently have to use float64 as bottleneck runs into issues with other # types: https://github.com/kwgoodman/bottleneck/issues/164 stream = stream.astype(np.float64) templates = templates.astype(np.float64) template_length = templates.shape[1] stream_length = len(stream) assert stream_length > template_length, "Template must be shorter than " \ "stream" fftshape = next_fast_len(template_length + stream_length - 1) # Set up normalizers stream_mean_array = bottleneck.move_mean( stream, template_length)[template_length - 1:] stream_std_array = bottleneck.move_std( stream, template_length)[template_length - 1:] # because stream_std_array is in denominator or res, nan all 0s stream_std_array[stream_std_array == 0] = np.nan # Normalize and flip the templates norm = ((templates - templates.mean(axis=-1, keepdims=True)) / ( templates.std(axis=-1, keepdims=True) * template_length)) norm_sum = norm.sum(axis=-1, keepdims=True) stream_fft = np.fft.rfft(stream, fftshape) template_fft = np.fft.rfft(np.flip(norm, axis=-1), fftshape, axis=-1) res = np.fft.irfft(template_fft * stream_fft, fftshape)[:, 0:template_length + stream_length - 1] res = ((_centered(res, (templates.shape[0], stream_length - template_length + 1))) - norm_sum * stream_mean_array) / stream_std_array res[np.isnan(res)] = 0.0 for i, pad in enumerate(pads): res[i] = np.append(res[i], np.zeros(pad))[pad:] return res.astype(np.float32), used_chans
def _centered(arr, newshape): """ Hack of scipy.signaltools._centered """ newshape = np.asarray(newshape) currshape = np.array(arr.shape) startind = (currshape - newshape) // 2 endind = startind + newshape myslice = [slice(startind[k], endind[k]) for k in range(len(endind))] return arr[tuple(myslice)]
[docs] @register_array_xcorr('time_domain') def time_multi_normxcorr(templates, stream, pads, threaded=False, *args, **kwargs): """ Compute cross-correlations in the time-domain using C routine. :param templates: 2D Array of templates :type templates: np.ndarray :param stream: 1D array of continuous data :type stream: np.ndarray :param pads: List of ints of pad lengths in the same order as templates :type pads: list :param threaded: Whether to use the threaded routine or not :type threaded: bool :return: np.ndarray of cross-correlations :return: np.ndarray channels used """ used_chans = ~np.isnan(templates).any(axis=1) utilslib = _load_cdll('libutils') argtypes = [ np.ctypeslib.ndpointer(dtype=np.float32, ndim=1, flags='C_CONTIGUOUS'), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float32, ndim=1, flags='C_CONTIGUOUS'), ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float32, ndim=1, flags='C_CONTIGUOUS')] restype = ctypes.c_int if threaded: func = utilslib.multi_normxcorr_time_threaded argtypes.append(ctypes.c_int) else: func = utilslib.multi_normxcorr_time func.argtypes = argtypes func.restype = restype # Need to de-mean everything templates_means = templates.mean(axis=1).astype(np.float32)[:, np.newaxis] stream_mean = stream.mean().astype(np.float32) templates = templates.astype(np.float32) - templates_means stream = stream.astype(np.float32) - stream_mean template_len = templates.shape[1] n_templates = templates.shape[0] image_len = stream.shape[0] ccc_length = image_len - template_len + 1 assert ccc_length > 0, "Template must be shorter than stream" ccc = np.ascontiguousarray( np.empty(ccc_length * n_templates), np.float32) t_array = np.ascontiguousarray(templates.flatten(), np.float32) time_args = [t_array, template_len, n_templates, np.ascontiguousarray(stream, np.float32), image_len, ccc] if threaded: time_args.append(kwargs.get('cores', cpu_count())) func(*time_args) ccc[np.isnan(ccc)] = 0.0 ccc = ccc.reshape((n_templates, image_len - template_len + 1)) for i in range(len(pads)): ccc[i] = np.append(ccc[i], np.zeros(pads[i]))[pads[i]:] templates += templates_means stream += stream_mean return ccc, used_chans
[docs] @register_array_xcorr('fftw', is_default=True) def fftw_normxcorr(templates, stream, pads, threaded=False, *args, **kwargs): """ Normalised cross-correlation using the fftw library. Internally this function used double precision numbers, which is definitely required for seismic data. Cross-correlations are computed as the inverse fft of the dot product of the ffts of the stream and the reversed, normalised, templates. The cross-correlation is then normalised using the running mean and standard deviation (not using the N-1 correction) of the stream and the sums of the normalised templates. This python function wraps the C-library written by C. Chamberlain for this purpose. :param templates: 2D Array of templates :type templates: np.ndarray :param stream: 1D array of continuous data :type stream: np.ndarray :param pads: List of ints of pad lengths in the same order as templates :type pads: list :param threaded: Whether to use the threaded routine or not - note openMP and python multiprocessing don't seem to play nice for this. :type threaded: bool :return: np.ndarray of cross-correlations :return: np.ndarray channels used """ utilslib = _load_cdll('libutils') argtypes = [ np.ctypeslib.ndpointer(dtype=np.float32, ndim=1, flags='C_CONTIGUOUS'), ctypes.c_long, ctypes.c_long, np.ctypeslib.ndpointer(dtype=np.float32, ndim=1, flags='C_CONTIGUOUS'), ctypes.c_long, np.ctypeslib.ndpointer(dtype=np.float32, flags='C_CONTIGUOUS'), ctypes.c_long, np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS')] restype = ctypes.c_int if threaded: func = utilslib.normxcorr_fftw_threaded else: func = utilslib.normxcorr_fftw func.argtypes = argtypes func.restype = restype # Generate a template mask used_chans = ~np.isnan(templates).any(axis=1) template_length = templates.shape[1] stream_length = len(stream) n_templates = templates.shape[0] fftshape = kwargs.get("fft_len") if fftshape is None: # In testing, 2**13 consistently comes out fastest - setting to # default. https://github.com/eqcorrscan/EQcorrscan/pull/285 fftshape = min( 2 ** 13, next_fast_len(template_length + stream_length - 1)) if fftshape < template_length: Logger.warning( "FFT length of {0} is shorter than the template, setting to " "{1}".format( fftshape, next_fast_len(template_length + stream_length - 1))) fftshape = next_fast_len(template_length + stream_length - 1) # Normalize and flip the templates norm = ((templates - templates.mean(axis=-1, keepdims=True)) / ( templates.std(axis=-1, keepdims=True) * template_length)) norm = np.nan_to_num(norm) ccc_length = stream_length - template_length + 1 assert ccc_length > 0, "Template must be shorter than stream" ccc = np.zeros((n_templates, ccc_length), np.float32) used_chans_np = np.ascontiguousarray(used_chans, dtype=np.intc) pads_np = np.ascontiguousarray(pads, dtype=np.intc) variance_warning = np.ascontiguousarray([0], dtype=np.intc) missed_corr = np.ascontiguousarray([0], dtype=np.intc) # Check that stream is non-zero and above variance threshold if not np.all(stream == 0) and np.var(stream) < 1e-8: # Apply gain stream *= MULTIPLIER Logger.warning("Low variance found for, applying gain " "to stabilise correlations") multiplier = MULTIPLIER else: multiplier = 1 ret = func( np.ascontiguousarray(norm.flatten(order='C'), np.float32), template_length, n_templates, np.ascontiguousarray(stream, np.float32), stream_length, np.ascontiguousarray(ccc, np.float32), fftshape, used_chans_np, pads_np, variance_warning, missed_corr) if ret < 0: raise MemoryError() elif ret > 0: Logger.critical('Error in C code (possible normalisation error)') Logger.critical('Maximum ccc %f at %i' % (ccc.max(), ccc.argmax())) Logger.critical('Minimum ccc %f at %i' % (ccc.min(), ccc.argmin())) Logger.critical('Recommend checking your data for spikes, clipping ' 'or artefacts') raise CorrelationError("Internal correlation error") if missed_corr[0]: Logger.warning( "{0} correlations not computed, are there gaps in the " "data? If not, consider increasing gain".format( missed_corr[0])) if variance_warning[0] and variance_warning[0] > template_length: Logger.warning( "Low variance found in {0} positions, check result.".format( variance_warning[0])) # Remove variance correction stream /= multiplier return ccc, used_chans
# The time-domain routine can be sped up massively on large machines (many # threads) using the openMP threaded functions. @time_multi_normxcorr.register('concurrent') def _time_threaded_normxcorr(templates, stream, stack=True, *args, **kwargs): """ Use the threaded time-domain routine for concurrency :type templates: list :param templates: A list of templates, where each one should be an obspy.Stream object containing multiple traces of seismic data and the relevant header information. :type stream: obspy.core.stream.Stream :param stream: A single Stream object to be correlated with the templates. :returns: New list of :class:`numpy.ndarray` objects. These will contain the correlation sums for each template for this day of data. :rtype: list :returns: list of ints as number of channels used for each cross-correlation. :rtype: list :returns: list of list of tuples of station, channel for all cross-correlations. :rtype: list """ no_chans = np.zeros(len(templates)) chans = [[] for _ in range(len(templates))] array_dict_tuple = _get_array_dicts(templates, stream, stack=stack) stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple ccc_length = max( len(stream[0]) - len(templates[0][0]) + 1, len(templates[0][0]) - len(stream[0]) + 1) if stack: cccsums = np.zeros([len(templates), ccc_length]) else: cccsums = np.zeros([len(templates), len(seed_ids), ccc_length]) for chan_no, seed_id in enumerate(seed_ids): tr_cc, tr_chans = time_multi_normxcorr( template_dict[seed_id], stream_dict[seed_id], pad_dict[seed_id], True) if stack: cccsums = np.sum([cccsums, tr_cc], axis=0) else: cccsums[:, chan_no] = tr_cc no_chans += tr_chans.astype(int) for chan, state in zip(chans, tr_chans): if state: chan.append(seed_id) if stack: cccsums = _zero_invalid_correlation_sums(cccsums, pad_dict, chans) chans = [[(seed_id.split('.')[1], seed_id.split('.')[-1].split('_')[0]) for seed_id in _chans] for _chans in chans] return cccsums, no_chans, chans @fftw_normxcorr.register('stream_xcorr') @fftw_normxcorr.register('multithread') @fftw_normxcorr.register('concurrent') def _fftw_stream_xcorr(templates, stream, stack=True, *args, **kwargs): """ Apply fftw normxcorr routine concurrently. :type templates: list :param templates: A list of templates, where each one should be an obspy.Stream object containing multiple traces of seismic data and the relevant header information. :type stream: obspy.core.stream.Stream :param stream: A single Stream object to be correlated with the templates. :returns: New list of :class:`numpy.ndarray` objects. These will contain the correlation sums for each template for this day of data. :rtype: list :returns: list of ints as number of channels used for each cross-correlation. :rtype: list :returns: list of list of tuples of station, channel for all cross-correlations. :rtype: list """ array_dict_tuple = _get_array_dicts(templates, stream, stack=stack) stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple assert set(seed_ids) # number of threads: # default to using inner threads # if `cores` or `cores_outer` passed in then use that # else if OMP_NUM_THREADS set use that # otherwise use all available inner_kwargs = copy.copy(kwargs) num_cores_inner, num_cores_outer = _set_inner_outer_threading( inner_kwargs.pop('cores', None), inner_kwargs.pop("cores_outer", None), len(stream)) chans = [[] for _i in range(len(templates))] cccsums, tr_chans = fftw_multi_normxcorr( template_array=template_dict, stream_array=stream_dict, pad_array=pad_dict, seed_ids=seed_ids, cores_inner=num_cores_inner, cores_outer=num_cores_outer, stack=stack, *args, **inner_kwargs) no_chans = np.sum(np.array(tr_chans).astype(int), axis=0) for seed_id, tr_chan in zip(seed_ids, tr_chans): for chan, state in zip(chans, tr_chan): if state: chan.append(seed_id) if stack: cccsums = _zero_invalid_correlation_sums(cccsums, pad_dict, chans) chans = [[(seed_id.split('.')[1], seed_id.split('.')[-1].split('_')[0]) for seed_id in _chans] for _chans in chans] return cccsums, no_chans, chans def _set_inner_outer_threading(num_cores_inner, num_cores_outer, n_chans): max_threads = int(os.getenv("OMP_NUM_THREADS", cpu_count())) if num_cores_inner is None: num_cores_inner = max_threads num_cores_outer = num_cores_outer or 1 if num_cores_outer > 1: if num_cores_outer > n_chans: Logger.info( "More outer cores than channels, setting to {0}".format( n_chans)) num_cores_outer = n_chans if num_cores_outer * num_cores_inner > max_threads: Logger.info("More threads requested than exist, falling back to " "outer-loop parallelism") num_cores_outer = min(max_threads, num_cores_outer) if 2 * num_cores_outer < max_threads: num_cores_inner = max_threads // num_cores_outer else: num_cores_inner = 1 return num_cores_inner, num_cores_outer
[docs] def fftw_multi_normxcorr( template_array, stream_array, pad_array, seed_ids, cores_inner, cores_outer, stack=True, *args, **kwargs ): """ Use a C loop rather than a Python loop - in some cases this will be fast. :type template_array: dict :param template_array: :type stream_array: dict :param stream_array: :type pad_array: dict :param pad_array: :type seed_ids: list :param seed_ids: rtype: np.ndarray, list :return: 3D Array of cross-correlations and list of used channels. """ utilslib = _load_cdll('libutils') utilslib.multi_normxcorr_fftw.argtypes = [ np.ctypeslib.ndpointer(dtype=np.float32, flags='C_CONTIGUOUS'), ctypes.c_long, ctypes.c_long, ctypes.c_long, np.ctypeslib.ndpointer(dtype=np.float32, flags='C_CONTIGUOUS'), ctypes.c_long, np.ctypeslib.ndpointer(dtype=np.float32, flags='C_CONTIGUOUS'), ctypes.c_long, np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), ctypes.c_int] utilslib.multi_normxcorr_fftw.restype = ctypes.c_int ''' Arguments are: templates (stacked [ch_1-t_1, ch_1-t_2, ..., ch_2-t_1, ch_2-t_2, ...]) number of templates template length number of channels image (stacked [ch_1, ch_2, ..., ch_n]) image length cross-correlations (stacked as per image) fft-length used channels (stacked as per templates) pad array (stacked as per templates) num threads inner num threads outer variance warnings missed correlation warnings (usually due to gaps) stack option ''' # pre processing used_chans = [] template_len = template_array[seed_ids[0]].shape[1] for seed_id in seed_ids: used_chans.append(~np.isnan(template_array[seed_id]).any(axis=1)) template_array[seed_id] = ( (template_array[seed_id] - template_array[seed_id].mean(axis=-1, keepdims=True)) / ( template_array[seed_id].std(axis=-1, keepdims=True) * template_len)) template_array[seed_id] = np.nan_to_num(template_array[seed_id]) n_channels = len(seed_ids) n_templates = template_array[seed_ids[0]].shape[0] image_len = stream_array[seed_ids[0]].shape[0] fft_len = kwargs.get("fft_len") if fft_len is None: # In testing, 2**13 consistently comes out fastest - setting to # default. https://github.com/eqcorrscan/EQcorrscan/pull/285 # But this results in lots of chunks - 2 ** 17 is also good. fft_len = min(2 ** 17, next_fast_len(template_len + image_len - 1)) if fft_len < template_len: Logger.warning( f"FFT length of {fft_len} is shorter than the template, setting to" f" {next_fast_len(template_len + image_len - 1)}") fft_len = next_fast_len(template_len + image_len - 1) template_array = np.ascontiguousarray( [template_array[x] for x in seed_ids], dtype=np.float32) multipliers = {} for x in seed_ids: # Check that stream is non-zero and above variance threshold if not np.all(stream_array[x] == 0) and np.var(stream_array[x]) < 1e-8: # Apply gain stream_array[x] *= MULTIPLIER Logger.warning(f"Low variance found for {x}, applying gain " "to stabilise correlations") multipliers.update({x: MULTIPLIER}) else: multipliers.update({x: 1}) stream_array = np.ascontiguousarray([stream_array[x] for x in seed_ids], dtype=np.float32) ccc_length = image_len - template_len + 1 assert ccc_length > 0, "Template must be shorter than stream" if stack: cccs = np.zeros((n_templates, ccc_length), np.float32) else: cccs = np.zeros( (n_templates, n_channels, ccc_length), dtype=np.float32) used_chans_np = np.ascontiguousarray(used_chans, dtype=np.intc) pad_array_np = np.ascontiguousarray( [pad_array[seed_id] for seed_id in seed_ids], dtype=np.intc) variance_warnings = np.ascontiguousarray( np.zeros(n_channels), dtype=np.intc) missed_correlations = np.ascontiguousarray( np.zeros(n_channels), dtype=np.intc) # call C function ret = utilslib.multi_normxcorr_fftw( template_array, n_templates, template_len, n_channels, stream_array, image_len, cccs, fft_len, used_chans_np, pad_array_np, cores_inner, cores_outer, variance_warnings, missed_correlations, int(stack)) if ret < 0: raise MemoryError("Memory allocation failed in correlation C-code") elif ret > 0: Logger.critical( 'Out-of-range correlation in C-code, see WARNING from C-code.' 'You are STRONGLY RECOMMENDED to check your data for spikes, ' 'clipping or non-physical artifacts') # raise CorrelationError("Internal correlation error") for i, missed_corr in enumerate(missed_correlations): if missed_corr: Logger.debug( f"{missed_corr} correlations not computed on {seed_ids[i]}, " f"are there gaps in the data? If not, consider " "increasing gain") for i, variance_warning in enumerate(variance_warnings): if variance_warning and variance_warning > template_len: Logger.warning( f"Low variance found in {variance_warning} places for " f"{seed_ids[i]}, check result.") # Remove gain for i, x in enumerate(seed_ids): stream_array[i] *= multipliers[x] return cccs, used_chans
# ------------------------------- FastMatchedFilter Wrapper def _run_fmf_xcorr(template_arr, data_arr, weights, pads, arch, step=1): template_arr, data_arr, multipliers = _fmf_stabilisation( template_arr=template_arr, data_arr=data_arr) return _stabalised_fmf( template_arr=template_arr, data_arr=data_arr, weights=weights, pads=pads, arch=arch, multipliers=multipliers, step=step) def _stabalised_fmf(template_arr, data_arr, weights, pads, arch, multipliers, step): if not FMF_INSTALLED: raise ImportError("FastMatchedFilter is not available") import fast_matched_filter if version.parse(fast_matched_filter.__version__) >= MIN_FMF_VERSION: from fast_matched_filter import matched_filter as fmf else: raise ImportError(f"FMF version {fast_matched_filter.__version__} " f"must be >= {MIN_FMF_VERSION}") Logger.info("Handing off to FMF") cccsums = fmf( templates=template_arr, weights=weights, moveouts=pads, data=data_arr, step=step, arch=arch, normalize="full") Logger.info("FMF returned") # Remove gain if np.any(multipliers != 1): data_arr /= multipliers return cccsums def _fmf_stabilisation(template_arr, data_arr): """ FMF doesn't do the internal check that EQC correlations do. """ # Demean tic = default_timer() template_arr -= template_arr.mean(axis=-1, keepdims=True) data_arr -= data_arr.mean(axis=-1, keepdims=True) toc = default_timer() Logger.info(f"Removing mean took {toc - tic:.4f} s") # Stability checking tic = default_timer() # var is fairly slow, var = mean(abs(a - a.mean()) ** 2) - mean is zero, # so we can skip a step stability_issues = np.logical_and( # data_arr.var(axis=1, keepdims=True) < 1e-8, np.mean(np.abs(data_arr) ** 2, axis=1, keepdims=True) < 1e-8, ~np.all(data_arr == 0, axis=1, keepdims=True)) multipliers = np.ones_like(stability_issues, dtype=float) multipliers[stability_issues] = MULTIPLIER if np.any(stability_issues): Logger.warning( f"Low variance found for channels {np.where(stability_issues)}," f"applying gain to stabilise correlations") data_arr *= multipliers toc = default_timer() Logger.info(f"Checking stability took {toc - tic:.4f} s") return template_arr, data_arr, multipliers @register_array_xcorr("fmf") def fmf_xcorr(templates, stream, pads, arch="precise", *args, **kwargs): """ Compute cross-correlations in the time-domain using the FMF routine. :param templates: 2D Array of templates :type templates: np.ndarray :param stream: 1D array of continuous data :type stream: np.ndarray :param pads: List of ints of pad lengths in the same order as templates :type pads: list :param arch: "gpu" or "precise" to run on GPU or CPU respectively type arch: str :return: np.ndarray of cross-correlations :return: np.ndarray channels used """ assert templates.ndim == 2, "Templates must be 2D" assert stream.ndim == 1, "Stream must be 1D" used_chans = ~np.isnan(templates).any(axis=1) # We have to reshape to an extra dimension for FMF ccc = _run_fmf_xcorr( template_arr=templates.reshape( (1, templates.shape[0], templates.shape[1])).swapaxes(0, 1), data_arr=stream.reshape((1, stream.shape[0])), weights=np.ones((1, templates.shape[0])), pads=np.array([pads]), arch=arch.lower()) return ccc, used_chans @fmf_xcorr.register("stream_xcorr") @fmf_xcorr.register("concurrent") def _fmf_gpu(templates, stream, *args, **kwargs): """ Thin wrapper of fmf_multi_xcorr setting arch to gpu. """ from fast_matched_filter import GPU_LOADED if not GPU_LOADED: Logger.warning("FMF reports GPU not loaded, reverting to CPU") return _fmf_cpu(templates=templates, stream=stream, *args, **kwargs) return _fmf_multi_xcorr(templates, stream, arch="gpu") @fmf_xcorr.register("multithread") @fmf_xcorr.register("multiprocess") def _fmf_cpu(templates, stream, *args, **kwargs): """ Thin wrapper of fmf_multi_xcorr setting arch to cpu. """ from fast_matched_filter import CPU_LOADED if not CPU_LOADED: raise NotImplementedError( "FMF reports CPU not loaded - try rebuilding FMF") return _fmf_multi_xcorr(templates, stream, arch="precise") def _fmf_reshape(template_dict, stream_dict, pad_dict, seed_ids): tic = default_timer() # Reshape templates into [templates x traces x time] t_arr = np.array([template_dict[seed_id] for seed_id in seed_ids]).swapaxes(0, 1) # Reshape stream into [traces x time] d_arr = np.array([stream_dict[seed_id] for seed_id in seed_ids]) # Moveouts should be [templates x traces] pads = np.array([pad_dict[seed_id] for seed_id in seed_ids]).swapaxes(0, 1) # Weights should be shaped like pads weights = np.ones_like(pads) toc = default_timer() Logger.info(f"Reshaping for FMF took {toc - tic:.4f} s") return t_arr, d_arr, weights, pads def _fmf_multi_xcorr(templates, stream, *args, **kwargs): """ Apply FastMatchedFilter routine concurrently. :type templates: list :param templates: A list of templates, where each one should be an obspy.Stream object containing multiple traces of seismic data and the relevant header information. :type stream: obspy.core.stream.Stream :param stream: A single Stream object to be correlated with the templates. :returns: New list of :class:`numpy.ndarray` objects. These will contain the correlation sums for each template for this day of data. :rtype: list :returns: list of ints as number of channels used for each cross-correlation. :rtype: list :returns: list of list of tuples of station, channel for all cross-correlations. :rtype: list """ if kwargs.get("stack", False): raise NotImplementedError( "FMF does not support unstacked correlations, use a different " "backend") arch = kwargs.get("arch", "gpu") Logger.info(f"Running FMF targeting the {arch}") chans = [[] for _i in range(len(templates))] array_dict_tuple = _get_array_dicts(templates, stream, stack=True) stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple assert set(seed_ids) t_arr, d_arr, weights, pads = _fmf_reshape( template_dict=template_dict, stream_dict=stream_dict, pad_dict=pad_dict, seed_ids=seed_ids) cccsums = _run_fmf_xcorr( template_arr=t_arr, weights=weights, pads=pads, data_arr=d_arr, step=1, arch=arch) tr_chans = np.array([~np.isnan(template_dict[seed_id]).any(axis=1) for seed_id in seed_ids]) no_chans = np.sum(np.array(tr_chans).astype(int), axis=0) # Note: FMF already returns the zeroed end of correlations - we don't # need to call _get_valid_correlation_sum for seed_id, tr_chan in zip(seed_ids, tr_chans): for chan, state in zip(chans, tr_chan): if state: chan.append((seed_id.split('.')[1], seed_id.split('.')[-1].split('_')[0])) return cccsums, no_chans, chans # ------------------------------- stream_xcorr functions
[docs] def get_stream_xcorr(name_or_func=None, concurrency=None): """ Return a function for performing normalized cross correlation on lists of streams. :param name_or_func: Either a name of a registered function or a callable that implements the standard array_normxcorr signature. :param concurrency: Optional concurrency strategy, options are below. :return: A callable with the interface of stream_normxcorr :Concurrency options: - multithread - use a threadpool for concurrency; - multiprocess - use a process pool for concurrency; - concurrent - use a customized concurrency strategy for the function, if not defined threading will be used. """ func = _get_registerd_func(name_or_func) concur = concurrency or 'stream_xcorr' if not hasattr(func, concur): msg = '%s does not support concurrency %s' % (func.__name__, concur) raise ValueError(msg) return getattr(func, concur)
# --------------------------- stream prep functions def _get_array_dicts(templates, stream, stack, *args, **kwargs): """ prepare templates and stream, return dicts """ tic = default_timer() # Do some reshaping # init empty structures for data storage template_dict = {} stream_dict = {} pad_dict = {} t_starts = [] stream.sort(['network', 'station', 'location', 'channel']) for template in templates: template.sort(['network', 'station', 'location', 'channel']) t_starts.append( UTCDateTime(ns=min([tr.stats.starttime.__dict__['_UTCDateTime__ns'] for tr in template]))) stream_start = min([tr.stats.starttime for tr in stream]) # get seed ids, make sure these are collected on sorted streams seed_ids = [tr.id + '_' + str(i) for i, tr in enumerate(templates[0])] # pull common channels out of streams and templates and put in dicts for i, seed_id in enumerate(seed_ids): temps_with_seed = [template.traces[i].data for template in templates] t_ar = np.array(temps_with_seed).astype(np.float32) template_dict.update({seed_id: t_ar}) stream_channel = _stream_quick_select(stream, seed_id.split('_')[0])[0] # Normalize data to ensure no float overflow stream_data = stream_channel.data / (np.max( np.abs(stream_channel.data)) / 1e5) stream_dict.update( {seed_id: stream_data.astype(np.float32)}) # PROBLEM - DEBUG: if two traces start just before / just after a # "full-sample-time", then stream_offset can become 1, while a value in # pad_list can become 0. 0-1 = -1; which is problematic. stream_offset = int( math.floor(stream_channel.stats.sampling_rate * (stream_channel.stats.starttime - stream_start))) if stack: pad_list = [ int(round( template.traces[i].stats.__dict__['sampling_rate'] * (template.traces[i].stats.starttime.__dict__[ '_UTCDateTime__ns'] - t_starts[j].__dict__['_UTCDateTime__ns']) / 1e9)) - stream_offset for j, template in enumerate(templates)] else: pad_list = [0 for _ in templates] pad_dict.update({seed_id: pad_list}) toc = default_timer() Logger.info(f"Making array dicts for {len(seed_ids)} seed ids " f"took {toc - tic:.4f} s") return stream_dict, template_dict, pad_dict, seed_ids # Remove fmf if it isn't installed if not FMF_INSTALLED: XCOR_FUNCS.pop("fmf") # a dict of built in xcorr functions, used to distinguish from user-defined XCORR_FUNCS_ORIGINAL = copy.copy(XCOR_FUNCS) if __name__ == '__main__': import doctest doctest.testmod()