Source code for moonpies.thermal_eq

"""
Thermal equillibrium model to compute ballistic sedimentation melting fraction.

:Authors: K. M. Luchsinger, C. J. Tai Udovicic
"""
import warnings
from multiprocessing import Pool, cpu_count
from pathlib import Path
import numpy as np
import pandas as pd
from moonpies import config

CFG = config.Cfg()
G_MOON = CFG.grav_moon  # [m/s^2]
R_MOON = CFG.rad_moon  # [m]


[docs]def make_melt_frac_dfs(mrs, ej_temps, nseed=10, parallel=True, savepath=""): """ Return mean, stdev DataFrame of melt fractions at each mr and ej_temp. Runs nseed times for each mr, ejecta_t combination, randomizing initial positions of ejecta pixels. If savepath is given, save as csvs. Parameters ---------- mrs (array): Array of mixing ratios [target] ej_temps (array): Array of ejecta temperatures nseed (int): Number of seeds to run for each mr, ejecta_t combination parallel (bool): Whether to run in parallel savepath (str): Path to save csv files """ ncpus = cpu_count() if parallel else 1 seeds = np.arange(nseed) melt_frac = get_melt_frac_arr(mrs, ej_temps, seeds, ncpus) mf_mean = np.mean(melt_frac, axis=2) mf_std = np.std(melt_frac, axis=2) df_mean = pd.DataFrame(mf_mean, index=mrs, columns=ej_temps) df_std = pd.DataFrame(mf_std, index=mrs, columns=ej_temps) if savepath: fmean = Path(savepath) / "ballistic_sed_frac_melted_mean.csv" fstd = Path(savepath) / "ballistic_sed_frac_melted_std.csv" df_mean.to_csv(fmean, float_format="%g") df_std.to_csv(fstd, float_format="%g") print(f"Wrote to {fmean} and {fstd}") return df_mean, df_std
[docs]def get_melt_frac_arr(mrs, ejecta_temps, seeds, ncpus=1): """ Return melt fraction array for each mr and ejecta_temp. """ # Sequential triple loop or parallelize on mr if not ncpus or ncpus <= 1: mf = np.zeros((len(mrs), len(ejecta_temps), len(seeds))) for i, mr in enumerate(mrs): for j, ejecta_t in enumerate(ejecta_temps): for k, seed in enumerate(seeds): mf[i, j, k] = melt_frac_teq_1d(mr, ejecta_t, rng=seed) else: with Pool(processes=ncpus) as pool: args = [(mr, ejecta_temps, seeds) for mr in mrs] mf = np.stack(pool.starmap(_parallel_melt_frac, args)) return mf
def _parallel_melt_frac(mr, ejecta_temps, seeds): """Return melt_frac_teq_1d(mr, ej_t, rng=seed) with fixed mr.""" mf = np.zeros((len(ejecta_temps), len(seeds))) for j, ejecta_t in enumerate(ejecta_temps): for k, seed in enumerate(seeds): mf[j, k] = melt_frac_teq_1d(mr, ejecta_t, rng=seed) return mf
[docs]def melt_frac_teq_1d( mr, ejecta_t0, target_t0=45, melt_t=110, n=1000, eq_thresh=1, max_iter=10**5, conv_warning=True, rng=None, ): """ Return fraction heated above melt_t from 1D thermal conduction model. Model initialized as array of length n with target_t0 at all indices. A fraction of random ejecta elements (1-mr) are assigned ejecta_t0. Each element is assumed to be 100 microns wide and time step is 1 msec. Complete melting is assumed to occur at melt_t with no heat loss. The model is stepped through time until one of these conditions is met: - Thermal equilibrium: All elements are within eq_thresh of their neighbors - All melted: All elements are above melt_t - Melting finished: All elements are below melt_t - Timeout: The model reaches max_iter without triggering any of the above Parameters ---------- ejecta_t0 (num): Initial temperature of the ejecta (pre-impact) [K] target_t0 (num): Initial temperature of the target material [K] mr (num): Mixing ratio of target material : ejecta material melt_t (num): Temperature at which elements are considered melted [K] n (num): Length of 1D model [100 micron pixels] eq_thresh (num): Threshold temperature for thermal equilibrium [K] max_iter (num): Max number of iterations to run (100000 approx. 15 sec) conv_warning (bool): Whether to warn if the model does not converge rng (int or np.RandomState): Random number generator or seed Returns ------- melt_fraction (float): Fraction of target material heated above melt_t """ # Conversion from mixing ratio to ejecta fraction is 1/(1+mr) ej_n = int(n * (1 / (1 + mr))) # Ejecta pixels is ejecta frac * n if ej_n == n: # All pixels ejecta so don't need to run model return float(ejecta_t0 >= melt_t) ej_inds = rand_positions(ej_n, n, rng=rng) # Get random ejecta positions # Initialize model temperature array with ejecta pixels inserted randomly temps = np.ones(n) * target_t0 # Init to target temp temps[ej_inds] = ejecta_t0 # Insert ejecta pixels melted = np.zeros(n, dtype=bool) # Init to all not melted melted[ej_inds] = True # Ejecta excluded (set melted but ignore later) i = 0 while ( i < max_iter and not melted.all() # No convergence after max_iter and not (temps < melt_t).all() # All melted and not ( # No more melting (all below melt_t) np.abs(np.diff(temps)) < eq_thresh ).all() # Equilibrium ): # Step through model temps = heat1d_finite_diff(temps) # Track melted pixels each iteration (once melted, pixel stays melted) melted[temps >= melt_t] = True i += 1 if conv_warning and i == max_iter: warnings.warn( f"Timeout: melt_frac_teq_1d(mr={mr:.3g},ejecta_t0={ejecta_t0}," + f"target_t0={target_t0}) quit after {max_iter} iterations" + f" (model_time={max_iter*1e-3:.4g} s)." ) total_melted = np.sum(melted) - ej_n # Count melted, exclude ejecta return total_melted / (n - ej_n) # Frac of target melted
[docs]def heat1d_finite_diff(T, density=1800, dt=0.001, dx=1e-5): """ Return updated T from finite difference 1D heat flow conduction model. See Eq. 2, Onorato et al. (1978). Parameters ---------- T (num or array): temperature [K] density (num): density, [kg/m^3] k (num): conductivity [W/m/K] Cp (num): heat capacity [J/kg/K] T_liq_c (num): liquidus of clasts [K] T_sol_c (num): solidus of clasts [K] dt (num): time step size [s], must match dx dx (num): spatial step size [m], must match dt Returns ------- alpha (num): scaling factor [unitless] """ K = thermal_diffusivity(thermal_conductivity(T), sphc(T), density) c = (K * dt) / dx**2 # coefficient array for finite difference # Boundary conditions (assume closed system, link ends) t0 = c[0] * T[-1] + (1 - 2 * c[0]) * T[0] + c[0] * T[1] tn = c[-1] * T[-2] + (1 - 2 * c[-1]) * T[-1] + c[-1] * T[0] # Compute T at next timestep for main array T[1:-1] = c[1:-1] * T[:-2] + (1 - 2 * c[1:-1]) * T[1:-1] + c[1:-1] * T[2:] # Set boundary conditions T[0] = t0 T[-1] = tn return T
[docs]def thermal_diffusivity(conductivity, cp, density=1800): """Return thermal diffusivity.""" return conductivity / (density * cp)
[docs]def thermal_conductivity(T, Kc=3.4e-3, chi=2.7): """Return thermal conductivity of regolith (eqn A4 Hayne 2017).""" return Kc * (1 + chi * (T / 350) ** 3)
[docs]def sphc(T, c0=-3.6125, c1=2.7431, c2=2.3616e-3, c3=-1.234e-5, c4=8.9093e-9): """Return specific heat capacity [J/kg/K] of regolith.""" return c0 + c1 * T + c2 * T**2 + c3 * T**3 + c4 * T**4
[docs]def rand_positions(ej_n, n=100, rng=None): """ Return ej_n random initial positions in target of length n. Parameters ---------- ej_n (int): Number of ejecta pixels to assign n (int): Length of 1D target [pixels] rng (int or np.RandomState): Random number generator or seed Returns ------- inds (array): Indices of initial positions of ejecta """ rng = _rng(rng) inds = rng.choice(np.arange(n), size=ej_n, replace=False) return inds
def _rng(rng): """Shorthand wrapper for np.random.default_rng.""" return np.random.default_rng(rng) if __name__ == "__main__": # Sample target fraction range [0, 1), convert to mixing ratio # Maximum mixing ratio on Moon ~32 (using Petro and Pieters, 2006) # The following results in mrs of [0, 39] FRACS = np.linspace(0, 1, 40, endpoint=False) # Mixing frac (target/total) MRS = FRACS / (1 - FRACS) # Mixing ratio (target:ejecta) TEMPS = np.linspace(110, 510, 81) # Ejecta temperatures [K] NSEED = 20 # random samples for each combination # Generate melt_frac csvs make_melt_frac_dfs(MRS, TEMPS, NSEED, savepath=CFG.data_path)