"""
This module provides the DataCoordinator class for reading data from
atomistic codes and organizing data into DataFrames using Pandas.
"""
import os
import re
import io as pio
import fnmatch
from typing import List, Dict, Tuple
import numpy as np
import pandas as pd
import tables
import ase
from ase import io as ase_io
from ase import db as ase_db
from ase.db import core as db_core
from ase.io import lammpsrun as ase_lammpsrun
from ase.calculators import singlepoint
from ase.calculators import calculator as ase_calc
from uf3.util import subsample
[docs]class DataCoordinator:
"""
Handler class for reading data from atomistic codes and organizing data
into DataFrames using Pandas.
"""
def __init__(self,
atoms_key='geometry',
energy_key='energy',
force_key='force',
size_key='size',
overwrite=False
):
"""
Args:
atoms_key (str): column name for geometries, default "geometry".
Modify when parsed geometries are part of a larger pipeline.
energy_key (str): column name for energies, default "energy".
force_key (str): identifier for forces, default "force".
size_key (str): column name for number of atoms per geometry,
default "size".
overwrite (bool): Allow overwriting of existing DataFrame
with matching key when loading.
"""
self.atoms_key = atoms_key
self.energy_key = energy_key
self.force_key = force_key
self.size_key = size_key
self.overwrite = overwrite
self.data = {}
self.keys = []
[docs] @staticmethod
def from_config(config):
"""Instantiate from configuration dictionary"""
keys = ['atoms_key',
'energy_key',
'force_key',
'size_key',
'overwrite']
config = {k: v for k, v in config.items() if k in keys}
return DataCoordinator(**config)
def __repr__(self):
summary = ["DataCoordinator:",
]
if len(self.keys) == 0:
summary.append(f" Datasets: None")
else:
summary.append(f" Datasets: {len(self.keys)} ({self.keys})")
return "\n".join(summary)
def __str__(self):
return self.__repr__()
[docs] def consolidate(self, remove_duplicates=True, keep='first'):
"""Wrapper for io.concat_dataframes()"""
dataframes = [self.data[k] for k in self.keys]
df = concat_dataframes(dataframes,
remove_duplicates=remove_duplicates,
keep=keep)
return df
[docs] def load_dataframe(self, dataframe, prefix=None):
"""Load existing pd.DataFrame"""
for key in [self.atoms_key, self.energy_key, self.size_key]:
if key not in dataframe.columns:
raise RuntimeError("Missing \"{}\" column.".format(key))
name_0 = dataframe.index[0] # existing prefix takes priority
if isinstance(name_0, str):
if '_' in name_0:
prefix = '_'.join(name_0.split('_')[:-1])
if prefix is None: # no prefix provided
prefix = len(self.data)
pattern = '{}_{{}}'.format(prefix)
dataframe = dataframe.rename(pattern.format)
if prefix in self.data:
print('Data already exists with prefix "{}".'.format(prefix),
end=' ')
if self.overwrite is True:
print('Overwriting...')
self.data[prefix] = dataframe
else:
print('Skipping...')
return
else:
self.data[prefix] = dataframe
self.keys.append(prefix)
[docs] def dataframe_from_lists(self,
geometries,
prefix=None,
energies=None,
forces=None,
load=True,
**kwargs):
"""Wrapper for io.prepare_dataframe_from_lists()"""
if prefix is None:
prefix = len(self.data)
df = prepare_dataframe_from_lists(geometries,
prefix,
energies=energies,
forces=forces,
atoms_key=self.atoms_key,
energy_key=self.energy_key,
force_key=self.force_key,
size_key=self.size_key,
**kwargs)
if load:
self.load_dataframe(df, prefix=prefix)
else:
return df
[docs] def dataframe_from_trajectory(self,
filename,
prefix=None,
load=True,
energy_key=None,
force_key=None,
**kwargs):
"""Wrapper for io.parse_trajectory()"""
if prefix is None:
prefix = len(self.data)
if energy_key is None:
energy_key = self.energy_key
if force_key is None:
force_key = self.force_key
df = parse_trajectory(filename,
prefix=prefix,
atoms_key=self.atoms_key,
energy_key=energy_key,
force_key=force_key,
size_key=self.size_key,
**kwargs)
if energy_key != self.energy_key:
df.rename(columns={energy_key: self.energy_key},
inplace=True)
if load:
self.load_dataframe(df, prefix=prefix)
else:
return df
dataframe_from_xyz = dataframe_from_trajectory
dataframe_from_vasprun = dataframe_from_trajectory
[docs] def dataframe_from_lammps_run(self,
path,
lammps_aliases,
prefix=None,
column_subs={"PotEng": "energy"},
log_fname="log.lammps",
dump_fname="dump.lammpstrj",
load=True,
**kwargs):
"""Wrapper for io.parse_lammps_outputs()"""
if prefix is None:
prefix = len(self.data)
df = parse_lammps_outputs(path,
lammps_aliases,
prefix=prefix,
column_subs=column_subs,
log_fname=log_fname,
dump_fname=dump_fname,
atoms_key=self.atoms_key,
size_key=self.size_key,
**kwargs)
if load:
self.load_dataframe(df, prefix=prefix)
else:
return df
[docs]def concat_dataframes(dataframes: List[pd.DataFrame],
remove_duplicates: bool = True,
keep: str = 'first'
) -> pd.DataFrame:
"""
Concatenate list of dataframes with optional removal of duplicate keys.
Args:
dataframes (list): list of DataFrames to merge
remove_duplicates (bool): remove duplicates.
keep (str, bool): 'first', 'last', or False.
Returns:
df (pandas.DataFrame)
"""
df = pd.concat(dataframes)
duplicate_array = df.index.duplicated(keep=keep)
if np.any(duplicate_array):
print('Duplicates keys found:', np.sum(duplicate_array))
if remove_duplicates:
print('Removing with keep=', keep)
df = df[~duplicate_array]
print('Unique keys:', len(df))
return df
[docs]def prepare_dataframe_from_lists(geometries: List[ase.Atoms],
prefix: str = None,
energies: List[float] = None,
forces: List[np.ndarray] = None,
atoms_key: str = 'geometry',
energy_key: str = 'energy',
force_key: str = 'force',
size_key: str = 'size',
copy: bool = True
) -> pd.DataFrame:
"""
Convenience function for arranging data into pandas DataFrame
with expected column names. Extracts energies and forces from
provided ase.Atoms objects if unspecified. If specified,
adds/overwrites energies and/or forces in ase.Atoms objects
via info and arrays attributes. Length of geometries, energies,
and forces must match.
Args:
geometries (list): list of ase.Atoms configurations.
prefix (str): prefix for DataFrame index.
e.g. "bulk" -> [bulk_0, bulk_1, bulk_2, ...]
energies (list or np.ndarray): vector of energy for each geometry.
forces (list): list of n x 3 arrays of forces for each geometry.
atoms_key (str): column name for geometries, default "geometry".
Modify when parsed geometries are part of a larger pipeline.
energy_key (str): column name for energies, default "energy".
force_key (str): identifier for forces, default "force".
size_key (str): column name for number of atoms per geometry,
default "size".
copy (bool): copy geometries, energies and forces before modification.
Returns:
df (pandas.DataFrame): standard dataframe with columns
[atoms_key, energy_key, fx, fy, fz]
"""
if copy:
geometries = [geom.copy() for geom in geometries]
geometries = update_geometries_from_calc(geometries,
energy_key=energy_key,
force_key=force_key)
# generate dataframe
default_columns = [atoms_key, energy_key, 'fx', 'fy', 'fz']
df = pd.DataFrame(columns=default_columns)
df[atoms_key] = geometries
scalar_keys = ()
array_keys = ()
if energies is not None:
if copy:
energies = np.array(energies)
df[energy_key] = energies
scalar_keys = ('energy',) # add energies to ase.Atoms objects
if forces is not None:
if copy:
forces = [array.copy() for array in forces]
df['fx'] = [np.array(array)[:, 0] for array in forces]
df['fy'] = [np.array(array)[:, 1] for array in forces]
df['fz'] = [np.array(array)[:, 2] for array in forces]
array_keys = ('fx', 'fy', 'fz') # add forces to ase.Atoms objects
# If values are provided, overwrite attributes for consistency.
update_geometries_from_dataframe(df,
scalar_keys=scalar_keys,
array_keys=array_keys)
# Otherwise, pull energies and forces from objects.
scalar_keys = ()
array_keys = ()
if energies is None:
scalar_keys = ('energy',) # get energies from ase.Atoms objects
if forces is None:
array_keys = ('fx', 'fy', 'fz') # get forces from ase.Atoms objects
df = update_dataframe_from_geometries(df,
atoms_key=atoms_key,
size_key=size_key,
scalar_keys=scalar_keys,
array_keys=array_keys,
inplace=True)
if prefix is not None:
pattern = '{}_{{}}'.format(prefix)
df = df.rename(pattern.format)
return df
[docs]def parse_trajectory(fname: str,
scalar_keys: List[str] = (),
array_keys: List[str] = (),
prefix: str = None,
atoms_key: str = "geometry",
energy_key: str = "energy",
force_key: str = 'force',
size_key: str = 'size'):
"""
Wrapper for ase.io.read, which is compatible with
many file formats (notably VASP's vasprun.xml and extended xyz).
If available, force information is written to each ase.Atoms object's
arrays attribute as separate "fx", "fy", and "fz" entries.
Args:
fname (str): filename.
scalar_keys (list): list of ase.Atoms.info keys to query and
include as a DataFrame column. e.g. ["config_type"].
array_keys (list): list of ase.Atoms.arrays keys to query and
include as a DataFrame column. e.g. ["charge"].
prefix (str): prefix for DataFrame index.
e.g. "bulk" -> [bulk_0, bulk_1, bulk_2, ...]
atoms_key (str): column name for geometries, default "geometry".
Modify when parsed geometries are part of a larger pipeline.
energy_key (str): column name for energies, default "energy".
force_key (str): identifier for forces, default "force".
size_key (str): column name for number of atoms per geometry,
default "size".
Returns:
df (pandas.DataFrame): standard dataframe with columns
[atoms_key, energy_key, fx, fy, fz]
"""
extension = os.path.splitext(fname)[-1]
kws = ['mysql', 'postgres', 'mariadb']
if extension in ['.db', '.json'] or any([kw in fname for kw in kws]):
# handle differently to retrieve attached names instead of reindexing
geometries = read_database(fname, index=slice(None, None))
new_index = [geom.info.get('row_name', None) for geom in geometries]
index_errors = new_index.count(None)
if index_errors > 1:
new_index = None
else: # flexible read function for a variety of filetypes
geometries = ase_io.read(fname, index=slice(None, None))
new_index = None
if not isinstance(geometries, list):
geometries = [geometries]
geometries = update_geometries_from_calc(geometries,
energy_key=energy_key,
force_key=force_key)
# create DataFrame
default_columns = [atoms_key, energy_key, 'fx', 'fy', 'fz']
scalar_keys = [p for p in scalar_keys
if p not in default_columns]
array_keys = [p for p in array_keys
if p not in default_columns]
columns = default_columns + scalar_keys + array_keys
df = pd.DataFrame(columns=columns)
df[atoms_key] = geometries
# object-dataframe consistency
scalar_keys = scalar_keys + [energy_key]
array_keys = array_keys + ["fx", "fy", "fz"]
df = update_dataframe_from_geometries(df,
atoms_key=atoms_key,
size_key=size_key,
scalar_keys=scalar_keys,
array_keys=array_keys,
inplace=True)
if new_index is not None:
df.index = new_index
print('Loaded index from file:', fname)
elif prefix is not None:
pattern = '{}_{{}}'.format(prefix)
df = df.rename(pattern.format)
return df
[docs]def read_database(filename: str, index: bool = None, **kwargs):
"""
Read ase.db-type database file.
Args:
filename (str): path to database file.
index(slice): Default (None, None, 1)
Returns:
list of ase.Atoms objects from database.
"""
if index is None:
index = slice(None, None)
db = ase.db.connect(filename, serial=True, **kwargs)
start, stop, _ = index.indices(db.count())
if start == stop:
return
geometries = []
for row in db.select(offset=start, limit=stop - start):
geom = row.toatoms(add_additional_information=True)
key_value_pairs = dict(geom.info['key_value_pairs'])
del geom.info['key_value_pairs']
geom.info = {**geom.info, **key_value_pairs}
geometries.append(geom)
return geometries
[docs]def parse_lammps_outputs(path: str,
lammps_aliases: Dict[int, str],
prefix: str = None,
column_subs: Dict[str, str] = {"PotEng": "energy"},
log_fname: str = "log.lammps",
dump_fname: str = "dump.lammpstrj",
atoms_key: str = "geometry",
size_key: str = 'size',
log_regex: str = None,
) -> pd.DataFrame:
"""
Convenience wrapper for parsing both LAMMPS log and dump
in a run directory.
Args:
path (str): path to run directory.
lammps_aliases (dict): optional map of LAMMPS atom types to species.
prefix (str): prefix for DataFrame index.
e.g. "bulk" -> [bulk_0, bulk_1, bulk_2, ...]
column_subs (dict): column name substitutions for DataFrame.
Default {"PotEng": "energy"}.
log_fname: log filenane, default "log.lammps".
dump_fname (str): dump filename, default "dump.lammpstrj".
atoms_key (str): column name for geometries, default "geometry".
Modify when parsed geometries are part of a larger pipeline.
size_key (str): column name for number of atoms per geometry,
default "size".
log_regex (str): Regular expression for identifying step information.
Defaults to "\\n(Step[^\\n]+\\n[^A-Za-z]+)(?:Loop time of)"
Returns:
df (pandas.DataFrame): Indexed by timestep, containing
columns from log (e.g. Temp, PotEng) and column containing
corresponding ase.Atoms snapshots.
"""
log_path = os.path.join(path, log_fname)
dump_path = os.path.join(path, dump_fname)
# Parse log file, yielding a DataFrame
df_log = parse_lammps_log(log_path, log_regex=log_regex)
df = df_log.rename(columns=column_subs)
df[atoms_key] = pd.Series(dtype=object)
col_idx = df.columns.get_loc(atoms_key)
log_timesteps = df['Step'].values
# Parse dump file, querying only timesteps appearing in the log
snapshots = parse_lammps_dump(dump_path,
lammps_aliases,
timesteps=log_timesteps)
log_idxs = np.arange(len(df))
intersection_idxs = []
for timestep, geom in snapshots.items():
# match log timesteps with snapshot timesteps
i = np.flatnonzero(log_timesteps == timestep)[0]
idx = log_idxs[i]
log_timesteps = np.delete(log_timesteps, i)
log_idxs = np.delete(log_idxs, i)
intersection_idxs.append(idx)
for i, (timestep, geom) in enumerate(snapshots.items()):
log_idx = intersection_idxs[i] # index of matching log row
timestep_info = df.iloc[log_idx].to_dict() # log row
df.iat[log_idx, col_idx] = geom
for key, value in timestep_info.items():
geom.info[key] = value
# Add geometries to DataFrame and remove timesteps with no geometry.
df = df.iloc[intersection_idxs]
if prefix is not None:
pattern = '{}_{{}}'.format(prefix)
df = df.rename(pattern.format)
# object-dataframe consistency
df = update_dataframe_from_geometries(df,
atoms_key=atoms_key,
size_key=size_key,
scalar_keys=['energy'],
array_keys=['fx', 'fy', 'fz'],
inplace=True)
return df
[docs]def update_dataframe_from_geometries(df: pd.DataFrame,
scalar_keys: List[str] = (),
array_keys: List[str] = (),
atoms_key: str = 'geometry',
size_key: str = 'size',
inplace: bool = True
) -> pd.DataFrame:
"""Intermediate function for object-dataframe consistency"""
if not inplace:
df = df.copy()
geometries = df[atoms_key]
scalar_idxs = []
array_idxs = []
for scalar in scalar_keys:
if scalar not in df.columns:
df[scalar] = pd.Series(dtype=object)
scalar_idxs.append(df.columns.get_loc(scalar))
if size_key not in df.columns:
df[size_key] = pd.Series(dtype=int)
size_idx = df.columns.get_loc(size_key)
for array in array_keys:
if array not in df.columns:
df[array] = pd.Series(dtype=object)
array_idxs.append(df.columns.get_loc(array))
for idx, geom in enumerate(geometries):
df.iat[idx, size_idx] = len(geom)
for scalar, scalar_idx in zip(scalar_keys, scalar_idxs):
try:
df.iat[idx, scalar_idx] = geom.info[scalar]
except KeyError:
continue
for array, array_idx in zip(array_keys, array_idxs):
try:
df.iat[idx, array_idx] = geom.arrays[array]
except KeyError:
continue
return df
[docs]def update_geometries_from_calc(geometries: List[ase.Atoms],
energy_key: str = 'energy',
force_key: str = 'force'
) -> List[ase.Atoms]:
"""Query attached calculators for energy and forces."""
for idx, geom in enumerate(geometries):
try:
geom.info[energy_key] = geom.calc.get_potential_energy()
except (ase_calc.PropertyNotImplementedError,
AttributeError):
pass # no energy
try:
forces = geom.calc.get_forces()
except (ase_calc.PropertyNotImplementedError,
AttributeError):
if force_key in geom.arrays:
forces = geom.arrays[force_key]
else:
continue # no forces
try:
geom.new_array('fx', forces[:, 0])
geom.new_array('fy', forces[:, 1])
geom.new_array('fz', forces[:, 2])
except ValueError: # shape mismatch
continue
except RuntimeError: # array already exists
continue
return geometries
[docs]def update_geometries_from_dataframe(df: pd.DataFrame,
scalar_keys: List[str] = (),
array_keys: List[str] = (),
atoms_key: str = 'geometry',
inplace: bool = True
) -> List[ase.Atoms]:
"""Intermediate function for object-dataframe consistency"""
geometries = df[atoms_key]
if not inplace:
geometries = [geom.copy() for geom in geometries]
scalar_idxs = [df.columns.get_loc(scalar) for scalar in scalar_keys]
array_idxs = [df.columns.get_loc(array) for array in array_keys]
for idx, geom in enumerate(geometries):
for scalar, scalar_idx in zip(scalar_keys, scalar_idxs):
geom.info[scalar] = df.iat[idx, scalar_idx]
for array, array_idx in zip(array_keys, array_idxs):
try:
geom.new_array(array, df.iat[idx, array_idx])
except ValueError: # shape mismatch
continue
except RuntimeError: # array already exists
continue
return geometries
[docs]def df_from_tsv_text(text: str) -> pd.DataFrame:
"""
Convenience function for converting tab-separated values (text)
into DataFrame.
"""
buffer = pio.StringIO(text) # pandas expects file buffer
df = pd.read_csv(buffer, delim_whitespace=True)
df = df.set_index("id").sort_index()
return df
[docs]def atoms_from_df(df: pd.DataFrame,
element_key: str = 'element',
lammps_aliases: Dict[int, str] = None,
info: Dict[str, float] = None,
**atom_kwargs
) -> ase.Atoms:
"""
Create ase.Atoms from DataFrame. Minimum required columns include:
x, y, z, [element_key]
Args:
df (pandas.DataFrame): DataFrame of interest.
element_key (str): column name corresponding to species.
lammps_aliases (dict): optional map of aliases to species
e.g. for LAMMPS atom types.
info (dict): optional dictionary of scalars.
**atom_kwargs: arguments to pass to ase.Atoms, e.g. cell and pbc.
Returns:
atoms (ase.Atoms)
"""
req_keys = ['x', 'y', 'z', element_key]
info = info or {}
lammps_aliases = lammps_aliases or {}
positions = df[['x', 'y', 'z']].to_numpy()
species = df[element_key]
species = [lammps_aliases.get(el, el)
for el in species] # substitute aliases
atoms = ase.Atoms(species, positions=positions, **atom_kwargs)
# Add extra columns, e.g. fx or per-atom quantities, as array entries.
extra_keys = list(set(df.columns).difference(req_keys))
for key in extra_keys:
atoms.new_array(key, df[key].values)
atoms.info = info
return atoms
[docs]def parse_lammps_log(fname: str, log_regex: str = None) -> pd.DataFrame:
"""
Parse lammps log file into pd.DataFrame.
Args:
fname (str): filename of log file.
log_regex (str): Regular expression for identifying step information.
Defaults to "\\n(Step[^\\n]+\\n[^A-Z]+)(?:Loop time)"
Returns:
df_log (pandas.DataFrame)
"""
log_regex = log_regex or '\n(Step[^\n]+\n[^A-Z]+)(?:Loop time)'
log_blocks = []
with open(fname, 'r') as f:
text = f.read()
for text_block in re.compile(log_regex).findall(text):
buffer = pio.StringIO(text_block)
df = pd.read_csv(buffer, delim_whitespace=True)
log_blocks.append(df)
df_log = pd.concat(log_blocks, ignore_index=True)
df_log = df_log[~df_log.duplicated()]
return df_log
[docs]def parse_lammps_dump(fname: str,
lammps_aliases: Dict[int, str],
timesteps: List[int] = None
) -> pd.Series:
"""
Read LAMMPS text dump file. Expects the following items in the
thermo_style: id type x y z
Other items, such as fx and custom computes,
are added via ase.Atoms.new_array().
Compatible with large files because the function reads line-by-line
and, optionally, saves only specified timesteps.
TODO: refactor to break up into smaller, reusable functions
Args:
fname (str): filename of dump file.
lammps_aliases (dict): map of LAMMSPS type to species.
timesteps (list, np.ndarray): Optional subset of timesteps to parse.
Note: function expects timesteps to match dump chronologically.
This behavior is intended to accommodate LAMMPS runs with
reset_timestep commands.
Returns:
snapshots (pandas.Series): Map of timestep to ase.Atoms, allowing
repeated entries in case of reset_timestep.
"""
parse_subset = (timesteps is not None)
timesteps = np.array(timesteps)
snapshot_index = []
snapshot_contents = []
atom_lines = []
timestep = None
cell = None
pbc = None
cell_displacement = None
with open(fname, 'r') as f:
while True:
line = f.readline()
if "ITEM: TIMESTEP" in line or not line:
if timestep is not None: # consolidate atom data
df = df_from_tsv_text('\n'.join(atom_lines))
atoms = atoms_from_df(df,
cell=cell,
pbc=pbc,
celldisp=cell_displacement,
element_key='type',
lammps_aliases=lammps_aliases)
if not parse_subset:
snapshot_index.append(timestep)
snapshot_contents.append(atoms)
else:
if timestep in timesteps:
snapshot_index.append(timestep)
snapshot_contents.append(atoms)
idx = np.flatnonzero(timesteps == timestep)[0]
# delete first occurrence of matching timestep
timesteps = np.delete(timesteps, idx)
if len(timesteps) == 0:
# finish early if all requested have been
# parsed. May not trigger if a requested
# timestep is absent from the dump.
break
if not line:
break
timestep = int(f.readline())
atom_lines = [] # reset timestep data
elif "ITEM: NUMBER OF ATOMS" in line:
n_atoms = int(f.readline()) # parsed but not necessary
elif "ITEM: BOX BOUNDS" in line: # cell data
conditions = line.replace("ITEM: BOX BOUNDS ", "").split()
a_line = f.readline().split()
b_line = f.readline().split()
c_line = f.readline().split()
cell_data = np.array([a_line, b_line, c_line])
cell_data = cell_data.astype(float)
cell_bounds = cell_data[:, :2].reshape(6, 1).flatten()
if len(conditions) < 3: # nonperiodic
pbc = (False, False, False)
off_diag = (0.0, 0.0, 0.0)
elif len(conditions) == 3: # orthogonal simulation cell
pbc = [('p' in condition.lower())
for condition in conditions]
off_diag = (0.0, 0.0, 0.0)
else: # triclinic simulation cell
# tilt_factors = conditions[:3]
pbc = [('p' in condition.lower())
for condition in conditions[3:]]
off_diag = cell_data[:, 2]
c_data = ase_lammpsrun.construct_cell(cell_bounds, off_diag)
cell, cell_displacement = c_data
elif "ITEM: ATOMS" in line: # header
atom_lines.append(line.replace("ITEM: ATOMS ", ""))
else: # atom data
atom_lines.append(line)
snapshots = pd.Series(index=snapshot_index,
data=snapshot_contents)
return snapshots
[docs]def read_vasp_pressure(path: str) -> float:
"""Utility for reading external pressure (kbar) from PSTRESS INCAR tag.
Used for extracting energy from VASP enthalpy (H = E + PV)"""
fname_incar = os.path.join(path, "INCAR")
fname_outcar = os.path.join(path, "OUTCAR")
fname_vasprun = os.path.join(path, "vasprun.xml")
pstress = None
for fname in [fname_incar, fname_outcar, fname_vasprun]:
print(fname)
if os.path.isfile(fname):
with open(fname, "r") as f:
line = f.readline()
while line:
if "PSTRESS" in line:
pstress = float(re.sub('[^0-9\\.]', '', line))
break
line = f.readline()
if isinstance(pstress, float):
break
if pstress is None:
return 0.0
else:
external_pressure = pstress * 1e-22 / (1.602176634e-19)
return external_pressure
[docs]def identify_paths(experiment_path: str = ".",
filename: str = None,
filename_pattern: str = None
) -> List[str]:
"""
Generate list of paths to files according to filename_pattern,
searching recursively from experiment_path.
Args:
experiment_path (str): directory in which to search, recursively.
Default: "."
filename (str): single filename.
filename_pattern (str): glob pattern e.g. "*.xyz" to search.
Returns:
data_paths (list)
"""
data_paths = []
if filename is not None:
if os.path.isfile(filename):
data_paths.append(filename)
elif os.path.isfile(os.path.join(experiment_path, filename)):
data_paths.append(filename)
if filename_pattern is not None:
for directory, folders, files in os.walk(experiment_path):
for filename in files:
if fnmatch.fnmatch(filename, filename_pattern):
path = os.path.join(directory, filename)
data_paths.append(path)
return data_paths
[docs]def parse_with_subsampling(data_paths: List[str],
data_coordinator: DataCoordinator,
max_samples: int = 100,
min_diff: float = 1e-3,
vasp_pressure: bool = False,
lammps_log: str = None,
lammps_aliases: Dict[int, str] = None,
verbose: bool = True):
"""
TODO: refactor to break up into smaller, reusable functions
Args:
data_paths (list)
data_coordinator (DataCoordinator)
max_samples (int): maximum number of samples taken per provided path.
Default: 100
min_diff (float): minimum energy difference between consecutive samples
in eV. Default: 1e-3
vasp_pressure (bool): whether to search for pressure and apply an
energy correction of Pressure * Volume term (H = E + PV).
lammps_log (str): optional name of lammps log, if applicable.
lammps_aliases (dict): map of LAMMPS type to species.
verbose (bool, int): verbosity level.
"""
common_prefix = os.path.commonprefix(data_paths)
common_path = os.path.dirname(common_prefix)
counter = 0
energy_key = data_coordinator.energy_key
size_key = data_coordinator.size_key
for data_path in data_paths:
prefix = data_path[len(common_path):]
prefix = prefix.replace("/", "-")
if prefix[0] == "-":
prefix = prefix[1:]
try:
if lammps_log is not None:
vasp_pressure = False
lammps_path, dump_fname = os.path.split(data_path)
df = data_coordinator.dataframe_from_lammps_run(
lammps_path, lammps_aliases, prefix=prefix, load=False,
log_fname=lammps_log, dump_fname=dump_fname,
column_subs={"TotEng": "energy"})
else:
df = data_coordinator.dataframe_from_trajectory(data_path,
prefix=prefix,
load=False)
except ValueError:
continue
if len(df) == 0:
continue
energy_list = df[energy_key].values / df[size_key].values
if max_samples > 0:
subsamples = subsample.farthest_point_sampling(
energy_list,
max_samples=max_samples,
min_diff=min_diff)
else:
subsamples = np.arange(len(energy_list))
if verbose >= 2:
print("{}/{} samples taken from {}.".format(len(subsamples),
len(energy_list),
prefix))
counter += len(subsamples)
if verbose >= 1:
print("Total: {} samples parsed.".format(counter))
df = df.iloc[np.sort(subsamples)]
if vasp_pressure:
vasp_path = os.path.dirname(data_path)
external_pressure = read_vasp_pressure(vasp_path)
if external_pressure != 0:
volumes = [geom.get_volume() for geom in df['geometry'].values]
corrections = np.multiply(volumes, external_pressure)
df[energy_key] = np.subtract(df['energy'], corrections)
if verbose >= 1:
line = "External pressure correction: {} kbar."
print(line.format(external_pressure))
data_coordinator.load_dataframe(df, prefix=prefix)
[docs]def cache_data(data_coordinator: DataCoordinator,
filename: str,
energy_key: str = 'energy',
serial: bool = False):
"""
Save dataframe from data_coordinator as ase Database.
Args:
data_coordinator (DataCoordinator)
filename (str)
energy_key (str): column name for energies, default "energy".
serial (bool)
"""
append = os.path.isfile(filename)
df_data = data_coordinator.consolidate()
geometries = df_data['geometry']
with ase_db.connect(filename, append=append, serial=serial) as database:
for name, geom in geometries.iteritems():
energy = geom.info[energy_key]
forces = np.vstack([geom.arrays['fx'],
geom.arrays['fy'],
geom.arrays['fz']]).T
geom_info = {k: geom.info[k] for k in geom.info
if (isinstance(geom.info[k],
(int, float, str, np.floating))
and k not in db_core.reserved_keys)}
geom = geom.copy()
calc = singlepoint.SinglePointCalculator(geom,
energy=energy,
forces=forces)
geom.calc = calc
database.write(geom,
id=None,
key_value_pairs=geom_info,
row_name=name)
[docs]def analyze_hdf_tables(filename: str) -> Tuple[int, int, List, Dict]:
"""Read hdf5 file and analyze table names and lengths"""
with tables.open_file(filename, mode="r") as h5file:
chunk_lengths = {}
paths = [group._v_name
for group in h5file.list_nodes("/")]
for path in paths:
table = h5file.get_node("/" + path, "axis0")
chunk_lengths[path] = table.nrows
n_chunks = len(chunk_lengths)
n_entries = int(np.sum([v for v in chunk_lengths.values()]))
chunk_names = sorted(paths)
return n_chunks, n_entries, chunk_names, chunk_lengths
[docs]def dataframe_batch_loader(filename: str, table_names: List) -> pd.DataFrame:
"""
Iterator for reading DataFrames from HDF5 using a list of table names,
i.e. from io.analyze_hdf_tables.
Args:
filename (str): path to HDF5 file.
table_names (list): list of table names in HDF5 to read.
"""
for table_name in table_names:
df = pd.read_hdf(filename, table_name)
yield df
[docs]def resolve_name_conflict(path: str) -> int:
"""Simple renaming by incrementing an integer preceding file extension."""
if os.path.isfile(path):
i = 0
while True:
stem, ext = os.path.splitext(path)
backup_path = stem + "." + str(i) + ext
if not os.path.isfile(backup_path):
os.rename(path, backup_path)
break
i += 1
return i