# -*- coding: utf-8 -*-
"""
Created on Thu Sep 28 12:34:23 2017
@author: jrpeacock
Translated from code by B. Murphy.
"""
# ==============================================================================
# Imports
# ==============================================================================
from pathlib import Path
from loguru import logger
import numpy as np
import xarray as xr
from mt_metadata.transfer_functions.tf import (
Survey,
Station,
Run,
Electric,
Magnetic,
)
from mt_metadata.transfer_functions.io.tools import get_nm_elev
from .metadata import Channel
from mt_metadata.utils.list_dict import ListDict
from mt_metadata import DEFAULT_CHANNEL_NOMENCLATURE
# ==============================================================================
PERIOD_FORMAT = ".10g"
[docs]class ZMMError(Exception):
pass
[docs]class ZMM(ZMMHeader):
"""
Container for Egberts zrr format.
"""
def __init__(self, fn=None, **kwargs):
super().__init__()
self.fn = fn
self._header_count = 0
self.transfer_functions = None
self.sigma_e = None
self.sigma_s = None
self.periods = None
self.dataset = None
self.decimation_dict = {}
self.channel_nomenclature = DEFAULT_CHANNEL_NOMENCLATURE
self._transfer_function = self._initialize_transfer_function()
for key in list(kwargs.keys()):
setattr(self, key, kwargs[key])
if self.fn is not None:
self.read()
def __str__(self):
lines = [f"Station: {self.station}", "-" * 50]
lines.append(f"\tSurvey: {self.survey_metadata.id}")
lines.append(f"\tProject: {self.survey_metadata.project}")
lines.append(
f"\tAcquired by: {self.station_metadata.acquired_by.author}"
)
lines.append(
f"\tAcquired date: {self.station_metadata.time_period.start_date}"
)
lines.append(f"\tLatitude: {self.latitude:.3f}")
lines.append(f"\tLongitude: {self.longitude:.3f}")
lines.append(f"\tElevation: {self.elevation:.3f}")
if "ex" in self.output_channels:
lines.append("\tImpedance: True")
else:
lines.append("\tImpedance: False")
if "hz" in self.output_channels:
lines.append("\tTipper: True")
else:
lines.append("\tTipper: False")
if self.periods is not None:
lines.append(f"\tNumber of periods: {self.periods.size}")
lines.append(
f"\t\tPeriod Range: {self.periods.min():.5E} -- {self.periods.max():.5E} s"
)
lines.append(
f"\t\tFrequency Range {1./self.periods.max():.5E} -- {1./self.periods.min():.5E} s"
)
return "\n".join(lines)
def __repr__(self):
lines = []
lines.append(f"station='{self.station}'")
lines.append(f"latitude={self.latitude:.2f}")
lines.append(f"longitude={self.longitude:.2f}")
lines.append(f"elevation={self.elevation:.2f}")
return f"MT( {(', ').join(lines)} )"
def __eq__(self, other):
"""
compare equals
:param other: DESCRIPTION
:type other: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
if not isinstance(other, ZMM):
raise TypeError(f"Cannot compare type {type(other)} with ZMM.")
is_equal = True
if self.station_metadata != other.station_metadata:
is_equal = False
if not self.dataset.equals(other.dataset):
is_equal = False
self.logger.info("Datasets are not equal")
print(self.dataset.fillna(0) != other.dataset.fillna(0).all())
return is_equal
@property
def channel_nomenclature(self):
return self._channel_nomenclature
@channel_nomenclature.setter
def channel_nomenclature(self, ch_dict):
"""
channel dictionary
"""
if not isinstance(ch_dict, dict):
raise TypeError(
"Channel_nomenclature must be a dictionary with keys "
"['ex', 'ey', 'hx', 'hy', 'hz']."
)
self._channel_nomenclature = ch_dict
# unpack channel nomenclature dict
for comp in DEFAULT_CHANNEL_NOMENCLATURE.keys():
try:
setattr(self, f"_{comp}", ch_dict[comp])
except KeyError:
setattr(self, f"_{comp}", comp)
ch_dict[comp] = comp
self._ex_ey = [self._ex, self._ey]
self._hx_hy = [self._hx, self._hy]
self._ex_ey_hz = [self._ex, self._ey, self._hz]
@property
def _ch_input_dict(self):
return {
"isp": self._hx_hy,
"res": self._ex_ey_hz,
"tf": self._hx_hy,
"tf_error": self._hx_hy,
"all": [self._ex, self._ey, self._hz, self._hx, self._hy],
}
@property
def _ch_output_dict(self):
return {
"isp": self._hx_hy,
"res": self._ex_ey_hz,
"tf": self._ex_ey_hz,
"tf_error": self._ex_ey_hz,
"all": [self._ex, self._ey, self._hz, self._hx, self._hy],
}
def _initialize_transfer_function(self, periods=[1]):
"""
create an empty x array for the data. For now this accommodates
a single processed station.
:return: DESCRIPTION
:rtype: TYPE
"""
# create an empty array for the transfer function
tf = xr.DataArray(
data=0.0 + 0j,
dims=["period", "output", "input"],
coords={
"period": periods,
"output": self._ch_output_dict["all"],
"input": self._ch_input_dict["all"],
},
name="transfer_function",
)
tf_err = xr.DataArray(
data=0.0,
dims=["period", "output", "input"],
coords={
"period": periods,
"output": self._ch_output_dict["all"],
"input": self._ch_input_dict["all"],
},
name="error",
)
inv_signal_power = xr.DataArray(
data=0.0 + 0j,
dims=["period", "output", "input"],
coords={
"period": periods,
"output": self._ch_output_dict["all"],
"input": self._ch_input_dict["all"],
},
name="inverse_signal_power",
)
residual_covariance = xr.DataArray(
data=0.0 + 0j,
dims=["period", "output", "input"],
coords={
"period": periods,
"output": self._ch_output_dict["all"],
"input": self._ch_input_dict["all"],
},
name="residual_covariance",
)
# will need to add in covariance in some fashion
return xr.Dataset(
{
tf.name: tf,
tf_err.name: tf_err,
inv_signal_power.name: inv_signal_power,
residual_covariance.name: residual_covariance,
},
coords={
"period": periods,
"output": self._ch_output_dict["all"],
"input": self._ch_input_dict["all"],
},
)
@property
def frequencies(self):
if self.periods is None:
return None
return 1.0 / self.periods
[docs] def initialize_arrays(self):
"""
make initial arrays based on number of frequencies and channels
"""
if self.num_freq is None:
return
self.periods = np.zeros(self.num_freq)
self.transfer_functions = np.zeros(
(self.num_freq, self.num_channels - 2, 2), dtype=np.complex64
)
# residual covariance -- square matrix with dimension as number of
# predicted channels
self.sigma_e = np.zeros(
(self.num_freq, self.num_channels - 2, self.num_channels - 2),
dtype=np.complex64,
)
# inverse coherent signal power -- square matrix, with dimension as the
# number of predictor channels
# since EMTF and this code assume N predictors is 2,
# this dimension is hard-coded
self.sigma_s = np.zeros((self.num_freq, 2, 2), dtype=np.complex64)
[docs] def read(self, fn=None, get_elevation=False):
"""
Read in Egbert zrr/zmm file
:param fn: full path to zmm/zrr file
:type fn: string or pathlib.Path
"""
if fn is not None:
self.fn = fn
self.read_header()
self.channel_nomenclature = self.channel_dict
self.initialize_arrays()
self._transfer_function = self._initialize_transfer_function()
self.dataset = self._initialize_transfer_function()
### read each data block and fill the appropriate array
for ii, period_block in enumerate(self._get_period_blocks()):
data_block = self._read_period_block(period_block)
self.periods[ii] = data_block["period"]
self._fill_tf_array_from_block(data_block["tf"], ii)
self._fill_sig_array_from_block(data_block["sig"], ii)
self._fill_res_array_from_block(data_block["res"], ii)
self._fill_dataset()
self.station_metadata.id = self.station
self.station_metadata.data_type = "MT"
self.station_metadata.channels_recorded = self.channels_recorded
# provenance
self.station_metadata.provenance.software.name = "EMTF"
self.station_metadata.provenance.software.version = "1"
self.station_metadata.transfer_function.runs_processed = (
self.station_metadata.run_list
)
self.station_metadata.transfer_function.software.name = "EMTF"
self.station_metadata.transfer_function.software.version = "1"
self.station_metadata.runs[0].sample_rate = np.median(
np.array(
[d["sample_rate"] for k, d in self.decimation_dict.items()]
)
)
# add information to runs
for rr in self.station_metadata.runs:
if self.transfer_functions.shape[1] >= 2:
rr.ex = self.ex_metadata
rr.ey = self.ey_metadata
rr.hx = self.hx_metadata
rr.hy = self.hy_metadata
if self.hz is not None:
rr.hz = self.hz_metadata
if self.elevation in [0, None] and get_elevation:
if self.latitude != 0 and self.longitude != 0:
self.elevation = get_nm_elev(
self.latitude,
self.longitude,
)
[docs] def write(self, fn, decimation_levels=None):
"""
write a zmm file
decimation_levels should be a dictionary with keys
* decimation_level
values will be a dictionary with keys
* frequency_band, value = (min, max)
* n_points, value = int
* sampling_freq, value = float
"""
if fn is not None:
self.fn = fn
lines = self.write_header()
lines += [
"",
] # add 1 space separating header from data
for p in self.dataset.period.data:
a = self.dataset.sel(period=p)
try:
dec_dict = self.decimation_dict[f"{p:{PERIOD_FORMAT}}"]
except KeyError:
dec_dict = {
"level": 0,
"bands": (0, 0),
"npts": 0,
"sample_rate": self.station_metadata.runs[0].sample_rate,
}
lines += [
(
f"period : {p:^18.5f} "
f"decimation level {dec_dict['level']:^8d}"
f"freq. band from {dec_dict['bands'][0]:>5d} to {dec_dict['bands'][1]:>5d}"
)
]
lines += [
f"number of data point {dec_dict['npts']} sampling freq. {dec_dict['sample_rate']} Hz"
]
# write tf
lines += [" Transfer Functions"]
for c_out in self.output_channels:
line = ""
for c_in in self.input_channels:
c_in_name = self.channel_nomenclature[c_in]
c_out_name = self.channel_nomenclature[c_out]
tf_element = a.transfer_function.loc[
dict(output=c_out_name, input=c_in_name)
].data
line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}"
lines += [line]
# write signal power
lines += [" Inverse Coherent Signal Power Matrix"]
for ii, c_out in enumerate(self.input_channels):
line = ""
for c_in in self.input_channels[: ii + 1]:
c_in_name = self.channel_nomenclature[c_in]
c_out_name = self.channel_nomenclature[c_out]
tf_element = a.inverse_signal_power.loc[
dict(output=c_out_name, input=c_in_name)
].data
line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}"
lines += [line]
# write residual covariance
lines += [" Residual Covariance"]
for ii, c_out in enumerate(self.output_channels):
line = ""
for c_in in self.output_channels[: ii + 1]:
c_in_name = self.channel_nomenclature[c_in]
c_out_name = self.channel_nomenclature[c_out]
tf_element = a.residual_covariance.loc[
dict(output=c_out_name, input=c_in_name)
].data
line += f"{tf_element.real:>12.4E}{tf_element.imag:>12.4E}"
lines += [line]
with open(self.fn, "w") as fid:
fid.write("\n".join(lines))
return self.fn
def _get_period_blocks(self):
"""
split file into period blocks
"""
with open(self.fn, "r") as fid:
fn_str = fid.read()
period_strings = fn_str.lower().split("period")
period_blocks = []
for per in period_strings:
period_blocks.append(per.split("\n"))
return period_blocks[1:]
def _read_period_block(self, period_block):
"""
read block:
period : 0.01587 decimation level 1 freq. band from 46 to 80
number of data point 951173 sampling freq. 0.004 Hz
Transfer Functions
0.1474E+00 -0.2049E-01 0.1618E+02 0.1107E+02
-0.1639E+02 -0.1100E+02 0.5559E-01 0.1249E-01
Inverse Coherent Signal Power Matrix
0.2426E+03 -0.2980E-06
0.9004E+02 -0.2567E+01 0.1114E+03 0.1192E-06
Residual Covaraince
0.8051E-05 0.0000E+00
-0.2231E-05 -0.2863E-06 0.8866E-05 0.0000E+00
"""
period = float(period_block[0].strip().split(":")[1].split()[0].strip())
level = int(
period_block[0].strip().split("level")[1].split()[0].strip()
)
bands = (
int(period_block[0].strip().split("from")[1].split()[0].strip()),
int(period_block[0].strip().split("to")[1].split()[0].strip()),
)
npts = int(period_block[1].strip().split("point")[1].split()[0].strip())
sr = float(period_block[1].strip().split("freq.")[1].split()[0].strip())
self.decimation_dict[f"{period:{PERIOD_FORMAT}}"] = {
"level": level,
"bands": bands,
"npts": npts,
"sample_rate": sr,
}
data_dict = {"period": period, "tf": [], "sig": [], "res": []}
key = "tf"
for line in period_block[2:]:
if "transfer" in line.lower():
key = "tf"
continue
elif "signal" in line.lower():
key = "sig"
continue
elif "residual" in line.lower():
key = "res"
continue
line_list = [float(xx) for xx in line.strip().split()]
values = [
complex(line_list[ii], line_list[ii + 1])
for ii in range(0, len(line_list), 2)
]
data_dict[key].append(values)
return data_dict
def _flatten_list(self, x_list):
"""
flatten = lambda l: [item for sublist in l for item in sublist]
Returns
-------
None.
"""
flat_list = [item for sublist in x_list for item in sublist]
return flat_list
def _fill_tf_array_from_block(self, tf_block, index):
"""
fill tf arrays from data blocks
"""
tf_block = self._flatten_list(tf_block)
for kk, jj in enumerate(range(0, len(tf_block), 2)):
self.transfer_functions[index, kk, 0] = tf_block[jj]
self.transfer_functions[index, kk, 1] = tf_block[jj + 1]
def _fill_sig_array_from_block(self, sig_block, index):
"""
fill signal array
"""
sig_block = self._flatten_list(sig_block)
self.sigma_s[index, 0, 0] = sig_block[0]
self.sigma_s[index, 1, 0] = sig_block[1]
self.sigma_s[index, 0, 1] = sig_block[1].conjugate()
self.sigma_s[index, 1, 1] = sig_block[2]
def _fill_res_array_from_block(self, res_block, index):
"""
fill residual covariance array
"""
for jj in range(self.num_channels - 2):
values = res_block[jj]
for kk in range(jj + 1):
if jj == kk:
self.sigma_e[index, jj, kk] = values[kk]
else:
self.sigma_e[index, jj, kk] = values[kk]
self.sigma_e[index, kk, jj] = values[kk].conjugate()
def _fill_dataset(self):
"""
fill the dataset
:return: DESCRIPTION
:rtype: TYPE
"""
self.dataset = self._initialize_transfer_function(periods=self.periods)
self.dataset.transfer_function.loc[
dict(input=self.input_channels, output=self.output_channels)
] = self.transfer_functions
self.dataset.inverse_signal_power.loc[
dict(input=self.input_channels, output=self.input_channels)
] = self.sigma_s
self.dataset.residual_covariance.loc[
dict(input=self.output_channels, output=self.output_channels)
] = self.sigma_e
[docs] def calculate_impedance(self, angle=0.0):
"""
calculate the impedances from the transfer functions
"""
# check to see if there are actually electric fields in the TFs
if not hasattr(self, "ex") or not hasattr(self, "ey"):
msg = (
"Cannot return apparent resistivity and phase "
"data because these TFs do not contain electric "
"fields as a predicted channel."
)
self.logger.error(msg)
raise ZMMError(msg)
# transform the TFs first...
# build transformation matrix for predictor channels
# (horizontal magnetic fields)
hx_index = self.hx.index
hy_index = self.hy.index
u = np.eye(2, 2)
u[hx_index, hx_index] = np.cos(np.deg2rad(self.hx.azimuth - angle))
u[hx_index, hy_index] = np.sin(np.deg2rad(self.hx.azimuth - angle))
u[hy_index, hx_index] = np.cos(np.deg2rad(self.hy.azimuth - angle))
u[hy_index, hy_index] = np.sin(np.deg2rad(self.hy.azimuth - angle))
u = np.linalg.inv(u)
# build transformation matrix for predicted channels (electric fields)
ex_index = self.ex.index
ey_index = self.ey.index
v = np.eye(
self.transfer_functions.shape[1], self.transfer_functions.shape[1]
)
v[ex_index - 2, ex_index - 2] = np.cos(
np.deg2rad(self.ex.azimuth - angle)
)
v[ey_index - 2, ex_index - 2] = np.sin(
np.deg2rad(self.ex.azimuth - angle)
)
v[ex_index - 2, ey_index - 2] = np.cos(
np.deg2rad(self.ey.azimuth - angle)
)
v[ey_index - 2, ey_index - 2] = np.sin(
np.deg2rad(self.ey.azimuth - angle)
)
# matrix multiplication...
rotated_transfer_functions = np.matmul(
v, np.matmul(self.transfer_functions, u.T)
)
rotated_sigma_s = np.matmul(u, np.matmul(self.sigma_s, u.T))
rotated_sigma_e = np.matmul(v, np.matmul(self.sigma_e, v.T))
# now pull out the impedance tensor
z = np.zeros((self.num_freq, 2, 2), dtype=np.complex64)
z[:, 0, 0] = rotated_transfer_functions[
:, ex_index - 2, hx_index
] # Zxx
z[:, 0, 1] = rotated_transfer_functions[
:, ex_index - 2, hy_index
] # Zxy
z[:, 1, 0] = rotated_transfer_functions[
:, ey_index - 2, hx_index
] # Zyx
z[:, 1, 1] = rotated_transfer_functions[
:, ey_index - 2, hy_index
] # Zyy
# and the variance information
var = np.zeros((self.num_freq, 2, 2))
var[:, 0, 0] = np.real(
rotated_sigma_e[:, ex_index - 2, ex_index - 2]
* rotated_sigma_s[:, hx_index, hx_index]
)
var[:, 0, 1] = np.real(
rotated_sigma_e[:, ex_index - 2, ex_index - 2]
* rotated_sigma_s[:, hy_index, hy_index]
)
var[:, 1, 0] = np.real(
rotated_sigma_e[:, ey_index - 2, ey_index - 2]
* rotated_sigma_s[:, hx_index, hx_index]
)
var[:, 1, 1] = np.real(
rotated_sigma_e[:, ey_index - 2, ey_index - 2]
* rotated_sigma_s[:, hy_index, hy_index]
)
error = np.sqrt(var)
return z, error
[docs] def calculate_tippers(self, angle=0.0):
"""
calculate induction vectors
"""
# check to see if there is a vertical magnetic field in the TFs
if self.hz is None:
raise ZMMError(
"Cannot return tipper data because the TFs do not "
"contain the vertical magnetic field as a "
"predicted channel."
)
# transform the TFs first...
# build transformation matrix for predictor channels
# (horizontal magnetic fields)
hx_index = self.hx.index
hy_index = self.hy.index
u = np.eye(2, 2)
u[hx_index, hx_index] = np.cos(np.deg2rad(self.hx.azimuth - angle))
u[hx_index, hy_index] = np.sin(np.deg2rad(self.hx.azimuth - angle))
u[hy_index, hx_index] = np.cos(np.deg2rad(self.hy.azimuth - angle))
u[hy_index, hy_index] = np.sin(np.deg2rad(self.hy.azimuth - angle))
u = np.linalg.inv(u)
# don't need to transform predicated channels (assuming no tilt in Hz)
hz_index = self.hz.index
v = np.eye(
self.transfer_functions.shape[1], self.transfer_functions.shape[1]
)
# matrix multiplication...
rotated_transfer_functions = np.matmul(
v, np.matmul(self.transfer_functions, u.T)
)
rotated_sigma_s = np.matmul(u, np.matmul(self.sigma_s, u.T))
rotated_sigma_e = np.matmul(v, np.matmul(self.sigma_e, v.T))
# now pull out tipper information
tipper = np.zeros((self.num_freq, 2), dtype=np.complex64)
tipper[:, 0] = rotated_transfer_functions[
:, hz_index - 2, hx_index
] # Tx
tipper[:, 1] = rotated_transfer_functions[
:, hz_index - 2, hy_index
] # Ty
# and the variance/error information
var = np.zeros((self.num_freq, 2))
var[:, 0] = np.real(
rotated_sigma_e[:, hz_index - 2, hz_index - 2]
* rotated_sigma_s[:, hx_index, hx_index]
) # Tx
var[:, 1] = np.real(
rotated_sigma_e[:, hz_index - 2, hz_index - 2]
* rotated_sigma_s[:, hy_index, hy_index]
) # Ty
error = np.sqrt(var)
tipper = tipper.reshape((self.num_freq, 1, 2))
error = error.reshape((self.num_freq, 1, 2))
return tipper, error
@property
def survey_metadata(self):
sm = Survey()
sm.add_station(self.station_metadata)
return sm
def _get_electric_metadata(self, comp):
"""
get electric information from the various metadata
"""
comp = comp.lower()
electric = Electric()
electric.positive.type = "electric"
electric.negative.type = "electric"
if hasattr(self, comp):
meas = getattr(self, comp)
electric.measurement_azimuth = meas.azimuth
electric.measurement_tilt = meas.tilt
electric.component = comp
electric.channel_number = meas.number
electric.channel_id = meas.number
return electric
@property
def ex_metadata(self):
return self._get_electric_metadata("ex")
@property
def ey_metadata(self):
return self._get_electric_metadata("ey")
def _get_magnetic_metadata(self, comp):
"""
get magnetic metadata from the various sources
:param comp: DESCRIPTION
:type comp: TYPE
:return: DESCRIPTION
:rtype: TYPE
"""
comp = comp.lower()
magnetic = Magnetic()
if hasattr(self, comp):
meas = getattr(self, comp)
magnetic.measurement_azimuth = meas.azimuth
magnetic.measurement_tilt = meas.tilt
magnetic.component = comp
magnetic.channel_number = meas.number
magnetic.channel_id = meas.number
return magnetic
@property
def hx_metadata(self):
return self._get_magnetic_metadata("hx")
@property
def hy_metadata(self):
return self._get_magnetic_metadata("hy")
@property
def hz_metadata(self):
return self._get_magnetic_metadata("hz")