Source code for pystrata.output

# The MIT License (MIT)
#
# Copyright (c) 2016-2021 Albert Kottke
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import collections

import matplotlib.pyplot as plt
import numpy as np
import scipy.integrate
from matplotlib.colors import LogNorm, TwoSlopeNorm
from scipy.interpolate import interp1d

try:
    import pandas as pd
except ImportError:
    pd = None

import pykooh

from .motion import GRAVITY, TimeSeriesMotion, WaveField


def plot_amplification_evolv(
    calc,
    metric="accel_tf",
    depths=None,
    freqs=None,
    normalized=False,
    wave_field_out="within",
    diverging_cmap=True,
    include_vs_profile=False,
    ax=None,
    **kwds,
):
    # Default plotting kwds. Combine both set of plot keywords and prefer the provided
    kwds = {
        "cmap": "RdBu" if diverging_cmap else "magma_r",
        "shading": "gouraud",
        "norm": TwoSlopeNorm(vmin=0, vcenter=1) if diverging_cmap else LogNorm(),
    } | kwds

    if freqs is None:
        freqs = np.logspace(-1, 2, num=301)

    osc_damping = 0.05 if "osc_damping" not in kwds else kwds["osc_damping"]

    ln_freqs = np.log(freqs)
    ln_freqs_mot = np.log(calc.motion.freqs)

    def get_amp(metric, depth):
        loc_output = calc.profile.location(wave_field_out, depth=depth)
        if metric == "accel_tf":
            y = np.abs(calc.calc_accel_tf(calc.loc_input, loc_output))
            # Interpolate the specific frequencies
            y = np.interp(ln_freqs, ln_freqs_mot, y)
        elif metric == "site_amp":
            if get_amp.in_ars is None:
                get_amp.in_ars = calc.motion.calc_osc_accels(freqs, osc_damping)

            out_ars = calc.motion.calc_osc_accels(
                freqs, osc_damping, calc.calc_accel_tf(calc.loc_input, loc_output)
            )
            y = out_ars / get_amp.in_ars
        else:
            raise NotImplementedError

        return y

    # Initialize static variable
    get_amp.in_ars = None

    if depths is None:
        depths = np.linspace(0, calc.profile[-1].depth)

    if ax is None:
        fig, ax = plt.subplots()

    amps = np.array([get_amp(metric, d) for d in depths])
    if normalized:
        amps /= amps[-1, :]

    cf = ax.pcolormesh(freqs, depths, amps, **kwds)

    cb = plt.colorbar(cf, ax=ax)
    cb.set_label("|TF|" if metric == "accel_tf" else "Site Ampl.")

    ax.set(
        xlabel="Frequency (Hz)",
        xscale="log",
        ylabel="Depth (m)",
        yscale="linear",
        ylim=(depths[0], 0),
    )

    return ax


