Source code for eqcorrscan.core.match_filter.helpers.archive_access
"""
Helper functions for access to archives, e.g. Party, Tribe
:copyright:
EQcorrscan developers.
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import contextlib
import os
import shutil
import tempfile
import tarfile
import logging
Logger = logging.getLogger(__name__)
[docs]
@contextlib.contextmanager
def temporary_directory():
""" make a temporary directory, yeild its name, cleanup on exit """
dir_name = tempfile.mkdtemp()
yield dir_name
if os.path.exists(dir_name):
shutil.rmtree(dir_name)
def _par_read(dirname, compressed=True):
"""
Internal write function to read a formatted parameter file.
:type dirname: str
:param dirname: Directory to read the parameter file from.
:type compressed: bool
:param compressed: Whether the directory is compressed or not.
"""
from eqcorrscan.core.match_filter.matched_filter import MatchFilterError
from eqcorrscan.core.match_filter.template import Template
templates = []
if compressed:
arc = tarfile.open(dirname, "r:*")
members = arc.getmembers()
_parfile = [member for member in members
if member.name.split(os.sep)[-1] ==
'template_parameters.csv']
if len(_parfile) == 0:
arc.close()
raise MatchFilterError(
'No template parameter file in archive')
parfile = arc.extractfile(_parfile[0])
else:
parfile = open(dirname + '/' + 'template_parameters.csv', 'r')
for line in parfile:
t_in = Template()
for key_pair in line.rstrip().split(','):
if key_pair.split(':')[0].strip() == 'name':
t_in.__dict__[key_pair.split(':')[0].strip()] = \
key_pair.split(':')[-1].strip()
elif key_pair.split(':')[0].strip() == 'filt_order':
try:
t_in.__dict__[key_pair.split(':')[0].strip()] = \
int(key_pair.split(':')[-1])
except ValueError:
pass
else:
try:
t_in.__dict__[key_pair.split(':')[0].strip()] = \
float(key_pair.split(':')[-1])
except ValueError:
pass
templates.append(t_in)
parfile.close()
if compressed:
arc.close()
return templates
def _resolved(x):
return os.path.realpath(os.path.abspath(x))
def _badpath(path, base):
"""
joinpath will ignore base if path is absolute.
"""
return not _resolved(os.path.join(base, path)).startswith(base)
def _badlink(info, base):
"""
Links are interpreted relative to the directory containing the link
"""
tip = _resolved(os.path.join(base, os.path.dirname(info.name)))
return _badpath(info.linkname, base=tip)
def _safemembers(members):
"""
Check members of a tar archive for safety.
Ensure that they do not contain paths or links outside of where we
need them - this would only happen if the archive wasn't made by
eqcorrscan.
:type members: :class:`tarfile.TarFile`
:param members: an open tarfile.
"""
base = _resolved(".")
for finfo in members:
assert not _badpath(finfo.name, base), \
f"{finfo.name} is blocked (illegal path)"
if finfo.issym() or finfo.islnk():
assert not _badlink(finfo, base), \
f"{finfo.name} is blocked: Link to {finfo.linkname}"
else:
yield finfo
if __name__ == "__main__":
import doctest
doctest.testmod()