Source code for eqcorrscan.core.match_filter.tribe

"""
Functions for network matched-filter detection of seismic data.

Designed to cross-correlate templates generated by template_gen function
with data and output the detections.

:copyright:
    EQcorrscan developers.

:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import copy
import getpass
import glob
import os
import pickle
import shutil
import tarfile
import tempfile
import traceback
import uuid
import logging

from multiprocessing import Process, Queue, cpu_count
from queue import Empty

from obspy import Catalog, Stream, read, read_events
from obspy.core.event import Comment, CreationInfo

from eqcorrscan.core import template_gen
from eqcorrscan.core.match_filter.template import (
    Template, quick_group_templates)
from eqcorrscan.core.match_filter.party import Party
from eqcorrscan.core.match_filter.family import Family
from eqcorrscan.core.match_filter.helpers import (
    _safemembers, _par_read, get_waveform_client,
    _remove_duplicates, _moveout)
from eqcorrscan.core.match_filter.helpers.tribe import _wildcard_fill

from eqcorrscan.utils.pre_processing import (
    _quick_copy_stream, _prep_data_for_correlation)


Logger = logging.getLogger(__name__)


[docs] class Tribe(object): """ Holder for multiple templates. :type templates: List of Template :param templates: The templates within the Tribe. """ _timeout = 1
[docs] def __init__(self, templates=None): self.templates = [] if isinstance(templates, Template): templates = [templates] if templates: self.templates.extend(templates) # Managers for Processes and Queues to be killed on errors self._processes = dict() self._queues = dict() # Assign unique ids self.__unique_ids()
def __repr__(self): """ Print information about the tribe. .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='a')]) >>> print(tribe) Tribe of 1 templates """ return 'Tribe of %i templates' % self.__len__() def __add__(self, other): """ Add two Tribes or a Tribe and a Template together. '+' .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='a')]) >>> tribe_ab = tribe + Tribe(templates=[Template(name='b')]) >>> print(tribe_ab) Tribe of 2 templates >>> tribe_abc = tribe_ab + Template(name='c') >>> print(tribe_abc) Tribe of 3 templates """ return self.copy().__iadd__(other) def __iadd__(self, other): """ Add in place: '+=' .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='a')]) >>> tribe += Tribe(templates=[Template(name='b')]) >>> print(tribe) Tribe of 2 templates >>> tribe += Template(name='c') >>> print(tribe) Tribe of 3 templates """ assert isinstance(other, (Tribe, Template)), \ "Must be either Template or Tribe" self.__unique_ids(other) if isinstance(other, Tribe): self.templates += other.templates elif isinstance(other, Template): self.templates.append(other) return self def __eq__(self, other): """ Test for equality. Rich comparison operator '==' .. rubric:: Example >>> tribe_a = Tribe(templates=[Template(name='a')]) >>> tribe_b = Tribe(templates=[Template(name='b')]) >>> tribe_a == tribe_b False >>> tribe_a == tribe_a True """ if self.sort().templates != other.sort().templates: return False return True def __ne__(self, other): """ Test for inequality. Rich comparison operator '!=' .. rubric:: Example >>> tribe_a = Tribe(templates=[Template(name='a')]) >>> tribe_b = Tribe(templates=[Template(name='b')]) >>> tribe_a != tribe_b True >>> tribe_a != tribe_a False """ return not self.__eq__(other) def __len__(self): """ Number of Templates in Tribe. len(tribe) .. rubric:: Example >>> tribe_a = Tribe(templates=[Template(name='a')]) >>> len(tribe_a) 1 """ return len(self.templates) def __iter__(self): """ Iterator for the Tribe. """ return list(self.templates).__iter__() def __getitem__(self, index): """ Support slicing to get Templates from Tribe. .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='a'), Template(name='b'), ... Template(name='c')]) >>> tribe[1] # doctest: +NORMALIZE_WHITESPACE Template b: 0 channels; lowcut: None Hz; highcut: None Hz; sampling rate None Hz; filter order: None; process length: None s >>> tribe[0:2] Tribe of 2 templates >>> tribe["d"] [] """ if isinstance(index, slice): return self.__class__(templates=self.templates.__getitem__(index)) elif isinstance(index, int): return self.templates.__getitem__(index) else: _index = [i for i, t in enumerate(self.templates) if t.name == index] try: return self.templates.__getitem__(_index[0]) except IndexError: Logger.warning('Template: %s not in tribe' % index) return [] def __unique_ids(self, other=None): """ Check that template names are unique. """ template_names = [t.name for t in self.templates] if other: assert isinstance(other, (Template, Tribe)), ( "Can only test against tribes or templates") if isinstance(other, Template): template_names.append(other.name) else: template_names.extend([t.name for t in other]) unique_names = set(template_names) if len(unique_names) < len(template_names): non_unique_names = [name for name in unique_names if template_names.count(name) > 1] raise NotImplementedError( "Multiple templates found with the same name. Template names " "must be unique. Non-unique templates: " f"{', '.join(non_unique_names)}") return @property def _stream_dir(self): """ Location for temporary streams """ return f".streams_{os.getpid()}"
[docs] def sort(self): """ Sort the tribe, sorts by template name. .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='c'), Template(name='b'), ... Template(name='a')]) >>> tribe.sort() Tribe of 3 templates >>> tribe[0] # doctest: +NORMALIZE_WHITESPACE Template a: 0 channels; lowcut: None Hz; highcut: None Hz; sampling rate None Hz; filter order: None; process length: None s """ self.templates = sorted(self.templates, key=lambda x: x.name) return self
[docs] def select(self, template_name): """ Select a particular template from the tribe. :type template_name: str :param template_name: Template name to look-up :return: Template .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='c'), Template(name='b'), ... Template(name='a')]) >>> tribe.select('b') # doctest: +NORMALIZE_WHITESPACE Template b: 0 channels; lowcut: None Hz; highcut: None Hz; sampling rate None Hz; filter order: None; process length: None s """ return [t for t in self.templates if t.name == template_name][0]
[docs] def remove(self, template): """ Remove a template from the tribe. :type template: :class:`eqcorrscan.core.match_filter.Template` :param template: Template to remove from tribe .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='c'), Template(name='b'), ... Template(name='a')]) >>> tribe.remove(tribe.templates[0]) Tribe of 2 templates """ self.templates = [t for t in self.templates if t != template] return self
[docs] def copy(self): """ Copy the Tribe. .. rubric:: Example >>> tribe_a = Tribe(templates=[Template(name='a')]) >>> tribe_b = tribe_a.copy() >>> tribe_a == tribe_b True """ # We can't copy processes, so we need to just copy the templates other = Tribe(copy.deepcopy(self.templates)) return other
[docs] def write(self, filename, compress=True, catalog_format="QUAKEML"): """ Write the tribe to a file using tar archive formatting. :type filename: str :param filename: Filename to write to, if it exists it will be appended to. :type compress: bool :param compress: Whether to compress the tar archive or not, if False then will just be files in a folder. :type catalog_format: str :param catalog_format: What format to write the detection-catalog with. Only Nordic, SC3ML, QUAKEML are supported. Note that not all information is written for all formats (QUAKEML is the most complete, but is slow for IO). .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='c', st=read())]) >>> tribe.write('test_tribe') Tribe of 1 templates >>> tribe.write( ... "this_wont_work.bob", ... catalog_format="BOB") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): TypeError: BOB is not supported """ from eqcorrscan.core.match_filter import CAT_EXT_MAP if catalog_format not in CAT_EXT_MAP.keys(): raise TypeError("{0} is not supported".format(catalog_format)) dirname, ext = os.path.splitext(filename) if not os.path.isdir(dirname): os.makedirs(dirname) self._par_write(dirname) tribe_cat = Catalog() for t in self.templates: if t.event is not None: # Check that the name in the comment matches the template name for comment in t.event.comments: if not comment.text: comment.text = "eqcorrscan_template_{0}".format(t.name) elif comment.text.startswith("eqcorrscan_template_"): comment.text = "eqcorrscan_template_{0}".format(t.name) tribe_cat.append(t.event) if len(tribe_cat) > 0: tribe_cat.write( os.path.join(dirname, 'tribe_cat.{0}'.format( CAT_EXT_MAP[catalog_format])), format=catalog_format) for template in self.templates: template.st.write( os.path.join(dirname, '{0}.ms'.format(template.name)), format='MSEED') if compress: if not filename.endswith(".tgz"): Logger.info("Appending '.tgz' to filename.") filename += ".tgz" with tarfile.open(filename, "w:gz") as tar: tar.add(dirname, arcname=os.path.basename(dirname)) shutil.rmtree(dirname) return self
def _par_write(self, dirname): """ Internal write function to write a formatted parameter file. :type dirname: str :param dirname: Directory to write the parameter file to. """ filename = dirname + '/' + 'template_parameters.csv' with open(filename, 'w') as parfile: for template in self.templates: for key in template.__dict__.keys(): if key not in ['st', 'event']: parfile.write(key + ': ' + str(template.__dict__[key]) + ', ') parfile.write('\n') return self def _temporary_template_db(self, template_dir: str = None) -> dict: """ Write a temporary template database of pickled templates to disk. :param template_dir: Directory to write to - if None will make a temporary directory. """ # We use template names for filenames - check that these are unique self.__unique_ids() # Make sure that the template directory exists, or make a tempdir if template_dir: if not os.path.isdir(template_dir): os.makedirs(template_dir) else: template_dir = tempfile.mkdtemp() template_files = dict() for template in self.templates: t_file = os.path.join(template_dir, f"{template.name}.pkl") with open(t_file, "wb") as f: pickle.dump(template, f) template_files.update({template.name: t_file}) return template_files
[docs] def read(self, filename): """ Read a tribe of templates from a tar formatted file. :type filename: str :param filename: File to read templates from. .. rubric:: Example >>> tribe = Tribe(templates=[Template(name='c', st=read())]) >>> tribe.write('test_tribe') Tribe of 1 templates >>> tribe_back = Tribe().read('test_tribe.tgz') >>> tribe_back == tribe True >>> # This can also read pickled templates >>> import pickle >>> with open("test_tribe.pkl", "wb") as f: ... pickle.dump(tribe, f) >>> tribe_back = Tribe().read("test_tribe.pkl") >>> tribe_back == tribe True """ if filename.endswith(".pkl"): with open(filename, "rb") as f: self.__iadd__(pickle.load(f)) return self with tarfile.open(filename, "r:*") as arc: temp_dir = tempfile.mkdtemp() arc.extractall(path=temp_dir, members=_safemembers(arc)) tribe_dir = glob.glob(temp_dir + os.sep + '*')[0] self._read_from_folder(dirname=tribe_dir) shutil.rmtree(temp_dir) # Assign unique ids self.__unique_ids() return self
def _read_from_folder(self, dirname): """ Internal folder reader. :type dirname: str :param dirname: Folder to read from. """ templates = _par_read(dirname=dirname, compressed=False) t_files = glob.glob(dirname + os.sep + '*.ms') tribe_cat_file = glob.glob(os.path.join(dirname, "tribe_cat.*")) if len(tribe_cat_file) != 0: tribe_cat = read_events(tribe_cat_file[0]) else: tribe_cat = Catalog() previous_template_names = [t.name for t in self.templates] for template in templates: if template.name in previous_template_names: # Don't read in for templates that we already have. continue for event in tribe_cat: for comment in event.comments: if comment.text == 'eqcorrscan_template_' + template.name: template.event = event t_file = [t for t in t_files if t.split(os.sep)[-1] == template.name + '.ms'] if len(t_file) == 0: Logger.error('No waveform for template: ' + template.name) continue elif len(t_file) > 1: Logger.warning('Multiple waveforms found, using: ' + t_file[0]) template.st = read(t_file[0]) # Remove templates that do not have streams templates = [t for t in templates if t.st is not None] self.templates.extend(templates) return
[docs] def construct(self, method, lowcut, highcut, samp_rate, filt_order, length, prepick, swin="all", process_len=86400, all_horiz=False, delayed=True, plot=False, plotdir=None, min_snr=None, parallel=False, num_cores=False, skip_short_chans=False, save_progress=False, **kwargs): """ Generate a Tribe of Templates. :type method: str :param method: Method of Tribe generation. Possible options are: `from_client`, `from_meta_file`. See below on the additional required arguments for each method. :type lowcut: float :param lowcut: Low cut (Hz), if set to None will not apply a lowcut :type highcut: float :param highcut: High cut (Hz), if set to None will not apply a highcut. :type samp_rate: float :param samp_rate: New sampling rate in Hz. :type filt_order: int :param filt_order: Filter level (number of corners). :type length: float :param length: Length of template waveform in seconds. :type prepick: float :param prepick: Pre-pick time in seconds :type swin: str :param swin: P, S, P_all, S_all or all, defaults to all: see note in :func:`eqcorrscan.core.template_gen.template_gen` :type process_len: int :param process_len: Length of data in seconds to download and process. :type all_horiz: bool :param all_horiz: To use both horizontal channels even if there is only a pick on one of them. Defaults to False. :type delayed: bool :param delayed: If True, each channel will begin relative to it's own pick-time, if set to False, each channel will begin at the same time. :type plot: bool :param plot: Plot templates or not. :type plotdir: str :param plotdir: The path to save plots to. If `plotdir=None` (default) then the figure will be shown on screen. :type min_snr: float :param min_snr: Minimum signal-to-noise ratio for a channel to be included in the template, where signal-to-noise ratio is calculated as the ratio of the maximum amplitude in the template window to the rms amplitude in the whole window given. :type parallel: bool :param parallel: Whether to process data in parallel or not. :type num_cores: int :param num_cores: Number of cores to try and use, if False and parallel=True, will use either all your cores, or as many traces as in the data (whichever is smaller). :type save_progress: bool :param save_progress: Whether to save the resulting template set at every data step or not. Useful for long-running processes. :type skip_short_chans: bool :param skip_short_chans: Whether to ignore channels that have insufficient length data or not. Useful when the quality of data is not known, e.g. when downloading old, possibly triggered data from a datacentre .. note:: *Method specific arguments:* - `from_client` requires: :param str client_id: string passable by obspy to generate Client, or any object with a `get_waveforms` method, including a Client instance. :param `obspy.core.event.Catalog` catalog: Catalog of events to generate template for :param float data_pad: Pad length for data-downloads in seconds - `from_meta_file` requires: :param str meta_file: Path to obspy-readable event file, or an obspy Catalog :param `obspy.core.stream.Stream` st: Stream containing waveform data for template. Note that this should be the same length of stream as you will use for the continuous detection, e.g. if you detect in day-long files, give this a day-long file! :param bool process: Whether to process the data or not, defaults to True. .. Note:: Method: `from_sac` is not supported by Tribe.construct and must use Template.construct. .. Note:: Templates will be named according to their start-time. """ templates, catalog, process_lengths = template_gen.template_gen( method=method, lowcut=lowcut, highcut=highcut, length=length, filt_order=filt_order, samp_rate=samp_rate, prepick=prepick, return_event=True, save_progress=save_progress, swin=swin, process_len=process_len, all_horiz=all_horiz, plotdir=plotdir, delayed=delayed, plot=plot, min_snr=min_snr, parallel=parallel, num_cores=num_cores, skip_short_chans=skip_short_chans, **kwargs) for template, event, process_len in zip(templates, catalog, process_lengths): t = Template() if len(template) == 0: Logger.error('Empty Template') continue t.st = template t.name = template.sort(['starttime'])[0]. \ stats.starttime.strftime('%Y_%m_%dt%H_%M_%S') t.lowcut = lowcut t.highcut = highcut t.filt_order = filt_order t.samp_rate = samp_rate t.process_length = process_len t.prepick = prepick event.comments.append(Comment( text="eqcorrscan_template_" + t.name, creation_info=CreationInfo(agency='eqcorrscan', author=getpass.getuser()))) t.event = event self.templates.append(t) return self
def _template_channel_ids(self, wildcards: bool = False): template_channel_ids = set() for template in self.templates: for tr in template.st: # Cope with missing info and convert to wildcards net, sta, loc, chan = tr.id.split('.') if wildcards: net, sta, loc, chan = _wildcard_fill( net, sta, loc, chan) template_channel_ids.add((net, sta, loc, chan)) return template_channel_ids
[docs] def cluster(self, method, **kwargs): """ Cluster the tribe. Cluster templates within a tribe: returns multiple tribes each of which could be stacked. :type method: str :param method: Method of stacking, see :mod:`eqcorrscan.utils.clustering` :return: List of tribes. """ from eqcorrscan.utils import clustering tribes = [] func = getattr(clustering, method) if method in ['space_cluster', 'space_time_cluster']: cat = Catalog([t.event for t in self.templates]) groups = func(cat, **kwargs) for group in groups: new_tribe = Tribe() for event in group: new_tribe.templates.extend([t for t in self.templates if t.event == event]) tribes.append(new_tribe) return tribes
[docs] def detect(self, stream, threshold, threshold_type, trig_int, plot=False, plotdir=None, daylong=False, parallel_process=True, xcorr_func=None, concurrency=None, cores=None, concurrent_processing=False, ignore_length=False, ignore_bad_data=False, group_size=None, overlap="calculate", full_peaks=False, save_progress=False, process_cores=None, pre_processed=False, check_processing=True, **kwargs): """ Detect using a Tribe of templates within a continuous stream. :type stream: `Queue` or `obspy.core.stream.Stream` :param stream: Queue of streams of continuous data to detect within using the Templates, or just the continuous data itself. :type threshold: float :param threshold: Threshold level, if using `threshold_type='MAD'` then this will be the multiple of the median absolute deviation. :type threshold_type: str :param threshold_type: The type of threshold to be used, can be MAD, absolute or av_chan_corr. See Note on thresholding below. :type trig_int: float :param trig_int: Minimum gap between detections from one template in seconds. If multiple detections occur within trig_int of one-another, the one with the highest cross-correlation sum will be selected. :type plot: bool :param plot: Turn plotting on or off. :type plotdir: str :param plotdir: The path to save plots to. If `plotdir=None` (default) then the figure will be shown on screen. :type daylong: bool :param daylong: Set to True to use the :func:`eqcorrscan.utils.pre_processing.dayproc` routine, which preforms additional checks and is more efficient for day-long data over other methods. :type parallel_process: bool :param parallel_process: :type xcorr_func: str or callable :param xcorr_func: A str of a registered xcorr function or a callable for implementing a custom xcorr function. For more information see: :func:`eqcorrscan.utils.correlate.register_array_xcorr` :type concurrency: str :param concurrency: The type of concurrency to apply to the xcorr function. Options are 'multithread', 'multiprocess', 'concurrent'. For more details see :func:`eqcorrscan.utils.correlate.get_stream_xcorr` :type cores: int :param cores: Number of workers for processing and detection. :type concurrent_processing: bool :param concurrent_processing: Whether to process steps in detection workflow concurrently or not. See https://github.com/eqcorrscan/EQcorrscan/pull/544 for benchmarking. :type ignore_length: bool :param ignore_length: If using daylong=True, then dayproc will try check that the data are there for at least 80% of the day, if you don't want this check (which will raise an error if too much data are missing) then set ignore_length=True. This is not recommended! :type ignore_bad_data: bool :param ignore_bad_data: If False (default), errors will be raised if data are excessively gappy or are mostly zeros. If True then no error will be raised, but an empty trace will be returned (and not used in detection). :type group_size: int :param group_size: Maximum number of templates to run at once, use to reduce memory consumption, if unset will use all templates. :type overlap: float :param overlap: Either None, "calculate" or a float of number of seconds to overlap detection streams by. This is to counter the effects of the delay-and-stack in calculating cross-correlation sums. Setting overlap = "calculate" will work out the appropriate overlap based on the maximum lags within templates. :type full_peaks: bool :param full_peaks: See `eqcorrscan.utils.findpeaks.find_peaks2_short` :type save_progress: bool :param save_progress: Whether to save the resulting party at every data step or not. Useful for long-running processes. :type process_cores: int :param process_cores: Number of processes to use for pre-processing (if different to `cores`). :type pre_processed: bool :param pre_processed: Whether the stream has been pre-processed or not to match the templates. :type check_processing: bool :param check_processing: Whether to check that all templates were processed the same. :return: :class:`eqcorrscan.core.match_filter.Party` of Families of detections. .. Note:: When using the "fftw" correlation backend the length of the fft can be set. See :mod:`eqcorrscan.utils.correlate` for more info. .. Note:: `stream` must not be pre-processed. If your data contain gaps you should *NOT* fill those gaps before using this method. The pre-process functions (called within) will fill the gaps internally prior to processing, process the data, then re-fill the gaps with zeros to ensure correlations are not incorrectly calculated within gaps. If your data have gaps you should pass a merged stream without the `fill_value` argument (e.g.: `stream = stream.merge()`). .. note:: **Data overlap:** Internally this routine shifts and trims the data according to the offsets in the template (e.g. if trace 2 starts 2 seconds after trace 1 in the template then the continuous data will be shifted by 2 seconds to align peak correlations prior to summing). Because of this, detections at the start and end of continuous data streams **may be missed**. The maximum time-period that might be missing detections is the maximum offset in the template. To work around this, if you are conducting matched-filter detections through long-duration continuous data, we suggest using some overlap (a few seconds, on the order of the maximum offset in the templates) in the continuous data. You will then need to post-process the detections (which should be done anyway to remove duplicates). See below note for how `overlap` argument affects data internally if `stream` is longer than the processing length. .. Note:: If `stream` is longer than processing length, this routine will ensure that data overlap between loops, which will lead to no missed detections at data start-stop points (see above note). This will result in end-time not being strictly honoured, so detections may occur after the end-time set. This is because data must be run in the correct process-length. .. note:: **Thresholding:** **MAD** threshold is calculated as the: .. math:: threshold {\\times} (median(abs(cccsum))) where :math:`cccsum` is the cross-correlation sum for a given template. **absolute** threshold is a true absolute threshold based on the cccsum value. **av_chan_corr** is based on the mean values of single-channel cross-correlations assuming all data are present as required for the template, e.g: .. math:: av\_chan\_corr\_thresh=threshold \\times (cccsum / len(template)) where :math:`template` is a single template from the input and the length is the number of channels within this template. """ # Check that template names are unique self.__unique_ids() # We should not need to copy the stream, it is copied in chunks by # _group_process # Argument handling if overlap is None: overlap = 0.0 elif not isinstance(overlap, float) and str(overlap) == "calculate": overlap = max( _moveout(template.st) for template in self.templates) elif not isinstance(overlap, float): raise NotImplementedError( "%s is not a recognised overlap type" % str(overlap)) assert overlap < self.templates[0].process_length, ( f"Overlap {overlap} must be less than process length " f"{self.templates[0].process_length}") # Copy because we need to muck around with them. inner_kwargs = copy.copy(kwargs) plot_format = inner_kwargs.pop("plot_format", "png") export_cccsums = inner_kwargs.pop('export_cccsums', False) peak_cores = inner_kwargs.pop('peak_cores', process_cores) or cpu_count() groups = inner_kwargs.pop("groups", None) if peak_cores == 1: parallel = False else: parallel = True if check_processing: assert len(quick_group_templates(self.templates)) == 1, ( "Inconsistent template processing parameters found, this is no" " longer supported. \nSplit your tribe using " "eqcorrscan.core.match_filter.template.quick_group_templates " "and re-run for each grouped tribe") sampling_rate = self.templates[0].samp_rate # Used for sanity checking seed id overlap template_ids = set( tr.id for template in self.templates for tr in template.st) args = (stream, template_ids, pre_processed, parallel_process, process_cores, daylong, ignore_length, overlap, ignore_bad_data, group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,) if concurrent_processing: party = self._detect_concurrent(*args, **inner_kwargs) else: party = self._detect_serial(*args, **inner_kwargs) Logger.info("Ensuring all templates are in party") additional_families = [] for template in self.templates: if template.name in party._template_dict.keys(): continue additional_families.append( Family(template=template, detections=[])) party.families.extend(additional_families) # Post-process if len(party) > 0: Logger.info("Removing duplicates") party = _remove_duplicates(party) return party
def _detect_serial( self, stream, template_ids, pre_processed, parallel_process, process_cores, daylong, ignore_length, overlap, ignore_bad_data, group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format, **kwargs ): """ Internal serial detect workflow. """ from eqcorrscan.core.match_filter.helpers.tribe import ( _pre_process, _group, _corr_and_peaks, _detect, _make_party) party = Party() assert isinstance(stream, Stream), ( f"Serial detection requires stream to be a stream, not" f" a {type(stream)}") # We need to copy data here to keep the users input safe. st_chunks = _pre_process( st=stream.copy(), template_ids=template_ids, pre_processed=pre_processed, filt_order=self.templates[0].filt_order, highcut=self.templates[0].highcut, lowcut=self.templates[0].lowcut, samp_rate=self.templates[0].samp_rate, process_length=self.templates[0].process_length, parallel=parallel_process, cores=process_cores, daylong=daylong, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data, overlap=overlap, **kwargs) chunk_files = [] for st_chunk in st_chunks: starttime = st_chunk[0].stats.starttime delta = st_chunk[0].stats.delta template_groups = _group( sids={tr.id for tr in st_chunk}, templates=self.templates, group_size=group_size, groups=groups) for i, template_group in enumerate(template_groups): templates = [_quick_copy_stream(t.st) for t in template_group] template_names = [t.name for t in template_group] Logger.info( f"Prepping {len(templates)} templates for correlation") # We need to copy the stream here. _st, templates, template_names = _prep_data_for_correlation( stream=_quick_copy_stream(st_chunk), templates=templates, template_names=template_names) all_peaks, thresholds, no_chans, chans = _corr_and_peaks( templates=templates, template_names=template_names, stream=_st, xcorr_func=xcorr_func, concurrency=concurrency, cores=cores, i=i, export_cccsums=export_cccsums, parallel=parallel, peak_cores=peak_cores, threshold=threshold, threshold_type=threshold_type, trig_int=trig_int, sampling_rate=sampling_rate, full_peaks=full_peaks, plot=plot, plotdir=plotdir, plot_format=plot_format, prepped=False, **kwargs) detections = _detect( template_names=template_names, all_peaks=all_peaks, starttime=starttime, delta=delta, no_chans=no_chans, chans=chans, thresholds=thresholds) chunk_file = _make_party( detections=detections, threshold=threshold, threshold_type=threshold_type, templates=self.templates, chunk_start=starttime, chunk_id=i, save_progress=save_progress) chunk_files.append(chunk_file) # Rebuild for _chunk_file in chunk_files: Logger.info(f"Adding party from {_chunk_file} to party") with open(_chunk_file, "rb") as _f: party += pickle.load(_f) if not save_progress: try: os.remove(_chunk_file) except FileNotFoundError: pass Logger.info(f"Added party from {_chunk_file}, party now " f"contains {len(party)} detections") if os.path.isdir(self._stream_dir): shutil.rmtree(self._stream_dir) return party def _detect_concurrent( self, stream, template_ids, pre_processed, parallel_process, process_cores, daylong, ignore_length, overlap, ignore_bad_data, group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format, **kwargs ): """ Internal concurrent detect workflow. """ from eqcorrscan.core.match_filter.helpers.processes import ( _pre_processor, _prepper, _make_detections, _check_for_poison, _get_and_check, Poison) from eqcorrscan.core.match_filter.helpers.tribe import _corr_and_peaks if isinstance(stream, Stream): Logger.info("Copying stream to keep your original data safe") st_queue = Queue(maxsize=2) st_queue.put(stream.copy()) # Close off queue st_queue.put(None) stream = st_queue else: # Note that if a queue has been passed we do not try to keep # data safe Logger.warning("Streams in queue will be edited in-place, you " "should not re-use them") # To reduce load copying templates between processes we dump them to # disk and pass the dictionary of files template_dir = f".template_db_{uuid.uuid4()}" template_db = self._temporary_template_db(template_dir) # Set up processes and queues poison_queue = kwargs.get('poison_queue', Queue()) if not pre_processed: processed_stream_queue = Queue(maxsize=1) else: processed_stream_queue = stream # Prepped queue contains templates and stream (and extras) prepped_queue = Queue(maxsize=1) # Output queues peaks_queue = Queue() party_file_queue = Queue() # Set up processes if not pre_processed: pre_processor_process = Process( target=_pre_processor, kwargs=dict( input_stream_queue=stream, temp_stream_dir=self._stream_dir, template_ids=template_ids, pre_processed=pre_processed, filt_order=self.templates[0].filt_order, highcut=self.templates[0].highcut, lowcut=self.templates[0].lowcut, samp_rate=self.templates[0].samp_rate, process_length=self.templates[0].process_length, parallel=parallel_process, cores=process_cores, daylong=daylong, ignore_length=ignore_length, overlap=overlap, ignore_bad_data=ignore_bad_data, output_filename_queue=processed_stream_queue, poison_queue=poison_queue, ), name="ProcessProcess" ) prepper_process = Process( target=_prepper, kwargs=dict( input_stream_filename_queue=processed_stream_queue, group_size=group_size, groups=groups, templates=template_db, output_queue=prepped_queue, poison_queue=poison_queue, xcorr_func=xcorr_func, ), name="PrepProcess" ) detector_process = Process( target=_make_detections, kwargs=dict( input_queue=peaks_queue, delta=1 / sampling_rate, templates=template_db, threshold=threshold, threshold_type=threshold_type, save_progress=save_progress, output_queue=party_file_queue, poison_queue=poison_queue, ), name="DetectProcess" ) # Cope with old tribes if not hasattr(self, '_processes'): self._processes = dict() if not hasattr(self, '_queues'): self._queues = dict() # Put these processes into the namespace self._processes.update({ "prepper": prepper_process, "detector": detector_process, }) self._queues.update({ "poison": poison_queue, "stream": stream, "prepped": prepped_queue, "peaks": peaks_queue, "party_file": party_file_queue, }) if not pre_processed: Logger.info("Starting preprocessor") self._queues.update({"processed_stream": processed_stream_queue}) self._processes.update({"pre-processor": pre_processor_process}) pre_processor_process.start() # Start your engines! prepper_process.start() detector_process.start() # Loop over input streams and template groups while True: killed = _check_for_poison(poison_queue) if killed: Logger.info("Killed in main loop") break try: to_corr = _get_and_check(prepped_queue, poison_queue) if to_corr is None: Logger.info("Ran out of streams, exiting correlation") break elif isinstance(to_corr, Poison): Logger.info("Killed in main loop") break starttime, i, stream, template_names, templates, \ *extras = to_corr inner_kwargs = copy.copy(kwargs) # We will mangle them # Correlation specific handling to reduce single-threaded time if xcorr_func == "fmf": weights, pads, chans, no_chans = extras inner_kwargs.update({ 'weights': weights, 'pads': pads, "no_chans": no_chans, "chans": chans, "prepped": True}) elif xcorr_func in (None, 'fftw'): pads, seed_ids = extras inner_kwargs.update({ "pads": pads, "seed_ids": seed_ids, "prepped": True}) Logger.info(f"Got stream of {len(stream)} channels") Logger.info(f"Starting correlation from {starttime}") all_peaks, thresholds, no_chans, chans = _corr_and_peaks( templates=templates, template_names=template_names, stream=stream, xcorr_func=xcorr_func, concurrency=concurrency, cores=cores, i=i, export_cccsums=export_cccsums, parallel=parallel, peak_cores=peak_cores, threshold=threshold, threshold_type=threshold_type, trig_int=trig_int, sampling_rate=sampling_rate, full_peaks=full_peaks, plot=plot, plotdir=plotdir, plot_format=plot_format, **inner_kwargs ) peaks_queue.put( (starttime, all_peaks, thresholds, no_chans, chans, template_names)) except Exception as e: Logger.error( f"Caught exception in correlator:\n {e}") traceback.print_tb(e.__traceback__) poison_queue.put(e) break # We need to break in Main i += 1 Logger.debug("Putting None into peaks queue.") peaks_queue.put(None) # Get the party back Logger.info("Collecting party") party = Party() while True: killed = _check_for_poison(poison_queue) if killed: Logger.error("Killed") break pf = _get_and_check(party_file_queue, poison_queue) if pf is None: # Fin - Queue has been finished with. break if isinstance(pf, Poison): Logger.error("Killed while checking for party") killed = True break with open(pf, "rb") as f: party += pickle.load(f) if not save_progress: try: os.remove(pf) except FileNotFoundError: pass # Check for exceptions if killed: internal_error = poison_queue.get() if isinstance(internal_error, Poison): internal_error = internal_error.value Logger.error(f"Raising error {internal_error} in main process") # Now we can raise the error if internal_error: # Clean the template db if os.path.isdir(template_dir): shutil.rmtree(template_dir) if os.path.isdir(self._stream_dir): shutil.rmtree(self._stream_dir) self._on_error(internal_error) # Shut down the processes and close the queues shutdown = kwargs.get("shutdown", True) # Allow client_detect to take control if shutdown: Logger.info("Shutting down") self._close_queues() self._close_processes() if os.path.isdir(template_dir): shutil.rmtree(template_dir) if os.path.isdir(self._stream_dir): shutil.rmtree(self._stream_dir) return party
[docs] def client_detect(self, client, starttime, endtime, threshold, threshold_type, trig_int, plot=False, plotdir=None, min_gap=None, daylong=False, parallel_process=True, xcorr_func=None, concurrency=None, cores=None, concurrent_processing=False, ignore_length=False, ignore_bad_data=False, group_size=None, return_stream=False, full_peaks=False, save_progress=False, process_cores=None, retries=3, check_processing=True, **kwargs): """ Detect using a Tribe of templates within a continuous stream. :type client: `obspy.clients.*.Client` :param client: Any obspy client (or client-like object) with a dataselect service. :type starttime: :class:`obspy.core.UTCDateTime` :param starttime: Start-time for detections. :type endtime: :class:`obspy.core.UTCDateTime` :param endtime: End-time for detections :type threshold: float :param threshold: Threshold level, if using `threshold_type='MAD'` then this will be the multiple of the median absolute deviation. :type threshold_type: str :param threshold_type: The type of threshold to be used, can be MAD, absolute or av_chan_corr. See Note on thresholding below. :type trig_int: float :param trig_int: Minimum gap between detections from one template in seconds. If multiple detections occur within trig_int of one-another, the one with the highest cross-correlation sum will be selected. :type plot: bool :param plot: Turn plotting on or off. :type plotdir: str :param plotdir: The path to save plots to. If `plotdir=None` (default) then the figure will be shown on screen. :type min_gap: float :param min_gap: Minimum gap allowed in data - use to remove traces with known issues :type daylong: bool :param daylong: Set to True to use the :func:`eqcorrscan.utils.pre_processing.dayproc` routine, which preforms additional checks and is more efficient for day-long data over other methods. :type parallel_process: bool :param parallel_process: :type xcorr_func: str or callable :param xcorr_func: A str of a registered xcorr function or a callable for implementing a custom xcorr function. For more information see: :func:`eqcorrscan.utils.correlate.register_array_xcorr` :type concurrency: str :param concurrency: The type of concurrency to apply to the xcorr function. Options are 'multithread', 'multiprocess', 'concurrent'. For more details see :func:`eqcorrscan.utils.correlate.get_stream_xcorr` :type cores: int :param cores: Number of workers for processing and detection. :type concurrent_processing: bool :param concurrent_processing: Whether to process steps in detection workflow concurrently or not. See https://github.com/eqcorrscan/EQcorrscan/pull/544 for benchmarking. :type ignore_length: bool :param ignore_length: If using daylong=True, then dayproc will try check that the data are there for at least 80% of the day, if you don't want this check (which will raise an error if too much data are missing) then set ignore_length=True. This is not recommended! :type ignore_bad_data: bool :param ignore_bad_data: If False (default), errors will be raised if data are excessively gappy or are mostly zeros. If True then no error will be raised, but an empty trace will be returned (and not used in detection). :type group_size: int :param group_size: Maximum number of templates to run at once, use to reduce memory consumption, if unset will use all templates. :type full_peaks: bool :param full_peaks: See `eqcorrscan.utils.findpeaks.find_peaks2_short` :type save_progress: bool :param save_progress: Whether to save the resulting party at every data step or not. Useful for long-running processes. :type process_cores: int :param process_cores: Number of processes to use for pre-processing (if different to `cores`). :type return_stream: bool :param return_stream: Whether to also output the stream downloaded, useful if you plan to use the stream for something else, e.g. lag_calc. :type retries: int :param retries: Number of attempts allowed for downloading - allows for transient server issues. :return: :class:`eqcorrscan.core.match_filter.Party` of Families of detections. .. Note:: When using the "fftw" correlation backend the length of the fft can be set. See :mod:`eqcorrscan.utils.correlate` for more info. .. Note:: Ensures that data overlap between loops, which will lead to no missed detections at data start-stop points (see note for :meth:`eqcorrscan.core.match_filter.Tribe.detect` method). This will result in end-time not being strictly honoured, so detections may occur after the end-time set. This is because data must be run in the correct process-length. .. warning:: Plotting within the match-filter routine uses the Agg backend with interactive plotting turned off. This is because the function is designed to work in bulk. If you wish to turn interactive plotting on you must import matplotlib in your script first, when you then import match_filter you will get the warning that this call to matplotlib has no effect, which will mean that match_filter has not changed the plotting behaviour. .. note:: **Thresholding:** **MAD** threshold is calculated as the: .. math:: threshold {\\times} (median(abs(cccsum))) where :math:`cccsum` is the cross-correlation sum for a given template. **absolute** threshold is a true absolute threshold based on the cccsum value. **av_chan_corr** is based on the mean values of single-channel cross-correlations assuming all data are present as required for the template, e.g: .. math:: av\_chan\_corr\_thresh=threshold \\times (cccsum / len(template)) where :math:`template` is a single template from the input and the length is the number of channels within this template. """ from eqcorrscan.core.match_filter.helpers.tribe import _download_st from eqcorrscan.core.match_filter.helpers.processes import ( _get_detection_stream) # This uses get_waveforms_bulk to get data - not all client types have # this, so we check and monkey patch here. if not hasattr(client, "get_waveforms_bulk"): assert hasattr(client, "get_waveforms"), ( f"client {client} must have at least a get_waveforms method") Logger.info(f"Client {client} does not have a get_waveforms_bulk " "method, monkey-patching this") client = get_waveform_client(client) if check_processing: assert len(quick_group_templates(self.templates)) == 1, ( "Inconsistent template processing parameters found, this is no" " longer supported. Split your tribe using " "eqcorrscan.core.match_filter.template.quick_group_templates " "and re-run for each group") groups = kwargs.get("groups", None) # Hard-coded buffer for downloading data, often data downloaded is # not the correct length buff = 300 data_length = max([t.process_length for t in self.templates]) # Calculate overlap overlap = max(_moveout(template.st) for template in self.templates) assert overlap < data_length, ( f"Overlap of {overlap} s is larger than the length of data to " f"be downloaded: {data_length} s - this won't work.") # Work out start and end times of chunks to download chunk_start, time_chunks = starttime, [] while chunk_start < endtime: time_chunks.append((chunk_start, chunk_start + data_length + 20)) chunk_start += data_length - overlap full_stream_dir = None if return_stream: full_stream_dir = tempfile.mkdtemp() poison_queue = Queue() detector_kwargs = dict( threshold=threshold, threshold_type=threshold_type, trig_int=trig_int, plot=plot, plotdir=plotdir, daylong=daylong, parallel_process=parallel_process, xcorr_func=xcorr_func, concurrency=concurrency, cores=cores, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data, group_size=group_size, overlap=None, full_peaks=full_peaks, process_cores=process_cores, save_progress=save_progress, return_stream=return_stream, check_processing=False, poison_queue=poison_queue, shutdown=False, concurrent_processing=concurrent_processing, groups=groups) if not concurrent_processing: Logger.warning("Using concurrent_processing=True can be faster if" "downloading your data takes a long time. See " "https://github.com/eqcorrscan/EQcorrscan/pull/544" "for benchmarks.") party = Party() if return_stream: full_st = Stream() for _starttime, _endtime in time_chunks: st = _download_st( starttime=_starttime, endtime=_endtime, buff=buff, min_gap=min_gap, template_channel_ids=self._template_channel_ids(), client=client, retries=retries) if len(st) == 0: Logger.warning(f"No suitable data between {_starttime} " f"and {_endtime}, skipping") continue party += self.detect(stream=st, pre_processed=False, **detector_kwargs) if return_stream: full_st += st if return_stream: return party, full_st return party # Get data in advance time_queue = Queue() stream_queue = Queue(maxsize=1) downloader = Process( target=_get_detection_stream, kwargs=dict( input_time_queue=time_queue, client=client, retries=retries, min_gap=min_gap, buff=buff, output_filename_queue=stream_queue, poison_queue=poison_queue, temp_stream_dir=self._stream_dir, full_stream_dir=full_stream_dir, pre_process=True, parallel_process=parallel_process, process_cores=process_cores, daylong=daylong, overlap=0.0, ignore_length=ignore_length, ignore_bad_data=ignore_bad_data, filt_order=self.templates[0].filt_order, highcut=self.templates[0].highcut, lowcut=self.templates[0].lowcut, samp_rate=self.templates[0].samp_rate, process_length=self.templates[0].process_length, template_channel_ids=self._template_channel_ids(), ), name="DownloadProcess" ) # Cope with old tribes if not hasattr(self, '_processes'): self._processes = dict() if not hasattr(self, '_queues'): self._queues = dict() # Put processes and queues into shared state self._processes.update({ "downloader": downloader, }) self._queues.update({ "times": time_queue, "poison": poison_queue, "stream": stream_queue, }) # Fill time queue for time_chunk in time_chunks: time_queue.put(time_chunk) # Close off queue time_queue.put(None) # Start up processes downloader.start() party = self.detect( stream=stream_queue, pre_processed=True, **detector_kwargs) # Close and join processes self._close_processes() self._close_queues() if return_stream: full_st = read(os.path.join(full_stream_dir, "*")) shutil.rmtree(full_stream_dir) return party, full_st return party
def _close_processes( self, terminate: bool = False, processes: dict = None ): processes = processes or self._processes for p_name, p in processes.items(): if terminate: Logger.warning(f"Terminating {p_name}") p.terminate() continue try: Logger.info(f"Joining {p_name}") p.join(timeout=self._timeout) except Exception as e: Logger.error(f"Failed to join due to {e}: terminating") p.terminate() Logger.info(f"Closing {p_name}") try: p.close() except Exception as e: Logger.error( f"Failed to close {p_name} due to {e}, terminating") p.terminate() Logger.info("Finished closing processes") return def _close_queues(self, queues: dict = None): queues = queues or self._queues for q_name, q in queues.items(): if q._closed: continue Logger.info(f"Emptying {q_name}") while True: try: q.get_nowait() except Empty: break Logger.info(f"Closing {q_name}") q.close() Logger.info("Finished closing queues") return def _on_error(self, error): """ Gracefully close all child processes and queues and raise error """ self._close_processes(terminate=True) self._close_queues() Logger.info("Admin complete, raising error") raise error
[docs] def read_tribe(fname): """ Read a Tribe of templates from a tar archive. :param fname: Filename to read from :return: :class:`eqcorrscan.core.match_filter.Tribe` """ tribe = Tribe() tribe.read(filename=fname) return tribe
if __name__ == "__main__": import doctest doctest.testmod() # List files to be removed after doctest cleanup = ['test_tribe.tgz', "test_tribe.pkl"] for f in cleanup: if os.path.isfile(f): os.remove(f) elif os.path.isdir(f): shutil.rmtree(f)