# -*- coding: utf-8 -*-
"""
.. module:: EDI
:synopsis: Deal with EDI files. The Edi class can read and write an .edi
file, the 'standard format' of magnetotellurics. Each section
of the .edi file is given its own class, so the elements of each
section are attributes for easy access.
.. moduleauthor:: Jared Peacock <jpeacock@usgs.gov>
Updated 2021 to used mt_metadata type metadata and how spectra are read.
"""
# ==============================================================================
# Imports
# ==============================================================================
import numpy as np
from pathlib import Path
from loguru import logger
from mt_metadata.transfer_functions.io.edi.metadata import (
Header,
Information,
DefineMeasurement,
DataSection,
)
from mt_metadata.transfer_functions import tf as metadata
from mt_metadata.transfer_functions.io.tools import (
_validate_str_with_equals,
index_locator,
_validate_edi_lines,
get_nm_elev,
)
from mt_metadata import __version__
# ==============================================================================
# EDI Class
# ==============================================================================
[docs]class EDI(object):
"""
This class is for .edi files, mainly reading and writing. Has been tested
on Winglink and Phoenix output .edi's, which are meant to follow the
archaic EDI format put forward by SEG. Can read impedance, Tipper and/or
spectra data.
The Edi class contains a class for each major section of the .edi file.
Frequency and components are ordered from highest to lowest frequency.
:param fn: full path to .edi file to be read in.
*default* is None. If an .edi file is input, it is
automatically read in and attributes of Edi are filled
:type fn: string or :class:`pathlib.Path`
:Change Latitude: ::
>>> from mt_metadata.transfer_functions.io.edi import EDI
>>> edi_obj = EDI(fn=r"/home/mt/mt01.edi")
>>> # change the latitude
>>> edi_obj.lat = 45.7869
>>> new_edi_fn = edi_obj.write()
"""
def __init__(self, fn=None, **kwargs):
self.logger = logger
self._fn = None
self._edi_lines = None
self.Header = Header()
self.Info = Information()
self.Measurement = DefineMeasurement()
self.Data = DataSection()
self.z = None
self.z_err = None
self.t = None
self.t_err = None
self.frequency = None
self.rotation_angle = None
self.residual_covariance = None
self.signal_inverse_power = None
self.tf = None
self.tf_err = None
self._z_labels = [
["zxxr", "zxxi", "zxx.var"],
["zxyr", "zxyi", "zxy.var"],
["zyxr", "zyxi", "zyx.var"],
["zyyr", "zyyi", "zyy.var"],
]
self._t_labels = [
["txr.exp", "txi.exp", "txvar.exp"],
["tyr.exp", "tyi.exp", "tyvar.exp"],
]
self._accepted_keys = [
"freq",
"zxxr",
"zxxi",
"zxyr",
"zxyi",
"zyxr",
"zyxi",
"zyyr",
"zyyi",
"zxx.var",
"zxy.var",
"zyx.var",
"zyy.var",
"txr.exp",
"txi.exp",
"tyr.exp",
"tyi.exp",
"txvar.exp",
"tyvar.exp",
"rhoxx",
"rhoxy",
"rhoyx",
"rhoyy",
"rhoxx.err",
"rhoxy.err",
"rhoyx.err",
"rhoyy.err",
"phsxx",
"phsxy",
"phsyx",
"phsyy",
"phsxx.err",
"phsxy.err",
"phsyx.err",
"phsyy.err",
"zrot",
"rhorot",
"trot",
]
self._channel_skip_list = [
"filter.name",
"filter.applied",
"time_period.start",
"time_period.end",
"location.elevation",
"location.latitude",
"location.longitude",
"location.x",
"location.y",
"location.z",
"positive.latitude",
"positive.longitude",
"positive.elevation",
"positive.x",
"positive.x2",
"positive.y",
"positive.y2",
"positive.z",
"positive.z2",
"negative.latitude",
"negative.longitude",
"negative.elevation",
"negative.x",
"negative.x2",
"negative.y",
"negative.y2",
"negative.z",
"negative.z2",
"sample_rate",
"data_quality.rating.value",
"data_quality.flag",
]
self._index_dict = {
"zxx": {"ii": 0, "jj": 0, "obj": "z", "err_obj": "z_err"},
"zxy": {"ii": 0, "jj": 1, "obj": "z", "err_obj": "z_err"},
"zyx": {"ii": 1, "jj": 0, "obj": "z", "err_obj": "z_err"},
"zyy": {"ii": 1, "jj": 1, "obj": "z", "err_obj": "z_err"},
"tx": {"ii": 0, "jj": 0, "obj": "t", "err_obj": "t_err"},
"ty": {"ii": 0, "jj": 1, "obj": "t", "err_obj": "t_err"},
"rhoxx": {"ii": 0, "jj": 0, "obj": "z", "err_obj": "z_err"},
"rhoxy": {"ii": 0, "jj": 1, "obj": "z", "err_obj": "z_err"},
"rhoyx": {"ii": 1, "jj": 0, "obj": "z", "err_obj": "z_err"},
"rhoyy": {"ii": 1, "jj": 1, "obj": "z", "err_obj": "z_err"},
}
self._data_header_str = ">!****{0}****!\n"
self._num_format = " 15.6e"
self._block_len = 6
self.fn = fn
for key, value in kwargs.items():
setattr(self, key, value)
def __str__(self):
lines = [f"Station: {self.station}", "-" * 50]
lines.append(f"\tSurvey: {self.survey_metadata.id}")
lines.append(f"\tProject: {self.survey_metadata.project}")
lines.append(
f"\tAcquired by: {self.station_metadata.acquired_by.name}"
)
lines.append(
f"\tAcquired date: {self.station_metadata.time_period.start_date}"
)
lines.append(
f"\tLatitude: {self.station_metadata.location.latitude:.3f}"
)
lines.append(
f"\tLongitude: {self.station_metadata.location.longitude:.3f}"
)
lines.append(
f"\tElevation: {self.station_metadata.location.elevation:.3f}"
)
if self.z is not None:
lines.append("\tImpedance: True")
else:
lines.append("\tImpedance: False")
if self.t is not None:
lines.append("\tTipper: True")
else:
lines.append("\tTipper: False")
if self.frequency is not None:
lines.append(f"\tNumber of periods: {self.frequency.size}")
lines.append(
f"\t\tPeriod Range: {1./self.frequency.max():.5E} -- {1./self.frequency.min():.5E} s"
)
lines.append(
f"\t\tFrequency Range {self.frequency.min():.5E} -- {self.frequency.max():.5E} s"
)
return "\n".join(lines)
def __repr__(self):
return self.__str__()
@property
def fn(self):
return self._fn
@fn.setter
def fn(self, fn):
if fn is not None:
self._fn = Path(fn)
if self._fn.exists():
self.read()
@property
def period(self):
if self.frequency is not None:
return 1.0 / self.frequency
return None
[docs] def read(self, fn=None, get_elevation=False):
"""
Read in an edi file and fill attributes of each section's classes.
Including:
* Header
* Info
* Measurement
* Data
* z, z_err
* t, t_err
.. note:: Automatically detects if data is in spectra format. All
data read in is converted to impedance and Tipper.
:param fn: full path to .edi file to be read in
*default* is None
:type fn: string
:Example: ::
>>> from mt_metadata.transfer_functions.io.edi import EDI
>>> edi_obj = EDI
>>> edi_obj.read(fn=r"/home/mt/mt01.edi")
"""
if fn is not None:
self._fn = Path(fn)
if self.fn is None:
msg = "Must input EDI file to read"
self.logger.error(msg)
raise IOError(msg)
if not self.fn.exists():
msg = f"Cannot find EDI file: {self.fn}"
self.logger.error(msg)
raise IOError(msg)
with open(self.fn, "r") as fid:
self._edi_lines = _validate_edi_lines(fid.readlines())
self.Header.read_header(self._edi_lines)
self.Info.read_info(self._edi_lines)
self.Measurement.read_measurement(self._edi_lines)
self.Data.read_data(self._edi_lines)
self.Data.match_channels(self.Measurement.channel_ids)
self._read_data()
if self.Header.lat is None:
self.Header.lat = self.Measurement.reflat
self.logger.debug(
"Got latitude from reflat for {0}".format(self.Header.dataid)
)
if self.Header.lon is None:
self.Header.lon = self.Measurement.reflon
self.logger.debug(
"Got longitude from reflon for {0}".format(self.Header.dataid)
)
if self.Header.elev is None:
self.Header.elev = self.Measurement.refelev
self.logger.debug(
"Got elevation from refelev for {0}".format(self.Header.dataid)
)
if self.elev in [0, None] and get_elevation:
if self.lat != 0 and self.lon != 0:
self.elev = get_nm_elev(self.lat, self.lon)
def _read_data(self):
"""
Read either impedance or spectra data depending on what the type is
in the data section.
"""
lines = self._edi_lines[self.Data._line_num :]
if self.Data.data_type_in == "spectra":
self.logger.debug("Converting Spectra to Impedance and Tipper")
self.logger.debug(
"Check to make sure input channel list is correct if the data looks incorrect"
)
if self.Data.nchan == 5:
c_list = ["hx", "hy", "hz", "ex", "ey"]
elif self.Data.nchan == 4:
c_list = ["hx", "hy", "ex", "ey"]
elif self.Data.nchan == 6:
c_list = ["hx", "hy", "ex", "ey", "rhx", "rhy"]
elif self.Data.nchan == 7:
c_list = ["hx", "hy", "hz", "ex", "ey", "rhx", "rhy"]
self._read_spectra(lines, comp_list=c_list)
elif self.Data.data_type_in == "z":
self._read_mt(lines)
def _read_mt(self, data_lines):
"""
Read in impedance and tipper data
:param data_lines: list of data lines from the edi file
:type data_lines: list
"""
data_dict = {}
data_find = False
for line in data_lines:
line = line.strip()
if ">" in line and "!" not in line:
line_list = line[1:].strip().split()
if len(line_list) == 0:
continue
key = line_list[0].lower()
if key in self._accepted_keys:
data_find = True
data_dict[key] = []
else:
data_find = False
elif data_find and ">" not in line and "!" not in line:
d_lines = line.strip().split()
for ii, dd in enumerate(d_lines):
# check for empty values and set them to 0, check for any
# other characters sometimes there are ****** for a null
# component
try:
d_lines[ii] = float(dd)
if d_lines[ii] == self.Header.empty:
d_lines[ii] = 0.0
except ValueError:
d_lines[ii] = 0.0
data_dict[key] += d_lines
# put everything into arrays
for key, k_list in data_dict.items():
data_dict[key] = np.array(k_list)
# fill useful arrays
self.frequency = data_dict["freq"]
self.z = np.zeros((self.frequency.size, 2, 2), dtype=complex)
self.z_err = np.zeros((self.frequency.size, 2, 2), dtype=float)
# fill tipper data if there it exists
self.t = np.zeros((self.frequency.size, 1, 2), dtype=complex)
self.t_err = np.zeros((self.frequency.size, 1, 2), dtype=float)
self.data_dict = data_dict
# fill tensors
for key in sorted(self._index_dict.keys(), reverse=True):
index = self._index_dict[key]
ii = index["ii"]
jj = index["jj"]
obj = getattr(self, index["obj"])
error_obj = getattr(self, index["err_obj"])
try:
if key.startswith("z"):
obj[:, ii, jj] = (
data_dict[f"{key}r"] + data_dict[f"{key}i"] * 1j
)
try:
error_key = [
k
for k in data_dict.keys()
if key in k and "var" in k
][0]
error_obj[:, ii, jj] = (
np.abs(data_dict[error_key]) ** 0.5
)
except IndexError:
self.logger.debug(
f"Could not find error information for {key}"
)
elif key.startswith("t"):
obj[:, ii, jj] = (
data_dict[f"{key}r.exp"]
+ data_dict[f"{key}i.exp"] * 1j
)
try:
error_key = [
k
for k in data_dict.keys()
if key in k and "var" in k
][0]
error_obj[:, ii, jj] = (
np.abs(data_dict[error_key]) ** 0.5
)
except IndexError:
self.logger.debug(
f"Could not find error information for {key}"
)
elif key.startswith("r") or key.startswith("p"):
self.logger.debug(
"Reading RHO and PHS to compute impedance"
)
if (self.z[:, ii, jj] == 0).all():
phase = data_dict[f"phs{key[-2:]}"]
z_real = np.sqrt(
(5 * self.frequency * data_dict[key])
/ (np.tan(np.deg2rad(phase)) ** 2 + 1)
)
z_imag = (np.tan(np.deg2rad(phase))) * z_real
if ii == 1 and jj == 0:
if phase.mean() < 90 and phase.mean() > 0:
obj[:, ii, jj] = -1 * (z_real + 1j * z_imag)
else:
obj[:, ii, jj] = z_real + 1j * z_imag
error_obj[:, ii, jj] = np.deg2rad(
data_dict[f"phs{key[-2:]}.err"]
) * np.sqrt(data_dict[key] * (self.frequency * 5))
except KeyError as error:
self.logger.debug(error)
# check for order of frequency, we want high togit low
if self.frequency[0] < self.frequency[1]:
self.logger.debug(
"Ordered arrays to be arranged from high to low frequency"
)
self.frequency = self.frequency[::-1]
self.z = self.z[::-1]
self.z_err = self.z_err[::-1]
try:
self.rotation_angle = np.array(data_dict["zrot"])
except KeyError:
try:
self.rotation_angle = np.array(data_dict["rhorot"])
except KeyError:
self.rotation_angle = np.zeros_like(self.frequency)
def _read_spectra(
self,
data_lines,
comp_list=["hx", "hy", "hz", "ex", "ey", "rhx", "rhy"],
):
"""
Read in spectra data and convert to impedance and Tipper.
Translated from A. Kelbert's EMTF fortran module
:param data_lines: list of lines from edi file
:type data_lines: list
:param comp_list: list of components that correspond to the columns
of the spectra data.
:type comp_list: list
"""
data_dict = {}
avgt_dict = {}
data_find = False
for line in data_lines:
if line.lower().find(">spectra") == 0 and line.find("!") == -1:
line_list = _validate_str_with_equals(line)
data_find = True
# frequency will be the key
try:
key = float(
[
ss.split("=")[1]
for ss in line_list
if ss.lower().find("freq") == 0
][0]
)
data_dict[key] = []
avgt = float(
[
ss.split("=")[1]
for ss in line_list
if ss.lower().find("avgt") == 0
][0]
)
avgt_dict[key] = avgt
except ValueError:
self.logger.debug("did not find frequency key")
elif data_find and line.find(">") == -1 and line.find("!") == -1:
data_dict[key] += [float(ll) for ll in line.strip().split()]
elif line.find(">spectra") == -1:
data_find = False
# get an object that contains the indices for each component
cc = index_locator(comp_list)
self.frequency = np.array(sorted(list(data_dict.keys()), reverse=True))
self.z = np.zeros((self.frequency.size, 2, 2), dtype=complex)
self.t = np.zeros((self.frequency.size, 1, 2), dtype=complex)
self.z_err = np.zeros_like(self.z, dtype=float)
self.t_err = np.zeros_like(self.t, dtype=float)
self.residual_covariance = np.zeros(
(self.frequency.size, cc.n_outputs, cc.n_outputs), dtype=complex
)
self.signal_inverse_power = np.zeros(
(self.frequency.size, cc.n_inputs, cc.n_inputs), dtype=complex
)
self.tf = np.zeros(
(self.frequency.size, cc.n_outputs, cc.n_inputs), dtype=complex
)
self.tf_err = np.zeros_like(self.tf, dtype=float)
for kk, key in enumerate(self.frequency):
# read in spectra as an (n_channel x n_channel) array
spectra_arr = np.reshape(
np.array(data_dict[key]), (len(comp_list), len(comp_list))
)
# compute cross powers
s_arr = np.zeros_like(spectra_arr, dtype=complex)
for ii in range(s_arr.shape[0]):
for jj in range(ii, s_arr.shape[0]):
if ii == jj:
s_arr[ii, jj] = spectra_arr[ii, jj]
else:
# minus sign for complex conjugation
# original spectra data are of form <A,B*>, but we need
# the order <B,A*>...
# this is achieved by complex conjugation of the
# original entries
s_arr[ii, jj] = complex(
spectra_arr[jj, ii], -spectra_arr[ii, jj]
)
# keep complex conjugated entries in the lower
# triangular matrix:
s_arr[jj, ii] = complex(
spectra_arr[jj, ii], spectra_arr[ii, jj]
)
# check for empty values
s_arr[s_arr == 0] = np.nan
s_arr[s_arr == self.Header.empty] = np.nan
# from A. Kelbert's EMTF
# cross spectra matrices
# Note we changed the indices to [ex, ey, hz] from [hz, ex, ey]
# input channels
rh = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex)
rr = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex)
hh = np.zeros((cc.n_inputs, cc.n_inputs), dtype=complex)
# output channels
re = np.zeros((cc.n_inputs, cc.n_outputs), dtype=complex)
he = np.zeros((cc.n_inputs, cc.n_outputs), dtype=complex)
ee = np.zeros((cc.n_outputs, cc.n_outputs), dtype=complex)
# fill in cross powers for input channels
rh[0, 0] = s_arr[cc.rhx, cc.hx]
rh[0, 1] = s_arr[cc.rhx, cc.hy]
rh[1, 0] = s_arr[cc.rhy, cc.hx]
rh[1, 1] = s_arr[cc.rhy, cc.hy]
rr[0, 0] = s_arr[cc.rhx, cc.rhx]
rr[0, 1] = s_arr[cc.rhx, cc.rhy]
rr[1, 0] = s_arr[cc.rhy, cc.rhx]
rr[1, 1] = s_arr[cc.rhy, cc.rhy]
hh[0, 0] = s_arr[cc.hx, cc.hx]
hh[0, 1] = s_arr[cc.hx, cc.hy]
hh[1, 0] = s_arr[cc.hy, cc.hx]
hh[1, 1] = s_arr[cc.hy, cc.hy]
# fill in cross powers for output channels
if cc.has_tipper and cc.has_electric:
re[0, 2] = s_arr[cc.rhx, cc.hz]
re[0, 0] = s_arr[cc.rhx, cc.ex]
re[0, 1] = s_arr[cc.rhx, cc.ey]
re[1, 2] = s_arr[cc.rhy, cc.hz]
re[1, 0] = s_arr[cc.rhy, cc.ex]
re[1, 1] = s_arr[cc.rhy, cc.ey]
he[0, 2] = s_arr[cc.hx, cc.hz]
he[0, 0] = s_arr[cc.hx, cc.ex]
he[0, 1] = s_arr[cc.hx, cc.ey]
he[1, 2] = s_arr[cc.hy, cc.hz]
he[1, 0] = s_arr[cc.hy, cc.ex]
he[1, 1] = s_arr[cc.hy, cc.ey]
ee[2, 2] = s_arr[cc.hz, cc.hz]
ee[2, 0] = s_arr[cc.hz, cc.ex]
ee[2, 1] = s_arr[cc.hz, cc.ey]
ee[0, 2] = s_arr[cc.ex, cc.hz]
ee[0, 0] = s_arr[cc.ex, cc.ex]
ee[0, 1] = s_arr[cc.ex, cc.ey]
ee[1, 2] = s_arr[cc.ey, cc.hz]
ee[1, 0] = s_arr[cc.ey, cc.ex]
ee[1, 1] = s_arr[cc.ey, cc.ey]
elif not cc.has_tipper and cc.has_electric:
re[0, 0] = s_arr[cc.rhx, cc.ex]
re[0, 1] = s_arr[cc.rhx, cc.ey]
re[1, 0] = s_arr[cc.rhy, cc.ex]
re[1, 0] = s_arr[cc.rhy, cc.ey]
he[0, 0] = s_arr[cc.hx, cc.ex]
he[0, 1] = s_arr[cc.hx, cc.ey]
he[0, 1] = s_arr[cc.hy, cc.ex]
he[1, 1] = s_arr[cc.hy, cc.ey]
ee[0, 0] = s_arr[cc.ex, cc.ex]
ee[0, 1] = s_arr[cc.ex, cc.ey]
ee[1, 0] = s_arr[cc.ey, cc.ex]
ee[1, 1] = s_arr[cc.ey, cc.ey]
elif cc.has_tipper and not cc.has_electric:
re[0, 0] = s_arr[cc.rhx, cc.hz]
re[1, 0] = s_arr[cc.rhy, cc.hz]
he[0, 0] = s_arr[cc.hx, cc.hz]
he[1, 0] = s_arr[cc.hy, cc.hz]
ee[0, 0] = s_arr[cc.hz, cc.hz]
# check to make sure the values are legit for accurate results
if abs(np.linalg.det(rh)) < np.finfo(float).eps:
self.logger.warning(
"spectral matrix determinant is too small "
f"{abs(np.linalg.det(rh))} for period {key}. "
"Results may be inaccurate"
)
tfh = np.matmul(np.linalg.inv(rh), re)
tf = tfh.conj().T
sig = np.matmul(
np.linalg.inv(rh), np.matmul(rr, np.linalg.inv(rh.conj().T))
)
res = (
ee
- np.matmul(tf, he)
- np.matmul(he.conj().T, tfh)
+ np.matmul(tf, np.matmul(hh, tfh))
) / avgt_dict[key]
# variance = abs(np.dot(res[0 : cc.n_inputs, :].T, sig))
variance = np.zeros((cc.n_outputs, cc.n_inputs), dtype=complex)
for nn in range(cc.n_outputs):
for mm in range(cc.n_inputs):
variance[nn, mm] = res[nn, nn] * sig[mm, mm]
tf_err = np.sqrt(np.abs(variance))
self.tf[kk, :, :] = tf
self.tf_err[kk, :, :] = np.sqrt(np.abs(variance))
self.signal_inverse_power[kk, :, :] = sig
self.residual_covariance[kk, :, :] = res
if cc.has_tipper and cc.has_electric:
self.z[kk, :, :] = tf[0:2, :]
self.z_err[kk, :, :] = tf_err[0:2, :]
self.t[kk, :, :] = tf[2, :]
self.t_err[kk, :, :] = tf_err[2, :]
self.z_err[np.where(np.nan_to_num(self.z_err) == 0.0)] = 1.0
self.t_err[np.nan_to_num(self.t_err) == 0.0] = 1.0
elif not cc.has_tipper and cc.has_electric:
self.z[kk, :, :] = tf[:, :]
self.z_err[kk, :, :] = tf_err[:, :]
self.z_err[np.where(np.nan_to_num(self.z_err) == 0.0)] = 1.0
elif cc.has_tipper and not cc.has_electric:
self.t[kk, :, :] = tf[:, :]
self.t_err[kk, :, :] = tf_err[:, :]
self.t_err[np.nan_to_num(self.t_err) == 0.0] = 1.0
[docs] def write(
self, new_edi_fn=None, longitude_format="LON", latlon_format="dms"
):
"""
Write a new edi file from either an existing .edi file or from data
input by the user into the attributes of Edi.
:param new_edi_fn: full path to new edi file.
*default* is None, which will write to the same
file as the input .edi with as:
r"/home/mt/mt01_1.edi"
:type new_edi_fn: string
:param longitude_format: whether to write longitude as LON or LONG.
options are 'LON' or 'LONG', default 'LON'
:type longitude_format: string
:param latlon_format: format of latitude and longitude in output edi,
degrees minutes seconds ('dms') or decimal
degrees ('dd')
:type latlon_format: string
:returns: full path to new edi file
:rtype: string
:Example: ::
>>> import mtpy.core.edi as mtedi
>>> edi_obj = mtedi.Edi(fn=r"/home/mt/mt01/edi")
>>> edi_obj.Header.dataid = 'mt01_rr'
>>> n_edi_fn = edi_obj.write_edi_file()
"""
if new_edi_fn is None:
if self.fn is not None:
new_edi_fn = self.fn
else:
new_edi_fn = Path().cwd().joinpath(f"{self.Header.dataid}.edi")
# write lines
extra_lines = []
if self.survey_metadata.summary != None:
extra_lines.append(
f"\tsurvey.summary = {self.survey_metadata.summary}\n"
)
if self.Header.progname != "mt_metadata":
extra_lines.append(
f"\toriginal_program.name={self.Header.progname}\n"
)
if self.Header.progvers != __version__:
extra_lines.append(
f"\toriginal_program.version={self.Header.progvers}\n"
)
if self.Header.progdate != "1980-01-01":
extra_lines.append(
f"\toriginal_program.date={self.Header.progdate}\n"
)
if self.Header.fileby != "1980-01-01":
extra_lines.append(
f"\toriginal_file.date={self.Header.filedate}\n"
)
header_lines = self.Header.write_header(
longitude_format=longitude_format, latlon_format=latlon_format
)
info_lines = self.Info.write_info()
info_lines.insert(1, "".join(extra_lines))
define_lines = self.Measurement.write_measurement(
longitude_format=longitude_format, latlon_format=latlon_format
)
self.Data.nfreq = len(self.frequency)
dsect_lines = self.Data.write_data()
# write out frequencies
freq_lines = [self._data_header_str.format("frequencies".upper())]
freq_lines += self._write_data_block(self.frequency, "freq")
# write out rotation angles
zrot_lines = [
self._data_header_str.format("impedance rotation angles".upper())
]
if self.rotation_angle is None:
self.rotation_angle = np.zeros(self.frequency.size)
elif isinstance(self.rotation_angle, (float, int)):
self.rotation_angle = np.repeat(
self.rotation_angle, self.frequency.size
)
elif len(self.rotation_angle) != self.frequency.size:
raise ValueError(
"rotation angle must be the same length and the number of "
f"frequencies {len(self.rotation_angle)} != {self.frequency.size}"
)
zrot_lines += self._write_data_block(self.rotation_angle, "zrot")
# write out data only impedance and tipper
z_data_lines = [self._data_header_str.format("impedances".upper())]
self.z = np.nan_to_num(self.z)
self.z_err = np.nan_to_num(self.z_err)
self.t = np.nan_to_num(self.t)
self.t_err = np.nan_to_num(self.t_err)
if self.z is not None:
for ii in range(2):
for jj in range(2):
z_lines_real = self._write_data_block(
self.z[:, ii, jj].real, self._z_labels[2 * ii + jj][0]
)
z_lines_imag = self._write_data_block(
self.z[:, ii, jj].imag, self._z_labels[2 * ii + jj][1]
)
z_lines_var = self._write_data_block(
self.z_err[:, ii, jj] ** 2.0,
self._z_labels[2 * ii + jj][2],
)
z_data_lines += z_lines_real
z_data_lines += z_lines_imag
z_data_lines += z_lines_var
if self.t is None:
trot_lines = [""]
t_data_lines = [""]
elif np.all(self.t == 0):
trot_lines = [""]
t_data_lines = [""]
else:
try:
# write out rotation angles
trot_lines = [
self._data_header_str.format(
"tipper rotation angles".upper()
)
]
if isinstance(self.rotation_angle, float):
trot = np.repeat(self.rotation_angle, self.frequency.size)
else:
trot = self.rotation_angle
trot_lines += self._write_data_block(np.array(trot), "trot")
# write out tipper lines
t_data_lines = [self._data_header_str.format("tipper".upper())]
for jj in range(2):
t_lines_real = self._write_data_block(
self.t[:, 0, jj].real, self._t_labels[jj][0]
)
t_lines_imag = self._write_data_block(
self.t[:, 0, jj].imag, self._t_labels[jj][1]
)
t_lines_var = self._write_data_block(
self.t_err[:, 0, jj] ** 2.0, self._t_labels[jj][2]
)
t_data_lines += t_lines_real
t_data_lines += t_lines_imag
t_data_lines += t_lines_var
except AttributeError:
trot_lines = [""]
t_data_lines = [""]
edi_lines = (
header_lines
+ info_lines
+ define_lines
+ dsect_lines
+ freq_lines
+ zrot_lines
+ z_data_lines
+ trot_lines
+ t_data_lines
+ [">END"]
)
with open(new_edi_fn, "w") as fid:
fid.write("".join(edi_lines))
self.fn = new_edi_fn
def _write_data_block(self, data_comp_arr, data_key):
"""
Write a data block
:param data_comp_arr: array of data components
:type data_comp_arr: np.ndarray
:param data_key: the component to write out
:type data_key: string
:returns: list of lines to write to edi file
:rtype: list
"""
if data_key.lower().find("z") >= 0 and data_key.lower() not in [
"zrot",
"trot",
]:
block_lines = [
">{0} ROT=ZROT // {1:.0f}\n".format(
data_key.upper(), data_comp_arr.size
)
]
elif data_key.lower().find("t") >= 0 and data_key.lower() not in [
"zrot",
"trot",
]:
block_lines = [
">{0} ROT=TROT // {1:.0f}\n".format(
data_key.upper(), data_comp_arr.size
)
]
elif data_key.lower() == "freq":
block_lines = [
">{0} // {1:.0f}\n".format(
data_key.upper(), data_comp_arr.size
)
]
elif data_key.lower() in ["zrot", "trot"]:
block_lines = [
">{0} // {1:.0f}\n".format(
data_key.upper(), data_comp_arr.size
)
]
else:
raise ValueError("Cannot write block for {0}".format(data_key))
for d_index, d_comp in enumerate(data_comp_arr, 1):
if d_comp == 0.0 and data_key.lower() not in ["zrot", "trot"]:
d_comp = self.Header.empty
# write the string in the specified format
num_str = "{0:{1}}".format(d_comp, self._num_format)
# check to see if a new line is needed
if d_index % self._block_len == 0:
num_str += "\n"
# at the end of the block add a return
if d_index == data_comp_arr.size:
num_str += "\n"
block_lines.append(num_str)
return block_lines
# -----------------------------------------------------------------------
# set a few important properties
# --> Latitude
@property
def lat(self):
"""latitude in decimal degrees"""
return self.Header.lat
@lat.setter
def lat(self, input_lat):
"""set latitude and make sure it is converted to a float"""
self.Header.lat = input_lat
# --> Longitude
@property
def lon(self):
"""longitude in decimal degrees"""
return self.Header.lon
@lon.setter
def lon(self, input_lon):
"""set latitude and make sure it is converted to a float"""
self.Header.lon = input_lon
# --> Elevation
@property
def elev(self):
"""Elevation in elevation units"""
return self.Header.elev
@elev.setter
def elev(self, input_elev):
"""set elevation and make sure it is converted to a float"""
self.Header.elev = input_elev
# --> station
@property
def station(self):
"""station name"""
if self.Header.dataid is not None:
return self.Header.dataid.replace(r"/", "_")
elif self.Measurement.refloc is not None:
return self.Measurement.refloc.replace('"', "")
elif self.Data.sectid is not None:
return self.Data.sectid
@station.setter
def station(self, new_station):
"""station name"""
if not isinstance(new_station, str):
new_station = f"{new_station}".replace(r"/", "_")
self.Header.dataid = new_station
self.Data.sectid = new_station
@property
def survey_metadata(self):
sm = metadata.Survey()
sm.project = self.Header.project
if sm.project is None:
try:
sm.project = self.Header.prospect
except AttributeError:
pass
sm.id = self.Header.survey
if sm.id is None:
sm.id = "0"
sm.acquired_by.name = self.Header.acqby
sm.geographic_name = self.Header.loc
sm.country = self.Header.country
for key, value in self.Info.info_dict.items():
if key is None:
key = "extra"
key = key.lower()
if key.startswith("survey."):
sm.set_attr_from_name(key.split("survey.")[1], value)
sm.add_station(self.station_metadata)
return sm
@survey_metadata.setter
def survey_metadata(self, survey):
"""
Update metadata from a survey metadata object
:param value: DESCRIPTION
:type value: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
if not isinstance(survey, metadata.Survey):
raise TypeError(
"Input must be a mt_metadata.transfer_function.Survey object"
f" not {type(survey)}"
)
self.Header.survey = survey.id
self.Header.project = survey.project
self.Header.loc = survey.geographic_name
self.Header.country = survey.country
if survey.summary != None:
self.Info.info_list.append(f"survey.summary = {survey.summary}")
@property
def station_metadata(self):
sm = metadata.Station()
sm.add_run(metadata.Run(id=f"{self.station}a"))
sm.id = self.station
sm.data_type = "MT"
sm.channels_recorded = self.Measurement.channels_recorded
# location
sm.location.latitude = self.lat
sm.location.longitude = self.lon
sm.location.elevation = self.elev
sm.location.datum = self.Header.datum
sm.location.declination.value = self.Header.declination.value
sm.orientation.reference_frame = self.Header.coordinate_system.split()[
0
]
# provenance
sm.acquired_by.name = self.Header.acqby
sm.provenance.creation_time = self.Header.filedate
sm.provenance.submitter.author = self.Header.fileby
sm.provenance.software.name = self.Header.fileby
sm.provenance.software.version = self.Header.progvers
sm.transfer_function.processed_date = self.Header.filedate
sm.transfer_function.runs_processed = sm.run_list
sm.transfer_function.id = self.station
# dates
if self.Header.acqdate is not None:
sm.time_period.start = self.Header.acqdate
if self.Header.enddate is not None:
sm.time_period.end = self.Header.enddate
for key, value in self.Info.info_dict.items():
if key is None:
continue
# processing information
for key, value in self.Info.info_dict.items():
if key is None:
continue
key = key.lower()
if "provenance" in key:
sm.set_attr_from_name(key, value)
elif "transfer_function" in key:
key = key.split("transfer_function.")[1]
if "processing_parameters" in key:
param = key.split(".")[-1]
sm.transfer_function.processing_parameters.append(
f"{param}={value}"
)
else:
sm.transfer_function.set_attr_from_name(key, value)
if "runs_processed" in key:
sm.run_list = sm.transfer_function.runs_processed
elif key.startswith("run."):
key = key.split("run.")[1]
comp, key = key.split(".", 1)
try:
ch = getattr(sm.runs[0], comp)
except AttributeError:
ch = None
if ch is None:
if comp in ["ex", "ey"]:
ch = metadata.Electric(component=comp)
sm.runs[0].add_channel(ch)
elif comp in ["hx", "hy", "hz", "rrhx", "rrhy"]:
ch = metadata.Magnetic(component=comp)
sm.runs[0].add_channel(ch)
else:
self.logger.warning(
f"Do not recognize channel {comp}, skipping..."
)
ch.set_attr_from_name(key, value)
elif key.startswith("data_logger"):
sm.runs[0].set_attr_from_name(key, value)
elif key.startswith("station."):
sm.set_attr_from_name(key.split("station.")[1], value)
elif "processing." in key:
key = key.split("processing.")[1]
if key in ["software"]:
sm.transfer_function.software.name = value
elif key in ["tag"]:
if value.count(",") > 0:
sm.transfer_function.remote_references = value.split(
","
)
else:
sm.transfer_function.remote_references = value.split()
elif key in ["processedby", "processed_by"]:
sm.transfer_function.processed_by.author = value
elif key in ["runlist", "run_list"]:
if value.count(",") > 0:
runs = value.split(",")
else:
runs = value.split()
sm.run_list = []
for rr in runs:
if rr not in sm.runs.keys():
sm.add_run(metadata.Run(id=rr))
sm.transfer_function.runs_processed = runs
elif key == "sitename":
sm.geographic_name = value
elif key == "signconvention":
sm.transfer_function.sign_convention = value
elif "mtft" in key or "emtf" in key or "mtedit" in key:
sm.transfer_function.processing_parameters.append(
f"{key}={value}"
)
if self.Header.filedate is not None:
sm.transfer_function.processed_date = self.Header.filedate
# make any extra information in info list into a comment
sm.comments = "\n".join(self.Info.info_list)
# add information to runs
for rr in sm.runs:
if rr.time_period.start == "1980-01-01T00:00:00+00:00":
rr.time_period.start = sm.time_period.start
if rr.time_period.end == "1980-01-01T00:00:00+00:00":
rr.time_period.end = sm.time_period.end
for ch in self.Measurement.channels_recorded:
try:
rr.add_channel(getattr(self, f"{ch}_metadata"))
except AttributeError:
pass
return sm
@station_metadata.setter
def station_metadata(self, sm):
"""
Set EDI metadata from station metadata object
:param sm: Station object to pull metadata from
:type sm: :class:`mt_metadata.transfer_functions.tf.Station`
"""
### fill header information from station
self.Header.acqby = sm.acquired_by.name
self.Header.acqdate = sm.time_period.start
self.Header.coordinate_system = sm.orientation.reference_frame
self.Header.dataid = sm.id
self.Header.declination = sm.location.declination
self.Header.elev = sm.location.elevation
self.Header.fileby = sm.provenance.submitter.author
self.Header.filedate = sm.provenance.creation_time
self.Header.lat = sm.location.latitude
self.Header.lon = sm.location.longitude
self.Header.datum = sm.location.datum
self.Header.units = sm.transfer_function.units
self.Header.enddate = sm.time_period.end
### write notes
# write comments, which would be anything in the info section from an edi
if isinstance(sm.comments, str):
self.Info.info_list += sm.comments.split("\n")
# write transfer function info first
for k, v in sm.transfer_function.to_dict(single=True).items():
if not v in [None]:
if k in ["processing_parameters"]:
for item in v:
self.Info.info_list.append(
f"transfer_function.{item.replace('=', ' = ')}"
)
else:
self.Info.info_list.append(f"transfer_function.{k} = {v}")
# write provenance
for k, v in sm.provenance.to_dict(single=True).items():
if not v in [None, "None", "null", "1980-01-01T00:00:00+00:00"]:
self.Info.info_list.append(f"provenance.{k} = {v}")
# write field notes
for run in sm.runs:
r_dict = run.to_dict(single=True)
for r_key, r_value in r_dict.items():
if r_value in [
None,
"None",
"null",
"1980-01-01T00:00:00+00:00",
]:
continue
self.Info.info_list.append(f"{run.id}.{r_key} = {r_value}")
for ch in run.channels:
ch_dict = ch.to_dict(single=True)
for ch_key, ch_value in ch_dict.items():
if ch_key not in self._channel_skip_list:
if ch_value in [
None,
"None",
"null",
"1980-01-01T00:00:00+00:00",
]:
continue
self.Info.info_list.append(
f"{run.id}.{ch.component}.{ch_key} = {ch_value}"
)
self.Measurement.from_metadata(ch)
### fill measurement
self.Measurement.refelev = sm.location.elevation
self.Measurement.reflat = sm.location.latitude
self.Measurement.reflon = sm.location.longitude
self.Measurement.refloc = sm.id
self.Measurement.maxchan = len(sm.channels_recorded)
def _get_electric_metadata(self, comp):
"""
get electric information from the various metadata
"""
comp = comp.lower()
electric = metadata.Electric(component=comp)
electric.positive.type = "electric"
electric.negative.type = "electric"
if hasattr(self.Measurement, f"meas_{comp}"):
meas = getattr(self.Measurement, f"meas_{comp}")
for attr in [
"negative.x",
"negative.y",
"positive.x2",
"positive.y2",
"measurement_azimuth",
"translated_azimuth",
]:
if electric.get_attr_from_name(attr) is None:
electric.set_attr_from_name(attr, 0)
electric.dipole_length = meas.dipole_length
electric.channel_id = meas.id
electric.measurement_azimuth = meas.azimuth
electric.translated_azimuth = meas.azimuth
electric.component = meas.chtype
electric.channel_number = meas.channel_number
electric.negative.x = meas.x
electric.positive.x2 = meas.x2
electric.negative.y = meas.y
electric.positive.y2 = meas.y2
for k, v in self.Info.info_dict.items():
if k is None:
continue
if f"{comp}." in k:
key = k.split(f"{comp}.")[1].strip()
if key == "manufacturer":
electric.negative.manufacturer = v
electric.positive.manufacturer = v
if key == "type":
electric.negative.type = v
electric.positive.type = v
if (
electric.positive.x2 == 0
and electric.positive.y2 == 0.0
and electric.negative.x == 0
and electric.negative.y == 0.0
):
electric.positive.x2 = electric.dipole_length * np.cos(
np.deg2rad(meas.azimuth)
)
electric.positive.y2 = electric.dipole_length * np.sin(
np.deg2rad(meas.azimuth)
)
for key, value in self.Info.info_dict.items():
if key is None:
continue
elif ".type" in key:
continue
if f".{comp}." in key:
key = key.split(f".{comp}.", 1)[-1]
electric.set_attr_from_name(key, value)
return electric
@property
def ex_metadata(self):
return self._get_electric_metadata("ex")
@property
def ey_metadata(self):
return self._get_electric_metadata("ey")
def _get_magnetic_metadata(self, comp):
"""
get magnetic metadata from the various sources
:param comp: DESCRIPTION
:type comp: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
magnetic = metadata.Magnetic(component=comp)
magnetic.sensor.type = "magnetic"
if hasattr(self.Measurement, f"meas_{comp}"):
meas = getattr(self.Measurement, f"meas_{comp}")
for attr in ["location.x", "location.y", "location.z"]:
if magnetic.get_attr_from_name(attr) is None:
magnetic.set_attr_from_name(attr, 0)
magnetic.measurement_azimuth = meas.azm
magnetic.translated_azimuth = meas.azm
magnetic.component = meas.chtype
magnetic.channel_number = meas.channel_number
magnetic.channel_id = meas.id
magnetic.location.x = meas.x
magnetic.location.y = meas.y
try:
magnetic.sensor.id = meas.meas_magnetic.sensor
except AttributeError:
pass
for k, v in self.Info.info_dict.items():
if k is None:
continue
if f"{comp}." in k:
key = k.split(f"{comp}.")[1].strip()
if key == "manufacturer":
magnetic.sensor.manufacturer = v
if key == "type":
magnetic.sensor.type = v
if key.startswith("sensor."):
magnetic.set_attr_from_name(key, v)
return magnetic
@property
def hx_metadata(self):
return self._get_magnetic_metadata("hx")
@property
def hy_metadata(self):
return self._get_magnetic_metadata("hy")
@property
def hz_metadata(self):
return self._get_magnetic_metadata("hz")
@property
def rrhx_metadata(self):
return self._get_magnetic_metadata("rrhx")
@property
def rrhy_metadata(self):
return self._get_magnetic_metadata("rrhy")