"""
This module provides the WeightedLinearModel class for fitting UF potentials
from featurized DataFrames using regularized least squares.
"""
from typing import List, Dict, Collection, Tuple
import os
import numpy as np
import pandas as pd
from uf3.representation import bspline, process
from uf3.data import io
from uf3.util import json_io
from uf3.util import parallel
DEFAULT_REGULARIZER_GRID = dict(ridge_1b=1e-8,
ridge_2b=0,
ridge_3b=0,
curve_2b=1e-8,
curve_3b=1e-8)
[docs]class VarianceRecorder:
"""Convenience class for computing online variance and mean"""
def __init__(self, mean=0, std=0, n=0):
self.mean = mean
self.std = std
self.n = int(n)
[docs] def update(self, batch: Collection) -> Tuple:
"""
Args:
batch (list or np.ndarray): n-dimensional data. For speed purposes,
dimensions are not checked for compatibility so caution
is advised when working with multidimensional data.
Statistics are computed along the first axis.
Returns:
(current mean, current standard deviation, current entry count)
"""
if self.n == 0:
self.mean = np.mean(batch, axis=0)
self.std = np.std(batch, axis=0)
self.n = len(batch)
return self.mean, self.std, self.n
else:
batch_std = np.std(batch, axis=0)
batch_mean = np.mean(batch, axis=0)
m = float(self.n)
n = len(batch)
std = (m / (m + n) * self.std**2
+ n / (m + n) * batch_std**2
+ m * n / (m + n)**2 * (self.mean - batch_mean)**2)
self.std = np.sqrt(std)
self.mean = m / (m + n) * self.mean + n / (m + n) * batch_mean
self.n += n
return self.mean, self.std, self.n
[docs] def update_with_components(self, df, keys=None):
"""Wrapper for dataframe with multiple columns of interest"""
if keys is None:
keys = ["fx", "fy", "fz"]
batch = []
for j, *components in df[keys].itertuples():
if any([component is np.nan for component in components]):
continue
if np.ndim(components) > 1: # if components are not scalars
components = list(np.concatenate(components))
batch.extend(components)
self.update(batch)
return self.mean, self.std, self.n
[docs]class BasicLinearModel:
"""
Base class for linear regression.
"""
def __init__(self,
regularizer: np.ndarray = None):
"""
Args:
regularizer (np.ndarray): regularization matrix.
"""
self.coefficients = None
self.regularizer = regularizer
[docs] def fit(self,
x: np.ndarray,
y: np.ndarray,
ridge_penalty: float = 1e-8,
):
"""
Direct solution to linear least squares with LU decomposition.
Args:
x (np.ndarray): input matrix of shape (n_samples, n_features).
y (np.ndarray): output vector of length n_samples.
ridge_penalty (float): magnitude of ridge penalty. Ignored
if self.regularizer is set at initialization.
"""
gram, ordinate = moore_penrose_components(x, y)
if self.regularizer is None:
regularizer = np.eye(len(gram)) * ridge_penalty
else:
regularizer = self.regularizer
regularizer = np.dot(regularizer.T, regularizer)
coefficients = lu_factorization(gram + regularizer, ordinate)
self.coefficients = coefficients
[docs] def predict(self, x: np.ndarray):
"""
Predict using fit coefficients.
Args:
x (np.ndarray): input matrix of shape (n_samples, n_features).
Returns:
predictions (np.ndarray): vector of predictions.
"""
predictions = np.dot(x, self.coefficients)
return predictions
[docs] def score(self, x, y, weights=None, normalize=True):
"""
Evaluate score (negative error metric).
Args:
x (np.ndarray): input matrix of shape (n_samples, n_features).
y (np.ndarray): output vector of length n_samples.
weights (np.ndarray): sample weights (optional).
normalize (bool): whether to normalize by the std of y.
Returns:
score (float): negative weighted root-mean-square-error.
"""
n_features = len(x[0])
if weights is not None:
w_matrix = np.eye(n_features) * np.sqrt(weights)
x = np.dot(w_matrix, x)
y = np.dot(w_matrix, y)
predictions = self.predict(x)
score = -rmse_metric(y, predictions)
if normalize:
score /= np.std(y)
return score
[docs]class WeightedLinearModel(BasicLinearModel):
"""
Handler class for regularized linear least squares using energies and
forces and basis set provided by bspline.BsplineBasis.
"""
def __init__(self, bspline_config, regularizer=None, **params):
super().__init__(regularizer)
self.bspline_config = bspline_config
if self.regularizer is None:
# initialize regularizer matrix if unspecified.
self.set_params(**params)
[docs] def set_params(self, **params):
"""Set parameters from keyword arguments. Initializes
regularizer with default parameters if unspecified."""
if "bspline_config" in params:
self.bspline_config = params["bspline_config"]
if "regularizer" in params:
self.regularizer = params["regularizer"]
elif self.regularizer is None:
reg_params = {k: v for k, v in params.items()
if isinstance(v, (int, float, np.floating))}
reg = self.bspline_config.get_regularization_matrix(**reg_params)
self.regularizer = reg
@property
def n_feats(self):
return self.bspline_config.n_feats
@property
def frozen_c(self):
return self.bspline_config.frozen_c
@property
def col_idx(self):
return self.bspline_config.col_idx
@property
def mask(self):
return get_freezing_mask(self.n_feats, self.col_idx)
def __repr__(self):
if self.coefficients is None:
fit = "False"
else:
fit = "True"
summary = ["WeightedLinearModel:",
f" Fit: {fit}",
self.bspline_config.__repr__()
]
return "\n".join(summary)
def __str__(self):
return self.__repr__()
[docs] def fit_with_gram(self, gram: np.ndarray, ordinate: np.ndarray):
"""
Intermediate function for direct solution using gram matrix
and ordinate (Moore-penrose inverse).
Args:
gram (np.ndarray): gram matrix (x^T x)
ordinate (np.ndarray: ordinate (x^T y)
"""
regularizer = freeze_regularizer(self.regularizer, self.mask)
regularizer = np.dot(regularizer.T, regularizer)
coefficients = lu_factorization(gram + regularizer, ordinate)
coefficients = revert_frozen_coefficients(coefficients,
self.n_feats,
self.mask,
self.frozen_c,
self.col_idx)
self.coefficients = coefficients
[docs] def fit(self,
x_e: np.ndarray,
y_e: np.ndarray,
x_f: np.ndarray = None,
y_f: np.ndarray = None,
weight: float = 0.5,
batch_size=2500,
):
"""
Direct solution from input-output pairs corresponding to
energies and forces, with option to weigh their respective
contributions.
Args:
x_e (np.ndarray): input matrix of shape (n_samples, n_features).
y_e (np.ndarray): output vector of length n_samples.
x_f (np.ndarray): input matrix corresponding to forces.
y_f (np.ndarray): output vector corresponding to forces.
weight (float): parameter balancing contribution from energies
vs. forces. Higher values favor energies; defaults to 0.5.
batch_size: maximum batch size for gram matrix construction.
"""
x_e, y_e = freeze_columns(x_e,
y_e,
self.mask,
self.frozen_c,
self.col_idx)
gram_e, ord_e = batched_moore_penrose(x_e, y_e, batch_size=batch_size)
if x_f is not None:
energy_weight = 1 / len(y_e) / np.std(y_e)
force_weight = 1 / len(y_f) / np.std(y_f)
x_f, y_f = freeze_columns(x_f,
y_f,
self.mask,
self.frozen_c,
self.col_idx)
gram_f, ord_f = batched_moore_penrose(x_f,
y_f,
batch_size=batch_size)
gram, ordinate = self.combine_weighted_gram(gram_e, gram_f, ord_e,
ord_f, energy_weight,
force_weight, weight)
else:
gram = gram_e
ordinate = ord_e
self.fit_with_gram(gram, ordinate)
[docs] def combine_weighted_gram(self,
gram_e: np.ndarray,
gram_f: np.ndarray,
ord_e: np.ndarray,
ord_f: np.ndarray,
energy_weight: float,
force_weight: float,
weight: float):
"""
Apply weighting to gram matrices and ordinates for energy and
force contributions to the fit.
Args:
gram_e (np.ndarray): gram matrix (x^T x) for energies.
gram_f (np.ndarray): gram matrix (x^T x) for forces.
ord_e (np.ndarray): ordinate (x^T y) for energies.
ord_f (np.ndarray): ordinate (x^T y) for forces.
energy_weight: 1 / (# energies * sqrt(Var(energies)))
force_weight: 1 / (# forces * sqrt(Var(forces)))
weight (float): parameter balancing contribution from energies
vs. forces. Higher values favor energies; defaults to 0.5.
Returns:
gram (np.ndarray): gram matrix (x^T x) for fitting.
ordinate (np.ndarray): ordinate (x^T y) for fitting.
"""
gram = (((weight * energy_weight) ** 2 * gram_e)
+ (((1 - weight) * force_weight) ** 2 * gram_f))
ordinate = (((weight * energy_weight) ** 2 * ord_e)
+ (((1 - weight) * force_weight) ** 2 * ord_f))
return gram, ordinate
[docs] def fit_from_file(self,
filename: str,
subset: Collection,
index: Collection,
weight: float = 0.5,
batch_size=2500,
energy_key="energy",
progress: str = "bar"):
"""
Accumulate inputs and outputs from batched parsing of HDF5 file
and compute direct solution via LU decomposition.
Args:
filename (str): path to HDF5 file.
subset (list): list of indices for training.
index (list): list of keys, i.e. from df_data DataFrame.
weight (float): parameter balancing contribution from energies
vs. forces. Higher values favor energies; defaults to 0.5.
batch_size (int): batch size, in rows, for matrix multiplication
operations in constructing gram matrices.
energy_key (str): column name for energies, default "energy".
progress (str): style for progress indicators.
"""
if not os.path.isfile(filename):
raise FileNotFoundError(filename)
n_tables, _, table_names, _ = io.analyze_hdf_tables(filename)
subset_keys = index[subset]
gram_e, gram_f, ord_e, ord_f = self.initialize_gram_ordinate()
e_variance = VarianceRecorder()
f_variance = VarianceRecorder()
table_iterator = parallel.progress_iter(np.arange(n_tables),
style=progress)
for j in table_iterator:
table_name = table_names[j]
df = process.load_feature_db(filename, table_name)
keys = df.index.unique(level=0).intersection(subset_keys)
if len(keys) == 0:
continue
g_e, g_f, o_e, o_f = self.gram_from_df(df,
keys,
e_variance=e_variance,
f_variance=f_variance,
energy_key=energy_key,
batch_size=batch_size)
gram_e += g_e
gram_f += g_f
ord_e += o_e
ord_f += o_f
energy_weight = 1 / e_variance.n / e_variance.std
force_weight = 1 / f_variance.n / f_variance.std
gram, ordinate = self.combine_weighted_gram(gram_e,
gram_f,
ord_e,
ord_f,
energy_weight,
force_weight,
weight)
self.fit_with_gram(gram, ordinate)
[docs] def initialize_gram_ordinate(self):
"""Initialize empty matrices for gram matrices and ordinates."""
n_columns = self.n_feats - len(self.col_idx)
gram_e = np.zeros((n_columns, n_columns))
ord_e = np.zeros(n_columns)
gram_f = np.zeros((n_columns, n_columns))
ord_f = np.zeros(n_columns)
return gram_e, gram_f, ord_e, ord_f
[docs] def gram_from_df(self,
df: pd.DataFrame,
keys: Collection,
e_variance: VarianceRecorder = None,
f_variance: VarianceRecorder = None,
energy_key: str = "energy",
batch_size: int = 2500):
"""
Extract inputs and outputs from dataframe and compute
moore-penrose components (gram matrices and ordinates).
Args:
df (pd.DataFrame): DataFrame of energy/force features.
keys (list): keys to query from df (e.g. training subset).
e_variance (VarianceRecorder): handler for accumulating
statistics for energies (mean and variance).
f_variance (VarianceRecorder): handler for accumulating
statistics for forces (mean and variance).
energy_key (str): column name for energies, default "energy".
batch_size (int): batch size, in rows, for matrix multiplication
operations in constructing gram matrices.
"""
n_elements = len(self.bspline_config.element_list)
x_e, y_e, x_f, y_f = dataframe_to_tuples(df.loc[keys],
n_elements=n_elements,
energy_key=energy_key)
x_e, y_e = freeze_columns(x_e,
y_e,
self.mask,
self.frozen_c,
self.col_idx)
x_f, y_f = freeze_columns(x_f,
y_f,
self.mask,
self.frozen_c,
self.col_idx)
if e_variance is not None and f_variance is not None:
e_variance.update(y_e)
f_variance.update(y_f)
gram_e, ordinate_e = batched_moore_penrose(x_e,
y_e,
batch_size=batch_size)
gram_f, ordinate_f = batched_moore_penrose(x_f,
y_f,
batch_size=batch_size)
return gram_e, gram_f, ordinate_e, ordinate_f
[docs] def batched_predict(self,
filename: str,
keys: List[str] = None,
table_names: List[str]=None,
score: bool = True):
"""
Extract inputs and outputs from HDF5 file and predict energies/forces.
Args:
filename: path to HDF5 file.
keys (list): keys to query from df (e.g. training subset).
table_names (list): list of table names in HDF5 to read.
score (bool): whether to return root mean square error metrics.
Returns:
y_e (np.ndarray): target values for energies.
p_e (np.ndarray): prediction values for forces.
y_f (np.ndarray): target values for energies.
p_f (np.ndarray): target values for forces.
rmse_e (np.ndarray): RMSE across energy predictions.
rmse_e (np.ndarray): RMSE across force predictions.
"""
n_elements = len(self.bspline_config.element_list)
y_e, p_e, y_f, p_f = batched_prediction(self,
filename,
table_names=table_names,
subset_keys=keys,
n_elements=n_elements)
if score:
rmse_e = rmse_metric(y_e, p_e)
rmse_f = rmse_metric(y_f, p_f)
print(f"RMSE (energy): {rmse_e:.3F}")
print(f"RMSE (forces): {rmse_f:.3F}")
return y_e, p_e, y_f, p_f, rmse_e, rmse_f
else:
return y_e, p_e, y_f, p_f
[docs] def save(self, filename: str):
"""Save model (coefficients and knots map) to file."""
solution = arrange_coefficients(self.coefficients, self.bspline_config)
knots_map = self.bspline_config.knots_map
json_io.dump_interaction_map(dict(solution=solution,
knots=knots_map),
filename=filename,
write=True)
[docs] def dump(self):
"""Arrange coefficients/knots map into dictionary."""
solution = arrange_coefficients(self.coefficients, self.bspline_config)
knots_map = self.bspline_config.knots_map
return dict(solution=solution, knots=knots_map)
[docs] def load(self,
solution: Dict[Tuple[str], np.ndarray] = None,
filename: str = None,
):
"""
Reflatten coefficients (e.g. obtained through arrange_coefficients)
and load into model for prediction.
Args:
solution (dict): dictionary of 1B, 2B, ... terms
organized as interaction: vector entries.
filename (str): filename of json dump containing solution.
"""
if filename is not None:
dump = json_io.load_interaction_map(filename)
solution = dump["solution"]
elif solution is None:
raise ValueError("Neither solution nor filename were provided.")
flattened_coefficients = []
for element in self.bspline_config.element_list:
value = solution[element]
flattened_coefficients.append([value])
for degree in range(2, self.bspline_config.degree + 1):
interactions = self.bspline_config.interactions_map[degree]
for interaction in interactions:
values = solution[interaction]
flattened_coefficients.append(values)
# self-energies, pair interactions & trio interactions
n_interactions = len(self.bspline_config.partition_sizes)
# add self-energy as separate interactions
n_coefficients = sum(self.bspline_config.partition_sizes)
if len(flattened_coefficients) != n_interactions:
error_line = "Incorrect interactions: {} provided, {} expected."
error_line = error_line.format(len(flattened_coefficients),
n_interactions)
raise ValueError(error_line)
flattened_coefficients = np.concatenate(flattened_coefficients)
if len(flattened_coefficients) != n_coefficients:
error_line = "Incorrect coefficients: {} provided, {} expected."
error_line = error_line.format(len(flattened_coefficients),
n_coefficients)
raise ValueError(error_line)
self.coefficients = np.array(flattened_coefficients)
[docs]def dataframe_to_tuples(df_features,
n_elements=None,
energy_key='energy'):
"""
Extract energy/force inputs/outputs from DataFrame.
Args:
df_features (pd.DataFrame): dataframe with target vector (y) as the
first column and feature vectors (x) as remaining columns.
n_elements (int): number of leading columns to consider for size
normalization.
energy_key (str): key for energy samples, used to slice df_features
into energies and forces for weight generation.
Returns:
x (np.ndarray): features for machine learning.
y (np.ndarray): target vector.
w (np.ndarray): weight vector for machine learning.
"""
if len(df_features) <= 1:
raise ValueError(
"Not enough samples ({} provided)".format(len(df_features)))
y_index = df_features.index.get_level_values(-1)
energy_mask = (y_index == energy_key)
force_mask = np.logical_not(energy_mask)
data = df_features.to_numpy()
y = data[:, 0]
x = data[:, 1:]
y_e = y[energy_mask]
y_f = y[force_mask]
if n_elements is not None:
s = np.sum(x[energy_mask, :n_elements], axis=1)
x_e = np.divide(x[energy_mask].T, s).T
y_e = y_e / s
else:
x_e = x[energy_mask]
x_f = x[force_mask]
return x_e, y_e, x_f, y_f
[docs]def moore_penrose_components(x, y):
"""
Compute gram matrix (x^T x) and ordinate (x^T y).
Args:
x (np.ndarray): input matrix of shape (n_samples, n_features).
y (np.ndarray): output vector of length n_samples.
Returns:
a: Gram matrix (X'X)
b: ordinate (X'y)
"""
a = np.dot(x.T, x)
b = np.dot(x.T, y)
return a, b
[docs]def batched_moore_penrose(x, y, batch_size=2500):
"""
Batched evaluation of gram matrix (x^T x) and ordinate (x^T y).
Args:
x (np.ndarray): input matrix of shape (n_samples, n_features).
y (np.ndarray): output vector of length n_samples.
batch_size: maximum batch size, default 2500 rows. This option
should be adjusted based on efficiency/memory tradeoffs.
Returns:
a: Gram matrix (X'X)
b: ordinate (X'y)
"""
n_samples, n_features = np.shape(x)
n_batches = int(n_samples / batch_size)
if n_batches <= 1:
return moore_penrose_components(x, y)
else:
batched_idx = np.array_split(np.arange(len(y)), n_batches)
gram = np.zeros((n_features, n_features))
ordinate = np.zeros(n_features)
for j, batch in enumerate(batched_idx):
x_x, x_y = moore_penrose_components(x[batch], y[batch])
gram += x_x
ordinate += x_y
return gram, ordinate
[docs]def lu_factorization(a, b):
"""
LU factorization for least-squares solution using np.linalg.solve().
Args:
a: coefficients (X) or Gram matrix (X'X)
b: ordinate (X'y)
"""
return np.linalg.solve(a, b)
[docs]def linear_least_squares(x, y):
"""
Solves the linear least-squares problem Ax=y. Regularizer matrix
should be concatenated to x and zero-values padded to y.
Args:
x (np.ndarray): input matrix of shape (n_samples, n_features).
y (np.ndarray): output vector of length n_samples.
Returns:
solution (np.ndarray): coefficients.
"""
a, b = moore_penrose_components(x, y)
return lu_factorization(a, b)
[docs]def weighted_least_squares(x, y, weights=None, regularizer=None):
"""
Solves the linear least-squares problem with optional Tikhonov regularizer
matrix and optional weighting.
TODO: Remove (deprecated)
Args:
x (np.ndarray): input matrix.
y (np.ndarray): output vector.
weights (np.ndarray): sample weights (optional).
regularizer (np.ndarray): Tikhonov regularizer matrix.
Returns:
solution (np.ndarray): coefficients.
predictions (list of np.ndarray): predictions.
"""
x_fit, y_fit = apply_weights(x, y, weights)
n_feats = len(x[0])
if regularizer is not None: # append regularizer
# validate_regularizer(regularizer, n_feats)
reg_zeros = np.zeros(len(regularizer))
x_fit = np.concatenate([x_fit, regularizer])
y_fit = np.concatenate([y_fit, reg_zeros])
solution = linear_least_squares(x_fit, y_fit)
return solution
[docs]def get_freezing_mask(n_feats: int, col_idx: np.ndarray) -> np.ndarray:
"""
Freezing mask is the set difference between the range of feature indices
and the indices to be excluded (col_idx).
Args:
n_feats (int): number of features.
col_idx (list): list of indices to be masked.
Returns:
mask (np.ndarray): set of non-frozen indices.
"""
mask = np.setdiff1d(np.arange(n_feats), col_idx)
return mask
[docs]def freeze_columns(x: np.ndarray,
y: np.ndarray,
mask: np.ndarray,
frozen_c: np.ndarray,
col_idx: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Freeze coefficients of the solution (e.g. forcing to zero) by
simultaneously eliminating columns of the input and their assumed
contribution to the output.
Args:
x (np.ndarray): input matrix.
y (np.ndarray): output vector.
mask (np.ndarray): set of non-frozen indices.
frozen_c (np.ndarray): values of coefficients to be frozen.
col_idx (np.ndarray): indices of coefficients to be frozen.
Returns:
x (np.ndarray): input matrix without frozen columns.
y (np.ndarray): output vector, minus frozen contributions.
"""
x_fixed = x[:, col_idx]
x = x[:, mask]
y = np.subtract(y, np.dot(x_fixed, frozen_c))
return x, y
[docs]def freeze_regularizer(regularizer: np.ndarray,
mask: np.ndarray) -> np.ndarray:
"""Apply freezing mask to regularizer, eliminating masked columns."""
regularizer = regularizer[mask, :][:, mask]
return regularizer
[docs]def revert_frozen_coefficients(solution: np.ndarray,
n_coeff: int,
mask: Collection[bool],
frozen_c: Collection[float],
frozen_idx: Collection[int],) -> np.ndarray:
"""
Reverse freezing operations by arranging learned coefficients
and frozen coefficients using the mask.
Args:
solution: learned solution, excluding frozen coefficients
n_coeff: number of columns in full (unfrozen) solution
mask: indices of remaining columns in x.
frozen_idx: column indices of fixed coefficients.
frozen_c: frozen coefficients.
Returns:
full_solution (np.ndarray)
"""
full_solution = np.zeros(n_coeff)
np.put_along_axis(full_solution, mask, solution, 0)
np.put_along_axis(full_solution, frozen_idx, frozen_c, 0)
return full_solution
[docs]def apply_weighted_gram(gram_matrix: np.ndarray,
weight: float) -> np.ndarray:
"""Deprecated utility function for weighting gram matrix."""
return gram_matrix * weight**2
[docs]def apply_weights(x, y, weights):
"""Deprecated utility function for weighting inputs/outputs."""
if weights is not None:
if len(weights) != len(x):
raise ValueError(
'Number of weights does not match number of samples.')
if not np.all(weights >= 0):
raise ValueError('Negative weights provided.')
w = np.sqrt(weights)
x_fit = np.multiply(x.T, w).T
y_fit = np.multiply(y, w)
else:
x_fit = x
y_fit = y
return x_fit, y_fit
[docs]def validate_regularizer(regularizer: np.ndarray, n_feats: int):
"""
Check for consistency between regularizer matrix and number of features.
Args:
regularizer (np.ndarray): regularizer matrix.
n_feats (int): number of features.
"""
n_row, n_col = regularizer.shape
if n_col != n_feats:
shape_comparison = "N x {0}. Provided: {1} x {2}".format(n_feats,
n_row,
n_col)
raise ValueError(
"Expected regularizer shape: " + shape_comparison)
[docs]def subset_prediction(df: pd.DataFrame,
model: WeightedLinearModel,
subset_keys: Collection = None,
**kwargs
) -> Tuple:
"""
Convenience function for optimization workflow. Read inputs/outputs
from DataFrame and predict using fitted model.
Args:
df (pd.DataFrame): DataFrame of inputs/outputs.
model (WeightedLinearModel): fitted model.
subset_keys (list): list of keys to query from DataFrame.
Returns:
y_e (np.ndarray): target values for energies.
p_e (np.ndarray): prediction values for forces.
y_f (np.ndarray): target values for energies.
p_f (np.ndarray): target values for forces.
"""
if subset_keys is not None:
idx = df.index.unique(level=0).intersection(subset_keys)
if len(idx) == 0:
return list(), list(), list(), list()
df = df.loc[idx]
x_e, y_e, x_f, y_f = dataframe_to_tuples(df,
**kwargs)
p_e = model.predict(x_e)
p_f = model.predict(x_f)
return y_e, p_e, y_f, p_f
[docs]def batched_prediction(model: WeightedLinearModel,
filename: str,
table_names: Collection = None,
subset_keys: Collection = None,
**kwargs):
"""
Convenience function for optimization workflow. Read inputs/outputs
from HDF5 file and predict using fitted model.
Args:
filename (str): path to HDF5 file.
model (WeightedLinearModel): fitted model.
table_names (list): list of table names to query from HDF5 file.
subset_keys (list): list of keys to query from DataFrame.
Returns:
y_e (np.ndarray): target values for energies.
p_e (np.ndarray): prediction values for forces.
y_f (np.ndarray): target values for energies.
p_f (np.ndarray): target values for forces.
"""
if table_names is None:
_, _, table_names, _ = io.analyze_hdf_tables(filename)
df_batches = io.dataframe_batch_loader(filename, table_names)
y_e = []
p_e = []
y_f = []
p_f = []
for df in df_batches:
predictions = subset_prediction(df,
model,
subset_keys=subset_keys,
**kwargs)
y_e.append(predictions[0])
p_e.append(predictions[1])
y_f.append(predictions[2])
p_f.append(predictions[3])
y_e = np.concatenate(y_e)
p_e = np.concatenate(p_e)
y_f = np.concatenate(y_f)
p_f = np.concatenate(p_f)
return y_e, p_e, y_f, p_f
[docs]def rmse_metric(predicted: Collection,
actual: Collection) -> float:
"""
Root-mean-square error metric.
Args:
predicted (list): prediction values.
actual (list): reference values.
Returns:
root-mean-square-error metric.
"""
return np.sqrt(np.mean(np.subtract(predicted, actual) ** 2))
[docs]def mae_metric(predicted, actual):
"""
Mean-absolute error metric.
Args:
predicted (list): prediction values.
actual (list): reference values.
Returns:
mean-absolute error metric.
"""
return np.mean(np.abs(np.subtract(predicted, actual)))
[docs]def arrange_coefficients(coefficients, bspline_config):
"""
Arrange coefficients by degree of interaction.
Args:
coefficients (np.ndarray): Flattened vector of coefficients.
Partitioned by provided bspline_config per degree.
bspline_config (bspline.BSplineBasis)
Returns:
solutions (dict): fit coefficients per degree.
"""
split_indices = np.cumsum(bspline_config.partition_sizes)[:-1]
solutions_list = np.array_split(coefficients,
split_indices)
element_list = bspline_config.element_list
solutions = {element: value[0] for element, value
in zip(element_list, solutions_list[:len(element_list)])}
solutions_list = solutions_list[len(element_list):]
j = 0
for d in range(2, bspline_config.degree + 1):
interactions_map = bspline_config.interactions_map[d]
for interaction in interactions_map:
solutions[interaction] = solutions_list[j]
j += 1
return solutions
[docs]def postprocess_coefficients_2b(coefficients,
core_hardness=2.0,
min_core=2.0,
min_slope=0.1,
rounding_factor=3,
smooth_cutoff=False,
in_place=False):
"""
Postprocess 2B coefficients to enforce repulsive core.
Args:
coefficients (np.ndarray): vector of 2B coefficients.
core_hardness (float): power base factor for hard-core correction.
min_core (float): minimum energy barrier at the lower-bound (eV).
min_slope (float): minimum core slope at peak (eV).
rounding_factor (float): decimal for rounding in extrema search.
smooth_cutoff (bool): whether to fix the last two coefficients to
zero, forcing the second derivative to be zero at the upper bound.
in_place (bool): whether to modify in-place or make a copy.
Returns:
coefficients (np.ndarray): new vector of coefficients.
"""
if not in_place: # apply corrections to a copy
coefficients = np.array(coefficients)
well_idx = find_pair_potential_well(coefficients, rounding_factor)
if well_idx > 1:
# search for maximum left of potential well, rounding to meV (default)
peak_search = np.round(coefficients[:well_idx], rounding_factor)
# bias towards well with imperceptible slope to deal with plateau
peak_search += np.arange(len(peak_search)) * 10**(-2 * rounding_factor)
gradient = np.gradient(peak_search)
peak_idx = np.argmax(peak_search)
if np.all(gradient[:peak_idx] >= 0):
# correction for case where lower-bound is far below
# observations and coefficients are nearly zero.
for i in np.arange(peak_idx)[::-1]:
value = np.abs(coefficients[i + 1]) * core_hardness
value = max(value, min_slope)
coefficients[i] = value
if coefficients[0] < min_core:
# fail-safe hard core by simply fixing the first coefficient
coefficients[0] = min_core
if smooth_cutoff:
coefficients[-2:] = 0
return coefficients
[docs]def find_pair_potential_well(coefficients, rounding_factor):
"""
Identify coefficient index corresponding to possible potential well.
Intermediate function for postprocess_coefficients_2b().
Args:
coefficients: vector of two-body coefficients.
rounding_factor: decimal for rounding in extrema search.
Returns:
well_idx: approximate location of potential well in coefficients
"""
peak_idx = np.argmax(coefficients)
well_idx = np.argmin(coefficients)
if well_idx < peak_idx:
# if well is left of peak, either core may not be well-defined
# or well may not be well-defined
well_search = np.round(coefficients[:peak_idx], rounding_factor)
if np.ptp(well_search) < 10 ** -(rounding_factor - 1):
# no actual well
well_idx = peak_idx + 1
return well_idx