Source code for eqcorrscan.utils.pre_processing

"""
Utilities module whose functions are designed to do the basic processing of
the data using obspy modules (which also rely on scipy and numpy).

:copyright:
    EQcorrscan developers.

:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import os

import numpy as np
import logging
import copy

from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache, partial
from scipy.signal import iirfilter, sosfilt, zpk2sos

from obspy import Stream, Trace, UTCDateTime
from obspy.core.trace import Stats


Logger = logging.getLogger(__name__)


def _check_daylong(data, threshold=0.5):
    """
    Check data continuity.

    Check to see that the day is more than threshold of zeros, if it is
    then the resampling will hate it.

    :type data: np.array
    :param data: Data from Trace to check if the data are okay.
    :type threshold: float
    :param threshold: Fraction of data to accept as zeros.

    :return quality (simply good or bad)
    :rtype: bool

    .. rubric:: Example

    >>> from obspy import read
    >>> from eqcorrscan.utils.pre_processing import _check_daylong
    >>> # Get the path to the test data
    >>> import eqcorrscan
    >>> import os
    >>> TEST_PATH = os.path.dirname(eqcorrscan.__file__) + '/tests/test_data'
    >>> st = read(TEST_PATH + '/WAV/TEST_/' +
    ...           '2013-09-01-0410-35.DFDPC_024_00')
    >>> _check_daylong(st[0].data)
    True
    >>> zeroed_data = st[0].copy().data
    >>> zeroed_data[0:9100] = np.zeros(9100)
    >>> _check_daylong(zeroed_data)
    False
    """
    return np.nonzero(data)[0].shape[0] >= threshold * data.shape[0]


def _simple_qc(st, max_workers=None, chunksize=1):
    """
    Multithreaded simple QC of data.

    :param st: Stream of data to check
    :type st: obspy.core.Stream
    :param max_workers: Maximum number of threads to use
    :type max_workers: int
    :param chunksize: Number of traces to process per thread
    :type chunksize: int

    :return: dict of {tr.id: quality} where quality is bool
    """
    qual = dict()
    with ThreadPoolExecutor(max_workers) as executor:
        for tr, _qual in zip(st, executor.map(
                _check_daylong, (tr.data for tr in st), chunksize=chunksize)):
            qual[tr.id] = _qual
    return qual


def _sanitize_length(st, starttime=None, endtime=None, daylong=False):
    """
    Check length and work out start, end, length and trimming criteria

    :param st: Stream to check
    :type st: obspy.core.Stream
    :param starttime: Desired starttime - if None, will be evaluated from data
    :type starttime: obspy.core.UTCDateTime
    :param endtime: DEsired endtime - can be None
    :type endtime: obspy.core.UTCDateTime
    :param daylong: Whether data should be one-day long.
    :type daylong: bool

    :return: obspy.core.Stream, length[float], clip[bool],
        starttime[obspy.core.UTCDateTime]
    """
    length, clip = None, False

    if daylong:
        length, clip = 86400, True
        # Set the start-time to a day start - cope with
        if starttime is None:
            startdates = []
            for tr in st:
                if abs(tr.stats.starttime - (UTCDateTime(
                        tr.stats.starttime.date) + 86400)) < tr.stats.delta:
                    # If the trace starts within 1 sample of the next day,
                    # use the next day as the startdate
                    startdates.append((tr.stats.starttime + 86400).date)
                    Logger.warning(
                        f'{tr.id} starts within 1 sample of the next day, '
                        f'using this time {(tr.stats.starttime + 86400).date}')
                else:
                    startdates.append(tr.stats.starttime.date)
            # Check that all traces start on the same date...
            if not len(set(startdates)) == 1:
                raise NotImplementedError('Traces start on different days')
            starttime = UTCDateTime(startdates[0])
    else:
        if starttime is not None and endtime is not None:
            for tr in st:
                Logger.info(
                    f"Trimming {tr.id} between {starttime} and {endtime}")
                tr.trim(starttime, endtime)
                if len(tr.data) == ((endtime - starttime) *
                                    tr.stats.sampling_rate) + 1:
                    Logger.info(f"{tr.id} is overlength dropping first sample")
                    tr.data = tr.data[1:len(tr.data)]
                    # TODO: this should adjust the start-time
                    # tr.stats.starttime += tr.stats.delta
            length = endtime - starttime
            clip = True
        elif starttime:
            for tr in st:
                tr.trim(starttime=starttime)
        elif endtime:
            for tr in st:
                tr.trim(endtime=endtime)
    return st, length, clip, starttime


@lru_cache(maxsize=5)
def _get_window(window, npts):
    """ Get window for resampling stabilisation. """
    from scipy.signal import get_window
    return np.fft.ifftshift(get_window(window, npts))


[docs] def multi_process(st, lowcut, highcut, filt_order, samp_rate, parallel=False, num_cores=False, starttime=None, endtime=None, daylong=False, seisan_chan_names=False, fill_gaps=True, ignore_length=False, ignore_bad_data=False): """ Apply standardised processing workflow to data for matched-filtering Steps: #. Check length and continuity of data meets user-defined criteria #. Fill remaining gaps in data with zeros and record gap positions #. Detrend data (using a simple linear detrend to set start and end to 0) #. Pad data to length #. Resample in the frequency domain #. Detrend data (using a simple linear detrend to set start and end to 0) #. Zerophase Butterworth filter #. Re-check length #. Re-apply zero-padding to gap locations recording in step 2 to remove filtering and resampling artefacts :param st: Stream to process :type st: obspy.core.Stream :param lowcut: Lowcut of butterworth filter in Hz. If set to None and highcut is given a highpass filter will be applied. If both lowcut and highcut are given, a bandpass filter will be applied. If lowcut and highcut are both None, no filtering will be applied. :type lowcut: float :param highcut: Highcut of butterworth filter in Hz. If set to None and lowcut is given a lowpass filter will be applied. If both lowcut and highcut are given, a bandpass filter will be applied. If lowcut and highcut are both None, no filtering will be applied. :type highcut: float :param filt_order: Filter order :type filt_order: int :param samp_rate: Desired sample rate of output data in Hz :type samp_rate: float :param parallel: Whether to process data in parallel (uses multi-threading) :type parallel: bool :param num_cores: Maximum number of cores to use for parallel processing :type num_cores: int :param starttime: Desired starttime of data :type starttime: obspy.core.UTCDateTime :param endtime: Desired endtime of data :type endtime: obspy.core.UTCDateTime :param daylong: Whether data should be considered to be one-day long. Setting this will assume that your data should start as close to the start of a day as possible given the sampling. :type daylong: bool :param seisan_chan_names: Whether to convert channel names to two-char seisan channel names :type seisan_chan_names: bool :param fill_gaps: Whether to fill-gaps in the data :type fill_gaps: bool :param ignore_length: Whether to ignore data that are not long enough. :type ignore_length: bool :param ignore_bad_data: Whether to ignore data that are excessively gappy :type ignore_bad_data: bool :return: Processed stream as obspy.core.Stream :Note: Works in place on your data, copy before giving to this function if you want to reuse your input data. """ if isinstance(st, Trace): tracein = True st = Stream(st) else: tracein = False # Add sanity check for filter if highcut and highcut >= 0.5 * samp_rate: raise IOError('Highcut must be lower than the Nyquist') if highcut and lowcut: assert lowcut < highcut, f"Lowcut: {lowcut} above highcut: {highcut}" # Allow datetimes for starttime and endtime if starttime and not isinstance(starttime, UTCDateTime): starttime = UTCDateTime(starttime) if starttime is False: starttime = None if endtime and not isinstance(endtime, UTCDateTime): endtime = UTCDateTime(endtime) if endtime is False: endtime = None # Make sensible choices about workers and chunk sizes if parallel: if not num_cores: # We don't want to over-specify threads, we don't have IO # bound tasks max_workers = min(len(st), os.cpu_count()) else: max_workers = min(len(st), num_cores) else: max_workers = 1 chunksize = len(st) // max_workers st, length, clip, starttime = _sanitize_length( st=st, starttime=starttime, endtime=endtime, daylong=daylong) for tr in st: if len(tr.data) == 0: st.remove(tr) Logger.warning('No data for {0} after trim'.format(tr.id)) # Do work # 0. Enforce double-preccision floats for this work for tr in st: if not tr.data.dtype == np.float64: Logger.debug(f"Converting {tr.id} to double precision") tr.data = tr.data.astype(np.float64) # 1. Fill gaps and keep track of them gappy = {tr.id: False for tr in st} gaps = dict() for i, tr in enumerate(st): if isinstance(tr.data, np.ma.MaskedArray): gappy[tr.id] = True gaps[tr.id], tr = _fill_gaps(tr) st[i] = tr # 2. Check for zeros and cope with bad data # ~ 4x speedup for 50 100 Hz daylong traces on 12 threads qual = _simple_qc(st, max_workers=max_workers, chunksize=chunksize) for trace_id, _qual in qual.items(): if not _qual: msg = ("Data have more zeros than actual data, please check the " f"raw data set-up and manually sort it: {tr.id}") if not ignore_bad_data: raise ValueError(msg) else: # Remove bad traces from the stream try: st.remove(st.select(id=trace_id)) except ValueError: Logger.info( f"{trace_id} not found in {set(tr.id for tr in st)}," f" ignoring") # 3. Detrend # ~ 2x speedup for 50 100 Hz daylong traces on 12 threads st = _multi_detrend(st, max_workers=max_workers, chunksize=chunksize) # 4. Check length and pad to length padded = {tr.id: (0., 0.) for tr in st} if clip: st.trim(starttime, starttime + length, nearest_sample=True) # Indexing because we are going to overwrite traces for i, _ in enumerate(st): if float(st[i].stats.npts / st[i].stats.sampling_rate) != length: Logger.info( 'Data for {0} are not long-enough, will zero pad'.format( st[i].id)) st[i], padded[st[i].id] = _length_check( st[i], starttime=starttime, length=length, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data) # Remove None traces that might be returned from length checking st.traces = [tr for tr in st if tr is not None] # Check that we actually still have some data if not _stream_has_data(st): if tracein: return st[0] return st # 5. Resample # ~ 3.25x speedup for 50 100 Hz daylong traces on 12 threads st = _multi_resample( st, sampling_rate=samp_rate, max_workers=max_workers, chunksize=chunksize) # Detrend again before filtering st = _multi_detrend(st, max_workers=max_workers, chunksize=chunksize) # 6. Filter # ~3.25x speedup for 50 100 Hz daylong traces on 12 threads st = _multi_filter( st, highcut=highcut, lowcut=lowcut, filt_order=filt_order, max_workers=max_workers, chunksize=chunksize) # 7. Reapply zeros after processing from 4 for tr in st: # Pads default to (0., 0.), pads should only ever be positive. if sum(padded[tr.id]) == 0: continue Logger.debug("Reapplying zero pads post processing") Logger.debug(str(tr)) pre_pad = np.zeros(int(padded[tr.id][0] * tr.stats.sampling_rate)) post_pad = np.zeros(int(padded[tr.id][1] * tr.stats.sampling_rate)) pre_pad_len = len(pre_pad) post_pad_len = len(post_pad) Logger.debug( f"Taking only valid data between {pre_pad_len} and " f"{tr.stats.npts - post_pad_len} samples") # Re-apply the pads, taking only the data section that was valid tr.data = np.concatenate( [pre_pad, tr.data[pre_pad_len: len(tr.data) - post_pad_len], post_pad]) Logger.debug(str(tr)) # 8. Recheck length for tr in st: if float(tr.stats.npts * tr.stats.delta) != length and clip: Logger.info(f'Data for {tr.id} are not of required length, will ' f'zero pad') # Use obspy's trim function with zero padding tr = tr.trim(starttime, starttime + length, pad=True, fill_value=0, nearest_sample=True) # If there is one sample too many after this remove the last one # by convention if len(tr.data) == (length * tr.stats.sampling_rate) + 1: tr.data = tr.data[1:len(tr.data)] if abs((tr.stats.sampling_rate * length) - tr.stats.npts) > tr.stats.delta: raise ValueError('Data are not required length for ' + tr.stats.station + '.' + tr.stats.channel) # 9. Re-insert gaps from 1 for i, tr in enumerate(st): if gappy[tr.id]: st[i] = _zero_pad_gaps(tr, gaps[tr.id], fill_gaps=fill_gaps) # 10. Clean up for tr in st: if len(tr.data) == 0: st.remove(tr) # 11. Account for seisan channel naming if seisan_chan_names: for tr in st: tr.stats.channel = tr.stats.channel[0] + tr.stats.channel[-1] if tracein: st.merge() return st[0] return st
@lru_cache(maxsize=50) def _empty_trace( network, station, location, channel, starttime, sampling_rate ): """ Generate an empty trace with a basic header matching the input trace :param network: Network code :type network: str :param station: Station code :type station: str :param location: Location code :type location: str :param channel: Channel code :type channel: str :param starttime: Start time of trace as datetime (NOT UTCDateTime) :type starttime: datetime.datetime :param sampling_rate: Sampling rate of data :type sampling_rate: float :returns: trace """ bad_trace = Trace( data=np.array([]), header={ "station": station, "channel": channel, "network": network, "location": location, "starttime": starttime, "sampling_rate": sampling_rate}) return bad_trace def _stream_has_data(st): return sum(tr.stats.npts for tr in st) > 0 def _length_check(tr, starttime, length, ignore_length, ignore_bad_data): """ Check that a trace meets the length requirements specified. Data are padded if needed to meet the length requirement. :param tr: Trace to check :type tr: obspy.core.Trace :param starttime: Desired starttime of data :type starttime: obspy.core.UTCDateTime :param length: Length in seconds required for data :type length: float :param ignore_length: Whether to ignore data that do not meet length criteria :type ignore_length: bool :param ignore_bad_data: Whether to ignore data that do not meet gappiness criteria :type ignore_bad_data: bool :return: obspy.core.Trace that meets criteria """ trace_length = tr.stats.endtime - tr.stats.starttime if trace_length < 0.8 * length and not ignore_length: msg = f"Data for {tr.id} is {trace_length:.2f} seconds "\ f"long, which is less than 80 percent of the desired "\ f"length ({length} seconds), will not pad" if not ignore_bad_data: raise NotImplementedError(msg) else: Logger.warning(msg) return _empty_trace(tr.stats.network, tr.stats.station, tr.stats.location, tr.stats.channel, tr.stats.starttime.datetime, tr.stats.sampling_rate), (0., 0.) # trim, then calculate length of any pads required pre_pad_secs = tr.stats.starttime - starttime post_pad_secs = (starttime + length) - tr.stats.endtime if pre_pad_secs > 0 or post_pad_secs > 0: pre_pad = np.zeros(int(pre_pad_secs * tr.stats.sampling_rate)) post_pad = np.zeros(int(post_pad_secs * tr.stats.sampling_rate)) Logger.debug(str(tr)) Logger.info( f"Padding to length with {pre_pad_secs} s before " f"and {post_pad_secs} s at end") tr.data = np.concatenate([pre_pad, tr.data, post_pad]) # Use this rather than the expected pad because of rounding samples tr.stats.starttime -= len(pre_pad) * tr.stats.delta Logger.debug(str(tr)) # If there is one sample too many after this remove the first one # by convention if tr.stats.npts == (length * tr.stats.sampling_rate) + 1: tr.data = tr.data[1:len(tr.data)] # Cope with time precision. if abs((tr.stats.sampling_rate * length) - tr.stats.npts) > tr.stats.delta: msg = (f"Data sampling-rate * length ({tr.stats.sampling_rate} *" f" {length} = {tr.stats.sampling_rate * length}) does not " f"match number of samples ({tr.stats.npts}) for {tr.id}") if not ignore_bad_data: raise ValueError(msg) else: Logger.warning(msg) return _empty_trace(tr.stats.network, tr.stats.station, tr.stats.location, tr.stats.channel, tr.stats.starttime.datetime, tr.stats.sampling_rate), (0., 0.) Logger.debug( f'I now have {tr.stats.npts} data points after enforcing length') return tr, (pre_pad_secs, post_pad_secs) def _multi_filter(st, highcut, lowcut, filt_order, max_workers=None, chunksize=1): """ Multithreaded zero-phase butterworth filtering of multi-channel data. :param st: Stream to filter :type st: obspy.core.Stream :param highcut: Highcut for butterworth filter in Hz :type highcut: float :param lowcut: Lowcut for butterworth filter in Hz :type lowcut: float :param filt_order: Filter order :type filt_order: int :param max_workers: Maximum number of threads to use :type max_workers: int :param chunksize: Number of traces to process per thread :type chunksize: int :return: obspy.core.Stream of filtered data """ if not highcut and not lowcut: Logger.warning("No filters applied") return st # Require that all channels are the same sampling frequency samp_rate = set(tr.stats.sampling_rate for tr in st) assert len(samp_rate) == 1, "Different sampling rates found" samp_rate = samp_rate.pop() # Sanity check filter bounds if highcut: assert highcut * 2 < samp_rate, "Highcut must be below Nyquist" if highcut and lowcut: assert lowcut < highcut, "Lowcut must be below highcut" fe = 0.5 * samp_rate if lowcut: low = lowcut / fe if highcut: high = highcut / fe # Design filter if highcut and lowcut: z, p, k = iirfilter( filt_order, [low, high], btype='band', ftype='butter', output='zpk') elif highcut: z, p, k = iirfilter( filt_order, high, btype='lowpass', ftype='butter', output='zpk') elif lowcut: z, p, k = iirfilter( filt_order, low, btype='highpass', ftype='butter', output='zpk') sos = zpk2sos(z, p, k) _filter = partial(_zerophase_filter, sos) with ThreadPoolExecutor(max_workers) as executor: results = executor.map( _filter, (tr.data for tr in st), chunksize=chunksize) for r, tr in zip(results, st): tr.data = r return st def _zerophase_filter(sos, data): """ Simple zerophase implementation of sosfilt. :param sos: Second-order-series of filters :param data: Data to filter :return: filtered data """ if len(data) == 0: Logger.debug("No data, no filtering") return data firstpass = sosfilt(sos, data) return sosfilt(sos, firstpass[::-1])[::-1] def _multi_detrend(st, max_workers=None, chunksize=1): """ Multithreaded detrending using simple linear detrend between first and last values. Follows obspy "simple" detrend. :param st: Stream to detrend :type st: obspy.core.Stream :param max_workers: Maximum number of threads to use :type max_workers: int :param chunksize: Number of traces to process per thread :type chunksize: int :return: obspy.core.Stream of detrended data """ for tr in st: tr.data = np.require(tr.data, np.float64) with ThreadPoolExecutor(max_workers) as executor: results = executor.map(_detrend, (tr.data for tr in st), chunksize=chunksize) # Ensure tasks complete _ = (r for r in results) return st def _detrend(data): """ Detrend signal simply by subtracting a line through the first and last point of the trace :param data: Data to detrend :type data: np.ndarray. :return: Nothing - works in place """ # Work in double-precision. data = np.require(data, dtype=np.float64) ndat = data.shape[0] x1, x2 = data[0], data[-1] data -= x1 + np.arange(ndat, dtype=np.float64) * ( np.float64(x2 - x1) / np.float64(ndat - 1)) return def _multi_resample(st, sampling_rate, max_workers=None, chunksize=1): """ Threaded resampling of a stream of data to a consistent sampling-rate :param st: Stream to resample :type st: obspy.core.Stream :param sampling_rate: Sampling rate to resample to :type sampling_rate: float :param max_workers: Maximum number of threads to use :type max_workers: int :param chunksize: Number of traces to process per thread :type chunksize: int :return: obspy.core.Stream of resampled data """ # Get the windows, and downsampling factors ahead of time to_resample = ( (tr.data, tr.stats.delta, tr.stats.sampling_rate / float(sampling_rate), sampling_rate, _get_window("hann", tr.stats.npts), tr.id) for tr in st) with ThreadPoolExecutor(max_workers) as executor: # Unpack tuple using lambda results = executor.map(lambda args: _resample(*args), to_resample, chunksize=chunksize) for r, tr in zip(results, st): tr.data = r tr.stats.sampling_rate = sampling_rate return st def _resample(data, delta, factor, sampling_rate, large_w, _id): """ Resample data in the frequency domain - adapted from obspy resample method :param data: Data to resample :type data: np.ndarray :param delta: Sample interval in seconds :type delta: float :param factor: Factor to resample by :type factor: float :param sampling_rate: Desired sampling-rate :type sampling_rate: float :param large_w: Window to apply to spectra to stabilise resampling :type large_w: np.ndarray :return: np.ndarray of resampled data. """ if factor == 1: # No resampling needed, don't waste time. return data # Need to work with numpy objects to release the GIL npts = data.shape[0] Logger.debug(f"Running resample for {_id} with {npts} data points") Logger.debug(f"{_id}: delta={delta}, factor={factor}, " f"sampling_rate out={sampling_rate}") Logger.debug(f"Sanity check data for {_id}, start and " f"end: {data[0]} -- {data[-1]}") Logger.debug(f"dtype for {_id}: {data.dtype}") if data.dtype == np.dtype('float64'): _floater = np.float64 # Retain double-precision else: _floater = np.float32 # Use single-precision where possible to reduce memory data = data.astype(_floater) df = _floater(1.0) / (npts * delta) num = np.int32(npts / factor) d_large_f = _floater(1.0) / num * sampling_rate # Forward fft x = np.fft.rfft(data) # Window x *= large_w[:npts // 2 + 1] # interpolate f = df * np.arange(0, npts // 2 + 1, dtype=np.int32) n_large_f = num // 2 + 1 large_f = d_large_f * np.arange(0, n_large_f, dtype=np.int32) # Have to split into real and imaginary parts for interpolation. y = np.interp(large_f, f, np.real(x)) + (1j * np.interp( large_f, f, np.imag(x))) # Try to reduce memory before doing the ifft del large_f, f, x return np.fft.irfft(y, n=num)[0:num] * (_floater(num) / _floater(npts)) def _zero_pad_gaps(tr, gaps, fill_gaps=True): """ Replace padded parts of trace with zeros. Will cut around gaps, detrend, then pad the gaps with zeros. :type tr: :class:`osbpy.core.stream.Trace` :param tr: A trace that has had the gaps padded :param gaps: List of dict of start-time and end-time as UTCDateTime objects :type gaps: list :param fill_gaps: Whether to fill gaps with zeros, or leave them as gaps :type fill_gaps: bool :return: :class:`obspy.core.stream.Trace` """ start_in, end_in = (tr.stats.starttime, tr.stats.endtime) tr = Stream([tr]) # convert to stream to use cutout method for gap in gaps: Logger.debug( f"Filling gap between {gap['starttime']} and {gap['endtime']}") tr.cutout(gap['starttime'], gap['endtime']).merge() tr = tr.merge()[0] if fill_gaps: tr = tr.split() tr = tr.detrend() tr = tr.merge(fill_value=0)[0] # Need to check length - if a gap happened overlapping the end or start # of the trace this will be lost. if tr.stats.starttime != start_in: # pad with zeros tr.data = np.concatenate( [np.zeros(int(tr.stats.starttime - start_in)), tr.data]) tr.stats.starttime = start_in if tr.stats.endtime != end_in: tr.data = np.concatenate( [tr.data, np.zeros(int(end_in - tr.stats.endtime))]) return tr def _fill_gaps(tr): """ Interpolate through gaps and work-out where gaps are. :param tr: Gappy trace (e.g. tr.data is np.ma.MaskedArray) :type tr: `obspy.core.stream.Trace` :return: gaps, trace, where gaps is a list of dict """ tr = tr.split() gaps = tr.get_gaps() tr = tr.detrend().merge(fill_value=0)[0] gaps = [{'starttime': gap[4], 'endtime': gap[5]} for gap in gaps] if len(gaps): Logger.debug(f"Gaps in {tr.id}: \n\t{gaps}") return gaps, tr def _group_process(filt_order, highcut, lowcut, samp_rate, process_length, parallel, cores, stream, daylong, ignore_length, ignore_bad_data, overlap): """ Process and chunk data. :type parallel: bool :param parallel: Whether to use parallel processing or not :type cores: int :param cores: Number of cores to use, can be False to use all available. :type stream: :class:`obspy.core.stream.Stream` :param stream: Stream to process, will be left intact. :type daylong: bool :param daylong: Whether to enforce day-length files or not. :type ignore_length: bool :param ignore_length: If using daylong=True, then processing will try check that the data are there for at least 80% of the day, if you don't want this check (which will raise an error if too much data are missing) then set ignore_length=True. This is not recommended! :type ignore_bad_data: bool :param ignore_bad_data: If False (default), errors will be raised if data are excessively gappy or are mostly zeros. If True then no error will be raised, but an empty trace will be returned. :type overlap: float :param overlap: Number of seconds to overlap chunks by. :return: list of processed streams. """ processed_streams = [] kwargs = { 'filt_order': filt_order, 'highcut': highcut, 'lowcut': lowcut, 'samp_rate': samp_rate, 'parallel': parallel, 'num_cores': cores, 'ignore_length': ignore_length, 'ignore_bad_data': ignore_bad_data} # Processing always needs to be run to account for gaps - pre-process will # check whether filtering and resampling needs to be done. starttimes = sorted([tr.stats.starttime for tr in stream]) endtimes = sorted([tr.stats.endtime for tr in stream]) if daylong: if process_length != 86400: Logger.warning( f'Processing day-long data, but template was cut from ' f'{process_length} s long data, will reduce correlations') process_length = 86400 # Check that data all start on the same day, otherwise strange # things will happen... startdates = [starttime.date for starttime in starttimes] if not len(set(startdates)) == 1: Logger.warning('Data start on different days, setting to last day') starttime = UTCDateTime(startdates[-1]) else: starttime = UTCDateTime(startdates[0]) # Can take any else: # We want to use shortproc to allow overlaps starttime = starttimes[0] endtime = endtimes[-1] data_len_samps = round((endtime - starttime) * samp_rate) + 1 assert overlap < process_length, "Overlap must be less than process length" chunk_len_samps = (process_length - overlap) * samp_rate n_chunks = int(data_len_samps // chunk_len_samps) Logger.info(f"Splitting these data in {n_chunks} chunks") if n_chunks == 0: Logger.error('Data must be process_length or longer, not computing') return [] for i in range(n_chunks): kwargs.update( {'starttime': starttime + (i * (process_length - overlap))}) if not daylong: _endtime = kwargs['starttime'] + process_length kwargs.update({'endtime': _endtime}) else: _endtime = kwargs['starttime'] + 86400 # This is where data should be copied and only here! if n_chunks > 1: chunk_stream = _quick_copy_stream( stream.slice(starttime=kwargs['starttime'], endtime=_endtime)) # Reduce memory by removing data that we don't need anymore stream.trim(starttime=_endtime - overlap) else: # If we only have one chunk, lets just use those data! chunk_stream = stream.trim( starttime=kwargs['starttime'], endtime=_endtime) Logger.info(f"Processing chunk {i} between {kwargs['starttime']} " f"and {_endtime}") if len(chunk_stream) == 0: Logger.warning( f"No data between {kwargs['starttime']} and {_endtime}") continue # Enforce chunk npts for tr in chunk_stream: Logger.info( f"Enforcing {int(process_length * tr.stats.sampling_rate)} " f"samples for {tr.id} (had {tr.stats.npts} points)") tr.data = tr.data[0:int( process_length * tr.stats.sampling_rate)] _chunk_stream_lengths = { tr.id: tr.stats.endtime - tr.stats.starttime for tr in chunk_stream} for tr_id, chunk_length in _chunk_stream_lengths.items(): # Remove traces that are too short. if not ignore_length and chunk_length <= .8 * process_length: tr = chunk_stream.select(id=tr_id)[0] chunk_stream.remove(tr) Logger.warning( "Data chunk on {0} starting {1} and ending {2} is " "below 80% of the requested length, will not use" " this.".format( tr.id, tr.stats.starttime, tr.stats.endtime)) if len(chunk_stream) == 0: continue Logger.debug( f"Processing chunk:\n{chunk_stream.__str__(extended=True)}") Logger.info(f"Processing using {kwargs}") _processed_stream = multi_process(st=chunk_stream, **kwargs) # If data have more zeros then pre-processing will return a # trace of 0 length _processed_stream.traces = [ tr for tr in _processed_stream if tr.stats.npts != 0] if len(_processed_stream) == 0: Logger.warning( f"Data quality insufficient between {kwargs['starttime']}" f" and {_endtime}") continue # Pre-processing does additional checks for zeros - we need to check # again whether we actually have something useful from this. processed_chunk_stream_lengths = [ tr.stats.endtime - tr.stats.starttime for tr in _processed_stream] if min(processed_chunk_stream_lengths) >= .8 * process_length: processed_streams.append(_processed_stream) else: Logger.warning( f"Data quality insufficient between {kwargs['starttime']}" f" and {_endtime}") continue if _endtime < stream[0].stats.endtime: Logger.warning( "Last bit of data between {0} and {1} will go unused " "because it is shorter than a chunk of {2} s".format( _endtime, stream[0].stats.endtime, process_length)) return processed_streams def _quick_copy_trace(trace, deepcopy_data=True): """ Function to quickly copy a trace. Sets values in the traces' and trace header's dict directly, circumventing obspy's init functions. Speedup: from 37 us to 12 us per trace - 3x faster :type trace: :class:`obspy.core.trace.Trace` :param trace: Stream to quickly copy :type deepcopy_data: bool :param deepcopy_data: Whether to deepcopy trace data (with `deepcopy_data=False` expect up to 20 % speedup, but use only when you know that data trace contents will not change or affect results). Warning: do not use this option to copy traces with processing history or response information. :rtype: :class:`obspy.core.trace.Trace` return: trace """ new_trace = Trace() for key, value in trace.__dict__.items(): if key == 'stats': new_stats = new_trace.stats for key_2, value_2 in value.__dict__.items(): if isinstance(value_2, UTCDateTime): new_stats.__dict__[key_2] = UTCDateTime( ns=value_2.__dict__['_UTCDateTime__ns']) else: new_stats.__dict__[key_2] = value_2 elif deepcopy_data: # data needs to be deepcopied (and anything else, to be safe) new_trace.__dict__[key] = copy.deepcopy(value) else: # No deepcopy, e.g. for NaN-traces with no effect on results new_trace.__dict__[key] = value return new_trace def _quick_copy_stream(stream, deepcopy_data=True): """ Function to quickly copy a stream. Speedup for simple trace: from 112 us to 44 (35) us per 3-trace stream - 2.8x (3.2x) faster Warning: use `deepcopy_data=False` (saves extra ~20 % time) only when the changing the data in the stream later does not change results (e.g., for NaN-trace or when data array will not be changed). This is what takes longest (1 empty trace, total time to copy 27 us): copy header: 18 us (vs create new empty header: 683 ns) Two points that can speed up copying / creation: 1. circumvent trace.__init__ and trace.__set_attr__ by setting value directly in trace's __dict__ 2. when setting trace header, circumvent that Stats(header) is called when header is already a Stats instance :type stream: :class:`obspy.core.stream.Stream` :param stream: Stream to quickly copy :type deepcopy_data: bool :param deepcopy_data: Whether to deepcopy data (with `deepcopy_data=False` expect up to 20 % speedup, but use only when you know that data trace contents will not change or affect results). :rtype: :class:`obspy.core.stream.Stream` return: stream """ new_traces = list() for trace in stream: new_traces.append( _quick_copy_trace(trace, deepcopy_data=deepcopy_data)) return Stream(new_traces) def _stream_quick_select(stream, seed_id): """ 4x quicker selection of traces in stream by full Seed-ID. Does not support wildcards or selection by network/station/location/channel alone. """ net, sta, loc, chan = seed_id.split('.') stream = Stream( [tr for tr in stream if (tr.stats.network == net and tr.stats.station == sta and tr.stats.location == loc and tr.stats.channel == chan)]) return stream def _prep_data_for_correlation(stream, templates, template_names=None, force_stream_epoch=True): """ Check that all channels are the same length and that all channels have data for both template and stream. Works in place on data - will cut to shortest length :param stream: Stream to compare data to :param templates: List of streams that will be forced to have the same channels as stream :param template_names: List of strings same length as templates :type force_stream_epoch: bool :param force_stream_epoch: Whether to force all channels in stream to cover the same time period :return: stream, templates, template_names (if template_names given) """ n_templates = len(templates) template_samp_rates = { tr.stats.sampling_rate for template in templates for tr in template} stream_samp_rates = {tr.stats.sampling_rate for tr in stream} samp_rates = template_samp_rates.union(stream_samp_rates) assert len(samp_rates) == 1, "Sampling rates differ" samp_rate = samp_rates.pop() out_stream = Stream() named = True if template_names is None: named = False template_names = range(n_templates) # Work out shapes. stream_start = min([tr.stats.starttime for tr in stream]) stream_end = max([tr.stats.endtime for tr in stream]) if force_stream_epoch: stream_length = int(samp_rate * (stream_end - stream_start)) + 1 else: stream_length = max([tr.stats.npts for tr in stream]) template_length = { tr.stats.npts for template in templates for tr in template} assert len(template_length) == 1, "Template traces not all the same length" template_length = template_length.pop() stream_ids = {tr.id for tr in stream} # Need to ensure that a channel can be in the template multiple times. all_template_ids = [ Counter([tr.id for tr in template]) for template in templates] template_ids = { stream_id: max(tid.get(stream_id, 0) for tid in all_template_ids) for stream_id in stream_ids} template_ids = {_id: value for _id, value in template_ids.items() if value} seed_ids = sorted( [key.split('.') + [i] for key, value in template_ids.items() for i in range(value)]) seed_ids = [('.'.join(seed_id[0:-1]), seed_id[-1]) for seed_id in seed_ids] Logger.info(f"Prepping for {len(seed_ids)} channels that share seed-ids " f"between templates and stream") Logger.debug(f"Shared seed-ids: {seed_ids}") for channel_number, seed_id in enumerate(template_ids.keys()): stream_data = np.zeros(stream_length, dtype=np.float32) stream_channel = stream.select(id=seed_id) if len(stream_channel) > 1: msg = f"Multiple channels in continuous data for {seed_id}" Logger.error(msg) raise NotImplementedError(msg) stream_channel = stream_channel[0] if stream_channel.stats.npts == stream_length: stream_data = stream_channel.data else: Logger.info('Data for {0} is not as long as needed, ' 'padding'.format(stream_channel.id)) if force_stream_epoch: start_pad = int(samp_rate * ( stream_channel.stats.starttime - stream_start)) end_pad = stream_length - ( start_pad + stream_channel.stats.npts) # In some cases there will be one sample missing when sampling # time-stamps are not set consistently between channels, this # results in start_pad and end_pad being len==0 if start_pad == 0 and end_pad == 0: Logger.debug("Start and end pad are both zero, padding " "at one end") if (stream_channel.stats.starttime - stream_start) > ( stream_end - stream_channel.stats.endtime): start_pad = int( stream_length - stream_channel.stats.npts) else: end_pad = int( stream_length - stream_channel.stats.npts) stream_channel.stats.starttime -= (start_pad / samp_rate) else: start_pad = 0 end_pad = stream_length - stream_channel.stats.npts if end_pad == 0: stream_data[start_pad:] = stream_channel.data else: stream_data[start_pad:-end_pad] = stream_channel.data header = stream_channel.stats.copy() header.npts = stream_length out_stream += Trace(data=stream_data, header=header) # Initialize nan template for speed. nan_channel = np.full(template_length, np.nan, dtype=np.float32) nan_channel = np.require(nan_channel, requirements=['C_CONTIGUOUS']) nan_template = Stream() for _seed_id in seed_ids: net, sta, loc, chan = _seed_id[0].split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, 'channel': chan, 'starttime': UTCDateTime(ns=0), 'npts': template_length, 'sampling_rate': samp_rate})) # Remove templates with no matching channels filt = np.ones(len(template_names)).astype(bool) for i, template in enumerate(templates): trace_ids = {tr.id for tr in template} if len(trace_ids.intersection(stream_ids)) == 0: filt[i] = 0 _out = dict(zip( [_tn for _tn, _filt in zip(template_names, filt) if _filt], [_t for _t, _filt in zip(templates, filt) if _filt])) flt_templates = list(_out.values()) if len(_out) != len(templates): Logger.debug("Some templates not used due to no matching channels") # Ensure that the templates' earliest traces are kept, even if there is no # continuous data for them. If this happens, we need to add a NaN-stream to # the continuous data to avoid inconsistent detection times. n_template_traces = np.array([len(temp) for temp in flt_templates]) n_stream_traces = sum([n+1 for s, n in seed_ids]) # These checks are not necessary if all templates will get NaN-traces, # because the NaN-traces will save the right starttime for the template. nan_stream_ids = list() if any(n_template_traces > n_stream_traces): earliest_templ_trace_ids = set( [template.sort(['starttime'])[0].id for template in flt_templates]) for earliest_templ_trace_id in earliest_templ_trace_ids: if earliest_templ_trace_id not in template_ids: nan_stream_ids.append(earliest_templ_trace_id) net, sta, loc, chan = earliest_templ_trace_id.split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, 'channel': chan, 'starttime': UTCDateTime(ns=0), 'sampling_rate': samp_rate})) stream_nan_data = np.full( stream_length, np.nan, dtype=np.float32) out_stream += Trace( data=np.ma.masked_array(stream_nan_data, stream_nan_data), header=Stats({ 'network': net, 'station': sta, 'location': loc, 'channel': chan, 'starttime': stream_start, 'npts': stream_length, 'sampling_rate': samp_rate})) seed_ids.append((earliest_templ_trace_id, 0)) incomplete_templates = { template_name for template_name, template in _out.items() if sorted([tr.id for tr in template]) != [tr.id for tr in nan_template]} # Fill out the templates with nan channels for template_name in incomplete_templates: template = _out[template_name] template_starttime = min(tr.stats.starttime for tr in template) out_template = _quick_copy_stream(nan_template, deepcopy_data=False) # Select traces very quickly: assume that trace order does not change, # make dict of trace-ids and list of indices and use indices to select stream_trace_id_dict = defaultdict(list) for n, tr in enumerate(template.traces): stream_trace_id_dict[tr.id].append(n) for channel_number, _seed_id in enumerate(seed_ids): seed_id, channel_index = _seed_id # Select all traces with same seed_id, based on indices for # corresponding traces stored in stream_trace_id_dict # Much quicker than: template_channel = template.select(id=seed_id) template_channel = Stream([ template.traces[idx] for idx in stream_trace_id_dict[seed_id]]) if len(template_channel) <= channel_index: # out_template[channel_number].data = nan_channel # quicker: out_template.traces[channel_number].__dict__[ 'data'] = np.copy(nan_channel) out_template.traces[channel_number].stats.__dict__[ 'npts'] = template_length out_template.traces[channel_number].stats.__dict__[ 'starttime'] = template_starttime out_template.traces[channel_number].stats.__dict__[ 'endtime'] = UTCDateTime(ns=int( round(template_starttime.ns + (template_length / samp_rate) * 1e9))) else: out_template.traces[channel_number] = template_channel.traces[ channel_index] # If a template-trace matches a NaN-trace in the stream , then set # template-trace to NaN so that this trace does not appear in channel- # list of detections. if len(nan_stream_ids) > 0: for tr in out_template: if tr.id in nan_stream_ids: tr.data = nan_channel _out.update({template_name: out_template}) out_templates = list(_out.values()) out_template_names = list(_out.keys()) if named: return out_stream, out_templates, out_template_names return out_stream, out_templates def shortproc(st, lowcut, highcut, filt_order, samp_rate, parallel=False, num_cores=False, starttime=None, endtime=None, seisan_chan_names=False, fill_gaps=True, ignore_length=False, ignore_bad_data=False, fft_threads=1): """ Deprecated """ Logger.warning("Shortproc is depreciated after 0.4.4 and will " "be removed in a future version. Use multi_process" " instead") st = multi_process( st=st, lowcut=lowcut, highcut=highcut, filt_order=filt_order, samp_rate=samp_rate, parallel=parallel, num_cores=num_cores, starttime=starttime, endtime=endtime, daylong=False, seisan_chan_names=seisan_chan_names, fill_gaps=fill_gaps, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data) return st def dayproc(st, lowcut, highcut, filt_order, samp_rate, starttime, parallel=True, num_cores=False, ignore_length=False, seisan_chan_names=False, fill_gaps=True, ignore_bad_data=False, fft_threads=1): """ Deprecated """ Logger.warning("dayproc is depreciated after 0.4.4 and will be " "removed in a future version. Use multi_process instead") st = multi_process( st=st, lowcut=lowcut, highcut=highcut, filt_order=filt_order, samp_rate=samp_rate, parallel=parallel, num_cores=num_cores, starttime=starttime, endtime=None, daylong=True, seisan_chan_names=seisan_chan_names, fill_gaps=fill_gaps, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data) return st def process(tr, lowcut, highcut, filt_order, samp_rate, starttime=False, clip=False, length=86400, seisan_chan_names=False, ignore_length=False, fill_gaps=True, ignore_bad_data=False, fft_threads=1): """ Deprecated """ Logger.warning("process is depreciated after 0.4.4 and will be removed " "in a future version. Use multi_process instead") if length == 86400: daylong = True else: daylong = False endtime = None if clip: if not starttime: starttime = tr.stats.starttime elif not isinstance(starttime, UTCDateTime): starttime = UTCDateTime(starttime) endtime = starttime + length st = multi_process( st=tr, lowcut=lowcut, highcut=highcut, filt_order=filt_order, samp_rate=samp_rate, parallel=False, num_cores=1, starttime=starttime, endtime=endtime, daylong=daylong, seisan_chan_names=seisan_chan_names, fill_gaps=fill_gaps, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data) return st if __name__ == "__main__": import doctest doctest.testmod()