# -*- coding: utf-8 -*-
"""
Created on Thu Dec 24 12:02:12 2020
:copyright:
Jared Peacock (jpeacock@usgs.gov)
:license: MIT
"""
# =============================================================================
# Imports
# =============================================================================
from pathlib import Path
from copy import deepcopy
from collections import OrderedDict
from collections.abc import MutableMapping
from operator import itemgetter
import json
from mt_metadata.utils import validators
from mt_metadata.utils.exceptions import MTSchemaError
from mt_metadata import REQUIRED_KEYS
from mt_metadata.base.helpers import NumpyEncoder
from loguru import logger
# =============================================================================
# base dictionary
# =============================================================================
[docs]class BaseDict(MutableMapping):
"""
BaseDict is a convenience class that can help the metadata dictionaries
act like classes so you can access variables by .name or [name]
.. note:: If the attribute has a . in the name then you will not be able
to access that attribute by class.name.name You will get an
attribute error. You need to access the attribute like a
dictionary class['name.name']
You can add an attribute by:
>>> b = BaseDict()
>>> b.update({name: value_dict})
Or you can add a whole dictionary:
>>> b.add_dict(ATTR_DICT['run'])
All attributes have a descriptive dictionary of the form:
>>> {'type': data type, 'required': [True | False],
>>> ... 'style': 'string style', 'units': attribute units}
* **type** --> the data type [ str | int | float | bool ]
* **required** --> required in the standards [ True | False ]
* **style** --> style of the string
* **units** --> units of the attribute, must be a string
"""
def __init__(self, *args, **kwargs):
self.update(dict(*args, **kwargs))
def __setitem__(self, key, value):
self.__dict__[key] = validators.validate_value_dict(value)
def __getitem__(self, key):
try:
return self.__dict__[key]
except KeyError as error:
msg = (
f"{error}, {key} is not in dictionary yet. "
"Returning default schema dictionary."
)
logger.debug(msg)
return {
"type": "string",
"required": False,
"style": "free form",
"units": None,
"options": None,
"description": "user defined",
"example": None,
"default": None,
}
def __delitem__(self, key):
try:
del self.__dict__[key]
except KeyError:
msg = "Key: {0} does not exist".format(key)
logger.info(msg)
def __iter__(self):
return iter(self.__dict__)
def __len__(self):
return len(self.__dict__)
# The final two methods aren't required, but nice for demo purposes:
def __str__(self):
"""returns simple dict representation of the mapping"""
s = dict(sorted(self.__dict__.items(), key=itemgetter(0)))
lines = []
for key, value in s.items():
if key in ["logger"]:
continue
lines.append("{0}:".format(key))
for name, info in value.items():
lines.append("\t{0}: {1}".format(name, info))
return "\n".join(lines)
def __repr__(self):
"""echoes class, id, & reproducible representation in the REPL"""
return "{}, BaseDict({})".format(
super(BaseDict, self).__repr__(), self.__dict__
)
@property
def name(self):
try:
return list(self.keys())[0]
except KeyError:
return None
[docs] def add_dict(self, add_dict, name=None, keys=None):
"""
Add a dictionary to. If name is input it is added to the keys of
the input dictionary
:param add_dict: dictionary to add
:type add_dict: dictionary, or MutableMapping
:param name: name to add to keys
:type name: string or None
:Example: ::
>>> s_obj = Standards()
>>> run_dict = s_obj.run_dict
>>> run_dict.add_dict(s_obj.declination_dict, 'declination')
"""
if not isinstance(add_dict, (dict, MutableMapping)):
msg = "add_dict takes only a dictionary not type {0}".format(
type(add_dict)
)
logger.error(msg)
raise TypeError(msg)
if keys:
small_dict = {}
for key, value in add_dict.items():
if key in keys:
small_dict[key] = value
add_dict = small_dict
if name:
add_dict = dict(
[
("{0}.{1}".format(name, key), value)
for key, value in add_dict.items()
]
)
self.update(**add_dict)
[docs] def copy(self):
return deepcopy(self)
[docs] def to_latex(self, max_entries=7, first_table_len=7):
"""
:param level_dict: DESCRIPTION
:type level_dict: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
beginning = [
r"\clearpage",
r"\\newpage",
r"\\begin{table}[h!]",
r"\caption*{{Attributes for {0} Category}}".format(self.name),
r"\\begin{tabular}{p{.305\\textwidth}p{.47\\textwidth}p{.2\\textwidth}}",
]
end = [r"\end{tabular}", r"\label{tab:}", r"\end{table}"]
header = [
" & ".join(
[
r"\textbf{Metadata Key}",
r"\textbf{Description}",
r"\textbf{Example}",
]
)
+ " \\ \toprule"
]
order = [
"name",
"required",
"units",
"type",
"style",
"description",
"example",
]
level_dict = OrderedDict(sorted(self.items(), key=itemgetter(0)))
ntables = int(len(level_dict) / max_entries)
if len(level_dict) // max_entries > 0:
ntables += 1
lines = []
for name, v_dict in level_dict.items():
if not v_dict["options"] in [None, "none", "None", []]:
v_dict["description"] += ". Options: {0}".format(
v_dict["options"]
)
line = [
r"\entry{{{0}}}".format(name)
+ "".join(["{{{0}}}".format(v_dict[ii]) for ii in order[1:]])
]
lines.append(line[0])
all_lines = (
beginning + header + ["\n".join(lines[0:first_table_len])] + end
)
for ii in range(ntables - 1):
stable = beginning + header
for kk in range(max_entries):
index = first_table_len + max_entries * ii + kk
try:
stable.append(lines[index].replace("_", r"\_"))
except IndexError:
break
stable += end
all_lines.append("\n".join(stable))
return all_lines
[docs] def from_csv(self, csv_fn):
"""
Read in CSV file as a dictionary
:param csv_fn: csv file to read metadata standards from
:type csv_fn: pathlib.Path or string
:return: dictionary of the contents of the file
:rtype: Dictionary
:Example: ::
>>> run_dict = BaseDict()
>>> run_dict.from_csv(get_level_fn('run'))
"""
csv_fn = Path(csv_fn)
if not csv_fn.exists():
msg = f"Schema file {csv_fn} does not exist."
logger.error(msg)
raise MTSchemaError(msg)
with open(csv_fn, "r") as fid:
logger.debug(f"Reading schema CSV {csv_fn}")
lines = fid.readlines()
header = validators.validate_header(
[ss.strip().lower() for ss in lines[0].strip().split(",")],
attribute=True,
)
attribute_dict = {}
for line in lines[1:]:
if len(line) < 2:
continue
line_dict = dict(
[
(key, ss.strip())
for key, ss in zip(
header, line.strip().split(",", len(header) - 1)
)
]
)
key_name = validators.validate_attribute(line_dict["attribute"])
line_dict.pop("attribute")
attribute_dict[key_name] = validators.validate_value_dict(line_dict)
self.update(attribute_dict)
[docs] def to_csv(self, csv_fn):
"""
write dictionary to csv file
:param level_dict: DESCRIPTION
:type level_dict: TYPE
:param csv_fn: DESCRIPTION
:type csv_fn: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
if not isinstance(csv_fn, Path):
csv_fn = Path(csv_fn)
# sort dictionary first
lines = [",".join(REQUIRED_KEYS)]
for key in sorted(list(self.keys())):
line = [key]
for rkey in REQUIRED_KEYS[1:]:
value = self[key][rkey]
if isinstance(value, (list, tuple)):
if len(value) == 0:
line.append("None")
else:
line.append(
'"{0}"'.format(value)
.replace(",", "|")
.replace("'", "")
)
else:
line.append("{0}".format(self[key][rkey]))
lines.append(",".join(line))
with csv_fn.open("w") as fid:
fid.write("\n".join(lines))
logger.info("Wrote dictionary to {0}".format(csv_fn))
return csv_fn
[docs] def to_json(self, json_fn, indent=" " * 4):
"""
Write schema standards to json
:param json_fn: full path to json file
:type json_fn: string or Path
:return: full path to json file
:rtype: Path
"""
json_fn = Path(json_fn)
json_dict = dict(
[(k, v) for k, v in self.items() if k not in ["logger"]]
)
with open(json_fn, "w") as fid:
json.dump(json_dict, fid, cls=NumpyEncoder, indent=indent)
return json_fn
[docs] def from_json(self, json_fn):
"""
Read schema standards from json
:param json_fn: full path to json file
:type json_fn: string or Path
:return: full path to json file
:rtype: Path
"""
json_fn = Path(json_fn)
if not json_fn.exists():
msg = f"JSON schema file {json_fn} does not exist"
logger.error(msg)
MTSchemaError(msg)
with open(json_fn, "r") as fid:
json_dict = json.load(fid)
valid_dict = {}
for k, v in json_dict.items():
valid_dict[k] = validators.validate_value_dict(v)
self.update(valid_dict)
[docs]def get_schema_fn(schema_element, paths):
"""
Get the correct file name for the given schema element from the provided
list of valid file names
:param schema_element: name of the schema element to get filename for
:type schema_element: string
:return: correct file name for given element
:rtype: :class:`pathlib.Path`
"""
for fn in paths:
if schema_element == fn.stem:
return fn
msg = f"Could not find schema element {schema_element}.json in {paths[0].parent}."
raise MTSchemaError(msg)
[docs]def get_schema(schema_element, paths):
"""
Get a :class:`mt_metadata.schema_base.BaseDict` object of the element
:param schema_element: name of the schema element to get filename for
:type schema_element: string
:return: return a dictionary that describes the standards for the element
:rtype: :class:`mt_metadata.schema_base.BaseDict`
"""
schema_fn = get_schema_fn(schema_element, paths)
element_dict = BaseDict()
element_dict.from_json(schema_fn)
return element_dict