Source code for sisl.io.sile

from __future__ import print_function, division

from os.path import splitext, isfile
import gzip

import numpy as np

from sisl import Geometry
from sisl.utils.misc import str_spec
from ._help import *


# Public used objects
__all__ = [
    'add_sile',
    'get_sile_class',
    'get_sile',
    'get_siles']

__all__ += [
    'BaseSile',
    'Sile',
    'SileCDF',
    'SileBin',
    'SileError',
    ]

# Decorators or sile-specific functions
__all__ += [
    'Sile_fh_open',
    'sile_raise_write',
    'sile_raise_read']

# Global container of all Sile rules
# This list of tuples is formed as
#  [('fdf', fdfSileSiesta, fdfSileSiesta),
#   ('fdf', ncSileSiesta, fdfSileSiesta)]
# [0] is the file-endding
# [1] is the base class that may be queried
# [2] is the actual class the file represents
# This enables one to add several files with the
# same extension and query it based on a sub-class
__sile_rules = []
__siles = []


[docs]def add_sile(ending, cls, case=True, gzip=False, _parent_cls=None): """ Public for attaching lookup tables for allowing users to attach files for the IOSile function call Parameters ---------- ending : str The file-name ending, it can be several file endings (.TBT.nc) cls : `BaseSile` child An object that is associated with the respective file. It must be inherited from a `BaseSile`. case : bool, (True) Whether case sensitivity is applicable for determining file. gzip : bool, (False) Whether files with `.gz` endings can be read. This option should only be given to files with ASCII text output. It will automatically call: >>> add_sile(ending+'.gz',...,gzip=False) to add the gzipped file to the list of possible files. """ global __sile_rules, __siles # Only add pure suffixes... if ending[0] == '.': add_sile(ending[1:], cls, case=case, gzip=gzip, _parent_cls=_parent_cls) return # The parent_obj is the actual class used to construct # the output file if _parent_cls is None: _parent_cls = cls # If it isn't already in the list of # Siles, add it. if cls not in __siles: __siles.append(cls) # These classes will never be added to the # children. It makes no sense to # have bases which are common for all files. # Or does it? # What if a file-extension may both represent a # formatted, or a binary file? # In that case should: # def_cls = [object, BaseSile] def_cls = [object, BaseSile, Sile, SileBin, SileCDF] # First we find the base-class # This base class must not be # BaseSile # Sile # SileBin # SileCDF def get_children(cls): # List all childern children = list(cls.__bases__) for child in children: cchildren = get_children(child) for cchild in cchildren: if cchild not in children: children.append(cchild) remove_cls(children, def_cls) return children def remove_cls(l, clss): for ccls in clss: if ccls in l: l.remove(ccls) # Finally we append the child objects inherited = get_children(cls) # Now we should remove all objects which are descendants from # another object in the list # We also default this base-class to be removed rem = [object, cls] for ccls in inherited: inh = get_children(ccls) for cccls in inh: if cccls in inherited: rem.append(cccls) remove_cls(inherited, rem) # Now, we finally have a list of classes which # are a single sub-class of the actual class. for ccls in inherited: add_sile(ending, ccls, case=case, gzip=gzip, _parent_cls=_parent_cls) # If the gzip is none, we decide whether we can # read gzipped files # In particular, if the cls is a `Sile`, we allow # such reading if not case: add_sile(ending.lower(), cls, gzip=gzip, _parent_cls=_parent_cls) add_sile(ending.upper(), cls, gzip=gzip, _parent_cls=_parent_cls) else: # Add the rule of the sile to the list of rules. __sile_rules.append((ending, cls, _parent_cls)) if gzip: add_sile(ending + '.gz', cls, case=case, _parent_cls=_parent_cls)
[docs]def get_sile_class(file, *args, **kwargs): """ Guess the ``Sile`` class corresponding to the input file and return the class Parameters ---------- file : str the file to be quried for a correct `Sile` object. This file name may contain {<class-name>} which sets `cls` in case `cls` is not set. For instance: water.xyz will return an ``XYZSile``. cls : class In case there are several files with similar file-suffixes you may query the exact base-class that should be chosen. If there are several ``Sile``s with similar file-endings this function returns a random one. """ global __sile_rules, __siles # This ensures that the first argument need not be cls cls = kwargs.pop('cls', None) # Split filename into proper file name and # the specification of the type tmp_file, fcls = str_spec(file) if cls is None and not fcls is None: # cls has not been set, and fcls is found # Figure out if fcls is a valid sile, if not # do nothing (it may be part of the file name) # Which is REALLY obscure... but....) for sile in __siles: if sile.__name__.lower().startswith(fcls.lower()): cls = sile # Make sure that {class-name} is # removed from the file name file = tmp_file break try: # Create list of endings on this file f = file end_list = [] end = '' # Check for files without ending, or that they are directly zipped lext = splitext(f) while len(lext[1]) > 0: end = lext[1] + end if end[0] == '.': end_list.append(end[1:]) else: end_list.append(end) lext = splitext(lext[0]) # We also check the entire file name # (mainly for VASP) end_list.append(f) # Reverse to start by the longest extension # (allows grid.nc extensions, etc.) end_list = list(reversed(end_list)) # First we check for class AND file ending for end in end_list: for suf, base, fobj in __sile_rules: if end != suf: continue if cls is None: return fobj elif cls == base: return fobj # Now we skip the limitation of the suffix, # now only the base-class is necessary. for end in end_list: # Check for object for suf, base, fobj in __sile_rules: if cls == base: return fobj del end_list raise NotImplementedError('sile not implemented: {}'.format(file)) except NotImplementedError as e: pass except Exception as e: import traceback as t t.print_exc() raise e raise NotImplementedError("Sile for file '"+ file + "' could not be found, possibly the file has not been implemented.")
[docs]def get_sile(file, *args, **kwargs): """ Guess the ``Sile`` corresponding to the input file and return an open object of the corresponding ``Sile`` Parameters ---------- file : str the file to be quried for a correct `Sile` object. This file name may contain {<class-name>} which sets `cls` in case `cls` is not set. For instance: water.dat{XYZSile} will read the file water.dat as an `XYZSile`. cls : class In case there are several files with similar file-suffixes you may query the exact base-class that should be chosen. If there are several `Sile`s with similar file-endings this function returns a random one. """ cls = kwargs.pop('cls', None) sile = get_sile_class(file, *args, cls=cls, **kwargs) return sile(str_spec(file)[0], *args, **kwargs)
[docs]def get_siles(attrs=[None]): """ Returns all siles with a specific attribute (or all) Parameters ---------- attrs : list of attribute names limits the returned sile-objects to those that have the given attributes `hasattr(sile, attrs)` """ global __siles if len(attrs) == 1 and attrs[0] is None: return list(__siles) siles = [] for sile in __siles: for attr in attrs: if hasattr(sile, attr): siles.append(sile) break return siles
[docs]class BaseSile(object): """ Base class for the Siles """
[docs] def read(self, *args, **kwargs): """ Generic read method which should be overloaded in child-classes Parameters ---------- **kwargs : keyword arguments will try and search for the attribute `read_<>` and call it with the remaining ``**kwargs`` as arguments. """ for key in kwargs.keys(): # Loop all keys and try to read the quantities if hasattr(self, "read_" + key): func = getattr(self, "read_" + key) # Call read return func(kwargs[key], **kwargs)
[docs] def read_sc(self, *args, **kwargs): """ Deprecated function which is superseeded by `read_supercell` """ if getattr(self, 'read_supercell'): return self.read_supercell(*args, **kwargs) raise ValueError('read_sc is deprecated, please use read_supercell')
[docs] def read_geom(self, *args, **kwargs): """ Deprecated function which is superseeded by `read_geometry` """ if getattr(self, 'read_geometry'): return self.read_geometry(*args, **kwargs) raise ValueError('read_geom is deprecated, please use read_geometry')
[docs] def read_es(self, *args, **kwargs): """ Deprecated function which is superseeded by `read_hamiltonian` """ if getattr(self, 'read_hamiltonian'): return self.read_hamiltonian(*args, **kwargs) raise ValueError('read_es is deprecated, please use read_hamiltonian')
# Options for writing # The default routine for writing _write_default = None # Whether only the default should be used # when issuing a write _write_default_only = False
[docs] def write(self, *args, **kwargs): """ Generic write method which should be overloaded in child-classes Parameters ---------- **kwargs : keyword arguments will try and search for the attribute `write_<>` and call it with the remaining ``**kwargs`` as arguments. """ if self._write_default is not None: self._write_default(*args, **kwargs) if self._write_default_only: return for key in kwargs.keys(): # Loop all keys and try to write the quantities if hasattr(self, "write_" + key): func = getattr(self, "write_" + key) # Call write func(kwargs[key], **kwargs)
[docs] def write_sc(self, *args, **kwargs): """ Deprecated function which is superseeded by `write_supercell` """ if getattr(self, 'write_supercell'): return self.write_supercell(*args, **kwargs) raise ValueError('write_sc is deprecated, please use write_supercell')
[docs] def write_geom(self, *args, **kwargs): """ Deprecated function which is superseeded by `write_geometry` """ if getattr(self, 'write_geometry'): return self.write_geometry(*args, **kwargs) raise ValueError('write_geom is deprecated, please use write_geometry')
[docs] def write_es(self, *args, **kwargs): """ Deprecated function which is superseeded by `write_hamiltonian` """ if getattr(self, 'write_hamiltonian'): return self.write_hamiltonian(*args, **kwargs) raise ValueError('write_es is deprecated, please use write_hamiltonian')
def _setup(self, *args, **kwargs): """ Setup the `Sile` after initialization Inherited method for setting up the sile. This method can be overwritten. """ pass def __setup(self, *args, **kwargs): """ Setup the `Sile` after initialization Inherited method for setting up the sile. This method **must** be overwritten *and* end with ``self._setup()``. """ self._setup(*args, **kwargs) def __getattr__(self, name): """ Override to check the handle """ if name == 'fh': raise AttributeError("The filehandle has not been opened yet...") return getattr(self.fh, name) @classmethod def _ArgumentParser_args_single(cls): """ Default arguments for the Sile """ return {} # Define the custom ArgumentParser
[docs] def ArgumentParser(self, parser=None, *args, **kwargs): """ Returns the arguments that may be available for this Sile Parameters ---------- parser: ArgumentParser the argument parser to add the arguments to. """ raise NotImplementedError("The ArgumentParser of '"+self.__class__.__name__+"' has not been implemented yet.")
[docs] def ArgumentParser_out(self, parser=None, *args, **kwargs): """ Appends additional arguments based on the output of the file Parameters ---------- parser: ArgumentParser the argument parser to add the arguments to. """ pass
def __repr__(self): """ Return a representation of the `Sile` """ return ''.join([self.__class__.__name__, '(', self.file, ')'])
[docs]def Sile_fh_open(func): """ Method decorator for objects to directly implement opening of the file-handle upon entry (if it isn't already). """ def pre_open(self, *args, **kwargs): if hasattr(self, "fh"): return func(self, *args, **kwargs) else: with self: return func(self, *args, **kwargs) return pre_open
[docs]class Sile(BaseSile): """ Class to contain a file with easy access """ def __init__(self, filename, mode='r', comment='#'): self._file = filename self._mode = mode if isinstance(comment, (list, tuple)): self._comment = list(comment) else: self._comment = [comment] self._line = 0 # Initialize self.__setup() @property def file(self): """ File of the current `Sile` """ return self._file def __setup(self): """ Setup the `Sile` after initialization Inherited method for setting up the sile. This method must **never** be overwritten. """ self._setup() def _open(self): if self.file.endswith('gz'): self.fh = gzip.open(self.file) else: self.fh = open(self.file, self._mode) self._line = 0 def __enter__(self): """ Opens the output file and returns it self """ self._open() return self def __exit__(self, type, value, traceback): self.fh.close() # clean-up so that it does not exist delattr(self, 'fh') self._line = 0 return False @staticmethod
[docs] def is_keys(keys): """ Returns true if ``isinstance(keys,(list,np.ndarray))`` """ return isinstance(keys, (list, np.ndarray))
@staticmethod
[docs] def key2case(key, case): """ Converts str/list of keywords to proper case """ if case: return key return key.lower()
@staticmethod
[docs] def keys2case(keys, case): """ Converts str/list of keywords to proper case """ if case: return keys return [k.lower() for k in keys]
@staticmethod
[docs] def line_has_key(line, key, case=True): if case: return line.find(key) >= 0 return line.lower().find(key) >= 0
@staticmethod
[docs] def line_has_keys(line, keys, case=True): found = False if case: for key in keys: found |= line.find(key) >= 0 else: l = line.lower() for key in keys: found |= l.find(key) >= 0 return found
[docs] def readline(self, comment=False): """ Reads the next line of the file """ l = self.fh.readline() self._line += 1 if comment: return l while starts_with_list(l, self._comment): l = self.fh.readline() self._line += 1 return l
[docs] def step_to(self, keywords, case=True, reread=True): """ Steps the file-handle until the keyword is found in the input """ # If keyword is a list, it just matches one of the inputs found = False # The previously read line... line = self._line # Do checking outside line checks if self.is_keys(keywords): line_has = self.line_has_keys keys = self.keys2case(keywords, case) else: line_has = self.line_has_key keys = self.key2case(keywords, case) while not found: l = self.readline() if l == '': break found = line_has(l, keys, case=case) if not found and (l == '' and line > 0) and reread: # We may be in the case where the user request # reading the same twice... # So we need to re-read the file... self.fh.close() # Re-open the file... self._open() # Try and read again while not found and self._line <= line: l = self.readline() if l == '': break found = line_has(l, keys, case=case) # sometimes the line contains information, as a # default we return the line found return found, l
[docs] def step_either(self, keywords, case=True): """ Steps the file-handle until the keyword is found in the input """ # If keyword is a list, it just matches one of the inputs # Do checking outside line checks if self.is_keys(keywords): line_has = self.line_has_key keys = self.keys2case(keywords, case) else: found, l = self.step_to(keywords, case) return found, 0, l # initialize found = False j = -1 while not found: l = self.readline() if l == '': break for i, key in enumerate(keys): found = line_has(l, key, case=case) if found: j = i break # sometimes the line contains information, as a # default we return the line found return found, j, l
def _write(self, *args, **kwargs): """ Wrapper to default the write statements """ self.fh.write(*args, **kwargs)
# Instead of importing netCDF4 on each invocation # of the __enter__ functioon (below), we make # a pass around it _netCDF4 = None def _import_netCDF4(): global _netCDF4 if _netCDF4 is None: import netCDF4 as _netCDF4
[docs]class SileCDF(BaseSile): """ Class to contain a file with easy access The file format for this file is the NetCDF file format """ def __init__(self, filename, mode='r', lvl=0, access=1, _open=True): """ Creates/Opens a SileCDF Opens a SileCDF with `mode` and compression level `lvl`. If `mode` is in read-mode (r) the compression level is ignored. The final `access` parameter sets how the file should be open and subsequently accessed. 0) means direct file access for every variable read 1) means stores certain variables in the object. """ self._file = filename # Open mode self._mode = mode # Save compression internally self._lvl = lvl # Initialize the _data dictionary for access == 1 self._data = dict() if isfile(self.file): self._access = access else: # If it has not been created we should not try # and read anything, no matter what the user says self._access = 0 # The CDF file can easily open the file if _open: global _import_netCDF4 _import_netCDF4() self.__dict__['fh'] = _netCDF4.Dataset(self.file, self._mode, format='NETCDF4') # Must call setup-methods self.__setup() @property def file(self): """ Filename of the current `Sile` """ return self._file def _setup(self, *args, **kwargs): """ Simple setup that needs to be overloaded """ pass def __setup(self): """ Setup `SileCDF` after initialization """ self._setup() @property def _cmp_args(self): """ Returns the compression arguments for the NetCDF file Do >>> nc.createVariable(..., **self._cmp_args) """ return {'zlib': self._lvl > 0, 'complevel': self._lvl} def __enter__(self): """ Opens the output file and returns it self """ # We do the import here return self def __exit__(self, type, value, traceback): return False def _dimension(self, name, tree=None): """ Local method for obtaing the dimension in a certain tree """ return self._dimensions(self, name, tree) @staticmethod def _dimensions(n, name, tree=None): """ Retrieve method to get the NetCDF variable """ if tree is None: return n.dimensions[name] g = n if isinstance(tree, list): for t in tree: g = g.groups[t] else: g = g.groups[tree] return g.dimensions[name] def _variable(self, name, tree=None): """ Local method for obtaining the data from the SileCDF. This method returns the variable as-is. """ if self._access > 0: if name in self._data: return self._data[name] return self._variables(self, name, tree=tree) def _value(self, name, tree=None): """ Local method for obtaining the data from the SileCDF. This method returns the value of the variable. """ return self._variable(name, tree)[:] @staticmethod def _variables(n, name, tree=None): """ Retrieve method to get the NetCDF variable """ if tree is None: return n.variables[name] g = n if isinstance(tree, list): for t in tree: g = g.groups[t] else: g = g.groups[tree] return g.variables[name] @staticmethod def _crt_grp(n, name): if '/' in name: groups = name.split('/') grp = n for group in groups: if len(group) > 0: grp = _crt_grp(grp, group) return grp if name in n.groups: return n.groups[name] return n.createGroup(name) @staticmethod def _crt_dim(n, name, l): if name in n.dimensions: return n.createDimension(name, l) @staticmethod def _crt_var(n, name, *args, **kwargs): if name in n.variables: return n.variables[name] attr = None if 'attr' in kwargs: attr = kwargs.pop('attr') v = n.createVariable(name, *args, **kwargs) if attr is not None: for name in attr: setattr(v, name, attr[name]) return v @classmethod
[docs] def isDimension(cls, obj): """ Return true if ``obj`` is an instance of the NetCDF4 ``Dimension`` type This is just a wrapper for ``isinstance(obj, netCDF4.Dimension)``. """ return isinstance(obj, _netCDF4.Dimension)
@classmethod
[docs] def isVariable(cls, obj): """ Return true if ``obj`` is an instance of the NetCDF4 ``Variable`` type This is just a wrapper for ``isinstance(obj, netCDF4.Variable)``. """ return isinstance(obj, _netCDF4.Variable)
@classmethod
[docs] def isGroup(cls, obj): """ Return true if ``obj`` is an instance of the NetCDF4 ``Group`` type This is just a wrapper for ``isinstance(obj, netCDF4.Group)``. """ return isinstance(obj, _netCDF4.Group)
@classmethod
[docs] def isDataset(cls, obj): """ Return true if ``obj`` is an instance of the NetCDF4 ``Dataset`` type This is just a wrapper for ``isinstance(obj, netCDF4.Dataset)``. """ return isinstance(obj, _netCDF4.Dataset)
isRoot = isDataset
[docs] def iter(self, group=True, dimension=True, variable=True, levels=-1, root=None): """ Iterator on all groups, variables and dimensions. This iterator iterates through all groups, variables and dimensions in the ``Dataset`` The generator sequence will _always_ be: 1. Group 2. Dimensions in group 3. Variables in group As the dimensions are generated before the variables it is possible to copy groups, dimensions, and then variables such that one always ensures correct dependencies in the generation of a new ``SileCDF``. Parameters ---------- group : ``bool`` (`True`) whether the iterator yields `Group` instances dimension : ``bool`` (`True`) whether the iterator yields `Dimension` instances variable : ``bool`` (`True`) whether the iterator yields `Variable` instances levels : ``int`` (`-1`) number of levels to traverse, with respect to ``root`` variable, i.e. number of sub-groups this iterator will return. root : ``str`` (`None`) the base root to start iterating from. Examples -------- Script for looping and checking each instance. >>> for gv in self.iter(): >>> if self.isGroup(gv): >>> # is group >>> elif self.isDimension(gv): >>> # is dimension >>> elif self.isVariable(gv): >>> # is variable """ if root is None: head = self.fh else: head = self.fh[root] # Start by returning the root group if group: yield head if dimension: for dim in head.dimensions.values(): yield dim if variable: for var in head.variables.values(): yield var if levels == 0: # Stop the iterator return for grp in head.groups.values(): for dvg in self.iter(group, dimension, variable, levels=levels-1, root=grp.path): yield dvg
__iter__ = iter
[docs]class SileBin(BaseSile): """ Class to contain a file with easy access The file format for this file is a binary format. """ def __init__(self, filename, mode='r'): """ Creates/Opens a SileBin Opens a SileBin with `mode` (b). If `mode` is in read-mode (r). """ self._file = filename # Open mode self._mode = mode.replace('b', '') + 'b' # Must call setup-methods self.__setup() @property def file(self): """ File of the current `Sile` """ return self._file def _setup(self, *args, **kwargs): """ Simple setup that needs to be overwritten """ pass def __setup(self): """ Setup `SileBin` after initialization """ self._setup() def __enter__(self): """ Opens the output file and returns it self """ return self def __exit__(self, type, value, traceback): return False
[docs]class SileError(IOError): """ Define an error object related to the Sile objects """ def __init__(self, value, obj=None): self.value = value self.obj = obj def __str__(self): s = '' if self.obj: s = self.obj.__name__ + '(' + self.obj.file + ')' return self.value + ' in ' + s
[docs]def sile_raise_write(self): if not ('w' in self._mode or 'a' in self._mode): raise SileError('Writing to a read-only file not possible', self)
[docs]def sile_raise_read(self): if not ('r' in self._mode or 'a' in self._mode): raise SileError('Reading a write-only file not possible', self)