# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import os
import numpy as np
from functools import lru_cache
from .sile import SileSiesta
from ..sile import add_sile, sile_fh_open, SileError
from sisl._common import Opt
from sisl._internal import set_module
import sisl._array as _a
from sisl import Geometry, Atom, SuperCell
from sisl.utils import PropertyDict
from sisl.utils.cmd import *
from sisl.unit.siesta import unit_convert
__all__ = ['outSileSiesta']
Bohr2Ang = unit_convert('Bohr', 'Ang')
def _ensure_atoms(atoms):
""" Ensures that the atoms list is a list with entries (converts `None` to a list). """
if atoms is None:
return [Atom(i) for i in range(150)]
elif len(atoms) == 0:
return [Atom(i) for i in range(150)]
return atoms
@set_module("sisl.io.siesta")
class outSileSiesta(SileSiesta):
""" Output file from Siesta
This enables reading the output quantities from the Siesta output.
"""
_completed = None
def readline(self):
line = super().readline()
if 'Job completed' in line:
self._completed = True
return line
readline.__doc__ = SileSiesta.readline.__doc__
[docs] @sile_fh_open()
def completed(self):
""" True if the full file has been read and "Job completed" was found. """
if self._completed is None:
completed = self.step_to("Job completed")[0]
else:
completed = self._completed
if completed:
self._completed = True
return completed
[docs] @lru_cache(1)
@sile_fh_open(True)
def read_basis(self):
""" Reads the basis as found in the output file
This parses 3 things:
1. At the start of the file there are some initatom output
specifying which species in the calculation.
2. Reading the <basis_specs> entries for the masses
3. Reading the PAO.Basis block output for orbital information
"""
found, line = self.step_to("Species number:")
if not found:
return []
atoms = {}
order = []
while 'Species number:' in line:
ls = line.split()
if ls[3] == 'Atomic':
atoms[ls[7]] = {'Z': int(ls[5]), 'tag': ls[7]}
order.append(ls[7])
else:
atoms[ls[4]] = {'Z': int(ls[7]), 'tag': ls[4]}
order.append(ls[4])
line = self.readline()
# Now go down to basis_specs
found, line = self.step_to("<basis_specs>")
while found:
# =====
self.readline()
# actual line
line = self.readline().split("=")
tag = line[0].split()[0]
atoms[tag]["mass"] = float(line[2].split()[0])
found, line = self.step_to("<basis_specs>", reread=False)
block = []
found, line = self.step_to("%block PAO.Basis")
line = self.readline()
while not line.startswith("%endblock PAO.Basis"):
block.append(line)
line = self.readline()
from .fdf import fdfSileSiesta
atom_orbs = fdfSileSiesta._parse_pao_basis(block)
for atom, orbs in atom_orbs.items():
atoms[atom]["orbitals"] = orbs
return [Atom(**atoms[tag]) for tag in order]
def _r_supercell_outcell(self):
""" Wrapper for reading the unit-cell from the outcoor block """
# Read until outcell is found
found, line = self.step_to("outcell: Unit cell vectors")
if not found:
raise ValueError(f"{self.__class__.__name__}._r_supercell_outcell did not find outcell key")
Ang = 'Ang' in line
# We read the unit-cell vectors (in Ang)
cell = []
line = self.readline()
while len(line.strip()) > 0:
line = line.split()
cell.append([float(x) for x in line[:3]])
line = self.readline()
cell = _a.arrayd(cell)
if not Ang:
cell *= Bohr2Ang
return SuperCell(cell)
def _r_geometry_outcoor(self, line, atoms=None):
""" Wrapper for reading the geometry as in the outcoor output """
atoms_order = _ensure_atoms(atoms)
is_final = 'Relaxed' in line or 'Final (unrelaxed)' in line
# Now we have outcoor
scaled = 'scaled' in line
fractional = 'fractional' in line
Ang = 'Ang' in line
# Read in data
xyz = []
atoms = []
line = self.readline()
while len(line.strip()) > 0:
line = line.split()
if len(line) != 6:
break
xyz.append(line[:3])
atoms.append(atoms_order[int(line[3]) - 1])
line = self.readline()
# in outcoor we know it is always just after
# But not if not variable cell.
# Retrieve the unit-cell (but do not skip file-descriptor position)
# This is because the current unit-cell is not always written.
pos = self.fh.tell()
cell = self._r_supercell_outcell()
if is_final and self.fh.tell() < pos:
# we have wrapped around the file
self.fh.seek(pos, os.SEEK_SET)
xyz = _a.arrayd(xyz)
# Now create the geometry
if scaled:
# The output file for siesta does not
# contain the lattice constant.
# So... :(
raise ValueError("Could not read the lattice-constant for the scaled geometry")
elif fractional:
xyz = xyz.dot(cell.cell)
elif not Ang:
xyz *= Bohr2Ang
return Geometry(xyz, atoms, sc=cell)
def _r_geometry_atomic(self, line, atoms=None):
""" Wrapper for reading the geometry as in the outcoor output """
atoms_order = _ensure_atoms(atoms)
# Now we have outcoor
Ang = 'Ang' in line
# Read in data
xyz = []
atoms = []
line = self.readline()
while len(line.strip()) > 0:
line = line.split()
xyz.append([float(x) for x in line[1:4]])
atoms.append(atoms_order[int(line[4])-1])
line = self.readline()
# Retrieve the unit-cell (but do not skip file-descriptor position)
# This is because the current unit-cell is not always written.
pos = self.fh.tell()
cell = self._r_supercell_outcell()
self.fh.seek(pos, os.SEEK_SET)
# Convert xyz
xyz = _a.arrayd(xyz)
if not Ang:
xyz *= Bohr2Ang
return Geometry(xyz, atoms, sc=cell)
[docs] @sile_fh_open()
def read_geometry(self, last=True, all=False):
""" Reads the geometry from the Siesta output file
Parameters
----------
last: bool, optional
only read the last geometry
all: bool, optional
return a list of all geometries (like an MD)
If `True` `last` is ignored
Returns
-------
geometries: list or Geometry or None
if all is False only one geometry will be returned (or None). Otherwise
a list of geometries corresponding to the MD-runs.
"""
atoms = self.read_basis()
if all:
# force last to be false
last = False
def func_none(*args, **kwargs):
""" Wrapper to return None """
return None
def next_geom():
coord, func = 0, func_none
line = ' '
while coord == 0 and line != '':
line = self.readline()
if 'outcoor' in line and 'coordinates' in line:
coord, func = 1, self._r_geometry_outcoor
elif 'siesta: Atomic coordinates' in line:
coord, func = 2, self._r_geometry_atomic
return coord, func(line, atoms)
# Read until a coordinate block is found
geom0 = None
mds = []
if all or last:
# we need to read through all things!
while True:
coord, geom = next_geom()
if coord == 0:
break
if coord == 2:
geom0 = geom
else:
mds.append(geom)
# Since the user requests only the MD geometries
# we only return those
if last:
if len(mds) > 0:
return mds[-1]
return geom0
return mds
# just read the next geometry we hit
return next_geom()[1]
[docs] @sile_fh_open()
def read_force(self, last=True, all=False, total=False, max=False):
""" Reads the forces from the Siesta output file
Parameters
----------
last: bool, optional
only read the last force
all: bool, optional
return a list of all forces (like an MD)
If `True` `last` is ignored
total: bool, optional
return the total forces instead of the atomic forces.
max: bool, optional
whether only the maximum atomic force should be returned for each step.
Setting it to `True` is equivalent to `max(outSile.read_force())` in case atomic forces
are written in the output file (`WriteForces .true.` in the fdf file)
Note that this is not the same as doing `max(outSile.read_force(total=True))` since
the forces returned in that case are averages on each axis.
Returns
-------
numpy.ndarray or None
returns ``None`` if the forces are not found in the
output, otherwise forces will be returned
The shape of the array will be different depending on the type of forces requested:
- atomic (default): (nMDsteps, nAtoms, 3)
- total: (nMDsteps, 3)
- max: (nMDsteps, )
If `all` is `False`, the first dimension does not exist. In the case of max, the returned value
will therefore be just a float, not an array.
If `total` and `max` are both `True`, they are returned separately as a tuple: ``(total, max)``
"""
if all:
last = False
# Read until forces are found
def next_force():
line = self.readline()
while not 'siesta: Atomic forces' in line:
line = self.readline()
if line == '':
return None
# Now read data
F = []
line = self.readline()
if 'siesta:' in line:
# This is the final summary, we don't need to read it as it does not contain new information
# and also it make break things since max forces are not written there
return None
# First, we encounter the atomic forces
while '---' not in line:
line = line.split()
if not (total or max):
F.append([float(x) for x in line[-3:]])
line = self.readline()
if line == '':
break
line = self.readline()
# Then, the total forces
if total:
F = [float(x) for x in line.split()[-3:]]
line = self.readline()
#And after that we can read the max force
if max and len(line.split()) != 0:
line = self.readline()
maxF = float(line.split()[1])
# In case total is also requested, we are going to store it all in the same variable
# It will be separated later
if total:
F = (*F, maxF)
else:
F = maxF
return _a.arrayd(F)
def return_forces(Fs):
# Handle cases where we can't now if they are found
if Fs is None: return None
Fs = _a.arrayd(Fs)
if max and total:
return (Fs[..., :-1], Fs[..., -1])
elif max and not all:
return Fs.ravel()[0]
return Fs
if all or last:
# list of all forces
Fs = []
while True:
F = next_force()
if F is None:
break
Fs.append(F)
if last:
return return_forces(Fs[-1])
return return_forces(Fs)
return return_forces(next_force())
[docs] @sile_fh_open()
def read_stress(self, key='static', last=True, all=False):
""" Reads the stresses from the Siesta output file
Parameters
----------
key : {'static', 'total'}
which stress to read from the output.
last: bool, optional
only read the last stress
all: bool, optional
return a list of all stresses (like an MD)
If `True` `last` is ignored
Returns
-------
numpy.ndarray or None
returns ``None`` if the stresses are not found in the
output, otherwise stresses will be returned
"""
if all:
last = False
# Read until stress are found
def next_stress():
line = self.readline()
while not ('siesta: Stress tensor' in line and key in line):
line = self.readline()
if line == '':
return None
# Now read data
S = []
for _ in range(3):
line = self.readline().split()
S.append([float(x) for x in line[-3:]])
return _a.arrayd(S)
# list of all stresses
Ss = []
if all or last:
while True:
S = next_stress()
if S is None:
break
Ss.append(S)
if last:
return Ss[-1]
if self.completed() and key == 'static':
return Ss[:-1]
return Ss
return next_stress()
[docs] @sile_fh_open()
def read_moment(self, orbitals=False, quantity='S', last=True, all=False):
""" Reads the moments from the Siesta output file
These will only be present in case of spin-orbit coupling.
Parameters
----------
orbitals: bool, optional
return a table with orbitally resolved
moments.
quantity: {'S', 'L'}, optional
return the spin-moments or the L moments
last: bool, optional
only read the last force
all: bool, optional
return a list of all forces (like an MD)
If `True` `last` is ignored
"""
# Read until outcoor is found
itt = iter(self)
for line in itt:
if 'moments: Atomic' in line:
break
if not 'moments: Atomic' in line:
return None
# The moments are printed in SPECIES list
next(itt) # empty
next(itt) # empty
na = 0
# Loop the species
tbl = []
# Read the species label
while True:
next(itt) # ""
next(itt) # Atom Orb ...
# Loop atoms in this species list
while True:
line = next(itt)
if line.startswith('Species') or \
line.startswith('--'):
break
line = ' '
atom = []
ia = 0
while not line.startswith('--'):
line = next(itt).split()
if ia == 0:
ia = int(line[0])
elif ia != int(line[0]):
raise ValueError("Error in moments formatting.")
# Track maximum number of atoms
na = max(ia, na)
if quantity == 'S':
atom.append([float(x) for x in line[4:7]])
elif quantity == 'L':
atom.append([float(x) for x in line[7:10]])
line = next(itt).split() # Total ...
if not orbitals:
ia = int(line[0])
if quantity == 'S':
atom.append([float(x) for x in line[4:7]])
elif quantity == 'L':
atom.append([float(x) for x in line[8:11]])
tbl.append((ia, atom))
if line.startswith('--'):
break
# Sort according to the atomic index
moments = [] * na
# Insert in the correct atomic
for ia, atom in tbl:
moments[ia-1] = atom
if not all:
return _a.arrayd(moments)
return moments
[docs] @sile_fh_open()
def read_energy(self):
""" Reads the final energy distribution
Currently the energies translated are:
``band``
band structure energy
``kinetic``
electronic kinetic energy
``hartree``
electronic electrostatic Hartree energy
``dftu``
DFT+U energy
``spin_orbit``
spin-orbit energy
``extE``
external field energy
``xc``
exchange-correlation energy
``exchange``
exchange energy
``correlation``
correlation energy
``bulkV``
bulk-bias correction energy
``total``
total energy
``negf``
NEGF energy
``fermi``
Fermi energy
``ion.electron``
ion-electron interaction energy
``ion.ion``
ion-ion interaction energy
``ion.kinetic``
kinetic ion energy
Any unrecognized key gets added *as is*.
Examples
--------
>>> energies = sisl.get_sile("RUN.out").read_energy()
>>> ion_energies = energies.ion
>>> ion_energies.ion # ion-ion interaction energy
>>> ion_energies.kinetic # ion kinetic energy
>>> energies.fermi # fermi energy
Returns
-------
PropertyDict : dictionary like lookup table ionic energies are stored in a nested `PropertyDict` at the key ``ion`` (all energies in eV)
"""
found = self.step_to("siesta: Final energy", reread=False)[0]
out = PropertyDict()
out.ion = PropertyDict()
if not found:
return out
itt = iter(self)
# Read data
line = next(itt)
name_conv = {
"Band Struct.": "band",
"Kinetic": "kinetic",
"Hartree": "hartree",
"Edftu": "dftu",
"Eldau": "dftu",
"Eso": "spin_orbit",
"Ext. field": "extE",
"Exch.-corr.": "xc",
"Exch.": "exchange",
"Corr.": "correlation",
"Ekinion": "ion.kinetic",
"Ion-electron": "ion.electron",
"Ion-ion": "ion.ion",
"Bulk bias": "bulkV",
"Total": "total",
"Fermi": "fermi",
"Enegf": "negf",
}
while len(line.strip()) > 0:
key, val = line.split("=")
key = key.split(":")[1].strip()
key = name_conv.get(key, key)
if key.startswith("ion."):
# sub-nest
out.ion[key[4:]] = float(val)
else:
out[key] = float(val)
line = next(itt)
return out
[docs] def read_data(self, *args, **kwargs):
""" Read specific content in the Siesta out file
The currently implemented things are denoted in
the parameters list.
Note that the returned quantities are in the order
of keywords, so:
>>> read_data(geometry=True, force=True)
<geometry>, <force>
>>> read_data(force=True, geometry=True)
<force>, <geometry>
Parameters
----------
geometry: bool, optional
read geometry, args are passed to `read_geometry`
force: bool, optional
read force, args are passed to `read_force`
stress: bool, optional
read stress, args are passed to `read_stress`
moment: bool, optional
read moment, args are passed to `read_moment` (only for spin-orbit calculations)
energy: bool, optional
read final energies, args are passed to `read_energy`
"""
run = []
# This loops ensures that we preserve the order of arguments
# From Py3.6 and onwards the **kwargs is an OrderedDictionary
for kw in kwargs.keys():
if kw in ['geometry', 'force', 'moment', 'stress', 'energy']:
if kwargs[kw]:
run.append(kw)
# Clean running names
for name in run:
kwargs.pop(name)
val = []
for name in run:
val.append(getattr(self, 'read_{}'.format(name.lower()))(*args, **kwargs))
if len(val) == 0:
return None
elif len(val) == 1:
val = val[0]
return val
[docs] @sile_fh_open()
def read_scf(self, key="scf", iscf=-1, imd=None, as_dataframe=False):
r""" Parse SCF information and return a table of SCF information depending on what is requested
Parameters
----------
key : {'scf', 'ts-scf'}
parse SCF information from Siesta SCF or TranSiesta SCF
iscf : int, optional
which SCF cycle should be stored. If ``-1`` only the final SCF step is stored,
for None *all* SCF cycles are returned. When `iscf` values queried are not found they
will be truncated to the nearest SCF step.
imd: int or None, optional
whether only a particular MD step is queried, if None, all MD steps are
parsed and returned. A negative number wraps for the last MD steps.
as_dataframe: boolean, optional
whether the information should be returned as a `pandas.DataFrame`. The advantage of this
format is that everything is indexed and therefore you know what each value means.You can also
perform operations very easily on a dataframe.
"""
#These are the properties that are written in SIESTA scf
props = ["iscf", "Eharris", "E_KS", "FreeEng", "dDmax", "Ef", "dHmax"]
if not iscf is None:
if iscf == 0:
raise ValueError(f"{self.__class__.__name__}.read_scf requires iscf argument to *not* be 0!")
if not imd is None:
if imd == 0:
raise ValueError(f"{self.__class__.__name__}.read_scf requires imd argument to *not* be 0!")
def reset_d(d, line):
if line.startswith('SCF cycle converged'):
if len(d['data']) > 0:
d['_final_iscf'] = 1
elif line.startswith('SCF cycle continued'):
d['_final_iscf'] = 0
def common_parse(line, d):
if line.startswith('ts-Vha:'):
d['ts-Vha'] = float(line.split()[1])
elif line.startswith('bulk-bias: |v'):
d['bb-v'] = list(map(float, line.split()[-3:]))
if 'bb-vx' not in props:
props.extend(['BB-vx', 'BB-vy', 'BB-vz'])
elif line.startswith('bulk-bias: {q'):
d['bb-q'] = list(map(float, line.split()[-3:]))
if 'bb-q+' not in props:
props.extend(['BB-q+', 'BB-q-', 'BB-q0'])
else:
return False
return True
if key.lower() == 'scf':
def parse_next(line, d):
line = line.strip().replace('*', '0')
reset_d(d, line)
if common_parse(line, d):
pass
elif line.startswith('scf:'):
d['_found_iscf'] = True
if len(line) == 97:
# this should be for Efup/dwn
# but I think this will fail for as_dataframe (TODO)
data = [int(line[5:9]), float(line[9:25]), float(line[25:41]),
float(line[41:57]), float(line[57:67]), float(line[67:77]),
float(line[77:87]), float(line[87:97])]
elif len(line) == 87:
data = [int(line[5:9]), float(line[9:25]), float(line[25:41]),
float(line[41:57]), float(line[57:67]), float(line[67:77]),
float(line[77:87])]
else:
# Populate DATA by splitting
data = line.split()
data = [int(data[1])] + list(map(float, data[2:]))
d['data'] = data
elif key.lower() == 'ts-scf':
props.append("ts-Vha")
def parse_next(line, d):
line = line.strip().replace('*', '0')
reset_d(d, line)
if common_parse(line, d):
pass
elif line.startswith('ts-q:'):
data = line.split()[1:]
try:
d['ts-q'] = list(map(float, data))
except:
# We are probably reading a device list
# ensure that props are appended
if data[-1] not in props:
props.extend(data)
pass
elif line.startswith('ts-scf:'):
d['_found_iscf'] = True
if len(line) == 100:
data = [int(line[8:12]), float(line[12:28]), float(line[28:44]),
float(line[44:60]), float(line[60:70]), float(line[70:80]),
float(line[80:90]), float(line[90:100]), d['ts-Vha']] + d['ts-q']
elif len(line) == 90:
data = [int(line[8:12]), float(line[12:28]), float(line[28:44]),
float(line[44:60]), float(line[60:70]), float(line[70:80]),
float(line[80:90]), d['ts-Vha']] + d['ts-q']
else:
# Populate DATA by splitting
data = line.split()
data = [int(data[1])] + list(map(float, data[2:])) + [d['ts-Vha']] + d['ts-q']
d['data'] = data
# A temporary dictionary to hold information while reading the output file
d = {
'_found_iscf': False,
'_final_iscf': 0,
'data': [],
}
md = []
scf = []
for line in self:
parse_next(line, d)
if d['_found_iscf']:
d['_found_iscf'] = False
data = d['data']
if len(data) == 0:
continue
if iscf is None or iscf < 0:
scf.append(data)
elif data[0] <= iscf:
# this ensures we will retain the latest iscf in
# case the requested iscf is too big
scf = data
if d['_final_iscf'] == 1:
d['_final_iscf'] = 2
elif d['_final_iscf'] == 2:
d['_final_iscf'] = 0
data = d['data']
if len(data) == 0:
# this traps the case where we read ts-scf
# but find the final scf iteration.
# In that case we don't have any data.
scf = []
continue
if len(scf) == 0:
# this traps cases where final_iscf has
# been trickered but we haven't collected anything.
# I.e. if key == scf but ts-scf also exists.
continue
# First figure out which iscf we should store
if iscf is None: # or iscf > 0
# scf is correct
pass
elif iscf < 0:
# truncate to 0
scf = scf[max(len(scf) + iscf, 0)]
# Populate md
md.append(np.array(scf))
# Reset SCF data
scf = []
# In case we wanted a given MD step and it's this one, just stop reading
# We are going to return the last MD (see below)
if imd == len(md):
break
# Define the function that is going to convert the information of a MDstep to a Dataset
if as_dataframe:
import pandas as pd
def MDstep_dataframe(scf):
scf = np.atleast_2d(scf)
return pd.DataFrame(
scf[..., 1:],
index=pd.Index(scf[..., 0].ravel().astype(np.int32),
name="iscf"),
columns=props[1:]
)
# Now we know how many MD steps there are
# We will return stuff based on what the user requested
# For pandas DataFrame this will be dependent
# 1. all MD steps requested => imd == index, iscf == column (regardless of iscf==none|int)
# 2. 1 MD step requested => iscf == index
if imd is None:
if as_dataframe:
if len(md) == 0:
# return an empty dataframe (with imd as index)
return pd.DataFrame(index=pd.Index([], name="imd"),
columns=props)
# Regardless of what the user requests we will always have imd == index
# and iscf a column, a user may easily change this.
df = pd.concat(map(MDstep_dataframe, md),
keys=_a.arangei(1, len(md) + 1), names=["imd"])
if iscf is not None:
df.reset_index("iscf", inplace=True)
return df
if iscf is None:
# since each MD step may be a different number of SCF steps
# we cannot convert to a dense array
return md
return np.array(md)
# correct imd to ensure we check against the final size
imd = min(len(md) - 1, max(len(md) + imd, 0))
if len(md) == 0:
# no data collected
if as_dataframe:
return pd.DataFrame(index=pd.Index([], name="iscf"),
columns=props[1:])
return np.array(md[imd])
if imd > len(md):
raise ValueError(f"{self.__class__.__name__}.read_scf could not find requested MD step ({imd}).")
# If a certain imd was requested, get it
# Remember that if imd is positive, we stopped reading at the moment we reached it
scf = np.array(md[imd])
if as_dataframe:
return MDstep_dataframe(scf)
return scf
[docs] @sile_fh_open()
def read_charge(self, name, iscf=Opt.ANY, imd=Opt.ANY, key_scf="scf", as_dataframe=False):
r"""Read charges calculated in SCF loop or MD loop (or both)
Siesta enables many different modes of writing out charges.
NOTE: currently Mulliken charges are not implemented.
The below table shows a list of different cases that
may be encountered, the letters are referred to in the
return section to indicate what is returned.
+-----------+-----+-----+--------+-------+------------------+
| Case | *A* | *B* | *C* | *D* | *E* |
+-----------+-----+-----+--------+-------+------------------+
| Charge | MD | SCF | MD+SCF | Final | Orbital resolved |
+-----------+-----+-----+--------+-------+------------------+
| Voronoi | + | + | + | + | - |
+-----------+-----+-----+--------+-------+------------------+
| Hirshfeld | + | + | + | + | - |
+-----------+-----+-----+--------+-------+------------------+
| Mulliken | + | + | + | + | + |
+-----------+-----+-----+--------+-------+------------------+
Notes
-----
Errors will be raised if one requests information not present. I.e.
passing an integer or `Opt.ALL` for `iscf` will raise an error if
the SCF charges are not present. For `Opt.ANY` it will return
the most information, effectively SCF will be returned if present.
Currently Mulliken is not implemented, any help in reading this would be
very welcome.
Parameters
----------
name: {"voronoi", "hirshfeld"}
the name of the charges that you want to read
iscf: int or Opt, optional
index (0-based) of the scf iteration you want the charges for.
If the enum specifier `Opt.ANY` or `Opt.ALL` are used, then
the returned quantities depend on what is present.
If ``None/Opt.NONE`` it will not return any SCF charges.
If both `imd` and `iscf` are ``None`` then only the final charges will be returned.
imd: int or Opt, optional
index (0-based) of the md step you want the charges for.
If the enum specifier `Opt.ANY` or `Opt.ALL` are used, then
the returned quantities depend on what is present.
If ``None/Opt.NONE`` it will not return any MD charges.
If both `imd` and `iscf` are ``None`` then only the final charges will be returned.
key_scf : str, optional
the key lookup for the scf iterations (a ":" will automatically be appended)
as_dataframe: boolean, optional
whether charges should be returned as a pandas dataframe.
Returns
-------
numpy.ndarray
if a specific MD+SCF index is requested (or special cases where output is
not complete)
list of numpy.ndarray
if one both `iscf` or `imd` is different from ``None/Opt.NONE``.
pandas.DataFrame
if `as_dataframe` is requested. The dataframe will have multi-indices if multiple
SCF or MD steps are requested.
"""
if not hasattr(self, 'fh'):
with self:
return read_charge(self, name, iscf, imd, key_scf, as_dataframe)
namel = name.lower()
if as_dataframe:
import pandas as pd
def _empty_charge():
# build a fake dataframe with no indices
return pd.DataFrame(index=pd.Index([], name="atom", dtype=np.int32),
dtype=np.float32)
else:
pd = None
def _empty_charge():
# return for single value with nan values
return _a.arrayf([[None]])
# define helper function for reading voronoi+hirshfeld charges
def _voronoi_hirshfeld_charges():
""" Read output from Voronoi/Hirshfeld charges """
nonlocal pd
# Expecting something like this:
# Voronoi Atomic Populations:
# Atom # dQatom Atom pop S Sx Sy Sz Species
# 1 -0.02936 4.02936 0.00000 -0.00000 0.00000 0.00000 C
# Define the function that parses the charges
def _parse_charge(line):
atom_idx, *vals, symbol = line.split()
# assert that this is a proper line
# this should catch cases where the following line of charge output
# is still parseable
atom_idx = int(atom_idx)
return list(map(float, vals))
# first line is the header
header = (self.readline()
.replace("dQatom", "dq") # dQatom in master
.replace(" Qatom", " dq") # Qatom in 4.1
.replace("Atom pop", "e") # not found in 4.1
.split())[2:-1]
# We have found the header, prepare a list to read the charges
atom_charges = []
line = ' '
while line != "":
try:
line = self.readline()
charge_vals = _parse_charge(line)
atom_charges.append(charge_vals)
except:
# We already have the charge values and we reached a line that can't be parsed,
# this means we have reached the end.
break
if pd is None:
# not as_dataframe
return _a.arrayf(atom_charges)
# determine how many columns we have
# this will remove atom indices and species, so only inside
ncols = len(atom_charges[0])
assert ncols == len(header)
# the precision is limited, so no need for double precision
return pd.DataFrame(atom_charges, columns=header, dtype=np.float32,
index=pd.RangeIndex(stop=len(atom_charges), name="atom"))
# define helper function for reading voronoi+hirshfeld charges
def _mulliken_charges():
""" Read output from Mulliken charges """
raise NotImplementedError("Mulliken charges are not implemented currently")
# Check that a known charge has been requested
if namel == "voronoi":
_r_charge = _voronoi_hirshfeld_charges
charge_keys = ["Voronoi Atomic Populations",
"Voronoi Net Atomic Populations"]
elif namel == "hirshfeld":
_r_charge = _voronoi_hirshfeld_charges
charge_keys = ["Hirshfeld Atomic Populations",
"Hirshfeld Net Atomic Populations"]
elif namel == "mulliken":
_r_charge = _mulliken_charges
charge_keys = ["mulliken: Atomic and Orbital Populations"]
else:
raise ValueError(f"{self.__class__.__name__}.read_charge name argument should be one of {known_charges}, got {name}?")
# Ensure the key_scf matches exactly (prepend a space)
key_scf = f" {key_scf.strip()}:"
# Reading charges may be quite time consuming for large MD simulations.
# to see if we finished a MD read, we check for these keys
search_keys = [
# two keys can signal ending SCF
"SCF Convergence", "SCF_NOT_CONV",
"siesta: Final energy",
key_scf,
*charge_keys
]
# adjust the below while loop to take into account any additional
# segments of search_keys
IDX_SCF_END = [0, 1]
IDX_FINAL = [2]
IDX_SCF = [3]
# the rest are charge keys
IDX_CHARGE = list(range(len(search_keys) - len(charge_keys),
len(search_keys)))
# state to figure out where we are
state = PropertyDict()
state.INITIAL = 0
state.MD = 1
state.SCF = 2
state.CHARGE = 3
state.FINAL = 4
# a list of scf_charge
md_charge = []
md_scf_charge = []
scf_charge = []
final_charge = None
# signal that any first reads are INITIAL charges
current_state = state.INITIAL
charge = _empty_charge()
FOUND_SCF = False
FOUND_MD = False
FOUND_FINAL = False
# TODO whalrus
ret = self.step_to(search_keys, case=True, ret_index=True, reread=False)
while ret[0]:
if ret[2] in IDX_SCF_END:
# we finished all SCF iterations
current_state = state.MD
md_scf_charge.append(scf_charge)
scf_charge = []
elif ret[2] in IDX_SCF:
current_state = state.SCF
# collect scf-charges (possibly none)
scf_charge.append(charge)
elif ret[2] in IDX_FINAL:
current_state = state.FINAL
# don't do anything, this is the final charge construct
# regardless of where it comes from.
elif ret[2] in IDX_CHARGE:
FOUND_CHARGE = True
# also read charge
charge = _r_charge()
if state.INITIAL == current_state or state.CHARGE == current_state:
# this signals scf charges
FOUND_SCF = True
# There *could* be 2 steps if we are mixing H,
# this is because it first does
# compute H -> compute DM -> compute H
# in the first iteration, subsequently we only do
# compute compute DM -> compute H
# once we hit ret[2] in IDX_SCF we will append
scf_charge = []
elif state.MD == current_state:
FOUND_MD = True
# we just finished an SCF cycle.
# So any output between SCF ending and
# a new one beginning *must* be that geometries
# charge
# Here `charge` may be NONE signalling
# we don't have charge in MD steps.
md_charge.append(charge)
# reset charge
charge = _empty_charge()
elif state.SCF == current_state:
FOUND_SCF = True
elif state.FINAL == current_state:
FOUND_FINAL = True
# a special state writing out the charges after everything
final_charge = charge
charge = _empty_charge()
scf_charge = []
# we should be done and no other charge reads should be found!
# should we just break?
current_state = state.CHARGE
# step to next entry
ret = self.step_to(search_keys, case=True, ret_index=True, reread=False)
if not any((FOUND_SCF, FOUND_MD, FOUND_FINAL)):
raise SileError(f"{self!s} does not contain any charges ({name})")
# if the scf-charges are not stored, it means that the MD step finalization
# has not been read. So correct
if len(scf_charge) > 0:
assert False, "this test shouldn't reach here"
# we must not have read through the entire MD step
# so this has to be a running simulation
if charge is not None:
scf_charge.append(charge)
charge = _empty_charge()
md_scf_charge.append(scf_charge)
# otherwise there is some *parsing* error, so for now we use assert
assert len(scf_charge) == 0
if as_dataframe:
# convert data to proper data structures
# regardless of user requests. This is an overhead... But probably not that big of a problem.
if FOUND_SCF:
md_scf_charge = pd.concat([pd.concat(iscf,
keys=pd.RangeIndex(1, len(iscf)+1, name="iscf"))
for iscf in md_scf_charge],
keys=pd.RangeIndex(1, len(md_scf_charge)+1, name="imd"))
if FOUND_MD:
md_charge = pd.concat(md_charge, keys=pd.RangeIndex(1, len(md_charge)+1, name="imd"))
else:
if FOUND_SCF:
nan_array = _a.emptyf(md_scf_charge[0][0].shape)
nan_array.fill(np.nan)
def get_md_scf_charge(scf_charge, iscf):
try:
return scf_charge[iscf]
except:
return nan_array
if FOUND_MD:
md_charge = np.stack(md_charge)
# option parsing is a bit *difficult* with flag enums
# So first figure out what is there, and handle this based
# on arguments
def _p(flag, found):
""" Helper routine to do the following:
Returns
-------
is_opt : bool
whether the flag is an `Opt`
flag :
corrected flag
"""
if isinstance(flag, Opt):
# correct flag depending on what `found` is
# If the values have been found we
# change flag to None only if flag == NONE
# If the case has not been found, we
# change flag to None if ANY or NONE is in flags
if found:
# flag is only NONE, then pass none
if not (Opt.NONE ^ flag):
flag = None
else: # not found
# we convert flag to none
# if ANY or NONE in flag
if (Opt.NONE | Opt.ANY) & flag:
flag = None
return isinstance(flag, Opt), flag
opt_imd, imd = _p(imd, FOUND_MD)
opt_iscf, iscf = _p(iscf, FOUND_SCF)
if not (FOUND_SCF or FOUND_MD):
# none of these are found
# we request that user does not request any input
if (opt_iscf or (not iscf is None)) or \
(opt_imd or (not imd is None)):
raise SileError(f"{self!s} does not contain MD/SCF charges")
elif not FOUND_SCF:
if opt_iscf or (not iscf is None):
raise SileError(f"{self!s} does not contain SCF charges")
elif not FOUND_MD:
if opt_imd or (not imd is None):
raise SileError(f"{self!s} does not contain MD charges")
# if either are options they may hold
if opt_imd and opt_iscf:
if FOUND_SCF:
return md_scf_charge
elif FOUND_MD:
return md_charge
elif FOUND_FINAL:
# I think this will never be reached
# If neither are found they will be converted to
# None
return final_charge
raise SileError(f"{str(self)} unknown argument for 'imd' and 'iscf'")
elif opt_imd:
# flag requested imd
if not (imd & (Opt.ANY | Opt.ALL)):
# wrong flag
raise SileError(f"{str(self)} unknown argument for 'imd'")
if FOUND_SCF and iscf is not None:
# this should be handled, i.e. the scf should be taken out
if as_dataframe:
return md_scf_charge.groupby(level=[0, 2]).nth(iscf)
return np.stack(tuple(get_md_scf_charge(x, iscf) for x in md_scf_charge))
elif FOUND_MD and iscf is None:
return md_charge
raise SileError(f"{str(self)} unknown argument for 'imd' and 'iscf', could not find SCF charges")
elif opt_iscf:
# flag requested imd
if not (iscf & (Opt.ANY | Opt.ALL)):
# wrong flag
raise SileError(f"{str(self)} unknown argument for 'iscf'")
if imd is None:
# correct imd
imd = -1
if as_dataframe:
md_scf_charge = md_scf_charge.groupby(level=0)
group = list(md_scf_charge.groups.keys())[imd]
return md_scf_charge.get_group(group).droplevel(0)
return np.stack(md_scf_charge[imd])
elif imd is None and iscf is None:
if FOUND_FINAL:
return final_charge
raise SileError(f"{str(self)} does not contain final charges")
elif imd is None:
# iscf is not None, so pass through as though explicitly passed
imd = -1
elif iscf is None:
# we return the last MD step and the requested scf iteration
if as_dataframe:
return md_charge.groupby(level=1).nth(imd)
return md_charge[imd]
if as_dataframe:
# first select imd
md_scf_charge = md_scf_charge.groupby(level=0)
group = list(md_scf_charge.groups.keys())[imd]
md_scf_charge = md_scf_charge.get_group(group).droplevel(0)
return md_scf_charge.groupby(level=1).nth(iscf)
return md_scf_charge[imd][iscf]
add_sile('out', outSileSiesta, case=False, gzip=True)