Source code for eqcorrscan.utils.pre_processing

Utilities module whose functions are designed to do the basic processing of
the data using obspy modules (which also rely on scipy and numpy).

    EQcorrscan developers.

    GNU Lesser General Public License, Version 3
import os

import numpy as np
import logging
import copy

from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache, partial
from scipy.signal import iirfilter, sosfilt, zpk2sos

from obspy import Stream, Trace, UTCDateTime
from obspy.core.trace import Stats

Logger = logging.getLogger(__name__)

def _check_daylong(data, threshold=0.5):
    Check data continuity.

    Check to see that the day is more than threshold of zeros, if it is
    then the resampling will hate it.

    :type data: np.array
    :param data: Data from Trace to check if the data are okay.
    :type threshold: float
    :param threshold: Fraction of data to accept as zeros.

    :return quality (simply good or bad)
    :rtype: bool

    .. rubric:: Example

    >>> from obspy import read
    >>> from eqcorrscan.utils.pre_processing import _check_daylong
    >>> # Get the path to the test data
    >>> import eqcorrscan
    >>> import os
    >>> TEST_PATH = os.path.dirname(eqcorrscan.__file__) + '/tests/test_data'
    >>> st = read(TEST_PATH + '/WAV/TEST_/' +
    ...           '2013-09-01-0410-35.DFDPC_024_00')
    >>> _check_daylong(st[0].data)
    >>> zeroed_data = st[0].copy().data
    >>> zeroed_data[0:9100] = np.zeros(9100)
    >>> _check_daylong(zeroed_data)
    return np.nonzero(data)[0].shape[0] >= threshold * data.shape[0]

def _simple_qc(st, max_workers=None, chunksize=1):
    Multithreaded simple QC of data.

    :param st: Stream of data to check
    :type st: obspy.core.Stream
    :param max_workers: Maximum number of threads to use
    :type max_workers: int
    :param chunksize: Number of traces to process per thread
    :type chunksize: int

    :return: dict of { quality} where quality is bool
    qual = dict()
    with ThreadPoolExecutor(max_workers) as executor:
        for tr, _qual in zip(st,
                _check_daylong, ( for tr in st), chunksize=chunksize)):
            qual[] = _qual
    return qual

def _sanitize_length(st, starttime=None, endtime=None, daylong=False):
    Check length and work out start, end, length and trimming criteria

    :param st: Stream to check
    :type st: obspy.core.Stream
    :param starttime: Desired starttime - if None, will be evaluated from data
    :type starttime: obspy.core.UTCDateTime
    :param endtime: DEsired endtime - can be None
    :type endtime: obspy.core.UTCDateTime
    :param daylong: Whether data should be one-day long.
    :type daylong: bool

    :return: obspy.core.Stream, length[float], clip[bool],
    length, clip = None, False

    if daylong:
        length, clip = 86400, True
        # Set the start-time to a day start - cope with
        if starttime is None:
            startdates = []
            for tr in st:
                if abs(tr.stats.starttime - (UTCDateTime(
               + 86400)) <
                    # If the trace starts within 1 sample of the next day,
                    # use the next day as the startdate
                    startdates.append((tr.stats.starttime + 86400).date)
                        f'{} starts within 1 sample of the next day, '
                        f'using this time {(tr.stats.starttime + 86400).date}')
            # Check that all traces start on the same date...
            if not len(set(startdates)) == 1:
                raise NotImplementedError('Traces start on different days')
            starttime = UTCDateTime(startdates[0])
        if starttime is not None and endtime is not None:
            for tr in st:
                    f"Trimming {} between {starttime} and {endtime}")
                tr.trim(starttime, endtime)
                if len( == ((endtime - starttime) *
                                    tr.stats.sampling_rate) + 1:
          "{} is overlength dropping first sample")
                    # TODO: this should adjust the start-time
                    # tr.stats.starttime +=
            length = endtime - starttime
            clip = True
        elif starttime:
            for tr in st:
        elif endtime:
            for tr in st:
    return st, length, clip, starttime

def _get_window(window, npts):
    """ Get window for resampling stabilisation. """
    from scipy.signal import get_window
    return np.fft.ifftshift(get_window(window, npts))

