Source code for mt_metadata.transfer_functions.io.edi.metadata.information

# -*- coding: utf-8 -*-
"""
Created on Sat Dec  4 14:13:37 2021

@author: jpeacock
"""
# =============================================================================
# Imports
# =============================================================================
from mt_metadata.base import Base
from mt_metadata.base.helpers import validate_name

# ==============================================================================
# Info object
# ==============================================================================
[docs]class Information(Base): """ Contain, read, and write info section of .edi file not much to really do here, but just keep it in the same format that it is read in as, except if it is in phoenix format then split the two paragraphs up so they are sequential. """ def __init__(self, fn=None, edi_lines=None): self.info_list = [] self.info_dict = {} self._phoenix_col_width = 38 self._phoenix_file = False self._empower_file = False self.phoenix_translation_dict = { "survey": "survey.id", "company": "station.acquired_by.organization", "job": "survey.project", "hardware": "run.data_logger.model", "mtuprog version": "run.data_logger.firmware.version", "xpr weighting": "processing_parameter", "hx sen": "run.hx.sensor.id", "hy sen": "run.hy.sensor.id", "hz sen": "run.hz.sensor.id", "rx sen": "run.rrhx.sensor.id", "ry sen": "run.rrhy.sensor.id", "stn number": "station.id", "mtu-box serial number": "run.data_logger.id", "ex pot resist": "run.ex.contact_resistance.start", "ey pot resist": "run.ey.contact_resistance.start", "ex voltage": ["run.ex.ac.start", "run.ex.dc.start"], "ey voltage": ["run.ey.ac.start", "run.ey.dc.start"], "start-up": "station.time_period.start", "end-time": "station.time_period.end", } self.translation_dict = { "operator": "run.acquired_by.author", "adu_serial": "run.data_logger.id", "e_azimuth": "run.ex.measurement_azimuth", "ex_len": "run.ex.dipole_length", "ey_len": "run.ey.dipole_length", "ex_resistance": "run.ex.contact_resistance.start", "ey_resistance": "run.ey.contact_resistance.start", "h_azimuth": "run.hx.measurement_azimuth", "hx": "run.hx.sensor.id", "hy": "run.hy.sensor.id", "hz": "run.hz.sensor.id", "hx_resistance": "run.hx.h_field_max.start", "hy_resistance": "run.hy.h_field_max.start", "hz_resistance": "run.hz.h_field_max.start", "algorithmname": "transfer_function.software.name", "ndec": "processing_parameter", "nfft": "processing_parameter", "ntype": "processing_parameter", "rrtype": "processing_parameter", "removelargelines": "processing_parameter", "rotmaxe": "processing_parameter", "project": "survey.project", "processedby": "transfer_function.processed_by.name", "processingsoftware": "transfer_function.software.name", "processingtag": "transfer_function.id", "signconvention": "transfer_function.sign_convention", "sitename": "station.geographic_name", "survey": "survey.id", "year": "survey.time_period.start_date", "runlist": "transfer_function.runs_processed", "remotesite": "transfer_function.remote_references", "remoteref": "transfer_function.processing_parameters", } self.empower_translation_dict = { "processingsoftware": "transfer_function.software.name", "sitename": "station.geographic_name", "year": "survey.time_period.start_date", "process_date": "transfer_function.processed_date", "declination": "station.location.declination.value", "tag": "component", "length": "dipole_length", "ac": "ac.end", "dc": "dc.end", "negative_res": "contact_resistance.start", "positive_res": "contact_resistance.end", "sensor_type": "sensor.model", "azimuth": "measured_azimuth", "sensor_serial": "sensor.id", "cal_name": "comments", "saturation": "comments", "instrument_type": "model", } super().__init__(attr_dict={}) def __str__(self): return "".join(self.write_info()) def __repr__(self): return self.__str__()
[docs] def get_info_list(self, edi_lines): """ get a list of lines from the info section :param edi_lines: DESCRIPTION :type edi_lines: TYPE :return: DESCRIPTION :rtype: TYPE """ info_list = [] info_find = False phoenix_list_02 = [] for line in edi_lines: if ">info" in line.lower(): info_find = True if len(line.strip().split()) > 1: info_list.append(line.strip().split()[1]) elif ">" in line[0:2]: # need to check for xml type formating if "<" in line: pass else: if info_find is True: break else: pass elif info_find: if "maxinfo" in line.lower(): continue if ( line.lower().find("run information") >= 0 and line.lower().find("station") >= 0 ): self._phoenix_file = True elif "empower" in line.lower(): self._empower_file = True if self._phoenix_file and len(line) > self._phoenix_col_width: info_list.append(line[0 : self._phoenix_col_width].strip()) phoenix_list_02.append( line[self._phoenix_col_width :].strip() ) else: line = line.strip() if len(line) > 1: if len(line) <= 3 and not line.isalnum(): continue info_list.append(line) info_list += phoenix_list_02 # validate the information list # empower has a specific format that cannot be sorted. if self._empower_file: info_list = self._validate_info_list(info_list, sort=False) else: info_list = self._validate_info_list(info_list, sort=True) return info_list
[docs] def read_info(self, edi_lines): """ read information section of the .edi file """ self.info_dict = {} self.info_list = self.get_info_list(edi_lines) if self._empower_file: self.info_dict, self.info_list = self._read_empower_info( self.info_list ) else: # make info items attributes of Information for ll in self.info_list: l_list = [None, ""] # phoenix has lat an lon information in the notes but separated by # a space instead of an = or : if ( "lat" in ll.lower() or "lon" in ll.lower() or "lng" in ll.lower() ): l_list = ll.split() if len(l_list) == 2: self.info_dict[l_list[0]] = l_list[1] continue elif len(l_list) == 4: self.info_dict[l_list[0]] = l_list[1] self.info_dict[l_list[2]] = l_list[3] continue elif len(l_list) == 6: self.info_dict[l_list[0]] = l_list[1] + l_list[2] self.info_dict[l_list[3]] = l_list[4] + l_list[5] continue # need to check if there is an = or : seperator, which ever # comes first is assumed to be the delimiter l_key, l_value = self._read_line(ll) if l_key: self.info_dict[l_key] = l_value if not self._empower_file: self.parse_info() if self.info_list is None: self.logger.info("Could not read information") return
def _read_empower_info(self, info_list): """ read empower style information block. Structured as Stations Electrics EX EY Magnetics HX HY HZ :param info_list: DESCRIPTION :type info_list: TYPE :return: DESCRIPTION :rtype: TYPE """ ch_key = { "e1": "ex", "e2": "ey", "h1": "hx", "h2": "hy", "h3": "hz", } info_dict = {} comp = None new_list = [] for line in info_list: og_key, l_value = self._read_line(line) l_key = validate_name(og_key.lower()) if l_value in [None]: comp = l_key if comp == "electrics": comp = "data_logger" elif comp in ["ex", "ey", "hx", "hy", "hz"]: comp = f"run.{comp}" continue elif l_value in [""]: continue else: if l_key in [ "station_name", "min_value", "max_value", "detected_sensor_type", "coordinates", "gps_(min_-_max)", "temperature_(min_-_max)", "recording_id", ]: continue try: l_key = self.empower_translation_dict[l_key] except KeyError: new_list.append(f"{l_key} = {l_value}") l_value = l_value.encode("ascii", "ignore").decode().lower() if l_value.count("[") >= 1 and l_value.count("]") >= 1: l_value = l_value.split("[")[0].strip() if l_value.count("%") >= 0: l_value = l_value.replace("%", "") if l_key in ["component"]: l_value = ch_key[l_value] if comp: if comp in ["run.hx", "run.hy", "run.hz"]: if "ac" in l_key or "dc" in l_key: continue if "comments" in l_key: og_key = validate_name(og_key.lower()) try: info_dict[ f"{comp}.{l_key}" ] += f",{og_key}={l_value}" except KeyError: info_dict[f"{comp}.{l_key}"] = f"{og_key}={l_value}" else: info_dict[f"{comp}.{l_key}"] = l_value else: info_dict[l_key.lower()] = l_value return info_dict, new_list def _get_separator(self, line): """ get separator to split line :param line: DESCRIPTION :type line: TYPE :return: DESCRIPTION :rtype: TYPE """ sep = None if line.count(":") > 0 and line.count("=") > 0: if line.find(":") < line.find("="): sep = ":" else: sep = "=" elif line.count(":") >= 1: sep = ":" # colon_find = line.find(":") elif line.count("=") >= 1: sep = "=" return sep def _read_line(self, line): sep = self._get_separator(line) if sep: l_list = line.split(sep, 1) if len(l_list) == 2: l_key = l_list[0].strip() l_value = l_list[1].strip().replace('"', "") if l_value.find("[") > 0 and l_value.find("]") > 0: if l_value.count(",") >= 1: l_sep = "," elif l_value.count(";") >= 1: l_sep = ";" elif l_value.count(":") >= 1: l_sep = ":" else: l_sep = None if l_sep: l_value = ( l_value.replace("[", "") .replace("[", "") .split(l_sep) ) return l_key, l_value else: return l_key, "" else: return line, None
[docs] def write_info(self, info_list=None): """ write out information """ if info_list is not None: self.info_list = self._validate_info_list(info_list) info_lines = [">INFO\n"] for line in sorted(list(set(self.info_list))): info_lines.append(f"{' '*4}{line}\n") return info_lines
def _validate_info_list(self, info_list, sort=True): """ check to make sure the info list input is valid, really just checking for Phoenix format where they put two columns in the file and remove any blank lines and the >info line """ new_info_list = [] # try to remove repeating lines if sort: info_list = sorted(list(set(info_list))) else: info_list = info_list for line in info_list: # get rid of empty lines lt = str(line).strip() if len(lt) > 1: if ">" in line: pass else: new_info_list.append(line.strip()) return new_info_list
[docs] def parse_info(self): """ Try to parse the info section into useful information. :return: DESCRIPTION :rtype: TYPE """ new_dict = {} processing_parameters = [] for key, value in self.info_dict.items(): if key is None: continue try: if self._phoenix_file: new_key = self.phoenix_translation_dict[key.lower()] else: new_key = self.translation_dict[key.lower()] if isinstance(new_key, list): values = value.split(",") if len(values) == len(new_key): for vkey, item in zip(new_key, values): item_value = ( item.lower().split("=")[1].replace("mv", "") ) new_dict[vkey] = item_value else: self.logger.warning(f"Could not parse line {value}") raise KeyError else: if new_key == "processing_parameter": processing_parameters.append(f"{key}={value}") else: if "pot resist" in key.lower(): new_dict[new_key] = value.split()[0] elif key.lower().endswith("sen"): comp = key.lower().split()[0] new_dict[ f"{comp}.sensor.manufacturer" ] = "Phoenix Geophysics" new_dict[f"{comp}.sensor.type"] = "Induction Coil" new_dict[new_key] = value else: new_dict[new_key] = value for item in self.info_list: if key.lower() in item.lower(): self.info_list.remove(item) break except KeyError: new_dict[key] = value if processing_parameters != []: new_dict[ "transfer_function.processing_parameters" ] = processing_parameters self.info_dict = new_dict