Source code for mt_metadata.transfer_functions.io.jfiles.jfile

# -*- coding: utf-8 -*-
"""
.. py:module:: JFile
    :synopsis: Deal with J-Files of the format propsed by Alan Jones 

.. codeauthor:: Jared Peacock <jpeacock@usgs.gov>

"""

# ==============================================================================
from pathlib import Path
import numpy as np
from loguru import logger

from mt_metadata.transfer_functions.tf import (
    Survey,
    Station,
    Run,
    Electric,
    Magnetic,
)
from mt_metadata.utils.mttime import MTime
from .metadata import Header
from mt_metadata.transfer_functions.io.tools import get_nm_elev


# ==============================================================================
# Class to read j_file
# ==============================================================================
[docs]class JFile: """ be able to read and write a j-file """ def __init__(self, fn=None, **kwargs): self.logger = logger self.header = Header() self._jfn = None self.fn = fn self.z = None self.z_err = None self.t = None self.t_err = None self.frequency = None for key, value in kwargs.items(): setattr(self, key, value) if self.fn is not None: self.read() def __str__(self): lines = [f"Station: {self.header.station}", "-" * 50] lines.append(f"\tSurvey: {self.survey_metadata.id}") lines.append(f"\tProject: {self.survey_metadata.project}") lines.append( f"\tAcquired by: {self.station_metadata.acquired_by.author}" ) lines.append( f"\tAcquired date: {self.station_metadata.time_period.start_date}" ) lines.append( f"\tLatitude: {self.station_metadata.location.latitude:.3f}" ) lines.append( f"\tLongitude: {self.station_metadata.location.longitude:.3f}" ) lines.append( f"\tElevation: {self.station_metadata.location.elevation:.3f}" ) if self.z is not None: if (self.z == 0).all(): lines.append("\tImpedance: False") else: lines.append("\tImpedance: True") else: lines.append("\tImpedance: False") if self.t is not None: if (self.t == 0).all(): lines.append("\tTipper: False") else: lines.append("\tTipper: True") else: lines.append("\tTipper: False") if self.frequency is not None: lines.append(f"\tNumber of periods: {self.frequency.size}") lines.append( f"\t\tPeriod Range: {1./self.frequency.max():.5E} -- {1./self.frequency.min():.5E} s" ) lines.append( f"\t\tFrequency Range {self.frequency.min():.5E} -- {self.frequency.max():.5E} s" ) return "\n".join(lines) def __repr__(self): lines = [] lines.append(f"station='{self.header.station}'") lines.append(f"latitude={self.header.latitude:.2f}") lines.append(f"longitude={self.header.longitude:.2f}") lines.append(f"elevation={self.header.elevation:.2f}") return f"JFile({(', ').join(lines)})" @property def fn(self): return self._jfn @fn.setter def fn(self, value): """ set file name :param value: DESCRIPTION :type value: TYPE :return: DESCRIPTION :rtype: TYPE """ if value is None: return value = Path(value) if value.suffix in [".j"]: self._jfn = value else: msg = f"Input file must be a *.j file not {value.suffix}" self.logger.error(msg) raise ValueError(msg) @property def periods(self): return 1.0 / self.frequency def _validate_j_file(self): """ change the lat, lon, elev lines to something machine readable, if they are not. """ if not self.fn.exists(): msg = f"Could not find {self.fn}, check path" self.logger.error(msg) raise NameError(msg) with open(self.fn, "r", errors="replace") as fid: j_lines = fid.readlines() for variable in ["lat", "lon", "elev"]: for ii, line in enumerate(j_lines): if variable in line.lower(): name = line.split("=")[0] try: value = float(line.split("=")[1].strip()) except ValueError: value = 0.0 self.logger.debug(f"Changed {name[1:]} to 0.0") j_lines[ii] = "{0} = {1}\n".format(name, value) break return j_lines
[docs] def read(self, fn=None, get_elevation=False): """ Read data from a j file :param fn: full path to j-file to read, defaults to None :type fn: string or pathlib.Path, optional """ # read data z_index_dict = { "zxx": (0, 0), "zxy": (0, 1), "zyx": (1, 0), "zyy": (1, 1), } t_index_dict = {"tzx": (0, 0), "tzy": (0, 1)} if fn is not None: self.fn = fn self.logger.debug(f"Reading {self.fn}") j_line_list = self._validate_j_file() self.header.read_header(j_line_list) self.header.read_metadata(j_line_list) data_lines = [ j_line for j_line in j_line_list if not ">" in j_line and not "#" in j_line ][:] self.header.station = data_lines[0].strip() # sometimes birrp outputs some missing periods, so the best way to deal with # this that I could come up with was to get things into dictionaries with # key words that are the period values, then fill in Z and T from there # leaving any missing values as 0 # make empty dictionary that have keys as the component z_dict = dict([(z_key, {}) for z_key in list(z_index_dict.keys())]) t_dict = dict([(t_key, {}) for t_key in list(t_index_dict.keys())]) for d_line in data_lines[1:]: # check to see if we are at the beginning of a component block, if so # set the dictionary key to that value if "z" in d_line.lower(): d_key = d_line.strip().split()[0].lower() # if we are at the number of periods line, skip it elif ( len(d_line.strip().split()) == 1 and "r" not in d_line.lower() ): continue elif "r" in d_line.lower(): break # get the numbers into the correct dictionary with a key as period and # for now we will leave the numbers as a list, which we will parse later else: # split the line up into each number d_list = d_line.strip().split() # make a copy of the list to be sure we don't rewrite any values, # not sure if this is necessary at the moment d_value_list = list(d_list) for d_index, d_value in enumerate(d_list): # check to see if the column number can be converted into a float # if it can't, then it will be set to 0, which is assumed to be # a masked number when writing to an .edi file try: d_value = float(d_value) # need to check for masked points represented by # birrp as -999, apparently if d_value == -999 or np.isnan(d_value): d_value_list[d_index] = 0.0 else: d_value_list[d_index] = d_value except ValueError: d_value_list[d_index] = 0.0 # put the numbers in the correct dictionary as: # key = period, value = [real, imaginary, error] if d_key in list(z_index_dict.keys()): z_dict[d_key][d_value_list[0]] = d_value_list[1:4] elif d_key in list(t_index_dict.keys()): t_dict[d_key][d_value_list[0]] = d_value_list[1:4] # --> now we need to get the set of periods for all components # check to see if there is any tipper data output all_periods = [] for z_key in list(z_index_dict.keys()): for f_key in list(z_dict[z_key].keys()): all_periods.append(f_key) if len(list(t_dict["tzx"].keys())) == 0: self.logger.debug(f"Could not find any Tipper data in {self.fn}") find_tipper = False else: for t_key in list(t_index_dict.keys()): for f_key in list(t_dict[t_key].keys()): all_periods.append(f_key) find_tipper = True all_periods = np.array(sorted(list(set(all_periods)))) all_periods = all_periods[np.nonzero(all_periods)] num_per = len(all_periods) # fill arrays using the period key from all_periods self.z = np.zeros((num_per, 2, 2), dtype=complex) self.z_err = np.zeros((num_per, 2, 2), dtype=float) self.t = np.zeros((num_per, 1, 2), dtype=complex) self.t_err = np.zeros((num_per, 1, 2), dtype=float) for p_index, per in enumerate(all_periods): for z_key in sorted(z_index_dict.keys()): kk = z_index_dict[z_key][0] ll = z_index_dict[z_key][1] try: z_value = ( z_dict[z_key][per][0] + 1j * z_dict[z_key][per][1] ) self.z[p_index, kk, ll] = z_value self.z_err[p_index, kk, ll] = z_dict[z_key][per][2] except KeyError: self.logger.debug(f"No value found for period {per:.4g}") self.logger.debug(f"For component {z_key}") if find_tipper is True: for t_key in sorted(t_index_dict.keys()): kk = t_index_dict[t_key][0] ll = t_index_dict[t_key][1] try: t_value = ( t_dict[t_key][per][0] + 1j * t_dict[t_key][per][1] ) self.t[p_index, kk, ll] = t_value self.t_err[p_index, kk, ll] = t_dict[t_key][per][2] except KeyError: self.logger.debug( f"No value found for period {per:.4g}" ) self.logger.debug(f"For component {t_key}") # put the results into mtpy objects self.frequency = 1.0 / all_periods self.z[np.where(self.z == np.inf)] = 0 + 0j self.t[np.where(self.t == np.inf)] = 0 + 0j self.z_err[np.where(self.z_err == np.inf)] = 10**6 self.t_err[np.where(self.t_err == np.inf)] = 10**6 if self.header.elevation == 0 and get_elevation: if self.header.latitude != 0 and self.header.longitude != 0: self.header.elevation = get_nm_elev( self.header.latitude, self.header.longitude )
@property def station_metadata(self): sm = Station() r1 = Run(id="001") if self.header.birrp_parameters.deltat < 0: r1.sample_rate = abs(self.header.birrp_parameters.deltat) else: r1.sample_rate = 1.0 / (self.header.birrp_parameters.deltat) if not np.all(self.z == 0): for ii, comp in enumerate(["ex", "ey", "hx", "hy"], 1): if comp.startswith("e"): ch = Electric(component=comp, channel_id=ii) elif comp.startswith("h"): ch = Magnetic(component=comp, channel_id=ii) r1.add_channel(ch) if not np.all(self.t == 0): ch = Magnetic(component="hz", channel_id=5) r1.add_channel(ch) sm.runs.append(r1) sm.id = self.header.station sm.data_type = "MT" sm.location.latitude = self.header.latitude sm.location.longitude = self.header.longitude sm.location.elevation = self.header.elevation # provenance sm.provenance.software.name = "BIRRP" sm.provenance.software.version = "5" sm.transfer_function.id = self.header.station sm.transfer_function.processed_date = MTime( self.fn.stat().st_ctime ).iso_str sm.transfer_function.runs_processed = sm.run_list # add birrp parameters for key, value in self.header.birrp_parameters.to_dict( single=True ).items(): sm.transfer_function.processing_parameters.append( f"{key} = {value}" ) return sm @property def survey_metadata(self): sm = Survey() sm.add_station(self.station_metadata) return sm