Source code for molsystem.atoms

# -*- coding: utf-8 -*-

"""A dictionary-like object for holding atoms

For efficiency the atom-data is stored as arrays -- numpy if possible, python
lists otherwise, along with metadata to define the attributes ("columns").

In some ways this a bit like pandas; however, we also need more control than a
simple pandas dataframe provides....
"""

import collections.abc
from itertools import zip_longest
import logging
from typing import Any, Dict, TypeVar

import numpy
import pandas

from molsystem.column import _Column as Column
from molsystem.table import _Table as Table

System_tp = TypeVar("System_tp", "System", None)
Atoms_tp = TypeVar("Atoms_tp", "_Atoms", str, None)

logger = logging.getLogger(__name__)


[docs]def grouped(iterable, n): "s -> (s0,s1,s2,...sn-1), (sn,sn+1,sn+2,...s2n-1), (s2n,...s3n-1), ..." return zip_longest(*[iter(iterable)] * n)
class _Atoms(collections.abc.MutableMapping): """The Atoms class holds arrays of attributes describing atoms. In order to handle changes in bonding (and numbers of atoms) the system class has several tables covering the atoms and bonds. See the main documentation for a more detailed description. The key tables are: configuration -- a list of configurations in e.g. a trajectory configuration_subset -- connnects configurations with subset template -- a simple list of templates templateatom -- holds the atoms in a template, if present. templatebond -- holds the bonds between the template atoms, if present subset -- an instantiation of a template, connected with one or more configurations of the system. subset_atom -- connects the subset to the atoms, and optionally connects the atom to a template atom. atom -- the nonvarying part of the description of the atoms coordinates -- the varying part of the description of atoms The description of the atoms is split into two parts: the identity of the atom in the 'atom' table; and the coordinates and other varying attributes in the table 'coordinates' as indicated above. This class handles the situation where the number and type of atoms changes from configuration to configuration, as it would for example in a grand canonical simulation or simulating surface deposition. The configuration if either given explicitly or the "current configuration" is used. The current configuration is set and stored in the system object. Given this configuration, this class provides the illusion that there is a single set of atoms in a single table containing both the identity of the atoms and their coordinates. Underneath, this is handled by a subset "all", which is a special subset that contains all of the atoms in the system. This subset is defined by one or more templates "all" which are instantiated in the subset table and connected to configurations via the configuration_subset table. If the number of atoms in the system is fixed and there are no bonds or they don't change (the case in classical MD), there will be one template "all" instantiated as one subset. All configurations will point to this one subset. If there are no bonds in the system, then this is all that is needed. However if there are bonds, the atoms involved in bonds need to be in the templateatom table and the bonds entered in the templatebond table. If the bonds change over time, then for each bond configuration there needs to be a distinct "all" template, with the appropriate templateatoms and templatebonds. This template is instantiated as a new subset and connected to one or more configurations. If the number of atoms changes over time, each new set of atoms also requires a new template and subset connected to the atoms in that configuration. A subset may persist over a number of configurations, but each time the atoms change a new subset is required. If both the atoms and the bonds change, then a corresponding subset is needed each time either changes. """ def __init__( self, system: System_tp, atom_tablename='atom', coordinates_tablename='coordinates', ) -> None: self._system = system self._atom_tablename = atom_tablename self._coordinates_tablename = coordinates_tablename self._atom_table = Table(system, self._atom_tablename) self._coordinates_table = Table(system, self._coordinates_tablename) def __enter__(self) -> Any: self.system.__enter__() return self def __exit__(self, etype, value, traceback) -> None: self.system.__exit__(etype, value, traceback) def __getitem__(self, key) -> Any: """Allow [] to access the data!""" return self.get_column(key) def __setitem__(self, key, value) -> None: """Allow x[key] access to the data""" column = self[key] column[0:] = value def __delitem__(self, key) -> None: """Allow deletion of keys""" if key in self._atom_table.attributes: del self._atom_table[key] elif key in self._coordinates_table.attributes: del self._coordinates_table[key] def __iter__(self) -> iter: """Allow iteration over the object""" return iter([*self.attributes.keys()]) def __len__(self) -> int: """The len() command""" return len(self.attributes) def __repr__(self) -> str: """The string representation of this object""" df = self.to_dataframe() return repr(df) def __str__(self) -> str: """The pretty string representation of this object""" df = self.to_dataframe() return str(df) def __contains__(self, item) -> bool: """Return a boolean indicating if a key exists.""" # Normal the tablename is used as an identifier, so is quoted with ". # Here we need it as a string literal so strip any quotes from it. tmp_item = item.strip('"') return tmp_item in self.attributes def __eq__(self, other) -> Any: """Return a boolean if this object is equal to another""" if self._atom_table != other._atom_table: return False if self._coordinates_table != other._coordinates_table: return False return True @property def system(self): """The system that we belong to.""" return self._system @property def db(self): """The database connection.""" return self.system.db @property def cursor(self): """The database connection.""" return self.system.cursor @property def table(self) -> str: """The name of this table.""" return '"' + self._atom_tablename + '"' @property def current_configuration(self): """The configuration of the system being used""" return self.system.current_configuration @current_configuration.setter def current_configuration(self, value): self.system.current_configuration = value @property def n_rows(self) -> int: """The number of rows in the table.""" self.cursor.execute(f'SELECT COUNT(*) FROM {self.table}') result = self.cursor.fetchone()[0] return result @property def attributes(self) -> Dict[str, Any]: """The definitions of the attributes. Combine the attributes of the atom and coordinates tables to make it look like a single larger table. """ result = self._atom_table.attributes for key, value in self._coordinates_table.attributes.items(): if key != 'atom': # atom key links the tables together, so ignore result[key] = value return result @property def coordinate_system(self): """The type of coordinates: 'fractional' or 'Cartesian'""" return self._system.coordinate_system @coordinate_system.setter def coordinate_system(self, value): self._system.coordinate_system = value @property def version(self): return self.system.version def add_attribute( self, name: str, coltype: str = 'float', default: Any = None, notnull: bool = False, index: bool = False, pk: bool = False, references: str = None, on_delete: str = 'cascade', on_update: str = 'cascade', values: Any = None, configuration_dependent: bool = False ) -> None: """Adds a new attribute. If the default value is None, you must always provide values wherever needed, for example when adding a row. Parameters ---------- name : str The name of the attribute. coltype : str The type of the attribute (column). Must be one of 'int', 'float', 'str' or 'byte' default : int, float, str or byte The default value for the attribute if no value is given. notnull : bool = False Whether the value must be non-null index : bool = False Whether to create an index on the column pk : bool = False Whether the column is the primry keys references : str = None If not null, the column is a foreign key for this table. on_delete : str = 'cascade' How to handle deletions of a foregin keys on_update : str = 'cascade' How to handle updates of a foregin key values : Any Either a single value or a list of values length 'nrows' of values to fill the column. configuration_dependent : bool = False Whether the attribute belongs with the coordinates (True) or atoms (False) Returns ------- None """ if configuration_dependent: self._coordinates_table.add_attribute( name, coltype=coltype, default=default, notnull=notnull, index=index, pk=pk, references=references, on_delete=on_delete, on_update=on_update, values=values ) else: self._atom_table.add_attribute( name, coltype=coltype, default=default, notnull=notnull, index=index, pk=pk, references=references, on_delete=on_delete, on_update=on_update, values=values ) def append(self, configuration=None, **kwargs: Dict[str, Any]) -> None: """Append one or more atoms The keys give the field for the data. If an existing field is not mentioned, then the default value is used, unless the default is None, in which case an error is thrown. It is an error if there is not a field corrresponding to a key. """ # Need to handle the elements specially. Can give atomic numbers, # or symbols. By construction the references to elements are identical # to their atomic numbers. if 'symbol' in kwargs: symbols = kwargs.pop('symbol') kwargs['atno'] = self.to_atnos(symbols) # How many new rows there are n_rows, lengths = self._get_n_rows(**kwargs) # Fill in the atom table data = {} for column in self._atom_table.attributes: if column != 'id' and column in kwargs: data[column] = kwargs.pop(column) if len(data) == 0: data['atno'] = [None] * n_rows ids = self._atom_table.append(n=n_rows, **data) # Now append to the coordinates table if configuration is None: configuration = self.current_configuration data = {'configuration': configuration, 'atom': ids} for column in self._coordinates_table.attributes: if column != 'id' and column in kwargs: data[column] = kwargs.pop(column) self._coordinates_table.append(n=n_rows, **data) # And to the subset 'all' subset_atom = self.system['subset_atom'] if isinstance(configuration, int): config = configuration else: config = configuration[0] subset_atom.append(subset=self.system.all_subset(config), atom=ids) return ids def atoms( self, *args, subset=None, configuration=None, template_order=False ): """Return an iterator over the atoms. Parameters ---------- args : [str] Added selection criteria for the SQL, one word at a time. subset : int = None Get the atoms for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. template_order : bool = False If True, and there are template atoms associated with the atoms, return rows in the order of the template. Returns ------- sqlite3.Cursor A cursor that returns sqlite3.Row objects for the atoms. """ if subset is None: subset = self.system.all_subset(configuration) # If we are asked to use template order, see if all the atoms # have associated template atoms. if template_order: self.cursor.execute( 'SELECT COUNT(*) FROM subset_atom WHERE subset = ?' ' AND templateatom IS NULL', (subset,) ) n = self.cursor.fetchone()[0] if n > 0: raise RuntimeError( 'Not all of the atoms are defined in the template for ' f'subset {subset} - {n} are not' ) atom_tbl = self._atom_tablename coord_tbl = self._coordinates_tablename atom_columns = [*self._atom_table.attributes] coord_columns = [*self._coordinates_table.attributes] coord_columns.remove('atom') columns = [f'at.{x}' for x in atom_columns] columns += [f'co.{x}' for x in coord_columns] column_defs = ', '.join(columns) sql = ( f'SELECT {column_defs}' f' FROM {atom_tbl} as at, {coord_tbl} as co, subset_atom as sa' ' WHERE at.id == sa.atom AND sa.subset = ? AND co.atom = at.id' ) if len(args) == 0: if template_order: sql += " ORDER BY templateatom" return self.db.execute(sql, (subset,)) parameters = [subset] for col, op, value in grouped(args, 3): if op == '==': op = '=' sql += f' AND "{col}" {op} ?' parameters.append(value) if template_order: sql += " ORDER BY templateatom" return self.db.execute(sql, parameters) def atom_ids(self, subset=None, configuration=None, template_order=False): """The ids of the atoms the subset or configuration. Parameters ---------- subset : int = None Get the atoms for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. template_order : bool = False If True, and there are template atoms associated with the atoms, return rows in the order of the template. Returns ------- [int] The ids of the requested atoms. """ if subset is None: subset = self.system.all_subset(configuration) if template_order: return [ x[0] for x in self.db.execute( "SELECT atom, templateatom FROM subset_atom " "WHERE subset = ? ORDER BY templateatom", (subset,) ) ] else: return [ x[0] for x in self.db.execute( "SELECT atom FROM subset_atom WHERE subset = ?", (subset,) ) ] def atomic_numbers( self, subset: int = None, configuration: int = None, template_order: bool = False ) -> [float]: """The atomic numbers of the atoms in the subset or configuration. Parameters ---------- subset : int = None Get the values for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. template_order : bool = False If True, and there are template atoms associated with the atoms, return rows in the order of the template. Returns ------- [int] The atomic numbers. """ column = self.get_column( 'atno', subset=subset, configuration=configuration, template_order=template_order ) return [*column] def atomic_masses( self, subset: int = None, configuration: int = None, template_order: bool = False ) -> [float]: """The atomic masses of the atoms in the subset or configuration. Parameters ---------- subset : int = None Get the values for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. template_order : bool = False If True, and there are template atoms associated with the atoms, return rows in the order of the template. Returns ------- [int] The atomic numbers. """ if 'mass' in self: column = self.get_column( 'mass', subset=subset, configuration=configuration, template_order=template_order ) return [*column] else: atnos = self.atomic_numbers( subset=subset, configuration=configuration, template_order=template_order ) return self._system.default_masses(atnos=atnos) def coordinates( self, subset=None, configuration=None, template_order=False, fractionals=True, in_cell=False, as_array=False ): """Return the coordinates optionally translated back into the principal unit cell. Parameters ---------- subset : int = None Get the atoms for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. template_order : bool = False If True, and there are template atoms associated with the atoms, return rows in the order of the template. fractionals : bool = True Return the coordinates as fractional coordinates for periodic systems. Non-periodic systems always use Cartesian coordinates. in_cell : bool, str = False Whether to translate the atoms into the unit cell, and if so whether to do so by molecule or just atoms. as_array : bool = False Whether to return the results as a numpy array or as a list of lists (the default). Returns ------- abc : [N][float*3] The coordinates, either Cartesian or fractional """ if configuration is None: configuration = self.current_configuration xyz = [] for row in self.atoms( subset=subset, configuration=configuration, template_order=template_order ): xyz.append([row['x'], row['y'], row['z']]) periodicity = self.system.periodicity if periodicity == 0: if as_array: return numpy.array(xyz) else: return xyz cell = self.system['cell'].cell(configuration) if isinstance(in_cell, str) and 'molecule' in in_cell: # Need fractionals... if self.system.coordinate_system == 'Cartesian': UVW = cell.to_fractionals(xyz, as_array=True) elif not isinstance(xyz, numpy.ndarray): UVW = numpy.array(xyz) else: UVW = xyz molecules = self.system.find_molecules( configuration=configuration, as_indices=True ) for indices in molecules: indices = numpy.array([i - 1 for i in indices]) uvw_mol = numpy.take(UVW, indices, axis=0) center = numpy.average(uvw_mol, axis=0) delta = numpy.floor(center) uvw_mol -= delta numpy.put_along_axis( UVW, numpy.expand_dims(indices, axis=1), uvw_mol, axis=0 ) if fractionals: if as_array: return UVW else: return UVW.tolist() else: return cell.to_cartesians(UVW, as_array=as_array) elif in_cell: # Need fractionals... if self.system.coordinate_system == 'Cartesian': UVW = cell.to_fractionals(xyz, as_array=True) elif not isinstance(xyz, numpy.ndarray): UVW = numpy.array(xyz) else: UVW = xyz delta = numpy.floor(UVW) UVW -= delta if fractionals: if as_array: return UVW else: return UVW.tolist() else: return cell.to_cartesians(UVW, as_array=as_array) else: if fractionals: if self.system.coordinate_system == 'Cartesian': return cell.to_fractionals(xyz, as_array=as_array) elif as_array: return numpy.array(xyz) else: return xyz else: if self.system.coordinate_system == 'fractional': return cell.to_cartesians(xyz, as_array=as_array) elif as_array: return numpy.array(xyz) else: return xyz def set_coordinates( self, xyz, subset=None, configuration=None, template_order=False, fractionals=True ): """Set the coordinates to new values. Parameters ---------- subset : int = None Set the atoms for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. template_order : bool = False If True, and there are template atoms associated with the atoms, the coordinates are in the order of the template. fractionals : bool = True The coordinates are fractional coordinates for periodic systems. Ignored for non-periodic systems. Returns ------- None """ if configuration is None: configuration = self.current_configuration as_array = isinstance(xyz, numpy.ndarray) x_column = self.get_column( 'x', subset=subset, configuration=configuration, template_order=template_order ) y_column = self.get_column( 'y', subset=subset, configuration=configuration, template_order=template_order ) z_column = self.get_column( 'z', subset=subset, configuration=configuration, template_order=template_order ) xs = [] ys = [] zs = [] periodicity = self.system.periodicity coord_system = self.system.coordinate_system if ( periodicity == 0 or (coord_system == 'Cartesian' and not fractionals) or (coord_system == 'fractional' and fractionals) ): if as_array: for x, y, z in xyz.tolist(): xs.append(x) ys.append(y) zs.append(z) else: for x, y, z in xyz: xs.append(x) ys.append(y) zs.append(z) else: cell = self.system['cell'].cell(configuration) if coord_system == 'fractional': # Convert coordinates to fractionals for x, y, z in cell.to_fractionals(xyz): xs.append(x) ys.append(y) zs.append(z) else: for x, y, z in cell.to_cartesians(xyz): xs.append(x) ys.append(y) zs.append(z) x_column[0:] = xs y_column[0:] = ys z_column[0:] = zs def get_column( self, key: str, subset: int = None, configuration: int = None, template_order: bool = False ) -> Any: """Get a Column object with the requested data Parameters ---------- key : str The attribute to get. subset : int = None Get the values for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. Returns ------- Column A Column object containing the data. """ if subset is None: subset = self.system.all_subset(configuration) if key in self._atom_table.attributes: sql = ( f'SELECT at.rowid, at.{key}, sa.templateatom' f' FROM {self._atom_tablename} as at, subset_atom as sa' f' WHERE at.id = sa.atom AND sa.subset = {subset}' ) if template_order: sql += " ORDER BY sa.templateatom" return Column(self._atom_table, key, sql=sql) elif key in self._coordinates_table.attributes: sql = ( f'SELECT co.rowid, co.{key}, sa.templateatom' f' FROM {self._atom_tablename} as at,' f' {self._coordinates_tablename} as co,' ' subset_atom as sa' f' WHERE co.atom = at.id AND at.id = sa.atom' f' AND sa.subset = {subset}' ) if template_order: sql += " ORDER BY sa.templateatom" return Column(self._coordinates_table, key, sql=sql) else: raise KeyError(f"'{key}' not in atoms") def to_atnos(self, symbols): """Convert element symbols to atomic numbers.""" return self._system.to_atnos(symbols) def _get_n_rows(self, **kwargs): """Get the total number of rows represented in the arguments.""" n_rows = None lengths = {} for key, value in kwargs.items(): if key not in self: raise KeyError(f'"{key}" is not an attribute of the atoms.') length = self.length_of_values(value) lengths[key] = length if n_rows is None: n_rows = 1 if length == 0 else length if length > 1 and length != n_rows: if n_rows == 1: n_rows = length else: raise IndexError( 'key "{}" has the wrong number of values, ' .format(key) + '{}. Should be 1 or the number of atoms ({}).' .format(length, n_rows) ) return n_rows, lengths def length_of_values(self, values: Any) -> int: """Return the length of the values argument. Parameters ---------- values : Any The values, which might be a string, single value, list, tuple, etc. Returns ------- length : int The length of the values. 0 indicates a scalar """ if isinstance(values, str): return 0 else: try: return len(values) except TypeError: return 0 def n_atoms(self, subset=None, configuration=None) -> int: """The number of atoms in a subset or configuration Parameters ---------- subset : int = None Get the atoms for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. Returns ------- int Number of atoms """ if subset is None: subset = self.system.all_subset(configuration) self.cursor.execute( "SELECT COUNT(*) FROM subset_atom WHERE subset = ?", (subset,) ) return self.cursor.fetchone()[0] def remove(self, atoms=None, subset=None, configuration=None) -> int: """Delete the atoms listed, or in a subset or configuration Parameters ---------- atoms : [int] = None The list of atoms to delete subset : int = None Get the atoms for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. Returns ------- None """ if atoms is not None: # Delete the listed atoms and coordinates subset = self.system.all_subset(configuration) # Need to handle bonds first self.system.bonds.remove(atoms=atoms, subset=subset) parameters = [(i,) for i in atoms] # Coordinates self.db.executemany( "DELETE FROM coordinates WHERE atom = ?", parameters ) # Atoms self.db.executemany("DELETE FROM atom WHERE id = ?", parameters) # Subset-Atoms self.db.executemany( "DELETE FROM subset_atom WHERE atom = ?", parameters ) return if subset is None: subset = self.system.all_subset(configuration) # Bonds only if removing all atoms, i.e. subset all self.system.bonds.remove(subset=subset) # Coordinates self.db.execute( "DELETE FROM coordinates" " WHERE atom IN (" " SELECT atom FROM subset_atom WHERE subset = ?" ")", (subset,) ) # Atoms self.db.execute( "DELETE FROM atom" " WHERE id IN (" " SELECT atom FROM subset_atom WHERE subset = ?" ")", (subset,) ) # Subset-Atoms self.db.execute("DELETE FROM subset_atom WHERE subset = ?", (subset,)) def symbols(self, subset=None, configuration: int = None) -> [str]: """The element symbols for the atoms in a subset or configuration Parameters ---------- subset : int = None Get the values for the subset. Defaults to the 'all/all' subset for the configuration given. configuration : int = None The configuration of interest. Defaults to the current configuration. Not used if the subset is given. Returns ------- [str] The element symbols """ return self._system.to_symbols( self.atomic_numbers(subset=subset, configuration=configuration) ) def to_dataframe(self): """Return the contents of the table as a Pandas Dataframe.""" data = {} rows = self.atoms() for row in rows: data[row[0]] = row[1:] columns = [x[0] for x in rows.description[1:]] df = pandas.DataFrame.from_dict(data, orient='index', columns=columns) return df