Source code for uf3.regression.least_squares

"""
This module provides the WeightedLinearModel class for fitting UF potentials
from featurized DataFrames using regularized least squares.
"""

from typing import List, Dict, Collection, Tuple
import os
import numpy as np
import pandas as pd
from uf3.representation import bspline, process
from uf3.data import io
from uf3.util import json_io
from uf3.util import parallel

DEFAULT_REGULARIZER_GRID = dict(ridge_1b=1e-8,
                                ridge_2b=0,
                                ridge_3b=0,
                                curve_2b=1e-8,
                                curve_3b=1e-8)


[docs]class VarianceRecorder: """Convenience class for computing online variance and mean""" def __init__(self, mean=0, std=0, n=0): self.mean = mean self.std = std self.n = int(n)
[docs] def update(self, batch: Collection) -> Tuple: """ Args: batch (list or np.ndarray): n-dimensional data. For speed purposes, dimensions are not checked for compatibility so caution is advised when working with multidimensional data. Statistics are computed along the first axis. Returns: (current mean, current standard deviation, current entry count) """ if self.n == 0: self.mean = np.mean(batch, axis=0) self.std = np.std(batch, axis=0) self.n = len(batch) return self.mean, self.std, self.n else: batch_std = np.std(batch, axis=0) batch_mean = np.mean(batch, axis=0) m = float(self.n) n = len(batch) std = (m / (m + n) * self.std**2 + n / (m + n) * batch_std**2 + m * n / (m + n)**2 * (self.mean - batch_mean)**2) self.std = np.sqrt(std) self.mean = m / (m + n) * self.mean + n / (m + n) * batch_mean self.n += n return self.mean, self.std, self.n
[docs] def update_with_components(self, df, keys=None): """Wrapper for dataframe with multiple columns of interest""" if keys is None: keys = ["fx", "fy", "fz"] batch = [] for j, *components in df[keys].itertuples(): if any([component is np.nan for component in components]): continue if np.ndim(components) > 1: # if components are not scalars components = list(np.concatenate(components)) batch.extend(components) self.update(batch) return self.mean, self.std, self.n
[docs]class BasicLinearModel: """ Base class for linear regression. """ def __init__(self, regularizer: np.ndarray = None): """ Args: regularizer (np.ndarray): regularization matrix. """ self.coefficients = None self.regularizer = regularizer
[docs] def fit(self, x: np.ndarray, y: np.ndarray, ridge_penalty: float = 1e-8, ): """ Direct solution to linear least squares with LU decomposition. Args: x (np.ndarray): input matrix of shape (n_samples, n_features). y (np.ndarray): output vector of length n_samples. ridge_penalty (float): magnitude of ridge penalty. Ignored if self.regularizer is set at initialization. """ gram, ordinate = moore_penrose_components(x, y) if self.regularizer is None: regularizer = np.eye(len(gram)) * ridge_penalty else: regularizer = self.regularizer regularizer = np.dot(regularizer.T, regularizer) coefficients = lu_factorization(gram + regularizer, ordinate) self.coefficients = coefficients
[docs] def predict(self, x: np.ndarray): """ Predict using fit coefficients. Args: x (np.ndarray): input matrix of shape (n_samples, n_features). Returns: predictions (np.ndarray): vector of predictions. """ predictions = np.dot(x, self.coefficients) return predictions
[docs] def score(self, x, y, weights=None, normalize=True): """ Evaluate score (negative error metric). Args: x (np.ndarray): input matrix of shape (n_samples, n_features). y (np.ndarray): output vector of length n_samples. weights (np.ndarray): sample weights (optional). normalize (bool): whether to normalize by the std of y. Returns: score (float): negative weighted root-mean-square-error. """ n_features = len(x[0]) if weights is not None: w_matrix = np.eye(n_features) * np.sqrt(weights) x = np.dot(w_matrix, x) y = np.dot(w_matrix, y) predictions = self.predict(x) score = -rmse_metric(y, predictions) if normalize: score /= np.std(y) return score
[docs]class WeightedLinearModel(BasicLinearModel): """ Handler class for regularized linear least squares using energies and forces and basis set provided by bspline.BsplineBasis. """ def __init__(self, bspline_config, regularizer=None, **params): super().__init__(regularizer) self.bspline_config = bspline_config if self.regularizer is None: # initialize regularizer matrix if unspecified. self.set_params(**params)
[docs] def set_params(self, **params): """Set parameters from keyword arguments. Initializes regularizer with default parameters if unspecified.""" if "bspline_config" in params: self.bspline_config = params["bspline_config"] if "regularizer" in params: self.regularizer = params["regularizer"] elif self.regularizer is None: reg_params = {k: v for k, v in params.items() if isinstance(v, (int, float, np.floating))} reg = self.bspline_config.get_regularization_matrix(**reg_params) self.regularizer = reg
@property def n_feats(self): return self.bspline_config.n_feats @property def frozen_c(self): return self.bspline_config.frozen_c @property def col_idx(self): return self.bspline_config.col_idx @property def mask(self): return get_freezing_mask(self.n_feats, self.col_idx) def __repr__(self): if self.coefficients is None: fit = "False" else: fit = "True" summary = ["WeightedLinearModel:", f" Fit: {fit}", self.bspline_config.__repr__() ] return "\n".join(summary) def __str__(self): return self.__repr__()
[docs] def fit_with_gram(self, gram: np.ndarray, ordinate: np.ndarray): """ Intermediate function for direct solution using gram matrix and ordinate (Moore-penrose inverse). Args: gram (np.ndarray): gram matrix (x^T x) ordinate (np.ndarray: ordinate (x^T y) """ regularizer = freeze_regularizer(self.regularizer, self.mask) regularizer = np.dot(regularizer.T, regularizer) coefficients = lu_factorization(gram + regularizer, ordinate) coefficients = revert_frozen_coefficients(coefficients, self.n_feats, self.mask, self.frozen_c, self.col_idx) self.coefficients = coefficients
[docs] def fit(self, x_e: np.ndarray, y_e: np.ndarray, x_f: np.ndarray = None, y_f: np.ndarray = None, weight: float = 0.5, batch_size=2500, ): """ Direct solution from input-output pairs corresponding to energies and forces, with option to weigh their respective contributions. Args: x_e (np.ndarray): input matrix of shape (n_samples, n_features). y_e (np.ndarray): output vector of length n_samples. x_f (np.ndarray): input matrix corresponding to forces. y_f (np.ndarray): output vector corresponding to forces. weight (float): parameter balancing contribution from energies vs. forces. Higher values favor energies; defaults to 0.5. batch_size: maximum batch size for gram matrix construction. """ x_e, y_e = freeze_columns(x_e, y_e, self.mask, self.frozen_c, self.col_idx) gram_e, ord_e = batched_moore_penrose(x_e, y_e, batch_size=batch_size) if x_f is not None: energy_weight = 1 / len(y_e) / np.std(y_e) force_weight = 1 / len(y_f) / np.std(y_f) x_f, y_f = freeze_columns(x_f, y_f, self.mask, self.frozen_c, self.col_idx) gram_f, ord_f = batched_moore_penrose(x_f, y_f, batch_size=batch_size) gram, ordinate = self.combine_weighted_gram(gram_e, gram_f, ord_e, ord_f, energy_weight, force_weight, weight) else: gram = gram_e ordinate = ord_e self.fit_with_gram(gram, ordinate)
[docs] def combine_weighted_gram(self, gram_e: np.ndarray, gram_f: np.ndarray, ord_e: np.ndarray, ord_f: np.ndarray, energy_weight: float, force_weight: float, weight: float): """ Apply weighting to gram matrices and ordinates for energy and force contributions to the fit. Args: gram_e (np.ndarray): gram matrix (x^T x) for energies. gram_f (np.ndarray): gram matrix (x^T x) for forces. ord_e (np.ndarray): ordinate (x^T y) for energies. ord_f (np.ndarray): ordinate (x^T y) for forces. energy_weight: 1 / (# energies * sqrt(Var(energies))) force_weight: 1 / (# forces * sqrt(Var(forces))) weight (float): parameter balancing contribution from energies vs. forces. Higher values favor energies; defaults to 0.5. Returns: gram (np.ndarray): gram matrix (x^T x) for fitting. ordinate (np.ndarray): ordinate (x^T y) for fitting. """ gram = (((weight * energy_weight) ** 2 * gram_e) + (((1 - weight) * force_weight) ** 2 * gram_f)) ordinate = (((weight * energy_weight) ** 2 * ord_e) + (((1 - weight) * force_weight) ** 2 * ord_f)) return gram, ordinate
[docs] def fit_from_file(self, filename: str, subset: Collection, index: Collection, weight: float = 0.5, batch_size=2500, energy_key="energy", progress: str = "bar"): """ Accumulate inputs and outputs from batched parsing of HDF5 file and compute direct solution via LU decomposition. Args: filename (str): path to HDF5 file. subset (list): list of indices for training. index (list): list of keys, i.e. from df_data DataFrame. weight (float): parameter balancing contribution from energies vs. forces. Higher values favor energies; defaults to 0.5. batch_size (int): batch size, in rows, for matrix multiplication operations in constructing gram matrices. energy_key (str): column name for energies, default "energy". progress (str): style for progress indicators. """ if not os.path.isfile(filename): raise FileNotFoundError(filename) n_tables, _, table_names, _ = io.analyze_hdf_tables(filename) subset_keys = index[subset] gram_e, gram_f, ord_e, ord_f = self.initialize_gram_ordinate() e_variance = VarianceRecorder() f_variance = VarianceRecorder() table_iterator = parallel.progress_iter(np.arange(n_tables), style=progress) for j in table_iterator: table_name = table_names[j] df = process.load_feature_db(filename, table_name) keys = df.index.unique(level=0).intersection(subset_keys) if len(keys) == 0: continue g_e, g_f, o_e, o_f = self.gram_from_df(df, keys, e_variance=e_variance, f_variance=f_variance, energy_key=energy_key, batch_size=batch_size) gram_e += g_e gram_f += g_f ord_e += o_e ord_f += o_f energy_weight = 1 / e_variance.n / e_variance.std force_weight = 1 / f_variance.n / f_variance.std gram, ordinate = self.combine_weighted_gram(gram_e, gram_f, ord_e, ord_f, energy_weight, force_weight, weight) self.fit_with_gram(gram, ordinate)
[docs] def initialize_gram_ordinate(self): """Initialize empty matrices for gram matrices and ordinates.""" n_columns = self.n_feats - len(self.col_idx) gram_e = np.zeros((n_columns, n_columns)) ord_e = np.zeros(n_columns) gram_f = np.zeros((n_columns, n_columns)) ord_f = np.zeros(n_columns) return gram_e, gram_f, ord_e, ord_f
[docs] def gram_from_df(self, df: pd.DataFrame, keys: Collection, e_variance: VarianceRecorder = None, f_variance: VarianceRecorder = None, energy_key: str = "energy", batch_size: int = 2500): """ Extract inputs and outputs from dataframe and compute moore-penrose components (gram matrices and ordinates). Args: df (pd.DataFrame): DataFrame of energy/force features. keys (list): keys to query from df (e.g. training subset). e_variance (VarianceRecorder): handler for accumulating statistics for energies (mean and variance). f_variance (VarianceRecorder): handler for accumulating statistics for forces (mean and variance). energy_key (str): column name for energies, default "energy". batch_size (int): batch size, in rows, for matrix multiplication operations in constructing gram matrices. """ n_elements = len(self.bspline_config.element_list) x_e, y_e, x_f, y_f = dataframe_to_tuples(df.loc[keys], n_elements=n_elements, energy_key=energy_key) x_e, y_e = freeze_columns(x_e, y_e, self.mask, self.frozen_c, self.col_idx) x_f, y_f = freeze_columns(x_f, y_f, self.mask, self.frozen_c, self.col_idx) if e_variance is not None and f_variance is not None: e_variance.update(y_e) f_variance.update(y_f) gram_e, ordinate_e = batched_moore_penrose(x_e, y_e, batch_size=batch_size) gram_f, ordinate_f = batched_moore_penrose(x_f, y_f, batch_size=batch_size) return gram_e, gram_f, ordinate_e, ordinate_f
[docs] def batched_predict(self, filename: str, keys: List[str] = None, table_names: List[str]=None, score: bool = True): """ Extract inputs and outputs from HDF5 file and predict energies/forces. Args: filename: path to HDF5 file. keys (list): keys to query from df (e.g. training subset). table_names (list): list of table names in HDF5 to read. score (bool): whether to return root mean square error metrics. Returns: y_e (np.ndarray): target values for energies. p_e (np.ndarray): prediction values for forces. y_f (np.ndarray): target values for energies. p_f (np.ndarray): target values for forces. rmse_e (np.ndarray): RMSE across energy predictions. rmse_e (np.ndarray): RMSE across force predictions. """ n_elements = len(self.bspline_config.element_list) y_e, p_e, y_f, p_f = batched_prediction(self, filename, table_names=table_names, subset_keys=keys, n_elements=n_elements) if score: rmse_e = rmse_metric(y_e, p_e) rmse_f = rmse_metric(y_f, p_f) print(f"RMSE (energy): {rmse_e:.3F}") print(f"RMSE (forces): {rmse_f:.3F}") return y_e, p_e, y_f, p_f, rmse_e, rmse_f else: return y_e, p_e, y_f, p_f
[docs] def save(self, filename: str): """Save model (coefficients and knots map) to file.""" solution = arrange_coefficients(self.coefficients, self.bspline_config) knots_map = self.bspline_config.knots_map json_io.dump_interaction_map(dict(solution=solution, knots=knots_map), filename=filename, write=True)
[docs] def dump(self): """Arrange coefficients/knots map into dictionary.""" solution = arrange_coefficients(self.coefficients, self.bspline_config) knots_map = self.bspline_config.knots_map return dict(solution=solution, knots=knots_map)
[docs] def load(self, solution: Dict[Tuple[str], np.ndarray] = None, filename: str = None, ): """ Reflatten coefficients (e.g. obtained through arrange_coefficients) and load into model for prediction. Args: solution (dict): dictionary of 1B, 2B, ... terms organized as interaction: vector entries. filename (str): filename of json dump containing solution. """ if filename is not None: dump = json_io.load_interaction_map(filename) solution = dump["solution"] elif solution is None: raise ValueError("Neither solution nor filename were provided.") flattened_coefficients = [] for element in self.bspline_config.element_list: value = solution[element] flattened_coefficients.append([value]) for degree in range(2, self.bspline_config.degree + 1): interactions = self.bspline_config.interactions_map[degree] for interaction in interactions: values = solution[interaction] flattened_coefficients.append(values) # self-energies, pair interactions & trio interactions n_interactions = len(self.bspline_config.partition_sizes) # add self-energy as separate interactions n_coefficients = sum(self.bspline_config.partition_sizes) if len(flattened_coefficients) != n_interactions: error_line = "Incorrect interactions: {} provided, {} expected." error_line = error_line.format(len(flattened_coefficients), n_interactions) raise ValueError(error_line) flattened_coefficients = np.concatenate(flattened_coefficients) if len(flattened_coefficients) != n_coefficients: error_line = "Incorrect coefficients: {} provided, {} expected." error_line = error_line.format(len(flattened_coefficients), n_coefficients) raise ValueError(error_line) self.coefficients = np.array(flattened_coefficients)
[docs]def dataframe_to_tuples(df_features, n_elements=None, energy_key='energy'): """ Extract energy/force inputs/outputs from DataFrame. Args: df_features (pd.DataFrame): dataframe with target vector (y) as the first column and feature vectors (x) as remaining columns. n_elements (int): number of leading columns to consider for size normalization. energy_key (str): key for energy samples, used to slice df_features into energies and forces for weight generation. Returns: x (np.ndarray): features for machine learning. y (np.ndarray): target vector. w (np.ndarray): weight vector for machine learning. """ if len(df_features) <= 1: raise ValueError( "Not enough samples ({} provided)".format(len(df_features))) y_index = df_features.index.get_level_values(-1) energy_mask = (y_index == energy_key) force_mask = np.logical_not(energy_mask) data = df_features.to_numpy() y = data[:, 0] x = data[:, 1:] y_e = y[energy_mask] y_f = y[force_mask] if n_elements is not None: s = np.sum(x[energy_mask, :n_elements], axis=1) x_e = np.divide(x[energy_mask].T, s).T y_e = y_e / s else: x_e = x[energy_mask] x_f = x[force_mask] return x_e, y_e, x_f, y_f
[docs]def moore_penrose_components(x, y): """ Compute gram matrix (x^T x) and ordinate (x^T y). Args: x (np.ndarray): input matrix of shape (n_samples, n_features). y (np.ndarray): output vector of length n_samples. Returns: a: Gram matrix (X'X) b: ordinate (X'y) """ a = np.dot(x.T, x) b = np.dot(x.T, y) return a, b
[docs]def batched_moore_penrose(x, y, batch_size=2500): """ Batched evaluation of gram matrix (x^T x) and ordinate (x^T y). Args: x (np.ndarray): input matrix of shape (n_samples, n_features). y (np.ndarray): output vector of length n_samples. batch_size: maximum batch size, default 2500 rows. This option should be adjusted based on efficiency/memory tradeoffs. Returns: a: Gram matrix (X'X) b: ordinate (X'y) """ n_samples, n_features = np.shape(x) n_batches = int(n_samples / batch_size) if n_batches <= 1: return moore_penrose_components(x, y) else: batched_idx = np.array_split(np.arange(len(y)), n_batches) gram = np.zeros((n_features, n_features)) ordinate = np.zeros(n_features) for j, batch in enumerate(batched_idx): x_x, x_y = moore_penrose_components(x[batch], y[batch]) gram += x_x ordinate += x_y return gram, ordinate
[docs]def lu_factorization(a, b): """ LU factorization for least-squares solution using np.linalg.solve(). Args: a: coefficients (X) or Gram matrix (X'X) b: ordinate (X'y) """ return np.linalg.solve(a, b)
[docs]def linear_least_squares(x, y): """ Solves the linear least-squares problem Ax=y. Regularizer matrix should be concatenated to x and zero-values padded to y. Args: x (np.ndarray): input matrix of shape (n_samples, n_features). y (np.ndarray): output vector of length n_samples. Returns: solution (np.ndarray): coefficients. """ a, b = moore_penrose_components(x, y) return lu_factorization(a, b)
[docs]def weighted_least_squares(x, y, weights=None, regularizer=None): """ Solves the linear least-squares problem with optional Tikhonov regularizer matrix and optional weighting. TODO: Remove (deprecated) Args: x (np.ndarray): input matrix. y (np.ndarray): output vector. weights (np.ndarray): sample weights (optional). regularizer (np.ndarray): Tikhonov regularizer matrix. Returns: solution (np.ndarray): coefficients. predictions (list of np.ndarray): predictions. """ x_fit, y_fit = apply_weights(x, y, weights) n_feats = len(x[0]) if regularizer is not None: # append regularizer # validate_regularizer(regularizer, n_feats) reg_zeros = np.zeros(len(regularizer)) x_fit = np.concatenate([x_fit, regularizer]) y_fit = np.concatenate([y_fit, reg_zeros]) solution = linear_least_squares(x_fit, y_fit) return solution
[docs]def get_freezing_mask(n_feats: int, col_idx: np.ndarray) -> np.ndarray: """ Freezing mask is the set difference between the range of feature indices and the indices to be excluded (col_idx). Args: n_feats (int): number of features. col_idx (list): list of indices to be masked. Returns: mask (np.ndarray): set of non-frozen indices. """ mask = np.setdiff1d(np.arange(n_feats), col_idx) return mask
[docs]def freeze_columns(x: np.ndarray, y: np.ndarray, mask: np.ndarray, frozen_c: np.ndarray, col_idx: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """ Freeze coefficients of the solution (e.g. forcing to zero) by simultaneously eliminating columns of the input and their assumed contribution to the output. Args: x (np.ndarray): input matrix. y (np.ndarray): output vector. mask (np.ndarray): set of non-frozen indices. frozen_c (np.ndarray): values of coefficients to be frozen. col_idx (np.ndarray): indices of coefficients to be frozen. Returns: x (np.ndarray): input matrix without frozen columns. y (np.ndarray): output vector, minus frozen contributions. """ x_fixed = x[:, col_idx] x = x[:, mask] y = np.subtract(y, np.dot(x_fixed, frozen_c)) return x, y
[docs]def freeze_regularizer(regularizer: np.ndarray, mask: np.ndarray) -> np.ndarray: """Apply freezing mask to regularizer, eliminating masked columns.""" regularizer = regularizer[mask, :][:, mask] return regularizer
[docs]def revert_frozen_coefficients(solution: np.ndarray, n_coeff: int, mask: Collection[bool], frozen_c: Collection[float], frozen_idx: Collection[int],) -> np.ndarray: """ Reverse freezing operations by arranging learned coefficients and frozen coefficients using the mask. Args: solution: learned solution, excluding frozen coefficients n_coeff: number of columns in full (unfrozen) solution mask: indices of remaining columns in x. frozen_idx: column indices of fixed coefficients. frozen_c: frozen coefficients. Returns: full_solution (np.ndarray) """ full_solution = np.zeros(n_coeff) np.put_along_axis(full_solution, mask, solution, 0) np.put_along_axis(full_solution, frozen_idx, frozen_c, 0) return full_solution
[docs]def apply_weighted_gram(gram_matrix: np.ndarray, weight: float) -> np.ndarray: """Deprecated utility function for weighting gram matrix.""" return gram_matrix * weight**2
[docs]def apply_weights(x, y, weights): """Deprecated utility function for weighting inputs/outputs.""" if weights is not None: if len(weights) != len(x): raise ValueError( 'Number of weights does not match number of samples.') if not np.all(weights >= 0): raise ValueError('Negative weights provided.') w = np.sqrt(weights) x_fit = np.multiply(x.T, w).T y_fit = np.multiply(y, w) else: x_fit = x y_fit = y return x_fit, y_fit
[docs]def validate_regularizer(regularizer: np.ndarray, n_feats: int): """ Check for consistency between regularizer matrix and number of features. Args: regularizer (np.ndarray): regularizer matrix. n_feats (int): number of features. """ n_row, n_col = regularizer.shape if n_col != n_feats: shape_comparison = "N x {0}. Provided: {1} x {2}".format(n_feats, n_row, n_col) raise ValueError( "Expected regularizer shape: " + shape_comparison)
[docs]def subset_prediction(df: pd.DataFrame, model: WeightedLinearModel, subset_keys: Collection = None, **kwargs ) -> Tuple: """ Convenience function for optimization workflow. Read inputs/outputs from DataFrame and predict using fitted model. Args: df (pd.DataFrame): DataFrame of inputs/outputs. model (WeightedLinearModel): fitted model. subset_keys (list): list of keys to query from DataFrame. Returns: y_e (np.ndarray): target values for energies. p_e (np.ndarray): prediction values for forces. y_f (np.ndarray): target values for energies. p_f (np.ndarray): target values for forces. """ if subset_keys is not None: idx = df.index.unique(level=0).intersection(subset_keys) if len(idx) == 0: return list(), list(), list(), list() df = df.loc[idx] x_e, y_e, x_f, y_f = dataframe_to_tuples(df, **kwargs) p_e = model.predict(x_e) p_f = model.predict(x_f) return y_e, p_e, y_f, p_f
[docs]def batched_prediction(model: WeightedLinearModel, filename: str, table_names: Collection = None, subset_keys: Collection = None, **kwargs): """ Convenience function for optimization workflow. Read inputs/outputs from HDF5 file and predict using fitted model. Args: filename (str): path to HDF5 file. model (WeightedLinearModel): fitted model. table_names (list): list of table names to query from HDF5 file. subset_keys (list): list of keys to query from DataFrame. Returns: y_e (np.ndarray): target values for energies. p_e (np.ndarray): prediction values for forces. y_f (np.ndarray): target values for energies. p_f (np.ndarray): target values for forces. """ if table_names is None: _, _, table_names, _ = io.analyze_hdf_tables(filename) df_batches = io.dataframe_batch_loader(filename, table_names) y_e = [] p_e = [] y_f = [] p_f = [] for df in df_batches: predictions = subset_prediction(df, model, subset_keys=subset_keys, **kwargs) y_e.append(predictions[0]) p_e.append(predictions[1]) y_f.append(predictions[2]) p_f.append(predictions[3]) y_e = np.concatenate(y_e) p_e = np.concatenate(p_e) y_f = np.concatenate(y_f) p_f = np.concatenate(p_f) return y_e, p_e, y_f, p_f
[docs]def rmse_metric(predicted: Collection, actual: Collection) -> float: """ Root-mean-square error metric. Args: predicted (list): prediction values. actual (list): reference values. Returns: root-mean-square-error metric. """ return np.sqrt(np.mean(np.subtract(predicted, actual) ** 2))
[docs]def mae_metric(predicted, actual): """ Mean-absolute error metric. Args: predicted (list): prediction values. actual (list): reference values. Returns: mean-absolute error metric. """ return np.mean(np.abs(np.subtract(predicted, actual)))
[docs]def arrange_coefficients(coefficients, bspline_config): """ Arrange coefficients by degree of interaction. Args: coefficients (np.ndarray): Flattened vector of coefficients. Partitioned by provided bspline_config per degree. bspline_config (bspline.BSplineBasis) Returns: solutions (dict): fit coefficients per degree. """ split_indices = np.cumsum(bspline_config.partition_sizes)[:-1] solutions_list = np.array_split(coefficients, split_indices) element_list = bspline_config.element_list solutions = {element: value[0] for element, value in zip(element_list, solutions_list[:len(element_list)])} solutions_list = solutions_list[len(element_list):] j = 0 for d in range(2, bspline_config.degree + 1): interactions_map = bspline_config.interactions_map[d] for interaction in interactions_map: solutions[interaction] = solutions_list[j] j += 1 return solutions
[docs]def postprocess_coefficients_2b(coefficients, core_hardness=2.0, min_core=2.0, min_slope=0.1, rounding_factor=3, smooth_cutoff=False, in_place=False): """ Postprocess 2B coefficients to enforce repulsive core. Args: coefficients (np.ndarray): vector of 2B coefficients. core_hardness (float): power base factor for hard-core correction. min_core (float): minimum energy barrier at the lower-bound (eV). min_slope (float): minimum core slope at peak (eV). rounding_factor (float): decimal for rounding in extrema search. smooth_cutoff (bool): whether to fix the last two coefficients to zero, forcing the second derivative to be zero at the upper bound. in_place (bool): whether to modify in-place or make a copy. Returns: coefficients (np.ndarray): new vector of coefficients. """ if not in_place: # apply corrections to a copy coefficients = np.array(coefficients) well_idx = find_pair_potential_well(coefficients, rounding_factor) if well_idx > 1: # search for maximum left of potential well, rounding to meV (default) peak_search = np.round(coefficients[:well_idx], rounding_factor) # bias towards well with imperceptible slope to deal with plateau peak_search += np.arange(len(peak_search)) * 10**(-2 * rounding_factor) gradient = np.gradient(peak_search) peak_idx = np.argmax(peak_search) if np.all(gradient[:peak_idx] >= 0): # correction for case where lower-bound is far below # observations and coefficients are nearly zero. for i in np.arange(peak_idx)[::-1]: value = np.abs(coefficients[i + 1]) * core_hardness value = max(value, min_slope) coefficients[i] = value if coefficients[0] < min_core: # fail-safe hard core by simply fixing the first coefficient coefficients[0] = min_core if smooth_cutoff: coefficients[-2:] = 0 return coefficients
[docs]def find_pair_potential_well(coefficients, rounding_factor): """ Identify coefficient index corresponding to possible potential well. Intermediate function for postprocess_coefficients_2b(). Args: coefficients: vector of two-body coefficients. rounding_factor: decimal for rounding in extrema search. Returns: well_idx: approximate location of potential well in coefficients """ peak_idx = np.argmax(coefficients) well_idx = np.argmin(coefficients) if well_idx < peak_idx: # if well is left of peak, either core may not be well-defined # or well may not be well-defined well_search = np.round(coefficients[:peak_idx], rounding_factor) if np.ptp(well_search) < 10 ** -(rounding_factor - 1): # no actual well well_idx = peak_idx + 1 return well_idx