Source code for mt_metadata.base.helpers

# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 20:37:52 2020

:copyright: 
    Jared Peacock (jpeacock@usgs.gov)

:license: MIT

"""
# =============================================================================
# Imports
# =============================================================================
import textwrap
import logging
import json
import numpy as np

from collections.abc import MutableMapping
from collections import OrderedDict, defaultdict
from xml.etree import cElementTree as et
from xml.dom import minidom
from operator import itemgetter

# from mt_metadata.utils.units import get_unit_object


filter_descriptions = {
    "zpk": "poles and zeros filter",
    "coefficient": "coefficient filter",
    "time delay": "time delay filter",
    "fir": "finite impaulse response filter",
    "fap": "frequency amplitude phase lookup table",
    "frequency response table": "frequency amplitude phase lookup table",
}

# =============================================================================
# write doc strings
# =============================================================================


[docs]def wrap_description(description, column_width): """ split a description into separate lines """ d_lines = textwrap.wrap(description, column_width) if len(d_lines) < 11: d_lines += [""] * (11 - len(d_lines)) return d_lines
[docs]def validate_c1(attr_dict, c1): """ :param attr_dict: DESCRIPTION :type attr_dict: TYPE :param c1: DESCRIPTION :type c1: TYPE :return: DESCRIPTION :rtype: TYPE """ try: max_c1 = max([len(key) for key in attr_dict.keys()]) if max_c1 > (c1 - 4): c1 = max_c1 + 6 except ValueError: pass return c1
[docs]def write_lines(attr_dict, c1=45, c2=45, c3=15): """ Takes the attribute dictionary from the json and parses it into a table Returns a string representation of this table. This overwrites the doc. :param attr_dict: attribute dictionary :type attr_dict: dict :param c1: cloumn 1 width, defaults to 45 :type c1: integer, optional :param c2: column 2 width, defaults to 45 :type c2: integer, optional :param c3: column 3 width, defaults to 15 :type c3: integer, optional :return: doc string :rtype: string """ c1 = validate_c1(attr_dict, c1) line = " | {0:<{1}}| {2:<{3}} | {4:<{5}}|" hline = " +{0}+{1}+{2}+".format( "-" * (c1 + 1), "-" * (c2 + 2), "-" * (c3 + 1) ) mline = " +{0}+{1}+{2}+".format( "=" * (c1 + 1), "=" * (c2 + 2), "=" * (c3 + 1) ) lines = [ hline, line.format( "**Metadata Key**", c1, "**Description**", c2, "**Example**", c3 ), mline, ] for key, entry in attr_dict.items(): if isinstance(entry, logging.Logger): continue d_lines = wrap_description(entry["description"], c2) e_lines = wrap_description(entry["example"], c3) # line 1 is with the entry lines.append( line.format(f"**{key}**", c1, d_lines[0], c2, e_lines[0], c3) ) # line 2 skip an entry in the lines.append(line.format("", c1, d_lines[1], c2, e_lines[1], c3)) # line 3 required lines.append( line.format( f"Required: {entry['required']}", c1, d_lines[2], c2, e_lines[2], c3, ) ) # line 4 blank lines.append(line.format("", c1, d_lines[3], c2, e_lines[3], c3)) # line 5 units lines.append( line.format( f"Units: {entry['units']}", c1, d_lines[4], c2, e_lines[4], c3 ) ) # line 6 blank lines.append(line.format("", c1, d_lines[5], c2, e_lines[5], c3)) # line 7 type lines.append( line.format( f"Type: {entry['type']}", c1, d_lines[6], c2, e_lines[6], c3 ) ) # line 8 blank lines.append(line.format("", c1, d_lines[7], c2, e_lines[7], c3)) # line 9 type lines.append( line.format( f"Style: {entry['style']}", c1, d_lines[8], c2, e_lines[8], c3 ) ) # line 10 blank lines.append(line.format("", c1, d_lines[9], c2, e_lines[9], c3)) default = [entry["default"]] + [""] * 5 if len(str(entry["default"])) > c1 - 15: default = [""] + wrap_description(entry["default"], c1) # line 9 type lines.append( line.format( f"**Default**: {default[0]}", c1, d_lines[8], c2, e_lines[8], c3, ) ) # line 10 blank lines.append( line.format(default[1], c1, d_lines[9], c2, e_lines[9], c3) ) # line 9 type lines.append( line.format(default[2], c1, d_lines[10], c2, e_lines[10], c3) ) # line 10 blank if len(d_lines) > 11: lines.append(line.format(default[3], c1, d_lines[11], c2, "", c3)) for index, d_line in enumerate(d_lines[12:], 4): try: lines.append( line.format(default[index], c1, d_line, c2, "", c3) ) except IndexError: lines.append(line.format("", c1, d_line, c2, "", c3)) # long default value if len(default) > 7: lines.append(line.format(default[3], c1, "", c2, "", c3)) for index, d_line in enumerate(default[4:], 12): try: lines.append( line.format(d_line, c1, d_lines[index], c2, "", c3) ) except IndexError: lines.append(line.format(d_line, c1, "", c2, "", c3)) lines.append(hline) return "\n".join(lines)
[docs]def write_block(key, attr_dict, c1=45, c2=45, c3=15): """ :param key: key to write from attr dict :type key: string :param attr_dict: attribute dictionary :type attr_dict: dict :param c1: column 1 width, defaults to 45 :type c1: int, optional :param c2: column 2 width, defaults to 45 :type c2: int, optional :param c3: column 3 width, defaults to 15 :type c3: int, optional :return: list of lines :rtype: list """ if len(key) > c1 - 4: c1 = len(key) + 6 line = " | {0:<{1}}| {2:<{3}} | {4:<{5}}|" hline = " +{0}+{1}+{2}+".format( "-" * (c1 + 1), "-" * (c2 + 2), "-" * (c3 + 1) ) mline = " +{0}+{1}+{2}+".format( "=" * (c1 + 1), "=" * (c2 + 2), "=" * (c3 + 1) ) section = f":navy:`{key}`" lines = [ section, "~" * len(section), "", ".. container::", "", " .. table::", " :class: tight-table", f" :widths: {c1} {c2} {c3}", "", hline, line.format( f"**{key}**", c1, "**Description**", c2, "**Example**", c3 ), mline, ] d_lines = wrap_description(attr_dict["description"], c2) e_lines = wrap_description(attr_dict["example"], c3) # line 1 is with the entry lines.append( line.format( f"**Required**: {attr_dict['required']}", c1, d_lines[0], c2, e_lines[0], c3, ) ) # line 2 skip an entry in the lines.append(line.format("", c1, d_lines[1], c2, e_lines[1], c3)) # line 3 required lines.append( line.format( f"**Units**: {attr_dict['units']}", c1, d_lines[2], c2, e_lines[2], c3, ) ) # line 4 blank lines.append(line.format("", c1, d_lines[3], c2, e_lines[3], c3)) # line 5 units lines.append( line.format( f"**Type**: {attr_dict['type']}", c1, d_lines[4], c2, e_lines[4], c3, ) ) # line 6 blank lines.append(line.format("", c1, d_lines[5], c2, e_lines[5], c3)) # line 7 type lines.append( line.format( f"**Style**: {attr_dict['style']}", c1, d_lines[6], c2, e_lines[6], c3, ) ) # line 8 blank lines.append(line.format("", c1, d_lines[7], c2, e_lines[7], c3)) default = [attr_dict["default"]] + [""] * 5 if len(str(attr_dict["default"])) > c1 - 15: default = [""] + wrap_description(attr_dict["default"], c1) # line 9 type lines.append( line.format( f"**Default**: {default[0]}", c1, d_lines[8], c2, e_lines[8], c3, ) ) # line 10 blank lines.append(line.format(default[1], c1, d_lines[9], c2, e_lines[9], c3)) # line 9 type lines.append(line.format(default[2], c1, d_lines[10], c2, e_lines[10], c3)) # line 10 blank if len(d_lines) > 11: lines.append(line.format(default[3], c1, d_lines[11], c2, "", c3)) for index, d_line in enumerate(d_lines[12:], 4): try: lines.append( line.format(default[index], c1, d_line, c2, "", c3) ) except IndexError: lines.append(line.format("", c1, d_line, c2, "", c3)) # long default value if len(default) > 7: lines.append(line.format(default[3], c1, "", c2, "", c3)) for index, d_line in enumerate(default[4:], 12): try: lines.append( line.format(d_line, c1, d_lines[index], c2, "", c3) ) except IndexError: lines.append(line.format(d_line, c1, "", c2, "", c3)) lines.append(hline) lines.append("") return lines
# code to convert ini_dict to flattened dictionary # default seperater '_'
[docs]def flatten_dict(meta_dict, parent_key=None, sep="."): """ :param meta_dict: DESCRIPTION :type meta_dict: TYPE :param parent_key: DESCRIPTION, defaults to None :type parent_key: TYPE, optional :param sep: DESCRIPTION, defaults to '.' :type sep: TYPE, optional :return: DESCRIPTION :rtype: TYPE """ items = [] for key, value in meta_dict.items(): if parent_key: new_key = f"{parent_key}{sep}{key}" else: new_key = key if isinstance(value, MutableMapping): items.extend(flatten_dict(value, new_key, sep=sep).items()) else: items.append((new_key, value)) return dict(items)
[docs]def flatten_list(x_list): """ Flatten a nested list flatten = lambda l: [item for sublist in l for item in sublist] Returns ------- None. """ flat_list = [item for sublist in x_list for item in sublist] return flat_list
[docs]def recursive_split_dict(key, value, remainder, sep="."): """ recursively split a dictionary :param key: DESCRIPTION :type key: TYPE :param value: DESCRIPTION :type value: TYPE :param remainder: DESCRIPTION :type remainder: TYPE :return: DESCRIPTION :rtype: TYPE """ key, *other = key.split(sep, 1) if other: recursive_split_dict(other[0], value, remainder.setdefault(key, {})) else: remainder[key] = value
[docs]def recursive_split_getattr(base_object, name, sep="."): key, *other = name.split(sep, 1) if other: base_object = getattr(base_object, key) value, prop = recursive_split_getattr(base_object, other[0]) else: value = getattr(base_object, key) try: if isinstance(getattr(type(base_object), key), property): prop = True except AttributeError: prop = False return value, prop
[docs]def recursive_split_setattr(base_object, name, value, sep="."): key, *other = name.split(sep, 1) if other: base_object = getattr(base_object, key) recursive_split_setattr(base_object, other[0], value) else: setattr(base_object, key, value)
[docs]def structure_dict(meta_dict, sep="."): """ :param meta_dict: DESCRIPTION :type meta_dict: TYPE :param sep: DESCRIPTION, defaults to '.' :type sep: TYPE, optional :return: DESCRIPTION :rtype: TYPE """ structured_dict = {} for key, value in meta_dict.items(): recursive_split_dict(key, value, structured_dict, sep=sep) return structured_dict
[docs]def get_units(name, attr_dict): """ """ try: units = attr_dict[name]["units"] if not isinstance(units, str): units = "{0}".format(units) except KeyError: units = None if units in [None, "None", "none"]: return None return units
[docs]def get_type(name, attr_dict): """ """ try: v_type = attr_dict[name]["type"] if v_type in ["string", str, "str", "String"]: v_type = None except KeyError: v_type = None return v_type
[docs]def recursive_split_xml(element, item, base, name, attr_dict=None): """ """ key = None if isinstance(item, dict): for key, value in item.items(): attr_name = ".".join([base, key]) sub_element = et.SubElement(element, key) recursive_split_xml(sub_element, value, attr_name, key, attr_dict) elif isinstance(item, (tuple, list)): for ii in item: sub_element = et.SubElement(element, "item") recursive_split_xml(sub_element, ii, base, name, attr_dict) elif isinstance(item, str): element.text = item elif isinstance(item, (float, int, type(None))): element.text = str(item) else: # if the value is an hdf5 reference make it a string if "reference" in str(type(item)).lower(): element.text = str(item) else: raise ValueError("Value cannot be {0}".format(type(item))) if attr_dict: units = get_units(base, attr_dict) if units: element.set("units", str(units)) # v_type = get_type(base, attr_dict) # if v_type: # element.set("type", v_type) return element, name
[docs]def dict_to_xml(meta_dict, attr_dict=None): """ Assumes dictionary is structured {class:{attribute_dict}} :param meta_dict: DESCRIPTION :type meta_dict: TYPE :return: DESCRIPTION :rtype: TYPE """ class_name = list(meta_dict.keys())[0] root = et.Element(class_name) for key, value in meta_dict[class_name].items(): element = et.SubElement(root, key) recursive_split_xml(element, value, key, key, attr_dict) return root
[docs]def element_to_dict(element): """ .. todo:: Add way to read in attritues like units and validate them. :param element: DESCRIPTION :type element: TYPE :return: DESCRIPTION :rtype: TYPE """ meta_dict = {element.tag: {} if element.attrib else None} children = list(element) if children: child_dict = defaultdict(list) for dc in map(element_to_dict, children): for k, v in dc.items(): child_dict[k].append(v) meta_dict = { element.tag: { k: v[0] if len(v) == 1 else v for k, v in child_dict.items() } } if "item" in meta_dict[element.tag].keys(): meta_dict[element.tag] = meta_dict[element.tag]["item"] # going to skip attributes for now, later can check them against # standards, neet to skip units and type if element.attrib: pop_units = False pop_type = False for k, v in element.attrib.items(): if k in ["units"]: if "type" in element.attrib.keys(): pop_type = True if len(element.attrib.keys()) <= 2: pop_units = True continue if k in ["type"]: if len(element.attrib.keys()) <= 1: if v in [ "float", "string", "integer", "boolean", "list", "tuple", ]: pop_type = True continue meta_dict[element.tag][k] = v if pop_units: element.attrib.pop("units") if pop_type: element.attrib.pop("type") if element.text: text = element.text.strip() if children or element.attrib: if text: if len(element.attrib.keys()) > 0: meta_dict[element.tag]["value"] = text else: meta_dict[element.tag] = text else: meta_dict[element.tag] = text return OrderedDict(sorted(meta_dict.items(), key=itemgetter(0)))
[docs]def element_to_string(element): return ( minidom.parseString(et.tostring(element).decode()) .toprettyxml( indent=" ", encoding="UTF-8", ) .decode() )
# ============================================================================= # Helper function to be sure everything is encoded properly # =============================================================================
[docs]class NumpyEncoder(json.JSONEncoder): """ Need to encode numpy ints and floats for json to work """
[docs] def default(self, obj): if isinstance( obj, ( np.int_, np.intc, np.intp, np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64, ), ): return int(obj) elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) elif isinstance(obj, (np.ndarray)): if obj.dtype == complex: return {"real": obj.real.tolist(), "imag": obj.imag.tolist()} else: return obj.tolist() # For now turn references into a generic string elif "h5" in str(type(obj)): return str(obj) return json.JSONEncoder.default(self, obj)
[docs]def validate_name(name, pattern=None): """ Validate name :param name: DESCRIPTION :type name: TYPE :param pattern: DESCRIPTION, defaults to None :type pattern: TYPE, optional :return: DESCRIPTION :rtype: TYPE """ if name is None: return "unknown" return name.replace(" ", "_")