# -*- coding: utf-8 -*-
"""A dictionary-like object for holding systems
"""
import collections.abc
import logging
from pathlib import Path
import shutil
import sqlite3
import tempfile
import pathvalidate
from molsystem.system import _System
logger = logging.getLogger(__name__)
[docs]class Systems(collections.abc.MutableMapping):
def __init__(self):
"""Initialize the systems with no systems."""
self._systems = {}
def __getitem__(self, key):
"""Allow [] access to the dictionary of systems"""
return self._systems[key]['system']
def __setitem__(self, key, value):
"""Allow x[key] access to the data"""
raise NotImplementedError(f"Table '{key}' cannot be created yet")
def __delitem__(self, key):
"""Allow deletion of keys"""
if key in self:
data = self._systems[key]
del data['system']
if data['temporary']:
if 'tempdir' in data:
shutil.rmtree(data['tempdir'])
else:
data['path'].unlink()
del self._systems[key]
else:
raise KeyError(
f"Trying to delete system '{key}', which does not exist."
)
def __iter__(self):
"""Allow iteration over the object"""
return iter(self._systems)
def __len__(self):
"""The len() command"""
return len(self._systems)
def __repr__(self):
"""The string representation of this object"""
return repr(self._systems)
def __str__(self):
"""The pretty string representation of this object"""
return str(self._systems)
def __contains__(self, key):
"""Return a boolean indicating if a system exists."""
return key in self._systems
def __eq__(self, other):
"""Return a boolean if this object is equal to another"""
raise NotImplementedError()
[docs] def list(self):
"""Return a list of the systems."""
return [*self._systems]
[docs] def create_system(self, name, filename=None, temporary=False, force=False):
"""Create a system with a given name, and optionally a filename."""
if name in self:
raise KeyError(f"System '{name}' already exists.")
self._systems[name] = data = {}
data['temporary'] = temporary
if temporary:
tmp = pathvalidate.sanitize_filename(name + '.db', platform='auto')
data['tempdir'] = Path(tempfile.mkdtemp())
path = data['tempdir'] / tmp
elif filename is None:
tmp = pathvalidate.sanitize_filename(name + '.db', platform='auto')
path = Path(tmp)
else:
tmp = pathvalidate.sanitize_filename(filename, platform='auto')
path = Path(tmp)
path = path.expanduser().resolve()
if path.exists():
if force:
path.unlink()
else:
raise RuntimeError(f"File '{path}' exists!")
filename = str(path)
system = _System(self, nickname=name, filename=filename)
data['system'] = system
data['path'] = path
return system
[docs] def copy_system(
self, other, name=None, filename=None, temporary=False, force=False
):
"""Create a copy of a system, optionally with a given name and
filename."""
if name is None:
tmp_name = other.name
name = tmp_name + '_copy'
i = 1
while name in self:
i += 1
name = f'{tmp_name}_copy_{i}'
elif name in self:
raise KeyError(f"System '{name}' already exists.")
self._systems[name] = data = {}
data['temporary'] = temporary
if temporary:
tmp = pathvalidate.sanitize_filename(name + '.db', platform='auto')
data['tempdir'] = Path(tempfile.mkdtemp())
path = data['tempdir'] / tmp
elif filename is None:
tmp = pathvalidate.sanitize_filename(name + '.db', platform='auto')
path = Path(tmp)
# Get simple absolute path
path = path.expanduser().resolve()
if path.exists():
if force:
path.unlink()
else:
raise RuntimeError(f"File '{path}' exists!")
filename = str(path)
# Copy the database over
db = sqlite3.connect(filename)
other.db.commit()
other.db.backup(db)
db.close()
# and open it
system = _System(self, nickname=name, filename=filename)
data['system'] = system
data['path'] = path
return system
[docs] def open_system(self, filename, name=None, temporary=False):
"""Open an existing system with a given name."""
path = Path(filename)
if name is None:
name = path.with_suffix('').name
if name in self:
raise RuntimeError(f"System '{name}' already exists!")
path = path.expanduser().resolve()
if not path.exists():
raise RuntimeError(f"File '{path}' does not exist!")
self._systems[name] = data = {}
data['temporary'] = temporary
filename = str(path)
# and open it
system = _System(self, nickname=name, filename=filename)
data['system'] = system
data['path'] = path
return system
[docs] def overwrite(self, system, other):
"""Overwrite a system with the contents of another."""
system.db.commit()
system.cursor.close()
system.db.close()
path = self._systems[system.nickname]['path']
path.unlink()
system._db = sqlite3.connect(path)
system._db.row_factory = sqlite3.Row
system._db.execute('PRAGMA foreign_keys = ON')
system._cursor = system._db.cursor()
other.db.commit()
other.db.backup(system._db)