Source code for mt_metadata.transfer_functions.tf.station

# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 21:30:36 2020

:copyright: 
    Jared Peacock (jpeacock@usgs.gov)

:license: MIT

"""
# =============================================================================
# Imports
# =============================================================================
import numpy as np
from collections import OrderedDict
from mt_metadata.base.helpers import write_lines
from mt_metadata.base import get_schema, Base
from .standards import SCHEMA_FN_PATHS
from mt_metadata.timeseries.standards import (
    SCHEMA_FN_PATHS as TS_SCHEMA_FN_PATHS,
)
from mt_metadata.utils.validators import validate_value_type
from . import (
    Fdsn,
    Orientation,
    Person,
    Provenance,
    Location,
    TimePeriod,
    Run,
    TransferFunction,
)
from mt_metadata.utils.list_dict import ListDict

# =============================================================================
attr_dict = get_schema("station", SCHEMA_FN_PATHS)
attr_dict.add_dict(get_schema("fdsn", TS_SCHEMA_FN_PATHS), "fdsn")
location_dict = get_schema("location", TS_SCHEMA_FN_PATHS)
location_dict.add_dict(
    get_schema("declination", TS_SCHEMA_FN_PATHS), "declination"
)
location_dict.add_dict(
    get_schema("geographic_location", TS_SCHEMA_FN_PATHS),
    None,
)
attr_dict.add_dict(location_dict, "location")
attr_dict.add_dict(
    get_schema("person", TS_SCHEMA_FN_PATHS),
    "acquired_by",
    keys=["author", "comments"],
)
attr_dict.add_dict(get_schema("orientation", TS_SCHEMA_FN_PATHS), "orientation")
attr_dict.add_dict(
    Provenance()._attr_dict,
    "provenance",
)

attr_dict.add_dict(get_schema("time_period", TS_SCHEMA_FN_PATHS), "time_period")
attr_dict.add_dict(TransferFunction()._attr_dict, "transfer_function")
attr_dict.add_dict(get_schema("copyright", TS_SCHEMA_FN_PATHS), None)
attr_dict["release_license"]["required"] = False
attr_dict.add_dict(
    get_schema("citation", TS_SCHEMA_FN_PATHS), None, keys=["doi"]
)
attr_dict["doi"]["required"] = False
# =============================================================================
[docs]class Station(Base): __doc__ = write_lines(attr_dict) def __init__(self, **kwargs): self.fdsn = Fdsn() self._channels_recorded = [] self.orientation = Orientation() self.acquired_by = Person() self.provenance = Provenance() self.location = Location() self.time_period = TimePeriod() self.transfer_function = TransferFunction() self.runs = ListDict() super().__init__(attr_dict=attr_dict, **kwargs) def __add__(self, other): if isinstance(other, Station): self.runs.extend(other.runs) return self else: msg = f"Can only merge Station objects, not {type(other)}" self.logger.error(msg) raise TypeError(msg) def __len__(self): return len(self.runs) @property def channels_recorded(self): """ A list of all channels recorded :return: list of all unique channels recorded for the station :rtype: list """ ch_list = [] for run in self.runs: ch_list += run.channels_recorded_all ch_list = sorted(set([cc for cc in ch_list if cc is not None])) if self._channels_recorded == []: return ch_list elif ch_list == []: return self._channels_recorded elif len(self._channels_recorded) != ch_list: return ch_list @channels_recorded.setter def channels_recorded(self, value): """ set channels_recorded """ if isinstance(value, np.ndarray): value = value.tolist() if value in [None, "None", "none", "NONE", "null"]: return elif isinstance(value, (list, tuple)): self._channels_recorded = value elif isinstance(value, (str)): value = value.split(",") else: raise TypeError( "'channels_recorded' must be set with a list not " f"{type(value)}." )
[docs] def has_run(self, run_id): """ Check to see if the run id already exists :param run_id: run id verbatim :type run_id: string :return: Tru if exists, False if not :rtype: boolean """ if run_id in self.run_list: return True return False
[docs] def run_index(self, run_id): """ Get the index of the run_id :param run_id: run id verbatim :type run_id: string :return: index of the run :rtype: integer """ if self.has_run(run_id): return self.run_list.index(run_id) return None
[docs] def add_run(self, run_obj, update=True): """ Add a run, if one of the same name exists overwrite it. :param run_obj: run object to add :type run_obj: :class:`mt_metadata.timeseries.Run` """ if not isinstance(run_obj, Run): raise TypeError( f"Input must be a mt_metadata.timeseries.Run object not {type(run_obj)}" ) if self.has_run(run_obj.id): self.runs[run_obj.id].update(run_obj) self.logger.debug( f"Station {run_obj.id} already exists, updating metadata" ) else: self.runs.append(run_obj) if update: self.update_time_period()
[docs] def get_run(self, run_id): """ Get a :class:`mt_metadata.timeseries.Run` object from the given id :param run_id: run id verbatim :type run_id: string """ if self.has_run(run_id): return self.runs[run_id] self.logger.warning(f"Could not find {run_id} in runs.") return None
[docs] def remove_run(self, run_id, update=True): """ remove a run from the survey :param run_id: run id verbatim :type run_id: string """ if self.has_run(run_id): self.runs.remove(run_id) if update: self.update_time_period() else: self.logger.warning(f"Could not find {run_id} to remove.")
@property def runs(self): """Return run list""" return self._runs @runs.setter def runs(self, value): """set the run list""" if not isinstance(value, (list, tuple, dict, ListDict, OrderedDict)): msg = ( "input run_list must be an iterable, should be a list or dict " f"not {type(value)}" ) self.logger.error(msg) raise TypeError(msg) fails = [] self._runs = ListDict() if isinstance(value, (dict, ListDict, OrderedDict)): value_list = value.values() elif isinstance(value, (list, tuple)): value_list = value for ii, run in enumerate(value_list): if isinstance(run, (dict, OrderedDict)): r = Run() r.from_dict(run) self._runs.append(r) elif not isinstance(run, Run): msg = f"Item {ii} is not type(Run); type={type(run)}" fails.append(msg) self.logger.error(msg) else: self._runs.append(run) if len(fails) > 0: raise TypeError("\n".join(fails)) self.update_time_period() @property def run_list(self): """Return names of run in survey""" return self.runs.keys() @run_list.setter def run_list(self, value): """Set list of run names""" if not hasattr(value, "__iter__"): msg = ( "input station_list must be an iterable, should be a list " f"not {type(value)}" ) self.logger.error(msg) raise TypeError(msg) value = validate_value_type(value, str, "name_list") for run in value: if not isinstance(run, str): try: run = str(run) except (ValueError, TypeError): msg = f"could not convert {run} to string" self.logger.error(msg) raise ValueError(msg) run = run.replace("'", "").replace('"', "") if run not in self.runs.keys(): self.add_run(Run(id=run))
[docs] def update_time_period(self): """ update time period from run information """ if self.__len__() > 0: start = [] end = [] for run in self.runs: if run.time_period.start != "1980-01-01T00:00:00+00:00": start.append(run.time_period.start) if run.time_period.end != "1980-01-01T00:00:00+00:00": end.append(run.time_period.end) if start: if self.time_period.start == "1980-01-01T00:00:00+00:00": self.time_period.start = min(start) else: if self.time_period.start > min(start): self.time_period.start = min(start) if end: if self.time_period.end == "1980-01-01T00:00:00+00:00": self.time_period.end = max(end) else: if self.time_period.end < max(end): self.time_period.end = max(end)