Source code for mt_metadata.transfer_functions.tf.survey

# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 21:30:36 2020

:copyright: 
    Jared Peacock (jpeacock@usgs.gov)

:license: MIT

"""
# =============================================================================
# Imports
# =============================================================================
from collections import OrderedDict
from mt_metadata.base.helpers import write_lines
from mt_metadata.base import get_schema, Base
from .standards import SCHEMA_FN_PATHS
from mt_metadata.timeseries.standards import (
    SCHEMA_FN_PATHS as TS_SCHEMA_FN_PATHS,
)
from mt_metadata.timeseries import (
    Person,
    Citation,
    Location,
    TimePeriod,
    Fdsn,
    FundingSource,
)
from mt_metadata.timeseries import Station as TSStation
from mt_metadata.timeseries.filters import (
    PoleZeroFilter,
    CoefficientFilter,
    TimeDelayFilter,
    FIRFilter,
    FrequencyResponseTableFilter,
)
from mt_metadata.transfer_functions.tf import Station


from mt_metadata.utils.list_dict import ListDict

# =============================================================================
attr_dict = get_schema("survey", SCHEMA_FN_PATHS)
attr_dict.add_dict(get_schema("fdsn", TS_SCHEMA_FN_PATHS), "fdsn")
attr_dict.add_dict(
    get_schema("person", TS_SCHEMA_FN_PATHS),
    "acquired_by",
    keys=["author", "comments", "organization"],
)
attr_dict.add_dict(
    get_schema("funding_source", TS_SCHEMA_FN_PATHS),
    "funding_source",
)
attr_dict.add_dict(
    get_schema("citation", TS_SCHEMA_FN_PATHS), "citation_dataset"
)
attr_dict.add_dict(
    get_schema("citation", TS_SCHEMA_FN_PATHS), "citation_journal"
)
attr_dict.add_dict(
    get_schema("location", TS_SCHEMA_FN_PATHS),
    "northwest_corner",
    keys=["latitude", "longitude"],
)
attr_dict.add_dict(
    get_schema("location", TS_SCHEMA_FN_PATHS),
    "southeast_corner",
    keys=["latitude", "longitude"],
)
attr_dict.add_dict(
    get_schema("geographic_location", TS_SCHEMA_FN_PATHS),
    None,
    keys=["country", "state"],
)
attr_dict.add_dict(
    get_schema("person", TS_SCHEMA_FN_PATHS),
    "project_lead",
    keys=["author", "email", "organization"],
)
attr_dict["project_lead.email"]["required"] = True
attr_dict["project_lead.organization"]["required"] = True

attr_dict.add_dict(get_schema("copyright", TS_SCHEMA_FN_PATHS), None)
# =============================================================================
[docs]class Survey(Base): __doc__ = write_lines(attr_dict) def __init__(self, **kwargs): self.acquired_by = Person() self.fdsn = Fdsn() self.citation_dataset = Citation() self.citation_journal = Citation() self.northwest_corner = Location() self.project_lead = Person() self.funding_source = FundingSource() self.southeast_corner = Location() self.time_period = TimePeriod() self.stations = ListDict() self.filters = {} super().__init__(attr_dict=attr_dict, **kwargs) def __add__(self, other): if isinstance(other, Survey): self.stations.extend(other.stations) return self else: msg = f"Can only merge Survey objects, not {type(other)}" self.logger.error(msg) raise TypeError(msg) def __len__(self): return len(self.stations) @property def stations(self): """Return station list""" return self._stations @stations.setter def stations(self, value): """set the station list""" if not isinstance(value, (list, tuple, dict, ListDict, OrderedDict)): msg = ( "input station_list must be an iterable, should be a list or dict " f"not {type(value)}" ) self.logger.error(msg) raise TypeError(msg) fails = [] self._stations = ListDict() if isinstance(value, (dict, ListDict, OrderedDict)): value_list = value.values() elif isinstance(value, (list, tuple)): value_list = value for ii, station in enumerate(value_list): if isinstance(station, Station): self._stations.append(station) elif isinstance(station, (dict, OrderedDict)): s = Station() s.from_dict(station) self._stations.append(s) elif isinstance(station, TSStation): s = Station() s.from_dict(station.to_dict()) s.runs = station.runs self._stations.append(s) else: msg = f"Item {ii} is not type(Station); type={type(station)}" fails.append(msg) self.logger.error(msg) if len(fails) > 0: raise TypeError("\n".join(fails)) @property def station_names(self): """Return names of station in survey""" return self.stations.keys() @property def filters(self): """A dictionary of available filters""" return self._filters @filters.setter def filters(self, value): """ Set the filters dictionary :param value: dictionary of filter objects :type value: dictionary """ filters = ListDict() fails = [] if value is None: return if isinstance(value, list): if len(value) > 0: if isinstance(value[0], (dict, OrderedDict, ListDict)): for ff in value: f_type = ff["type"] if f_type is None: msg = "filter type is None do not know how to read the filter" fails.append(msg) self.logger.error(msg) if f_type.lower() in ["zpk"]: f = PoleZeroFilter() elif f_type.lower() in ["coefficient"]: f = CoefficientFilter() elif f_type.lower() in ["time delay"]: f = TimeDelayFilter() elif f_type.lower() in ["fir"]: f = FIRFilter() elif f_type.lower() in ["frequency response table"]: f = FrequencyResponseTableFilter() else: msg = f"filter type {f_type} not supported." fails.append(msg) self.logger.error(msg) f.from_dict(ff) filters[f.name] = f elif not isinstance(value, (dict, OrderedDict, ListDict)): msg = ( "Filters must be a dictionary with keys = names of filters, " f"not {type(value)}" ) self.logger.error(msg) raise TypeError(msg) else: for k, v in value.items(): if not isinstance( v, ( PoleZeroFilter, CoefficientFilter, TimeDelayFilter, FrequencyResponseTableFilter, FIRFilter, ), ): msg = f"Item {k} is not Filter type; type={type(v)}" fails.append(msg) self.logger.error(msg) else: filters[k.lower()] = v if len(fails) > 0: raise TypeError("\n".join(fails)) self._filters = filters @property def filter_names(self): """return a list of filter names""" return list(self.filters.keys())
[docs] def has_station(self, station_id): """ Has station id :param station_id: station id verbatim :type station_id: string :return: True if exists or False if not :rtype: boolean """ if station_id in self.station_names: return True return False
[docs] def station_index(self, station_id): """ Get station index :param station_id: station id verbatim :type station_id: string :return: index value if station is found :rtype: integer """ if self.has_station(station_id): return self.station_names.index(station_id) return None
[docs] def add_station(self, station_obj, update=True): """ Add a station, if has the same name update that object. :param station_obj: station object to add :type station_obj: `:class:`mt_metadata.timeseries.Station` """ if not isinstance(station_obj, Station): if isinstance(station_obj, TSStation): tf_station_metadata = Station() tf_station_metadata.from_dict(station_obj.to_dict()) station_obj = tf_station_metadata raise TypeError( f"Input must be a mt_metadata.timeseries.Station object not {type(station_obj)}" ) if self.has_station(station_obj.id): self.stations[station_obj.id].update(station_obj) self.logger.warning( f"Station {station_obj.id} already exists, updating metadata" ) else: self.stations.append(station_obj) if update: self.update_time_period() self.update_bounding_box()
[docs] def get_station(self, station_id): """ Get a station from the station id :param station_id: station id verbatim :type station_id: string :return: station object :rtype: :class:`mt_metadata.timeseries.Station` """ if self.has_station(station_id): return self.stations[station_id] else: self.logger.warning(f"Could not find station {station_id}") return None
[docs] def remove_station(self, station_id, update=True): """ remove a station from the survey :param station_id: station id verbatim :type station_id: string """ if self.has_station(station_id): self.stations.remove(station_id) if update: self.update_bounding_box() self.update_time_period() else: self.logger.warning(f"Could not find {station_id} to remove.")
[docs] def update_bounding_box(self): """ Update the bounding box of the survey from the station information """ if self.__len__() > 0: lat = [] lon = [] for station in self.stations: lat.append(station.location.latitude) lon.append(station.location.longitude) self.southeast_corner.latitude = min(lat) self.southeast_corner.longitude = max(lon) self.northwest_corner.latitude = max(lat) self.northwest_corner.longitude = min(lon)
[docs] def update_time_period(self): """ Update the start and end time of the survey based on the stations """ if self.__len__() > 0: start = [] end = [] for station in self.stations: if station.time_period.start != "1980-01-01T00:00:00+00:00": start.append(station.time_period.start) if station.time_period.end != "1980-01-01T00:00:00+00:00": end.append(station.time_period.end) if start: if self.time_period.start == "1980-01-01T00:00:00+00:00": self.time_period.start = min(start) else: if self.time_period.start > min(start): self.time_period.start = min(start) if end: if self.time_period.end == "1980-01-01T00:00:00+00:00": self.time_period.end = max(end) else: if self.time_period.end < max(end): self.time_period.end = max(end)