Source code for uf3.representation.process

"""
This module provides the BasisFeaturizer class for computing
energy and force features, using bspline.BsplineBasis, from DataFrames
containing ase.Atoms configurations.
"""

import os
import warnings
import sqlite3
import numpy as np
import pandas as pd
from uf3.representation import distances
from uf3.representation import angles
from uf3.representation import bspline
from uf3.data import io
from uf3.data import geometry
from uf3.util import parallel


[docs]class BasisFeaturizer: """ -Manage knot-related logic for pair interactions -Generate energy/force features -Arrange features into DataFrame -Process DataFrame into tuples of (x, y, weight) """ def __init__(self, chemical_system, bspline_config, fit_forces=True, prefix='x'): """ Args: chemical_system (uf3.data.composition.ChemicalSystem) bspline_config (uf3.representation.bspline.BsplineConfig) fit_forces (bool): whether to generate force features. prefix (str): prefix for feature columns. """ self.chemical_system = chemical_system self.bspline_config = bspline_config self.fit_forces = fit_forces self.prefix = prefix # generate column labels self.n_features = sum(self.partition_sizes) - len(self.element_list) feature_columns = ['{}_{}'.format(prefix, i) for i in range(self.n_features)] composition_columns = ['n_{}'.format(el) for el in self.element_list] self.columns = ["y"] + composition_columns + feature_columns @property def degree(self): return self.chemical_system.degree @property def element_list(self): return self.chemical_system.element_list @property def interactions_map(self): return self.chemical_system.interactions_map @property def r_min_map(self): return self.bspline_config.r_min_map @property def r_max_map(self): return self.bspline_config.r_max_map @property def resolution_map(self): return self.bspline_config.resolution_map @property def r_cut(self): return self.bspline_config.r_cut @property def knots_map(self): return self.bspline_config.knots_map @property def knot_subintervals(self): return self.bspline_config.knot_subintervals @property def basis_functions(self): return self.bspline_config.basis_functions @property def partition_sizes(self): return self.bspline_config.partition_sizes @property def interaction_hashes(self): return self.chemical_system.interaction_hashes @property def trailing_trim(self): if self.bspline_config.mask_trim: return self.bspline_config.trailing_trim else: return 0
[docs] @staticmethod def from_config(chemical_system, config): """Instantiate from configuration dictionary""" keys = ['knot_spacing', 'prefix', 'fit_forces'] config = {k: v for k, v in config.items() if k in keys} return BasisFeaturizer(chemical_system, **config)
def __repr__(self): summary = ["BasisFeaturizer:", f" Fit forces: {self.fit_forces}", f" Column prefix: {self.prefix}", self.bspline_config.__repr__() ] return "\n".join(summary) def __str__(self): return self.__repr__()
[docs] def evaluate(self, df_data, atoms_key="geometry", energy_key="energy", progress="bar"): """ Process standard dataframe to generate representation features and arrange into processed dataframe. Operates in serial by default. Args: df_data (pd.DataFrame): standard dataframe with columns [atoms_key, energy_key, fx, fy, fz] atoms_key (str) energy_key (str) progress (str, None): style of progress counter. Returns: df_features (pd.DataFrame): processed dataframe with columns [y, {name}_0, ..., {name}_x, n_A, ..., n_Z] corresponding to target vector, pair-distance representation features, and composition (one-body) features. """ eval_map = {} header = df_data.columns column_positions = {} for key in [atoms_key, energy_key, 'fx', 'fy', 'fz']: if key in header: column_positions[key] = header.get_loc(key) + 1 row_gener = parallel.progress_iter(df_data.itertuples(name=None), total=len(df_data), style=progress) for row in row_gener: # iterate over rows without modification. name = row[0] geom = row[column_positions[atoms_key]] energy = None forces = None if energy_key in column_positions: energy = row[column_positions[energy_key]] if 'fx' in column_positions and self.fit_forces: forces = [row[column_positions[component]] for component in ['fx', 'fy', 'fz']] if np.any(np.isnan(forces)): forces = None # invalid forces geom_features = self.evaluate_configuration(geom, name, energy, forces, energy_key) eval_map.update(geom_features) df_features = self.arrange_features_dataframe(eval_map) return df_features
[docs] def arrange_features_dataframe(self, eval_map): """ Args: eval_map (dict): map of energy/force keys to fixed-length feature vectors. If forces and the energy are both provided, the dictionary will contain 3N + 1 entries. Returns: df_features (pd.DataFrame): processed dataframe with columns [y, {name}_0, ..., {name}_x, n_A, ..., n_Z] corresponding to target vector, pair-distance representation features, and composition (one-body) features. """ df_features = pd.DataFrame.from_dict(eval_map, orient='index', columns=self.columns) index = pd.MultiIndex.from_tuples(df_features.index) df_features = df_features.set_index(index) return df_features
[docs] def evaluate_parallel(self, df_data, client, atoms_key="geometry", energy_key="energy", n_jobs=2, shuffle=True, progress="bar"): """ Process standard dataframe to generate representation features and arrange into processed dataframe. Operates in serial by default. Args: df_data (pd.DataFrame): standard dataframe with columns [atoms_key, energy_key, fx, fy, fz] data_coordinator (uf3.data.io.DataCoordinator) n_jobs (int): number of parallel jobs to submit. client (concurrent.futures.Executor, dask.distributed.Client) Returns: df_features (pd.DataFrame): processed dataframe with columns [y, {name}_0, ..., {name}_x, n_A, ..., n_Z] corresponding to target vector, pair-distance representation features, and composition (one-body) features. """ if n_jobs < 2: warnings.warn("Processing in serial.", RuntimeWarning) df_features = self.evaluate(df_data, atoms_key=atoms_key, energy_key=energy_key) return df_features if shuffle: shuffled_idx = np.arange(len(df_data)) np.random.shuffle(shuffled_idx) batches = parallel.split_dataframe(df_data.iloc[shuffled_idx], n_jobs) else: batches = parallel.split_dataframe(df_data, n_jobs) try: batches = [client.scatter(batch) for batch in batches] except AttributeError: pass future_list = parallel.batch_submit(self.evaluate, batches, client, atoms_key=atoms_key, energy_key=energy_key, progress=False) df_features = parallel.gather_and_merge(future_list, client=client, cancel=True, progress=progress) df_features = df_features.loc[df_data.index, :] try: for batch in batches: client.cancel(batch) except AttributeError: pass return df_features
def batched_to_hdf(self, filename, df_data, client, n_jobs=16, batch_size=50, progress="bar", table_template="features_{}", **kwargs): idx_all = np.arange(len(df_data)) idx_splits = idx_all[batch_size::batch_size] idx_batches = np.array_split(idx_all, idx_splits) n_batches = len(idx_batches) idx_magnitude = np.ceil(np.log10(n_batches) + 0.1).astype(int) idx_magnitude = max(idx_magnitude, 3) if os.path.isfile(filename): n_chunks, _, chunk_names, _ = io.analyze_hdf_tables(filename) warnings.warn(f"File already exists: contains {n_chunks} chunks.", RuntimeWarning) else: chunk_names = [] for j, idx_batch in parallel.progress_iter(enumerate(idx_batches), total=len(idx_batches), style=progress): table_name = table_template.format(str(j).rjust(idx_magnitude, "0")) if table_name in chunk_names: continue kwargs["progress"] = False kwargs["n_jobs"] = n_jobs df_features = self.evaluate_parallel(df_data.iloc[idx_batch], client, **kwargs) save_feature_db(df_features, filename, table_name=table_name)
[docs] def evaluate_configuration(self, geom, name=None, energy=None, forces=None, energy_key="energy"): """ Generate feature vector(s) for learning energy and/or forces of one configuration. TODO: refactor to break up into smaller, reusable functions Args: geom (ase.Atoms): configuration of interest. name (str): if specified, keys in returned dictionary are tuples {(name, 'e'), (name, 'fx'), ...{ instead of {'e', 'fx', ...} energy (float): energy of configuration (optional). forces (list, np.ndarray): array containing force components fx, fy, fz for each atom. Expected shape is (n_atoms, 3). energy_key (str): column name for energies, default "energy". Returns: eval_map (dict): map of energy/force keys to fixed-length feature vectors. If forces and the energy are both provided, the dictionary will contain 3N + 1 entries. """ eval_map = {} n_atoms = len(geom) symbols = set(geom.get_chemical_symbols()) # check for undefined elements invalid_set = symbols.difference(self.element_list) if len(invalid_set) > 0: invalid_set = ', '.join(invalid_set) warning_str = "Invalid elements: {}".format(invalid_set) if name is not None: warning_str += " in configuration "+name warnings.warn(warning_str, RuntimeWarning) return dict() if any(geom.pbc): # generate supercell if necessary supercell = geometry.get_supercell(geom, r_cut=self.r_cut) else: supercell = geom if energy is not None: # compute energy features vector_1B = self.chemical_system.get_composition_tuple(geom) vector_2B = self.featurize_energy_2B(geom, supercell) vector = np.concatenate([vector_1B, vector_2B]) if self.degree > 2: vector_3B = self.featurize_energy_3B(geom, supercell) vector = np.concatenate([vector, vector_3B]) if name is not None: key = (name, energy_key) else: key = energy_key eval_map[key] = np.insert(vector, 0, energy) if forces is not None: # compute force features vectors_1B = np.zeros((len(geom), 3, len(self.element_list))) vectors_2B = self.featurize_force_2B(geom, supercell) vectors = np.concatenate([vectors_1B, vectors_2B], axis=2) if self.degree > 2: vectors_3B = self.featurize_force_3B(geom, supercell) vectors = np.concatenate([vectors, vectors_3B], axis=2) for j, component in enumerate(['fx', 'fy', 'fz']): for i in range(n_atoms): vector = vectors[i, j, :] vector = np.insert(vector, 0, forces[j][i]) atom_index = component + '_' + str(i) if name is not None: key = (name, atom_index) else: key = atom_index eval_map[key] = vector return eval_map
[docs] def featurize_energy_2B(self, geom, supercell=None): """ Generate 2B feature vector for learning energy of one configuration. Args: geom (ase.Atoms): unit cell or configuration of interest. supercell (ase.Atoms): optional ase.Atoms output of get_supercell used to account for atoms in periodic images. Returns: vector (np.ndarray): vector of features. """ if supercell is None: supercell = geom pair_tuples = self.interactions_map[2] distances_map = distances.distances_by_interaction(geom, pair_tuples, self.r_min_map, self.r_max_map, supercell=supercell) feature_map = {} for pair in pair_tuples: basis_function = self.basis_functions[pair] features = bspline.evaluate_basis_functions( distances_map[pair], basis_function, trailing_trim=self.trailing_trim) feature_map[pair] = features feature_vector = flatten_by_interactions(feature_map, pair_tuples) return feature_vector
[docs] def featurize_force_2B(self, geom, supercell=None): """ Generate 2B feature vectors for learning forces of one configuration. Args: geom (ase.Atoms): unit cell or configuration of interest. supercell (ase.Atoms): optional ase.Atoms output of get_supercell used to account for atoms in periodic images. Returns: feature_array (np.ndarray): feature vectors arranged in array of shape (n_atoms, n_force_components, n_features). """ if supercell is None: supercell = geom pair_tuples = self.interactions_map[2] deriv_results = distances.derivatives_by_interaction(geom, pair_tuples, self.r_cut, self.r_min_map, self.r_max_map, supercell) distance_map, derivative_map = deriv_results feature_map = {} for pair in pair_tuples: basis_functions = self.basis_functions[pair] knot_sequence = self.knots_map[pair] features = bspline.featurize_force_2B( basis_functions, distance_map[pair], derivative_map[pair], knot_sequence, trailing_trim=self.trailing_trim) feature_map[pair] = features feature_array = flatten_by_interactions(feature_map, pair_tuples) return feature_array
[docs] def featurize_energy_3B(self, geom, supercell=None): """ Generate 3B feature vector for learning energy of one configuration. Args: geom (ase.Atoms): unit cell or configuration of interest. supercell (ase.Atoms): optional ase.Atoms output of get_supercell used to account for atoms in periodic images. Returns: vector (np.ndarray): vector of features. """ if supercell is None: supercell = geom trio_list = self.interactions_map[3] knot_sets = [self.knots_map[trio] for trio in trio_list] basis_functions = [self.basis_functions[trio] for trio in trio_list] hashes = self.interaction_hashes[3] grids = angles.featurize_energy_3b(geom, knot_sets, basis_functions, hashes, supercell=supercell, trailing_trim=self.trailing_trim) vectors = [] for i, trio in enumerate(trio_list): value_grid = grids[i] vector = self.bspline_config.compress_3B(value_grid, trio) vectors.append(vector) vector = np.concatenate(vectors) return vector
[docs] def featurize_force_3B(self, geom, supercell=None): """ Generate 3B feature vectors for learning forces of one configuration. Args: geom (ase.Atoms): unit cell or configuration of interest. supercell (ase.Atoms): optional ase.Atoms output of get_supercell used to account for atoms in periodic images. Returns: feature_array (np.ndarray): feature vectors arranged in array of shape (n_atoms, n_force_components, n_features). """ if supercell is None: supercell = geom trio_list = self.interactions_map[3] knot_sets = [self.knots_map[trio] for trio in trio_list] basis_functions = [self.basis_functions[trio] for trio in trio_list] hashes = self.interaction_hashes[3] grids = angles.featurize_force_3b(geom, knot_sets, basis_functions, hashes, supercell=supercell, trailing_trim=self.trailing_trim) blocks = [] for i, trio in enumerate(trio_list): values = grids[i] block = [[self.bspline_config.compress_3B(grid, trio) for grid in atom_set] for atom_set in values] blocks.append(block) return np.concatenate(blocks, axis=-1)
[docs] def get_training_tuples(self, df_features, kappa, data_coordinator): """ TODO: Remove (deprecated) Weights are generated by normalizing energy and force entries by the respective sample standard deviations as well as the relative number of entries per type. Weights are further modified by kappa, which controls the relative weighting between energy and force errors. A value of 0 corresponds to force-training, while a value of 1 corresponds to energy-training. Args: df_features (pd.DataFrame): dataframe with target vector (y) as the first column and feature vectors (x) as remaining columns. kappa (float): energy-force weighting parameter between 0 and 1. data_coordinator (uf3.data.io.DataCoordinator) Returns: x (np.ndarray): features for machine learning. y (np.ndarray): target vector. w (np.ndarray): weight vector for machine learning. """ energy_key = data_coordinator.energy_key x, y, w = dataframe_to_training_tuples(df_features, kappa=kappa, energy_key=energy_key) return x, y, w
[docs]def save_feature_db(dataframe, filename, table_name='features'): """ Save dataframe with sqlite. Args: dataframe (pd.DataFrame) filename (str) table_name (str): default "features". """ dataframe.to_hdf(filename, table_name, mode="a", format='fixed')
[docs]def load_feature_db(filename, table_name='features'): """ Load dataframe with sqlite. Args: filename (str) table_name (str): default "features". Returns: dataframe (pd.DataFrame) """ dataframe = pd.read_hdf(filename, table_name) return dataframe
[docs]def legacy_load_feature_db(filename, table_name): # deprecated conn = sqlite3.connect(filename) dataframe = pd.read_sql_query("SELECT * FROM {};".format(table_name), conn) dataframe.set_index(keys=['level_0', 'level_1'], inplace=True) conn.close() return dataframe
[docs]def dataframe_to_training_tuples(df_features, kappa=0.5, energy_key='energy'): """ Args: df_features (pd.DataFrame): dataframe with target vector (y) as the first column and feature vectors (x) as remaining columns. kappa (float): energy-force weighting parameter between 0 and 1. energy_key (str): key for energy samples, used to slice df_features into energies and forces for weight generation. TODO: refactor to break up into smaller, reusable functions Returns: x (np.ndarray): features for machine learning. y (np.ndarray): target vector. w (np.ndarray): weight vector for machine learning. """ if kappa < 0 or kappa > 1: raise ValueError("Invalid domain for kappa weighting parameter.") if len(df_features) <= 1: raise ValueError( f"Not enough samples ({len(df_features)} provided)") y_index = df_features.index.get_level_values(-1) energy_mask = (y_index == energy_key) force_mask = np.logical_not(energy_mask) data = df_features.to_numpy() y = data[:, 0] x = data[:, 1:] energy_values = y[energy_mask] force_values = y[force_mask] # compute metrics for weighting energy_std = np.std(energy_values) force_std = np.std(force_values) n_energy = len(energy_values) n_forces = len(force_values) # generate weights based on sample standard deviation and frequency. energy_weights = np.ones(n_energy) / energy_std / n_energy * kappa force_weights = np.ones(n_forces) / force_std / n_forces * (1 - kappa) w = np.zeros(n_energy + n_forces) w[energy_mask] = energy_weights w[force_mask] = force_weights return x, y, w
[docs]def flatten_by_interactions(vector_map, pair_tuples): """ Args: vector_map (dict): map of vector per interaction. pair_tuples (list): list of pair interactions e.g. [(A-A), (A-B), (A-C), (B-B), ...)] Returns: (np.ndarray): concatenated result of joining vector_map in order of occurrence in pair_tuples. """ return np.concatenate([vector_map[pair] for pair in pair_tuples], axis=-1)