[docs] class OutputCollection(collections.abc.Collection):
[docs] def __init__(self, outputs): super().__init__() self.outputs = outputs
def __iter__(self): return iter(self.outputs) def __contains__(self, value): return value in self.outputs def __len__(self): return len(self.outputs) def __getitem__(self, key): return self.outputs[key] def __call__(self, calc, name=None): # Save results for o in self: o(calc, name=name) def reset(self): for o in self: o.reset()
def append_arrays(many, single): """Append an array to another padding with NaNs for constant length. Parameters ---------- many : array_like of rank (j, k) values appended to a copy of this array. This may be a 1-D or 2-D array. single : array_like of rank l values to append. This should be a 1-D array. Returns ------- append : :class:`numpy.ndarray` 2-D array with rank (j + 1, max(k, l)) with missing values padded with :class:`numpy.nan` """ assert np.ndim(single) == 1 # Check if the values need to be padded to for equal length diff = single.shape[0] - many.shape[0] if diff < 0: single = np.pad(single, (0, -diff), "constant", constant_values=np.nan) elif diff > 0: # Need different padding based on if many is 1d or 2d. padding = ((0, diff), (0, 0)) if len(many.shape) > 1 else (0, diff) many = np.pad(many, padding, "constant", constant_values=np.nan) else: # No padding needed pass return np.c_[many, single] class Output: _const_ref = False xscale = "log" yscale = "log" drawstyle = "default" def __init__(self, refs=None): self._refs = np.asarray([] if refs is None else refs) self._values = None self._names = [] def __call__(self, calc, name=None): if name is None: if self.values is None: i = 1 elif len(self.values.shape) == 1: i = 2 else: i = self.values.shape[1] + 1 name = "r%d" % i self._names.append(name) @property def refs(self): return self._refs @property def values(self): return self._values @property def names(self): return self._names def reset(self): self._values = None self._names = [] if not self._const_ref: self._refs = np.array([]) def iter_results(self): shared_ref = len(self.refs.shape) == 1 for i, name in enumerate(self.names): refs = self.refs if shared_ref else self.refs[:, i] values = self.values if len(self.values.shape) == 1 else self.values[:, i] yield name, refs, values def _add_refs(self, refs): refs = np.asarray(refs) if len(self._refs) == 0: self._refs = np.array(refs) else: self._refs = append_arrays(self._refs, refs) def _add_values(self, values): values = np.asarray(values) if self._values is None: self._values = values else: self._values = append_arrays(self._values, values) def calc_stats(self, as_dataframe=False): ln_values = np.log(self.values) median = np.exp(np.nanmean(ln_values, axis=1)) ln_std = np.nanstd(ln_values, axis=1) stats = {"ref": self.refs, "median": median, "ln_std": ln_std} if as_dataframe and pd: stats = pd.DataFrame(stats).set_index("ref") stats.index.name = self.ref_name return stats def to_dataframe(self): if not pd: raise RuntimeError("Install `pandas` library.") if isinstance(self.names[0], tuple): columns = pd.MultiIndex.from_tuples(self.names) else: columns = self.names df = pd.DataFrame(self.values, index=self.refs, columns=columns) return df @staticmethod def _get_xy(refs, values): return refs, values def plot(self, ax=None, style="indiv"): assert style in ["stats", "indiv"] if ax is None: fig, ax = plt.subplots() if style == "stats" and len(self.values.shape) > 1 and self.values.shape[1] < 3: raise RuntimeError("Unable to plot stats for less than 3 values.") if style == "stats": kwds = {"color": "C0", "alpha": 0.6, "lw": 0.8, "drawstyle": self.drawstyle} elif style == "indiv": kwds = {"lw": 1.0, "drawstyle": self.drawstyle} else: raise NotImplementedError("Valid options are: stats, indiv.") # Add the data x, y = self._get_xy(self.refs, self.values) lines = ax.plot(x, y, **kwds) if style == "stats": lines[0].set_label("Realization") else: for layer, name in zip(lines, self.names): layer.set_label(name) if style == "stats": stats = self.calc_stats() ax.plot( *self._get_xy(stats["ref"], stats["median"]), color="C1", lw=2, label="Median", ) ax.set( xlabel=self.xlabel, xscale=self.xscale, ylabel=self.ylabel, yscale=self.yscale, ) if len(lines) > 1: ax.legend() return ax class OutputLocation: def __init__(self, wave_field, depth=None, index=None): self._depth = depth self._index = index if not isinstance(wave_field, WaveField): wave_field = WaveField[wave_field] self._wave_field = wave_field @property def depth(self): return self._depth @property def index(self): return self._index @property def wave_field(self): return self._wave_field def __call__(self, profile): """Lookup the location with the profile.""" return profile.location(self.wave_field, depth=self.depth, index=self.index) class LocationBasedOutput(Output): def __init__(self, ref, location): super().__init__(ref) self._location = location @property def location(self): return self._location def __call__(self, calc, name=None): raise NotImplementedError def _get_location(self, calc): """Locate location within the profile.""" return self._location(calc.profile)
[docs] class TimeSeriesOutput(LocationBasedOutput): xlabel = "Time (sec)" xscale = "linear" ylabel = NotImplemented yscale = "linear" ref_name = "time"
[docs] def __init__(self, location): super().__init__(None, location)
@property def times(self): return self.refs def __call__(self, calc, name=None): if not isinstance(calc.motion, TimeSeriesMotion): raise NotImplementedError Output.__call__(self, calc, name) # Compute the response loc = self._get_location(calc) tf = self._get_trans_func(calc, loc) values = calc.motion.calc_time_series(tf) values = self._modify_values(calc, loc, values) self._add_values(values) # Add the reference refs = calc.motion.time_step * np.arange(len(values)) self._add_refs(refs) def _get_trans_func(self, calc, location): raise NotImplementedError def _modify_values(self, calc, location, values): return values def to_dataframe(self): raise NotImplementedError
class AccelerationTSOutput(TimeSeriesOutput): ylabel = "Acceleration (g)" def _get_trans_func(self, calc, location): return calc.calc_accel_tf(calc.loc_input, location) class AriasIntensityTSOutput(AccelerationTSOutput): ylabel = "Arias Intensity (m/s)" def _modify_values(self, calc, location, values): time_step = calc.motion.time_step values = scipy.integrate.cumulative_trapezoid(values**2, dx=time_step) values *= GRAVITY * np.pi / 2 return values class StrainTSOutput(TimeSeriesOutput): def __init__(self, location, in_percent=False): super().__init__(location) self._in_percent = in_percent assert self.location.wave_field == WaveField.within def _get_trans_func(self, calc, location): return calc.calc_strain_tf(calc.loc_input, location) def _modify_values(self, calc, location, values): if self._in_percent: # Convert to percent values *= 100.0 return values @property def ylabel(self): suffix = "(%)" if self._in_percent else "(dec)" return "Shear Strain " + suffix class StressTSOutput(TimeSeriesOutput): def __init__(self, location, damped=False, normalized=False): super().__init__(location) self._damped = damped self._normalized = normalized assert self.location.wave_field == WaveField.within @property def damped(self): return self._damped @property def ylabel(self): if self._normalized: ylabel = "Stress Ratio (τ/σ`ᵥ)" else: ylabel = "Stress (τ)" return ylabel def _get_trans_func(self, calc, location): tf = calc.calc_stress_tf(calc.loc_input, location, self.damped) if self._normalized: # Correct by effective stress at depth tf /= location.stress_vert(effective=True) return tf class FourierAmplitudeSpectrumOutput(LocationBasedOutput): _const_ref = True xlabel = "Frequency (Hz)" ylabel = "Fourier Ampl. (cm/s)" ref_name = "freq" # Make None the default, so that the default will be not applying smoothing. # This is to make calling this output "cleaner". def __init__(self, freqs, location, ko_bandwidth=None): super().__init__(freqs, location) self._ko_bandwidth = ko_bandwidth @property def freqs(self): return self._refs @property def ko_bandwidth(self): return self._ko_bandwidth def __call__(self, calc, name=None): Output.__call__(self, calc, name) loc = self._get_location(calc) tf = calc.calc_accel_tf(calc.loc_input, loc) # Only return the absolute value fas = np.abs(tf * calc.motion.fourier_amps) # Interpolate to the specified frequencies if self._ko_bandwidth is None: fas = np.interp(self.freqs, calc.motion.freqs, fas) else: fas = pykooh.smooth( self.freqs, calc.motion.freqs, fas, self.ko_bandwidth, ) self._add_values(fas)
[docs] class ResponseSpectrumOutput(LocationBasedOutput): _const_ref = True xlabel = "Frequency (Hz)" ref_name = "freq"
[docs] def __init__(self, freqs, location, osc_damping): super().__init__(freqs, location) self._osc_damping = osc_damping
@property def freqs(self): return self._refs @property def periods(self): return 1.0 / np.asarray(self._refs) @property def osc_damping(self): return self._osc_damping @property def ylabel(self): return f"{100 * self.osc_damping:g}%-Damped, Spec. Accel. (g)" def __call__(self, calc, name=None): Output.__call__(self, calc, name) loc = self._get_location(calc) tf = calc.calc_accel_tf(calc.loc_input, loc) ars = calc.motion.calc_osc_accels(self.freqs, self.osc_damping, tf) self._add_values(ars)
class RatioBasedOutput(Output): _const_ref = True def __init__(self, refs, location_in, location_out): super().__init__(refs) self._location_in = location_in self._location_out = location_out @property def location_in(self): return self._location_in @property def location_out(self): return self._location_out def __call__(self, calc, name=None): raise NotImplementedError def _get_locations(self, calc): """Locate locations within the profile.""" return (self._location_in(calc.profile), self._location_out(calc.profile))
[docs] class AccelTransferFunctionOutput(RatioBasedOutput): xlabel = "Frequency (Hz)" ylabel = "Accel. Transfer Func." ref_name = "freq"
[docs] def __init__( self, refs, location_in, location_out, ko_bandwidth=None, absolute=True ): super().__init__(refs, location_in, location_out) self._ko_bandwidth = ko_bandwidth self._absolute = absolute
def __call__(self, calc, name=None): Output.__call__(self, calc, name) # Locate position within the profile loc_in, loc_out = self._get_locations(calc) # Compute the response if self._absolute: tf = np.abs(calc.calc_accel_tf(loc_in, loc_out)) else: tf = calc.calc_accel_tf(loc_in, loc_out) if self._ko_bandwidth is None: tf = np.interp(self.freqs, calc.motion.freqs, tf) else: tf = pykooh.smooth(self.freqs, calc.motion.freqs, tf, self._ko_bandwidth) self._add_values(tf) @property def freqs(self): return self._refs
class ResponseSpectrumRatioOutput(RatioBasedOutput): xlabel = "Frequency (Hz)" ref_name = "freq" def __init__(self, freqs, location_in, location_out, osc_damping): super().__init__(freqs, location_in, location_out) self._osc_damping = osc_damping @property def freqs(self): return self._refs @property def periods(self): return 1.0 / np.asarray(self._refs) @property def osc_damping(self): return self._osc_damping @property def ylabel(self): return f"{100 * self.osc_damping:g}%-Damped, Resp. Spectral Ratio" def __call__(self, calc, name=None): Output.__call__(self, calc, name) loc_in, loc_out = self._get_locations(calc) in_ars = calc.motion.calc_osc_accels( self.freqs, self.osc_damping, calc.calc_accel_tf(calc.loc_input, loc_in) ) out_ars = calc.motion.calc_osc_accels( self.freqs, self.osc_damping, calc.calc_accel_tf(calc.loc_input, loc_out) ) ratio = out_ars / in_ars self._add_values(ratio) class ProfileBasedOutput(Output): ylabel = "Depth (m)" yscale = "linear" drawstyle = "steps-post" ref_name = "depth" def __init__(self): super().__init__() def __call__(self, calc, name=None): Output.__call__(self, calc, name) depths = np.r_[0, calc.profile.depth_mid[:-1]] self._add_refs(depths) def calc_stats(self, as_dataframe=False, ref=None): if ref is None: ref = np.linspace(0, np.nanmax(self.refs) * 1.05, num=512) n = self.values.shape[1] if self.values.ndim > 1 else 1 with np.errstate(divide="ignore", invalid="ignore"): # Ignore zeros in the data ln_values = np.array([self._ln_interp(i, ref) for i in range(n)]).T median = np.exp(np.nanmean(ln_values, axis=1)) ln_std = np.nanstd(ln_values, axis=1) stats = {"ref": ref, "median": median, "ln_std": ln_std} if as_dataframe and pd: stats = pd.DataFrame(stats).set_index("ref") stats.index.name = self.ref_name return stats @staticmethod def _get_xy(refs, values): return values, refs def plot(self, ax=None, style="stats"): ax = Output.plot(self, ax, style) ax.invert_yaxis() return ax def _ln_interp(self, i, ref): """Interpolate the values in log-y space.""" _ref = self.refs[:, i] if self.refs.ndim > 1 else self.refs # Only select points with valid entries mask = np.isfinite(_ref) _ref = _ref[mask] _ln_values = np.log( self.values[mask, i] if self.values.ndim > 1 else self.values[mask] ) if np.any(mask): f = interp1d( _ref, _ln_values, kind="next", fill_value=(_ln_values[0], _ln_values[-1]), bounds_error=False, ) _ln_interped = f(ref) else: nans = np.empty_like(ref) nans[:] = np.nan _ln_interped = np.array(nans) return _ln_interped def to_dataframe(self, ref=None): if not pd: raise RuntimeError("Install `pandas` library.") if ref is None: ref = np.linspace(0, np.nanmax(self.refs)) if isinstance(self.names[0], tuple): columns = pd.MultiIndex.from_tuples(self.names) else: columns = self.names # Ignore zeros in the data n = self.values.shape[1] if self.values.ndim > 1 else 1 values = np.exp(np.array([self._ln_interp(i, ref) for i in range(n)])).T df = pd.DataFrame(values, index=ref, columns=columns) return df class MaxStrainProfile(ProfileBasedOutput): xlabel = "Max. Strain (dec)" def __init__(self): super().__init__() def __call__(self, calc, name=None): ProfileBasedOutput.__call__(self, calc, name) values = [0] + [layer.strain_max for layer in calc.profile[:-1]] self._add_values(values) class DampingProfile(ProfileBasedOutput): xlabel = "Damping (dec)" def __call__(self, calc, name=None): Output.__call__(self, calc, name) # Add depth at top of layer self._add_refs(calc.profile.depth) values = [layer.damping for layer in calc.profile[:-1]] # Bring the first mid-layer value to the surface values.insert(0, values[0]) self._add_values(values) class ShearModReducProfile(ProfileBasedOutput): xlabel = "G/Gmax" def __call__(self, calc, name=None): Output.__call__(self, calc, name) # Add depth at top of layer self._add_refs(calc.profile.depth) values = [layer.shear_mod_reduc for layer in calc.profile[:-1]] # Bring the first mid-layer value to the surface values.insert(0, values[0]) self._add_values(values) class InitialVelProfile(ProfileBasedOutput): xlabel = "Initial Velocity (m/s)" def __init__(self): super().__init__() def __call__(self, calc, name=None): Output.__call__(self, calc, name) # Add depth at top of layer self._add_refs(calc.profile.depth) values = [layer.initial_shear_vel for layer in calc.profile[:-1]] values.insert(0, values[0]) self._add_values(values) class CompatVelProfile(ProfileBasedOutput): xlabel = "Strain-Compatible Velocity (m/s)" def __init__(self): super().__init__() def __call__(self, calc, name=None): Output.__call__(self, calc, name) # Add depth at top of layer self._add_refs(calc.profile.depth) values = [np.min(layer.shear_vel) for layer in calc.profile[:-1]] values.insert(0, values[0]) self._add_values(values) class CyclicStressRatioProfile(ProfileBasedOutput): # From Idriss and Boulanger (2008, pg. 70): # The 0.65 is a constant used to represent the reference stress # level. While being somewhat arbitrary it was selected in the # beginning of the development of liquefaction procedures in 1966 # and has been in use ever since. _stress_level = 0.65 def __init__(self): super().__init__() def __call__(self, calc, name=None): ProfileBasedOutput.__call__(self, calc, name) values = [ layer.stress_shear_max / layer.stress_vert(layer.thickness / 2, True) for layer in calc.profile[:-1] ] # Repeat the first value for the surface values = self._stress_level * np.array([values[0]] + values) self._add_values(values)
[docs] class MaxAccelProfile(ProfileBasedOutput): xlabel = "Max. Accel. (g)" def __call__(self, calc, name=None): Output.__call__(self, calc, name) # Add depth at top of layer depths = calc.profile.depth values = [self._calc_accel(calc, depth) for depth in depths] self._add_refs(depths) self._add_values(values) def _calc_accel(self, calc, depth): return calc.motion.calc_peak( calc.calc_accel_tf( calc.loc_input, calc.profile.location("within", depth=depth) ) )