Source code for mt_metadata.timeseries.stationxml.xml_inventory_mt_experiment

# -*- coding: utf-8 -*-
"""
Created on Mon Feb 22 09:27:10 2021

:copyright:
    Jared Peacock (jpeacock@usgs.gov)

:license: MIT

"""
# =============================================================================
# Imports
# =============================================================================
from pathlib import Path
from copy import deepcopy
from loguru import logger

from mt_metadata import timeseries as metadata
from mt_metadata.timeseries.stationxml import (
    XMLNetworkMTSurvey,
    XMLStationMTStation,
    XMLChannelMTChannel,
)

from obspy.core import inventory
from obspy import read_inventory

# =============================================================================


[docs]class XMLInventoryMTExperiment: """ Read the full files and put the elements in the appropriate locations. """ def __init__(self): self.logger = logger self.network_translator = XMLNetworkMTSurvey() self.station_translator = XMLStationMTStation() self.channel_translator = XMLChannelMTChannel()
[docs] def xml_to_mt(self, inventory_object=None, stationxml_fn=None, mt_fn=None): """ Read in a StationXML using Obspy :class:`obspy.core.inventory.Inventory` and convert to an MT :class:`mt_metadata.timeseries.Experiment` :param inventory_object: inventory object or StationXML file name :type inventory_object: :class:`obspy.core.inventory.Inventory` :param stationxml_fn: full path to StationXML file :type stationxml_fn: Path or string :param mt_fn: full path to MT file :type mt_fn: Path or string :return: DESCRIPTION :rtype: TYPE """ if stationxml_fn: if isinstance(stationxml_fn, Path): stationxml_fn = stationxml_fn.as_posix() inventory_object = read_inventory(stationxml_fn) if not inventory_object: msg = "Must provide either an inventory object or StationXML file path" self.logger.error(msg) raise ValueError(msg) mt_experiment = metadata.Experiment() for xml_network in inventory_object.networks: mt_survey = self.network_translator.xml_to_mt(xml_network) for xml_station in xml_network.stations: mt_station = self.station_translator.xml_to_mt(xml_station) for xml_channel in xml_station: mt_channel, mt_filters = self.channel_translator.xml_to_mt( xml_channel, mt_survey.filters ) mt_survey.filters.update(mt_filters) # if there is a run list match channel to runs if self.channel_translator.run_list: for run_id in sorted(self.channel_translator.run_list): # need to make a copy of the channel otherwise # the properties are constantly overwritten as we loop # through the runs run_channel = deepcopy(mt_channel) mt_run = mt_station.get_run(run_id) # need to set the start and end time to the run run_channel.time_period.start = ( mt_run.time_period.start ) run_channel.time_period.end = mt_run.time_period.end mt_run.add_channel(run_channel) # if there are runs already try to match by start, end, sample_rate # initialized runs have a sample rate of 0. This could be an # issue in the future. elif mt_station.runs: for mt_run in mt_station.runs: if ( mt_run.sample_rate == mt_channel.sample_rate or mt_run.sample_rate == 0 ): # match assuming the runs have the correct start # and end times. if ( mt_run.time_period.start >= mt_channel.time_period.start ) and ( mt_run.time_period.end <= mt_channel.time_period.end ): mt_run.channels.append(mt_channel) mt_run.sample_rate = mt_channel.sample_rate # make a new run with generic information else: mt_run = metadata.Run( id=f"{len(mt_station.runs)+1:03d}" ) mt_run.time_period.start = mt_channel.time_period.start mt_run.time_period.end = mt_channel.time_period.end mt_run.sample_rate = mt_channel.sample_rate mt_run.channels.append(mt_channel) mt_station.runs.append(mt_run) mt_station.update_time_period() mt_survey.stations.append(mt_station) if xml_network.stations: mt_survey.update_bounding_box() mt_survey.update_time_period() # need to check if the network/survey already exists, the files # from make_mth5_from_iris have multiples of the same network if mt_survey.id in mt_experiment.surveys.keys(): mt_experiment.surveys[mt_survey.id].stations.update( mt_survey.stations ) else: mt_experiment.surveys.append(mt_survey) if mt_fn: mt_experiment.to_xml(fn=mt_fn) return mt_experiment
[docs] def mt_to_xml( self, mt_experiment, mt_fn=None, stationxml_fn=None, ns_dict=None ): """ Convert from MT :class:`mt_metadata.timeseries.Experiment` to :class:`obspy.core.inventory.Inventory` :param mt_experiment: DESCRIPTION :type mt_experiment: TYPE :param mt_fn: DESCRIPTION, defaults to None :type mt_fn: TYPE, optional :param stationxml_fn: DESCRIPTION, defaults to None :type stationxml_fn: TYPE, optional :param ns_dict: DESCRIPTION, defaults to None :type ns_dict: TYPE, optional :raises ValueError: DESCRIPTION :return: DESCRIPTION :rtype: TYPE """ if mt_fn: mt_experiment = metadata.Experiment() mt_experiment.from_xml(mt_fn) if not mt_experiment: msg = "Must provide either an experiment object or file path" self.logger.error(msg) raise ValueError(msg) xml_inventory = inventory.Inventory() for mt_survey in mt_experiment.surveys: xml_network = self.network_translator.mt_to_xml(mt_survey) for mt_station in mt_survey.stations: xml_station = self.station_translator.mt_to_xml(mt_station) if mt_survey.country is not None: xml_station.site.country = ",".join( [str(country) for country in mt_survey.country] ) for mt_run in mt_station.runs: xml_station = self.add_run( xml_station, mt_run, mt_survey.filters ) xml_network.stations.append(xml_station) xml_inventory.networks.append(xml_network) if stationxml_fn: if isinstance(stationxml_fn, Path): stationxml_fn = stationxml_fn.as_posix() xml_inventory.write(stationxml_fn, "stationxml", nsmap=ns_dict) return xml_inventory
[docs] def add_run(self, xml_station, mt_run, filters_dict): """ Check to see if channel information already exists in the channel list of an xml station. .. todo:: Need to make sure the times are updated :param xml_station: DESCRIPTION :type xml_station: TYPE :param xml_channel: DESCRIPTION :type xml_channel: TYPE :return: DESCRIPTION :rtype: TYPE """ for mt_channel in mt_run.channels: xml_channel = self.channel_translator.mt_to_xml( mt_channel, filters_dict ) existing_channels = xml_station.select( channel=xml_channel.code ).channels if existing_channels: find = False start_list = [c.start_date for c in existing_channels] existing_channel = existing_channels[ start_list.index(max(start_list)) ] # should only compare the last channel # for existing_channel in existing_channels: run_list = [c.value for c in existing_channel.comments] # Compare channel metadata if matches just add run.id if its # not already there. self.logger.debug( f"Comparing {xml_channel.code} to {existing_channel.code}" ) if self.compare_xml_channel(xml_channel, existing_channel): find = True self.logger.debug( f"Matched {xml_channel.code}={existing_channel.code}" ) if not mt_run.id in run_list: self.logger.debug( f"adding run id {mt_run.id} to {run_list}" ) existing_channel.comments.append( inventory.Comment(mt_run.id, subject="mt.run.id") ) if xml_channel.start_date < existing_channel.start_date: self.logger.debug("Changed starting time") existing_channel.start_date = xml_channel.start_date if xml_channel.end_date > existing_channel.end_date: self.logger.debug("Changed ending time") existing_channel.end_date = xml_channel.end_date # continue if not find: self.logger.debug( f"xxx Unmatched {xml_channel.code}!={existing_channel.code}" ) run_list = [c.value for c in xml_channel.comments] if not mt_run.id in run_list: self.logger.debug( f"adding run id {mt_run.id} to {run_list}" ) xml_channel.comments.append( inventory.Comment(mt_run.id, subject="mt.run.id") ) xml_station.channels.append(xml_channel) else: self.logger.debug( f"no existing channels for {xml_channel.code}" ) run_list = [c.value for c in xml_channel.comments] if not mt_run.id in run_list: self.logger.debug( f"adding run id {mt_run.id} to {run_list}" ) xml_channel.comments.append( inventory.Comment(mt_run.id, subject="mt.run.id") ) xml_station.channels.append(xml_channel) return xml_station
[docs] def compare_xml_channel(self, xml_channel_01, xml_channel_02): """ Compare xml channels to see if a new epoch needs to be made or not. :param xml_channel_01: DESCRIPTION :type xml_channel_01: TYPE :param xml_channel_02: DESCRIPTION :type xml_channel_02: TYPE :return: DESCRIPTION :rtype: TYPE """ if xml_channel_01.code != xml_channel_02.code: self.logger.debug(f"{xml_channel_01.code} != {xml_channel_02.code}") return False if xml_channel_01.sample_rate != xml_channel_02.sample_rate: self.logger.debug( f"{xml_channel_01.sample_rate} != {xml_channel_02.sample_rate}" ) return False if xml_channel_01.sensor != xml_channel_02.sensor: self.logger.debug( f"{xml_channel_01.sensor} != {xml_channel_02.sensor}" ) return False if round(xml_channel_01.latitude, 3) != round( xml_channel_02.latitude, 3 ): self.logger.debug( f"{round(xml_channel_01.latitude, 3)} != {round(xml_channel_02.latitude, 3)}" ) return False if round(xml_channel_01.longitude, 3) != round( xml_channel_02.longitude, 3 ): self.logger.debug( f"{round(xml_channel_01.longitude, 3)} != {round(xml_channel_02.longitude, 3)}" ) return False if round(xml_channel_01.azimuth, 2) != round(xml_channel_02.azimuth, 2): self.logger.debug( f"{round(xml_channel_01.azimuth, 2)} != {round(xml_channel_02.azimuth, 2)}" ) return False if round(xml_channel_01.dip, 2) != round(xml_channel_02.dip, 2): self.logger.debug( f"{round(xml_channel_01.dip, 2)} != {round(xml_channel_02.dip, 2)}" ) return False return True