[docs] def multi_process(st, lowcut, highcut, filt_order, samp_rate, parallel=False, num_cores=False, starttime=None, endtime=None, daylong=False, seisan_chan_names=False, fill_gaps=True, ignore_length=False, ignore_bad_data=False): """ Apply standardised processing workflow to data for matched-filtering Steps: #. Check length and continuity of data meets user-defined criteria #. Fill remaining gaps in data with zeros and record gap positions #. Detrend data (using a simple linear detrend to set start and end to 0) #. Pad data to length #. Resample in the frequency domain #. Detrend data (using a simple linear detrend to set start and end to 0) #. Zerophase Butterworth filter #. Re-check length #. Re-apply zero-padding to gap locations recording in step 2 to remove filtering and resampling artefacts :param st: Stream to process :type st: obspy.core.Stream :param lowcut: Lowcut of butterworth filter in Hz. If set to None and highcut is given a highpass filter will be applied. If both lowcut and highcut are given, a bandpass filter will be applied. If lowcut and highcut are both None, no filtering will be applied. :type lowcut: float :param highcut: Highcut of butterworth filter in Hz. If set to None and lowcut is given a lowpass filter will be applied. If both lowcut and highcut are given, a bandpass filter will be applied. If lowcut and highcut are both None, no filtering will be applied. :type highcut: float :param filt_order: Filter order :type filt_order: int :param samp_rate: Desired sample rate of output data in Hz :type samp_rate: float :param parallel: Whether to process data in parallel (uses multi-threading) :type parallel: bool :param num_cores: Maximum number of cores to use for parallel processing :type num_cores: int :param starttime: Desired starttime of data :type starttime: obspy.core.UTCDateTime :param endtime: Desired endtime of data :type endtime: obspy.core.UTCDateTime :param daylong: Whether data should be considered to be one-day long. Setting this will assume that your data should start as close to the start of a day as possible given the sampling. :type daylong: bool :param seisan_chan_names: Whether to convert channel names to two-char seisan channel names :type seisan_chan_names: bool :param fill_gaps: Whether to fill-gaps in the data :type fill_gaps: bool :param ignore_length: Whether to ignore data that are not long enough. :type ignore_length: bool :param ignore_bad_data: Whether to ignore data that are excessively gappy :type ignore_bad_data: bool :return: Processed stream as obspy.core.Stream :Note: Works in place on your data, copy before giving to this function if you want to reuse your input data. """ if isinstance(st, Trace): tracein = True st = Stream(st) else: tracein = False # Add sanity check for filter if highcut and highcut >= 0.5 * samp_rate: raise IOError('Highcut must be lower than the Nyquist') if highcut and lowcut: assert lowcut < highcut, f"Lowcut: {lowcut} above highcut: {highcut}" # Allow datetimes for starttime and endtime if starttime and not isinstance(starttime, UTCDateTime): starttime = UTCDateTime(starttime) if starttime is False: starttime = None if endtime and not isinstance(endtime, UTCDateTime): endtime = UTCDateTime(endtime) if endtime is False: endtime = None # Make sensible choices about workers and chunk sizes if parallel: if not num_cores: # We don't want to over-specify threads, we don't have IO # bound tasks max_workers = min(len(st), os.cpu_count()) else: max_workers = min(len(st), num_cores) else: max_workers = 1 chunksize = len(st) // max_workers st, length, clip, starttime = _sanitize_length( st=st, starttime=starttime, endtime=endtime, daylong=daylong) for tr in st: if len( == 0: st.remove(tr) Logger.warning('No data for {0} after trim'.format( # Do work # 0. Enforce double-preccision floats for this work for tr in st: if not == np.float64: Logger.debug(f"Converting {} to double precision") = # 1. Fill gaps and keep track of them gappy = { False for tr in st} gaps = dict() for i, tr in enumerate(st): if isinstance(, gappy[] = True gaps[], tr = _fill_gaps(tr) st[i] = tr # 2. Check for zeros and cope with bad data # ~ 4x speedup for 50 100 Hz daylong traces on 12 threads qual = _simple_qc(st, max_workers=max_workers, chunksize=chunksize) for trace_id, _qual in qual.items(): if not _qual: msg = ("Data have more zeros than actual data, please check the " f"raw data set-up and manually sort it: {}") if not ignore_bad_data: raise ValueError(msg) else: # Remove bad traces from the stream try: st.remove( except ValueError: f"{trace_id} not found in {set( for tr in st)}," f" ignoring") # 3. Detrend # ~ 2x speedup for 50 100 Hz daylong traces on 12 threads st = _multi_detrend(st, max_workers=max_workers, chunksize=chunksize) # 4. Check length and pad to length padded = { (0., 0.) for tr in st} if clip: st.trim(starttime, starttime + length, nearest_sample=True) # Indexing because we are going to overwrite traces for i, _ in enumerate(st): if float(st[i].stats.npts / st[i].stats.sampling_rate) != length: 'Data for {0} are not long-enough, will zero pad'.format( st[i].id)) st[i], padded[st[i].id] = _length_check( st[i], starttime=starttime, length=length, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data) # Remove None traces that might be returned from length checking st.traces = [tr for tr in st if tr is not None] # Check that we actually still have some data if not _stream_has_data(st): if tracein: return st[0] return st # 5. Resample # ~ 3.25x speedup for 50 100 Hz daylong traces on 12 threads st = _multi_resample( st, sampling_rate=samp_rate, max_workers=max_workers, chunksize=chunksize) # Detrend again before filtering st = _multi_detrend(st, max_workers=max_workers, chunksize=chunksize) # 6. Filter # ~3.25x speedup for 50 100 Hz daylong traces on 12 threads st = _multi_filter( st, highcut=highcut, lowcut=lowcut, filt_order=filt_order, max_workers=max_workers, chunksize=chunksize) # 7. Reapply zeros after processing from 4 for tr in st: # Pads default to (0., 0.), pads should only ever be positive. if sum(padded[]) == 0: continue Logger.debug("Reapplying zero pads post processing") Logger.debug(str(tr)) pre_pad = np.zeros(int(padded[][0] * tr.stats.sampling_rate)) post_pad = np.zeros(int(padded[][1] * tr.stats.sampling_rate)) pre_pad_len = len(pre_pad) post_pad_len = len(post_pad) Logger.debug( f"Taking only valid data between {pre_pad_len} and " f"{tr.stats.npts - post_pad_len} samples") # Re-apply the pads, taking only the data section that was valid = np.concatenate( [pre_pad,[pre_pad_len: len( - post_pad_len], post_pad]) Logger.debug(str(tr)) # 8. Recheck length for tr in st: if float(tr.stats.npts * != length and clip:'Data for {} are not of required length, will ' f'zero pad') # Use obspy's trim function with zero padding tr = tr.trim(starttime, starttime + length, pad=True, fill_value=0, nearest_sample=True) # If there is one sample too many after this remove the last one # by convention if len( == (length * tr.stats.sampling_rate) + 1: =[1:len(] if abs((tr.stats.sampling_rate * length) - tr.stats.npts) > raise ValueError('Data are not required length for ' + tr.stats.station + '.' + # 9. Re-insert gaps from 1 for i, tr in enumerate(st): if gappy[]: st[i] = _zero_pad_gaps(tr, gaps[], fill_gaps=fill_gaps) # 10. Clean up for tr in st: if len( == 0: st.remove(tr) # 11. Account for seisan channel naming if seisan_chan_names: for tr in st: =[0] +[-1] if tracein: st.merge() return st[0] return st
@lru_cache(maxsize=50) def _empty_trace( network, station, location, channel, starttime, sampling_rate ): """ Generate an empty trace with a basic header matching the input trace :param network: Network code :type network: str :param station: Station code :type station: str :param location: Location code :type location: str :param channel: Channel code :type channel: str :param starttime: Start time of trace as datetime (NOT UTCDateTime) :type starttime: datetime.datetime :param sampling_rate: Sampling rate of data :type sampling_rate: float :returns: trace """ bad_trace = Trace( data=np.array([]), header={ "station": station, "channel": channel, "network": network, "location": location, "starttime": starttime, "sampling_rate": sampling_rate}) return bad_trace def _stream_has_data(st): return sum(tr.stats.npts for tr in st) > 0 def _length_check(tr, starttime, length, ignore_length, ignore_bad_data): """ Check that a trace meets the length requirements specified. Data are padded if needed to meet the length requirement. :param tr: Trace to check :type tr: obspy.core.Trace :param starttime: Desired starttime of data :type starttime: obspy.core.UTCDateTime :param length: Length in seconds required for data :type length: float :param ignore_length: Whether to ignore data that do not meet length criteria :type ignore_length: bool :param ignore_bad_data: Whether to ignore data that do not meet gappiness criteria :type ignore_bad_data: bool :return: obspy.core.Trace that meets criteria """ trace_length = tr.stats.endtime - tr.stats.starttime if trace_length < 0.8 * length and not ignore_length: msg = f"Data for {} is {trace_length:.2f} seconds "\ f"long, which is less than 80 percent of the desired "\ f"length ({length} seconds), will not pad" if not ignore_bad_data: raise NotImplementedError(msg) else: Logger.warning(msg) return _empty_trace(, tr.stats.station, tr.stats.location,, tr.stats.starttime.datetime, tr.stats.sampling_rate), (0., 0.) # trim, then calculate length of any pads required pre_pad_secs = tr.stats.starttime - starttime post_pad_secs = (starttime + length) - tr.stats.endtime if pre_pad_secs > 0 or post_pad_secs > 0: pre_pad = np.zeros(int(pre_pad_secs * tr.stats.sampling_rate)) post_pad = np.zeros(int(post_pad_secs * tr.stats.sampling_rate)) Logger.debug(str(tr)) f"Padding to length with {pre_pad_secs} s before " f"and {post_pad_secs} s at end") = np.concatenate([pre_pad,, post_pad]) # Use this rather than the expected pad because of rounding samples tr.stats.starttime -= len(pre_pad) * Logger.debug(str(tr)) # If there is one sample too many after this remove the first one # by convention if tr.stats.npts == (length * tr.stats.sampling_rate) + 1: =[1:len(] # Cope with time precision. if abs((tr.stats.sampling_rate * length) - tr.stats.npts) > msg = (f"Data sampling-rate * length ({tr.stats.sampling_rate} *" f" {length} = {tr.stats.sampling_rate * length}) does not " f"match number of samples ({tr.stats.npts}) for {}") if not ignore_bad_data: raise ValueError(msg) else: Logger.warning(msg) return _empty_trace(, tr.stats.station, tr.stats.location,, tr.stats.starttime.datetime, tr.stats.sampling_rate), (0., 0.) Logger.debug( f'I now have {tr.stats.npts} data points after enforcing length') return tr, (pre_pad_secs, post_pad_secs) def _multi_filter(st, highcut, lowcut, filt_order, max_workers=None, chunksize=1): """ Multithreaded zero-phase butterworth filtering of multi-channel data. :param st: Stream to filter :type st: obspy.core.Stream :param highcut: Highcut for butterworth filter in Hz :type highcut: float :param lowcut: Lowcut for butterworth filter in Hz :type lowcut: float :param filt_order: Filter order :type filt_order: int :param max_workers: Maximum number of threads to use :type max_workers: int :param chunksize: Number of traces to process per thread :type chunksize: int :return: obspy.core.Stream of filtered data """ if not highcut and not lowcut: Logger.warning("No filters applied") return st # Require that all channels are the same sampling frequency samp_rate = set(tr.stats.sampling_rate for tr in st) assert len(samp_rate) == 1, "Different sampling rates found" samp_rate = samp_rate.pop() # Sanity check filter bounds if highcut: assert highcut * 2 < samp_rate, "Highcut must be below Nyquist" if highcut and lowcut: assert lowcut < highcut, "Lowcut must be below highcut" fe = 0.5 * samp_rate if lowcut: low = lowcut / fe if highcut: high = highcut / fe # Design filter if highcut and lowcut: z, p, k = iirfilter( filt_order, [low, high], btype='band', ftype='butter', output='zpk') elif highcut: z, p, k = iirfilter( filt_order, high, btype='lowpass', ftype='butter', output='zpk') elif lowcut: z, p, k = iirfilter( filt_order, low, btype='highpass', ftype='butter', output='zpk') sos = zpk2sos(z, p, k) _filter = partial(_zerophase_filter, sos) with ThreadPoolExecutor(max_workers) as executor: results = _filter, ( for tr in st), chunksize=chunksize) for r, tr in zip(results, st): = r return st def _zerophase_filter(sos, data): """ Simple zerophase implementation of sosfilt. :param sos: Second-order-series of filters :param data: Data to filter :return: filtered data """ if len(data) == 0: Logger.debug("No data, no filtering") return data firstpass = sosfilt(sos, data) return sosfilt(sos, firstpass[::-1])[::-1] def _multi_detrend(st, max_workers=None, chunksize=1): """ Multithreaded detrending using simple linear detrend between first and last values. Follows obspy "simple" detrend. :param st: Stream to detrend :type st: obspy.core.Stream :param max_workers: Maximum number of threads to use :type max_workers: int :param chunksize: Number of traces to process per thread :type chunksize: int :return: obspy.core.Stream of detrended data """ for tr in st: = np.require(, np.float64) with ThreadPoolExecutor(max_workers) as executor: results =, ( for tr in st), chunksize=chunksize) # Ensure tasks complete _ = (r for r in results) return st def _detrend(data): """ Detrend signal simply by subtracting a line through the first and last point of the trace :param data: Data to detrend :type data: np.ndarray. :return: Nothing - works in place """ # Work in double-precision. data = np.require(data, dtype=np.float64) ndat = data.shape[0] x1, x2 = data[0], data[-1] data -= x1 + np.arange(ndat, dtype=np.float64) * ( np.float64(x2 - x1) / np.float64(ndat - 1)) return def _multi_resample(st, sampling_rate, max_workers=None, chunksize=1): """ Threaded resampling of a stream of data to a consistent sampling-rate :param st: Stream to resample :type st: obspy.core.Stream :param sampling_rate: Sampling rate to resample to :type sampling_rate: float :param max_workers: Maximum number of threads to use :type max_workers: int :param chunksize: Number of traces to process per thread :type chunksize: int :return: obspy.core.Stream of resampled data """ # Get the windows, and downsampling factors ahead of time to_resample = ( (,, tr.stats.sampling_rate / float(sampling_rate), sampling_rate, _get_window("hann", tr.stats.npts), for tr in st) with ThreadPoolExecutor(max_workers) as executor: # Unpack tuple using lambda results = args: _resample(*args), to_resample, chunksize=chunksize) for r, tr in zip(results, st): = r tr.stats.sampling_rate = sampling_rate return st def _resample(data, delta, factor, sampling_rate, large_w, _id): """ Resample data in the frequency domain - adapted from obspy resample method :param data: Data to resample :type data: np.ndarray :param delta: Sample interval in seconds :type delta: float :param factor: Factor to resample by :type factor: float :param sampling_rate: Desired sampling-rate :type sampling_rate: float :param large_w: Window to apply to spectra to stabilise resampling :type large_w: np.ndarray :return: np.ndarray of resampled data. """ if factor == 1: # No resampling needed, don't waste time. return data # Need to work with numpy objects to release the GIL npts = data.shape[0] Logger.debug(f"Running resample for {_id} with {npts} data points") Logger.debug(f"{_id}: delta={delta}, factor={factor}, " f"sampling_rate out={sampling_rate}") Logger.debug(f"Sanity check data for {_id}, start and " f"end: {data[0]} -- {data[-1]}") Logger.debug(f"dtype for {_id}: {data.dtype}") if data.dtype == np.dtype('float64'): _floater = np.float64 # Retain double-precision else: _floater = np.float32 # Use single-precision where possible to reduce memory data = data.astype(_floater) df = _floater(1.0) / (npts * delta) num = np.int32(npts / factor) d_large_f = _floater(1.0) / num * sampling_rate # Forward fft x = np.fft.rfft(data) # Window x *= large_w[:npts // 2 + 1] # interpolate f = df * np.arange(0, npts // 2 + 1, dtype=np.int32) n_large_f = num // 2 + 1 large_f = d_large_f * np.arange(0, n_large_f, dtype=np.int32) # Have to split into real and imaginary parts for interpolation. y = np.interp(large_f, f, np.real(x)) + (1j * np.interp( large_f, f, np.imag(x))) # Try to reduce memory before doing the ifft del large_f, f, x return np.fft.irfft(y, n=num)[0:num] * (_floater(num) / _floater(npts)) def _zero_pad_gaps(tr, gaps, fill_gaps=True): """ Replace padded parts of trace with zeros. Will cut around gaps, detrend, then pad the gaps with zeros. :type tr: :class:`` :param tr: A trace that has had the gaps padded :param gaps: List of dict of start-time and end-time as UTCDateTime objects :type gaps: list :param fill_gaps: Whether to fill gaps with zeros, or leave them as gaps :type fill_gaps: bool :return: :class:`` """ start_in, end_in = (tr.stats.starttime, tr.stats.endtime) tr = Stream([tr]) # convert to stream to use cutout method for gap in gaps: Logger.debug( f"Filling gap between {gap['starttime']} and {gap['endtime']}") tr.cutout(gap['starttime'], gap['endtime']).merge() tr = tr.merge()[0] if fill_gaps: tr = tr.split() tr = tr.detrend() tr = tr.merge(fill_value=0)[0] # Need to check length - if a gap happened overlapping the end or start # of the trace this will be lost. if tr.stats.starttime != start_in: # pad with zeros = np.concatenate( [np.zeros(int(tr.stats.starttime - start_in)),]) tr.stats.starttime = start_in if tr.stats.endtime != end_in: = np.concatenate( [, np.zeros(int(end_in - tr.stats.endtime))]) return tr def _fill_gaps(tr): """ Interpolate through gaps and work-out where gaps are. :param tr: Gappy trace (e.g. is :type tr: `` :return: gaps, trace, where gaps is a list of dict """ tr = tr.split() gaps = tr.get_gaps() tr = tr.detrend().merge(fill_value=0)[0] gaps = [{'starttime': gap[4], 'endtime': gap[5]} for gap in gaps] if len(gaps): Logger.debug(f"Gaps in {}: \n\t{gaps}") return gaps, tr def _group_process(filt_order, highcut, lowcut, samp_rate, process_length, parallel, cores, stream, daylong, ignore_length, ignore_bad_data, overlap): """ Process and chunk data. :type parallel: bool :param parallel: Whether to use parallel processing or not :type cores: int :param cores: Number of cores to use, can be False to use all available. :type stream: :class:`` :param stream: Stream to process, will be left intact. :type daylong: bool :param daylong: Whether to enforce day-length files or not. :type ignore_length: bool :param ignore_length: If using daylong=True, then processing will try check that the data are there for at least 80% of the day, if you don't want this check (which will raise an error if too much data are missing) then set ignore_length=True. This is not recommended! :type ignore_bad_data: bool :param ignore_bad_data: If False (default), errors will be raised if data are excessively gappy or are mostly zeros. If True then no error will be raised, but an empty trace will be returned. :type overlap: float :param overlap: Number of seconds to overlap chunks by. :return: list of processed streams. """ processed_streams = [] kwargs = { 'filt_order': filt_order, 'highcut': highcut, 'lowcut': lowcut, 'samp_rate': samp_rate, 'parallel': parallel, 'num_cores': cores, 'ignore_length': ignore_length, 'ignore_bad_data': ignore_bad_data} # Processing always needs to be run to account for gaps - pre-process will # check whether filtering and resampling needs to be done. starttimes = sorted([tr.stats.starttime for tr in stream]) endtimes = sorted([tr.stats.endtime for tr in stream]) if daylong: if process_length != 86400: Logger.warning( f'Processing day-long data, but template was cut from ' f'{process_length} s long data, will reduce correlations') process_length = 86400 # Check that data all start on the same day, otherwise strange # things will happen... startdates = [ for starttime in starttimes] if not len(set(startdates)) == 1: Logger.warning('Data start on different days, setting to last day') starttime = UTCDateTime(startdates[-1]) else: starttime = UTCDateTime(startdates[0]) # Can take any else: # We want to use shortproc to allow overlaps starttime = starttimes[0] endtime = endtimes[-1] data_len_samps = round((endtime - starttime) * samp_rate) + 1 assert overlap < process_length, "Overlap must be less than process length" chunk_len_samps = (process_length - overlap) * samp_rate n_chunks = int(data_len_samps // chunk_len_samps)"Splitting these data in {n_chunks} chunks") if n_chunks == 0: Logger.error('Data must be process_length or longer, not computing') return [] for i in range(n_chunks): kwargs.update( {'starttime': starttime + (i * (process_length - overlap))}) if not daylong: _endtime = kwargs['starttime'] + process_length kwargs.update({'endtime': _endtime}) else: _endtime = kwargs['starttime'] + 86400 # This is where data should be copied and only here! if n_chunks > 1: chunk_stream = _quick_copy_stream( stream.slice(starttime=kwargs['starttime'], endtime=_endtime)) # Reduce memory by removing data that we don't need anymore stream.trim(starttime=_endtime - overlap) else: # If we only have one chunk, lets just use those data! chunk_stream = stream.trim( starttime=kwargs['starttime'], endtime=_endtime)"Processing chunk {i} between {kwargs['starttime']} " f"and {_endtime}") if len(chunk_stream) == 0: Logger.warning( f"No data between {kwargs['starttime']} and {_endtime}") continue # Enforce chunk npts for tr in chunk_stream: f"Enforcing {int(process_length * tr.stats.sampling_rate)} " f"samples for {} (had {tr.stats.npts} points)") =[0:int( process_length * tr.stats.sampling_rate)] _chunk_stream_lengths = { tr.stats.endtime - tr.stats.starttime for tr in chunk_stream} for tr_id, chunk_length in _chunk_stream_lengths.items(): # Remove traces that are too short. if not ignore_length and chunk_length <= .8 * process_length: tr =[0] chunk_stream.remove(tr) Logger.warning( "Data chunk on {0} starting {1} and ending {2} is " "below 80% of the requested length, will not use" " this.".format(, tr.stats.starttime, tr.stats.endtime)) if len(chunk_stream) == 0: continue Logger.debug( f"Processing chunk:\n{chunk_stream.__str__(extended=True)}")"Processing using {kwargs}") _processed_stream = multi_process(st=chunk_stream, **kwargs) # If data have more zeros then pre-processing will return a # trace of 0 length _processed_stream.traces = [ tr for tr in _processed_stream if tr.stats.npts != 0] if len(_processed_stream) == 0: Logger.warning( f"Data quality insufficient between {kwargs['starttime']}" f" and {_endtime}") continue # Pre-processing does additional checks for zeros - we need to check # again whether we actually have something useful from this. processed_chunk_stream_lengths = [ tr.stats.endtime - tr.stats.starttime for tr in _processed_stream] if min(processed_chunk_stream_lengths) >= .8 * process_length: processed_streams.append(_processed_stream) else: Logger.warning( f"Data quality insufficient between {kwargs['starttime']}" f" and {_endtime}") continue if _endtime < stream[0].stats.endtime: Logger.warning( "Last bit of data between {0} and {1} will go unused " "because it is shorter than a chunk of {2} s".format( _endtime, stream[0].stats.endtime, process_length)) return processed_streams def _quick_copy_trace(trace, deepcopy_data=True): """ Function to quickly copy a trace. Sets values in the traces' and trace header's dict directly, circumventing obspy's init functions. Speedup: from 37 us to 12 us per trace - 3x faster :type trace: :class:`obspy.core.trace.Trace` :param trace: Stream to quickly copy :type deepcopy_data: bool :param deepcopy_data: Whether to deepcopy trace data (with `deepcopy_data=False` expect up to 20 % speedup, but use only when you know that data trace contents will not change or affect results). Warning: do not use this option to copy traces with processing history or response information. :rtype: :class:`obspy.core.trace.Trace` return: trace """ new_trace = Trace() for key, value in trace.__dict__.items(): if key == 'stats': new_stats = new_trace.stats for key_2, value_2 in value.__dict__.items(): if isinstance(value_2, UTCDateTime): new_stats.__dict__[key_2] = UTCDateTime( ns=value_2.__dict__['_UTCDateTime__ns']) else: new_stats.__dict__[key_2] = value_2 elif deepcopy_data: # data needs to be deepcopied (and anything else, to be safe) new_trace.__dict__[key] = copy.deepcopy(value) else: # No deepcopy, e.g. for NaN-traces with no effect on results new_trace.__dict__[key] = value return new_trace def _quick_copy_stream(stream, deepcopy_data=True): """ Function to quickly copy a stream. Speedup for simple trace: from 112 us to 44 (35) us per 3-trace stream - 2.8x (3.2x) faster Warning: use `deepcopy_data=False` (saves extra ~20 % time) only when the changing the data in the stream later does not change results (e.g., for NaN-trace or when data array will not be changed). This is what takes longest (1 empty trace, total time to copy 27 us): copy header: 18 us (vs create new empty header: 683 ns) Two points that can speed up copying / creation: 1. circumvent trace.__init__ and trace.__set_attr__ by setting value directly in trace's __dict__ 2. when setting trace header, circumvent that Stats(header) is called when header is already a Stats instance :type stream: :class:`` :param stream: Stream to quickly copy :type deepcopy_data: bool :param deepcopy_data: Whether to deepcopy data (with `deepcopy_data=False` expect up to 20 % speedup, but use only when you know that data trace contents will not change or affect results). :rtype: :class:`` return: stream """ new_traces = list() for trace in stream: new_traces.append( _quick_copy_trace(trace, deepcopy_data=deepcopy_data)) return Stream(new_traces) def _stream_quick_select(stream, seed_id): """ 4x quicker selection of traces in stream by full Seed-ID. Does not support wildcards or selection by network/station/location/channel alone. """ net, sta, loc, chan = seed_id.split('.') stream = Stream( [tr for tr in stream if ( == net and tr.stats.station == sta and tr.stats.location == loc and == chan)]) return stream def _prep_data_for_correlation(stream, templates, template_names=None, force_stream_epoch=True): """ Check that all channels are the same length and that all channels have data for both template and stream. Works in place on data - will cut to shortest length :param stream: Stream to compare data to :param templates: List of streams that will be forced to have the same channels as stream :param template_names: List of strings same length as templates :type force_stream_epoch: bool :param force_stream_epoch: Whether to force all channels in stream to cover the same time period :return: stream, templates, template_names (if template_names given) """ n_templates = len(templates) template_samp_rates = { tr.stats.sampling_rate for template in templates for tr in template} stream_samp_rates = {tr.stats.sampling_rate for tr in stream} samp_rates = template_samp_rates.union(stream_samp_rates) assert len(samp_rates) == 1, "Sampling rates differ" samp_rate = samp_rates.pop() out_stream = Stream() named = True if template_names is None: named = False template_names = range(n_templates) # Work out shapes. stream_start = min([tr.stats.starttime for tr in stream]) stream_end = max([tr.stats.endtime for tr in stream]) if force_stream_epoch: stream_length = int(samp_rate * (stream_end - stream_start)) + 1 else: stream_length = max([tr.stats.npts for tr in stream]) template_length = { tr.stats.npts for template in templates for tr in template} assert len(template_length) == 1, "Template traces not all the same length" template_length = template_length.pop() stream_ids = { for tr in stream} # Need to ensure that a channel can be in the template multiple times. all_template_ids = [ Counter([ for tr in template]) for template in templates] template_ids = { stream_id: max(tid.get(stream_id, 0) for tid in all_template_ids) for stream_id in stream_ids} template_ids = {_id: value for _id, value in template_ids.items() if value} seed_ids = sorted( [key.split('.') + [i] for key, value in template_ids.items() for i in range(value)]) seed_ids = [('.'.join(seed_id[0:-1]), seed_id[-1]) for seed_id in seed_ids]"Prepping for {len(seed_ids)} channels that share seed-ids " f"between templates and stream") Logger.debug(f"Shared seed-ids: {seed_ids}") for channel_number, seed_id in enumerate(template_ids.keys()): stream_data = np.zeros(stream_length, dtype=np.float32) stream_channel = if len(stream_channel) > 1: msg = f"Multiple channels in continuous data for {seed_id}" Logger.error(msg) raise NotImplementedError(msg) stream_channel = stream_channel[0] if stream_channel.stats.npts == stream_length: stream_data = else:'Data for {0} is not as long as needed, ' 'padding'.format( if force_stream_epoch: start_pad = int(samp_rate * ( stream_channel.stats.starttime - stream_start)) end_pad = stream_length - ( start_pad + stream_channel.stats.npts) # In some cases there will be one sample missing when sampling # time-stamps are not set consistently between channels, this # results in start_pad and end_pad being len==0 if start_pad == 0 and end_pad == 0: Logger.debug("Start and end pad are both zero, padding " "at one end") if (stream_channel.stats.starttime - stream_start) > ( stream_end - stream_channel.stats.endtime): start_pad = int( stream_length - stream_channel.stats.npts) else: end_pad = int( stream_length - stream_channel.stats.npts) stream_channel.stats.starttime -= (start_pad / samp_rate) else: start_pad = 0 end_pad = stream_length - stream_channel.stats.npts if end_pad == 0: stream_data[start_pad:] = else: stream_data[start_pad:-end_pad] = header = stream_channel.stats.copy() header.npts = stream_length out_stream += Trace(data=stream_data, header=header) # Initialize nan template for speed. nan_channel = np.full(template_length, np.nan, dtype=np.float32) nan_channel = np.require(nan_channel, requirements=['C_CONTIGUOUS']) nan_template = Stream() for _seed_id in seed_ids: net, sta, loc, chan = _seed_id[0].split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, 'channel': chan, 'starttime': UTCDateTime(ns=0), 'npts': template_length, 'sampling_rate': samp_rate})) # Remove templates with no matching channels filt = np.ones(len(template_names)).astype(bool) for i, template in enumerate(templates): trace_ids = { for tr in template} if len(trace_ids.intersection(stream_ids)) == 0: filt[i] = 0 _out = dict(zip( [_tn for _tn, _filt in zip(template_names, filt) if _filt], [_t for _t, _filt in zip(templates, filt) if _filt])) flt_templates = list(_out.values()) if len(_out) != len(templates): Logger.debug("Some templates not used due to no matching channels") # Ensure that the templates' earliest traces are kept, even if there is no # continuous data for them. If this happens, we need to add a NaN-stream to # the continuous data to avoid inconsistent detection times. n_template_traces = np.array([len(temp) for temp in flt_templates]) n_stream_traces = sum([n+1 for s, n in seed_ids]) # These checks are not necessary if all templates will get NaN-traces, # because the NaN-traces will save the right starttime for the template. nan_stream_ids = list() if any(n_template_traces > n_stream_traces): earliest_templ_trace_ids = set( [template.sort(['starttime'])[0].id for template in flt_templates]) for earliest_templ_trace_id in earliest_templ_trace_ids: if earliest_templ_trace_id not in template_ids: nan_stream_ids.append(earliest_templ_trace_id) net, sta, loc, chan = earliest_templ_trace_id.split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, 'channel': chan, 'starttime': UTCDateTime(ns=0), 'sampling_rate': samp_rate})) stream_nan_data = np.full( stream_length, np.nan, dtype=np.float32) out_stream += Trace(, stream_nan_data), header=Stats({ 'network': net, 'station': sta, 'location': loc, 'channel': chan, 'starttime': stream_start, 'npts': stream_length, 'sampling_rate': samp_rate})) seed_ids.append((earliest_templ_trace_id, 0)) incomplete_templates = { template_name for template_name, template in _out.items() if sorted([ for tr in template]) != [ for tr in nan_template]} # Fill out the templates with nan channels for template_name in incomplete_templates: template = _out[template_name] template_starttime = min(tr.stats.starttime for tr in template) out_template = _quick_copy_stream(nan_template, deepcopy_data=False) # Select traces very quickly: assume that trace order does not change, # make dict of trace-ids and list of indices and use indices to select stream_trace_id_dict = defaultdict(list) for n, tr in enumerate(template.traces): stream_trace_id_dict[].append(n) for channel_number, _seed_id in enumerate(seed_ids): seed_id, channel_index = _seed_id # Select all traces with same seed_id, based on indices for # corresponding traces stored in stream_trace_id_dict # Much quicker than: template_channel = template_channel = Stream([ template.traces[idx] for idx in stream_trace_id_dict[seed_id]]) if len(template_channel) <= channel_index: # out_template[channel_number].data = nan_channel # quicker: out_template.traces[channel_number].__dict__[ 'data'] = np.copy(nan_channel) out_template.traces[channel_number].stats.__dict__[ 'npts'] = template_length out_template.traces[channel_number].stats.__dict__[ 'starttime'] = template_starttime out_template.traces[channel_number].stats.__dict__[ 'endtime'] = UTCDateTime(ns=int( round(template_starttime.ns + (template_length / samp_rate) * 1e9))) else: out_template.traces[channel_number] = template_channel.traces[ channel_index] # If a template-trace matches a NaN-trace in the stream , then set # template-trace to NaN so that this trace does not appear in channel- # list of detections. if len(nan_stream_ids) > 0: for tr in out_template: if in nan_stream_ids: = nan_channel _out.update({template_name: out_template}) out_templates = list(_out.values()) out_template_names = list(_out.keys()) if named: return out_stream, out_templates, out_template_names return out_stream, out_templates def shortproc(st, lowcut, highcut, filt_order, samp_rate, parallel=False, num_cores=False, starttime=None, endtime=None, seisan_chan_names=False, fill_gaps=True, ignore_length=False, ignore_bad_data=False, fft_threads=1): """ Deprecated """ Logger.warning("Shortproc is depreciated after 0.4.4 and will " "be removed in a future version. Use multi_process" " instead") st = multi_process( st=st, lowcut=lowcut, highcut=highcut, filt_order=filt_order, samp_rate=samp_rate, parallel=parallel, num_cores=num_cores, starttime=starttime, endtime=endtime, daylong=False, seisan_chan_names=seisan_chan_names, fill_gaps=fill_gaps, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data) return st def dayproc(st, lowcut, highcut, filt_order, samp_rate, starttime, parallel=True, num_cores=False, ignore_length=False, seisan_chan_names=False, fill_gaps=True, ignore_bad_data=False, fft_threads=1): """ Deprecated """ Logger.warning("dayproc is depreciated after 0.4.4 and will be " "removed in a future version. Use multi_process instead") st = multi_process( st=st, lowcut=lowcut, highcut=highcut, filt_order=filt_order, samp_rate=samp_rate, parallel=parallel, num_cores=num_cores, starttime=starttime, endtime=None, daylong=True, seisan_chan_names=seisan_chan_names, fill_gaps=fill_gaps, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data) return st def process(tr, lowcut, highcut, filt_order, samp_rate, starttime=False, clip=False, length=86400, seisan_chan_names=False, ignore_length=False, fill_gaps=True, ignore_bad_data=False, fft_threads=1): """ Deprecated """ Logger.warning("process is depreciated after 0.4.4 and will be removed " "in a future version. Use multi_process instead") if length == 86400: daylong = True else: daylong = False endtime = None if clip: if not starttime: starttime = tr.stats.starttime elif not isinstance(starttime, UTCDateTime): starttime = UTCDateTime(starttime) endtime = starttime + length st = multi_process( st=tr, lowcut=lowcut, highcut=highcut, filt_order=filt_order, samp_rate=samp_rate, parallel=False, num_cores=1, starttime=starttime, endtime=endtime, daylong=daylong, seisan_chan_names=seisan_chan_names, fill_gaps=fill_gaps, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data) return st if __name__ == "__main__": import doctest doctest.testmod()