# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 20:41:16 2020
:copyright:
Jared Peacock (jpeacock@usgs.gov)
:license: MIT
"""
# =============================================================================
# Imports
# =============================================================================
from copy import deepcopy
from collections import OrderedDict
from operator import itemgetter
from pathlib import Path
from loguru import logger
import json
import pandas as pd
import numpy as np
from mt_metadata.utils.validators import (
validate_attribute,
validate_value_type,
)
from mt_metadata.utils.exceptions import MTSchemaError
from . import helpers
from mt_metadata.base.helpers import write_lines
attr_dict = {}
# =============================================================================
# Base class that everything else will inherit
# =============================================================================
[docs]class Base:
__doc__ = write_lines(attr_dict)
def __init__(self, attr_dict={}, **kwargs):
self._changed = False
self._class_name = validate_attribute(self.__class__.__name__)
self.logger = logger
self._debug = False
self._set_attr_dict(attr_dict)
for name, value in kwargs.items():
self.set_attr_from_name(name, value)
def _set_attr_dict(self, attr_dict):
"""
Set attribute dictionary and variables.
should have the proper keys.
:param attr_dict: attribute dictionary
:type attr_dict: dict
"""
self._attr_dict = attr_dict
for key, value_dict in attr_dict.items():
self.set_attr_from_name(key, value_dict["default"])
def __str__(self):
"""
:return: table describing attributes
:rtype: string
"""
meta_dict = self.to_dict()[self._class_name.lower()]
lines = [f"{self._class_name}:"]
for name, value in meta_dict.items():
lines.append(f"\t{name} = {value}")
return "\n".join(lines)
def __repr__(self):
return self.to_json()
def __eq__(self, other):
if other in [None]:
return False
elif isinstance(other, (Base, dict, str, pd.Series)):
home_dict = self.to_dict(single=True, required=False)
if isinstance(other, Base):
other_dict = other.to_dict(single=True, required=False)
elif isinstance(other, dict):
other_dict = other
elif isinstance(other, str):
if other.lower() in ["none", "null", "unknown"]:
return False
other_dict = OrderedDict(
sorted(json.loads(other).items(), key=itemgetter(0))
)
elif isinstance(other, pd.Series):
other_dict = OrderedDict(
sorted(other.to_dict().items(), key=itemgetter(0))
)
else:
raise ValueError(
f"Cannot compare {self._class_name} with {type(other)}"
)
fail = False
for key, value in home_dict.items():
try:
other_value = other_dict[key]
if isinstance(value, np.ndarray):
if value.size != other_value.size:
msg = f"Array sizes for {key} differ: {value.size} != {other_value.size}"
self.logger.info(msg)
fail=True
continue
if not (value == other_value).all():
msg = f"{key}: {value} != {other_value}"
self.logger.info(msg)
fail = True
elif isinstance(value, (float, int, complex)):
if not np.isclose(value, other_value):
msg = f"{key}: {value} != {other_value}"
self.logger.info(msg)
fail = True
else:
if value != other_value:
msg = f"{key}: {value} != {other_value}"
self.logger.info(msg)
fail = True
except KeyError:
msg = "Cannot find {0} in other".format(key)
self.logger.info(msg)
if fail:
return False
else:
return True
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
def __len__(self):
return len(self.get_attribute_list())
[docs] def update(self, other, match=[]):
"""
Update attribute values from another like element, skipping None
:param other: other Base object
:type other: :class:`mt_metadata.base.metadata.Base`
"""
if not isinstance(other, type(self)):
self.logger.warning(
f"Cannot update {type(self)} with {type(other)}"
)
for k in match:
if self.get_attr_from_name(k) != other.get_attr_from_name(k):
msg = (
f"{k} is not equal {self.get_attr_from_name(k)} != "
f"{other.get_attr_from_name(k)}"
)
self.logger.error(msg)
raise ValueError(msg)
for k, v in other.to_dict(single=True).items():
if hasattr(v, "size"):
if v.size > 0:
self.set_attr_from_name(k, v)
else:
if v not in [None, 0.0, [], "", "1980-01-01T00:00:00+00:00"]:
self.set_attr_from_name(k, v)
@property
def changed(self):
return self._changed
@changed.setter
def changed(self, value):
self._changed = value
def __deepcopy__(self, memodict={}):
"""
Need to skip copying the logger
need to copy properties as well.
:return: Deep copy
:rtype: :class:`mt_metadata.base.metadata.Base`
"""
copied = type(self)()
for key in self.to_dict(single=True, required=False).keys():
try:
copied.set_attr_from_name(
key, deepcopy(self.get_attr_from_name(key), memodict)
)
# Need the TypeError for objects that have no __reduce__ method
# like H5 references.
except (AttributeError, TypeError) as error:
self.logger.debug(error)
continue
# need to copy and properties
for key in self.__dict__.keys():
if key.startswith("_"):
test_property = getattr(self.__class__, key[1:], None)
if isinstance(test_property, property):
value = getattr(self, key[1:])
if hasattr(value, "copy"):
setattr(copied, key[1:], value.copy())
else:
setattr(copied, key[1:], value)
return copied
[docs] def copy(self):
"""
Copy object
"""
return self.__deepcopy__()
[docs] def get_attribute_list(self):
"""
return a list of the attributes
"""
return sorted(list(self._attr_dict.keys()))
def _validate_name(self, name):
"""
validate the name to conform to the standards
name must be:
* all lower case {a-z; 1-9}
* must start with a letter
* categories are separated by '.'
* words separated by '_'
{object}.{name_name}
'/' will be replaced with '.'
converted to all lower case
:param name: name name
:type name: string
:return: valid name name
:rtype: string
"""
return validate_attribute(name)
def _validate_type(self, value, v_type, style=None):
"""
validate type from standards
"""
try:
return validate_value_type(value, v_type, style=style)
except MTSchemaError as error:
self.logger.exception(error)
raise MTSchemaError(error)
def _validate_option(self, name, value, option_list):
"""
validate the given attribute name agains possible options and check
for aliases
:param name: DESCRIPTION
:type name: TYPE
:param option_list: DESCRIPTION
:type option_list: TYPE
:param alias_list: DESCRIPTION
:type alias_list: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
if value is None:
return True, False, None
options = [ss.lower() for ss in option_list]
other_possible = False
if "other" in options:
other_possible = True
if value.lower() in options:
return True, other_possible, None
elif value.lower() not in options and other_possible:
msg = (
f"Value '{value}' not found for metadata field '{name}' in options list {option_list}, but other options"
+ f" are allowed. Allowing {option_list} to be set to {value}."
)
return True, other_possible, msg
return False, other_possible, f"Value '{value}' for metadata field '{name}' not found in options list {option_list}"
def __setattr__(self, name, value):
"""
set attribute based on metadata standards
Something here doesnt allow other objects to be set as attributes
"""
# skip these attribute because they are validated in the property
# setter.
skip_list = [
"latitude",
"longitude",
"elevation",
"start_date",
"end_date",
"start",
"end",
"applied",
"logger",
"changed",
"hdf5_reference",
"obspy_mapping",
"surveys",
"filters",
"stations",
"runs",
"channels",
"channels_recorded_all",
"channels_recorded_electric"
"channels_recorded_magnetic"
"channels_recorded_auxiliary",
"electrode",
"estimates_list",
"input_channels",
"output_channels",
"data_types_list",
"info_list",
"info_dict",
"filters_list",
"fn",
]
if name in skip_list:
super().__setattr__(name, value)
return
if not name.startswith("_"):
# test if the attribute is a property first, if it is, then
# it will have its own defined setter, so use that one and
# skip validation.
try:
test_property = getattr(self.__class__, name, None)
if isinstance(test_property, property):
self.logger.debug(
f"Identified {name} as property, using fset"
)
test_property.fset(self, value)
return
except AttributeError:
pass
if hasattr(self, "_attr_dict") and not name.startswith("_"):
self.logger.debug(f"Setting {name} to {value}")
try:
v_dict = self._attr_dict[name]
v_type = self._get_standard_type(name)
value = self._validate_type(value, v_type, v_dict["style"])
# check options
if v_dict["style"] == "controlled vocabulary":
options = v_dict["options"]
accept, other, msg = self._validate_option(name, value, options)
if not accept:
self.logger.error(msg.format(value, options))
raise MTSchemaError(msg.format(value, options))
if other and not accept:
self.logger.warning(msg.format(value, options, name))
super().__setattr__(name, value)
except KeyError as error:
raise KeyError(error)
else:
super().__setattr__(name, value)
def _get_standard_type(self, name):
"""
helper function to get the standard type for the given name
"""
name = self._validate_name(name)
try:
standards = self._attr_dict[name]
if isinstance(standards, type(logger)):
return None
return standards["type"]
except KeyError:
if name[0] != "_":
msg = (
f"{name} is not defined in the standards. "
" Should add attribute information with "
"add_base_attribute if the attribute is going to "
"propogate via to_dict, to_json, to_series"
)
self.logger.info(msg)
return None
[docs] def get_attr_from_name(self, name):
"""
Access attribute from the given name.
The name can contain the name of an object which must be separated
by a '.' for e.g. {object_name}.{name} --> location.latitude
.. note:: this is a helper function for names with '.' in the name for
easier getting when reading from dictionary.
:param name: name of attribute to get.
:type name: string
:return: attribute value
:rtype: type is defined by the attribute name
:Example:
>>> b = Base(**{'category.test_attr':10})
>>> b.get_attr_from_name('category.test_attr')
10
"""
name = self._validate_name(name)
v_type = self._get_standard_type(name)
if "." in name:
value, prop = helpers.recursive_split_getattr(self, name)
if prop:
return value
else:
value = getattr(self, name)
try:
if isinstance(getattr(type(self), name), property):
return value
except AttributeError:
pass
if hasattr(value, "to_dict"):
return value
return self._validate_type(value, v_type)
[docs] def set_attr_from_name(self, name, value):
"""
Helper function to set attribute from the given name.
The name can contain the name of an object which must be separated
by a '.' for e.g. {object_name}.{name} --> location.latitude
.. note:: this is a helper function for names with '.' in the name for
easier getting when reading from dictionary.
:param name: name of attribute to get.
:type name: string
:param value: attribute value
:type value: type is defined by the attribute name
:Example:
>>> b = Base(**{'category.test_attr':10})
>>> b.set_attr_from_name('category.test_attr', '10')
>>> print(b.category.test_attr)
'10'
"""
if "." in name:
try:
helpers.recursive_split_setattr(self, name, value)
except AttributeError as error:
msg = (
"{0} is not in the current standards. "
+ "To properly add the attribute use "
+ "add_base_attribute."
)
self.logger.error(msg.format(name))
raise AttributeError(error)
else:
setattr(self, name, value)
[docs] def add_base_attribute(self, name, value, value_dict):
"""
Add an attribute to _attr_dict so it will be included in the
output dictionary
:param name: name of attribute
:type name: string
:param value: value of the new attribute
:type value: described in value_dict
:param value_dict: dictionary describing the attribute, must have keys
['type', 'required', 'style', 'units', 'alias', 'description',
'options', 'example']
:type name: string
* type --> the data type [ str | int | float | bool ]
* required --> required in the standards [ True | False ]
* style --> style of the string
* units --> units of the attribute, must be a string
* alias --> other possible names for the attribute
* options --> if only a few options are accepted, separated by | or
comma.b [ option_01 | option_02 | other ]. 'other' means other options
available but not yet defined.
* example --> an example of the attribute
:Example:
>>> extra = {'type': str,
>>> ... 'style': 'controlled vocabulary',
>>> ... 'required': False,
>>> ... 'units': celsius,
>>> ... 'description': 'local temperature',
>>> ... 'alias': ['temp'],
>>> ... 'options': [ 'ambient', 'air', 'other'],
>>> ... 'example': 'ambient'}
>>> r = Run()
>>> r.add_base_attribute('temperature', 'ambient', extra)
"""
name = self._validate_name(name)
self._attr_dict.update({name: value_dict})
self.set_attr_from_name(name, value)
[docs] def to_dict(self, nested=False, single=False, required=True):
"""
make a dictionary from attributes, makes dictionary from _attr_list.
:param nested: make the returned dictionary nested
:type nested: [ True | False ] , default is False
:param single: return just metadata dictionary -> meta_dict[class_name]
:type single: [ True | False ], default is False
:param required: return just the required elements and any elements with
non-None values
"""
meta_dict = {}
for name in list(self._attr_dict.keys()):
try:
value = self.get_attr_from_name(name)
if hasattr(value, "to_dict"):
value = value.to_dict(nested=nested, required=required)
elif isinstance(value, dict):
for key, obj in value.items():
if hasattr(obj, "to_dict"):
value[key] = obj.to_dict(
nested=nested, required=required
)
else:
value[key] = obj
elif isinstance(value, list):
v_list = []
for obj in value:
if hasattr(obj, "to_dict"):
v_list.append(
obj.to_dict(nested=nested, required=required)
)
else:
v_list.append(obj)
value = v_list
except AttributeError as error:
self.logger.debug(error)
value = None
if required:
if isinstance(value, (np.ndarray)):
if name == "zeros" or name == "poles":
meta_dict[name] = value
elif value.all() != 0:
meta_dict[name] = value
elif hasattr(value, "size"):
if value.size > 0:
meta_dict[name] = value
elif (
value
not in [None, "1980-01-01T00:00:00+00:00", "1980", [], ""]
or self._attr_dict[name]["required"]
):
meta_dict[name] = value
else:
meta_dict[name] = value
if nested:
meta_dict = helpers.structure_dict(meta_dict)
meta_dict = {
self._class_name.lower(): OrderedDict(
sorted(meta_dict.items(), key=itemgetter(0))
)
}
if single:
meta_dict = meta_dict[list(meta_dict.keys())[0]]
return meta_dict
[docs] def from_dict(self, meta_dict, skip_none=False):
"""
fill attributes from a dictionary
:param meta_dict: dictionary with keys equal to metadata.
:type meta_dict: dictionary
"""
if not isinstance(meta_dict, (dict, OrderedDict)):
msg = f"Input must be a dictionary not {type(meta_dict)}"
self.logger.error(msg)
raise MTSchemaError(msg)
keys = list(meta_dict.keys())
if len(keys) == 1:
class_name = keys[0]
if class_name.lower() != self._class_name.lower():
msg = (
"name of input dictionary is not the same as class type "
f"input = {class_name}, class type = {self._class_name}"
)
self.logger.debug(msg, class_name, self._class_name)
meta_dict = helpers.flatten_dict(meta_dict[class_name])
else:
self.logger.debug(
f"Assuming input dictionary is of type {self._class_name}",
)
meta_dict = helpers.flatten_dict(meta_dict)
# set attributes by key.
for name, value in meta_dict.items():
if skip_none:
if value in [
None,
"None",
"none",
"NONE",
"null",
"Null",
"NULL",
"1980-01-01T00:00:00+00:00",
]:
continue
self.set_attr_from_name(name, value)
[docs] def to_json(self, nested=False, indent=" " * 4, required=True):
"""
Write a json string from a given object, taking into account other
class objects contained within the given object.
:param nested: make the returned json nested
:type nested: [ True | False ] , default is False
"""
return json.dumps(
self.to_dict(nested=nested, required=required),
cls=helpers.NumpyEncoder,
indent=indent,
)
[docs] def from_json(self, json_str):
"""
read in a json string and update attributes of an object
:param json_str: json string or file path
:type json_str: string or :class:`pathlib.Path`
"""
if isinstance(json_str, str):
try:
json_path = Path(json_str)
if json_path.exists():
with open(json_path, "r") as fid:
json_dict = json.load(fid)
except OSError:
pass
json_dict = json.loads(json_str)
elif isinstance(json_str, Path):
if json_str.exists():
with open(json_str, "r") as fid:
json_dict = json.load(fid)
elif not isinstance(json_str, (str, Path)):
msg = f"Input must be valid JSON string not {type(json_str)}"
self.logger.error(msg)
raise MTSchemaError(msg)
self.from_dict(json_dict)
[docs] def from_series(self, pd_series):
"""
Fill attributes from a Pandas series
.. note:: Currently, the series must be single layered with key names
separated by dots. (location.latitude)
:param pd_series: Series containing metadata information
:type pd_series: pandas.Series
.. todo:: Force types in series
"""
if not isinstance(pd_series, pd.Series):
msg = f"Input must be a Pandas.Series not type {type(pd_series)}"
self.logger.error(msg)
raise MTSchemaError(msg)
for key, value in pd_series.items():
self.set_attr_from_name(key, value)
[docs] def to_series(self, required=True):
"""
Convert attribute list to a pandas.Series
.. note:: this is a flattened version of the metadata
:return: pandas.Series
:rtype: pandas.Series
"""
return pd.Series(self.to_dict(single=True, required=required))
[docs] def to_xml(self, string=False, required=True):
"""
make an xml element for the attribute that will add types and
units.
:param string: output a string instead of an XML element
:type string: [ True | False ], default is False
:return: XML element or string
"""
element = helpers.dict_to_xml(
self.to_dict(nested=True, required=required), self._attr_dict
)
if not string:
return element
else:
return helpers.element_to_string(element)
[docs] def from_xml(self, xml_element):
"""
:param xml_element: XML element
:type xml_element: etree.Element
:return: Fills attributes accordingly
"""
self.from_dict(helpers.element_to_dict(xml_element))