#>=============================================================================
#>IphreeqcPy a python wrapper for Iphreeqc
#>-----------------------------------------------------------------------------
#>
#>Copyright (C) 2016 Ravi Patel
#
#>This program is free software: you can redistribute it and/or modify
#>it under the terms of the GNU Lesser General Public License as
#>published by the Free Software Foundation, version 3
#>This program is distributed in the hope that it will be useful,
#>but WITHOUT ANY WARRANTY; without even the implied warranty of
#>MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#>GNU General Public License for more details.
#>You should have received a copy of the GNU Lesser General Public License
#>along with this program. If not, see <http://www.gnu.org/licenses/>.
#>=============================================================================
import ctypes
import os
import sys
import platform
__version__ = '1.0-beta'
if sys.version_info[0] == 2:
def bytes(str_, encoding):
"""Compatibilty function for Python 3.
"""
return str_
range = xrange
[docs]class IPhreeqc():
"""
Python wrapper for the compiled IPhreeqc.
Parameters
----------
dll_path (optional) string
path for the iphreeqc compiled library .dll for windows or .so for linux
Note
----
IPhreeqc is compiled on installation for Linux GNU compilers are used
and for windows visual studio 10 compilers are used. By default these compiled
files are used.
"""
def __init__(self, dll_path=None):
if not dll_path:
system= platform.system()
if system == 'Windows':
dll_name = 'IPhreeqc.dll'
elif system == 'Linux':
dll_name = 'libiphreeqc.so'
else:
msg = 'Platform %s is not supported.' % sys.platform
raise NotImplementedError(msg)
dll_path = os.path.join(os.path.dirname(__file__), dll_name)
phreeqc = ctypes.cdll.LoadLibrary(dll_path)
c_int = ctypes.c_int
#map iphreeqc methods
method_mapping = [('_AccumulateLine', phreeqc.AccumulateLine,
[c_int, ctypes.c_char_p], c_int),
('_AddError', phreeqc.AddError,
[c_int, ctypes.c_char_p], c_int),
('_AddWarning', phreeqc.AddWarning,
[c_int, ctypes.c_char_p], c_int),
('_ClearAccumulatedLines',
phreeqc.ClearAccumulatedLines, [c_int], c_int),
('_CreateIPhreeqc', phreeqc.CreateIPhreeqc,
[ctypes.c_voidp], c_int),
('_DestroyIPhreeqc', phreeqc.DestroyIPhreeqc,
[c_int], c_int),
('_GetComponent', phreeqc.GetComponent,
[c_int, c_int], ctypes.c_char_p),
('_GetComponentCount', phreeqc.GetComponentCount,
[c_int], c_int),
('_GetErrorString', phreeqc.GetErrorString,
[c_int], ctypes.c_char_p),
('_RunAccumulated', phreeqc.RunAccumulated,
[c_int], c_int),
('_GetSelectedOutputColumnCount',
phreeqc.GetSelectedOutputColumnCount, [c_int],
c_int),
('_GetSelectedOutputRowCount',
phreeqc.GetSelectedOutputRowCount, [c_int], c_int),
('_GetSelectedOutputValue', phreeqc.GetSelectedOutputValue,
[c_int, c_int, c_int, ctypes.POINTER(_VAR)], c_int),
('_LoadDatabase', phreeqc.LoadDatabase,
[c_int, ctypes.c_char_p], c_int),
('_LoadDatabaseString', phreeqc.LoadDatabaseString,
[c_int, ctypes.c_char_p], c_int),
('_RunString', phreeqc.RunString,
[c_int, ctypes.c_char_p], c_int),
('_SetSelectedOutputFileOn',
phreeqc.SetSelectedOutputFileOn, [c_int, c_int],
c_int),
('_SetSelectedOutputFileOff',
phreeqc.SetSelectedOutputFileOn, [c_int, c_int],
c_int),
('_SetDumpFileOn',
phreeqc.SetDumpFileOn, [c_int,c_int],
c_int),
('_SetDumpFileOff',
phreeqc.SetDumpFileOn, [c_int,c_int],
c_int),
('_SetDumpStringOn',
phreeqc.SetDumpStringOn, [c_int,c_int],
c_int),
('_SetDumpStringOff',
phreeqc.SetDumpStringOn, [c_int,c_int],
c_int),
('_GetDumpString',
phreeqc.GetDumpString, [c_int],
ctypes.c_char_p),
]
for name, com_obj, argtypes, restype in method_mapping:
com_obj.argtypes = argtypes
com_obj.restype = restype
setattr(self, name, com_obj)
self.var = _VAR ()
self.phc_error_count = 0
self.phc_warning_count = 0
self.phc_database_error_count = 0
self.id = self.CreateIPhreeqc()
@staticmethod
def _RaisePhreeqcError(error_code):
"""
There was an error, raise an exception.
Parameters
----------
error_code: integer
equal to 0 gives 'ok'
equal to -1 raises 'out of memory'
equal to -2 raises 'bad value'
equal to -3 raises 'invalid argument type'
equal to -4 raises 'invalid row'
equal to -5 raises 'invalid column'
equal to -6 raises 'invalid instance id'
"""
error_types = {0: 'ok', -1: 'out of memory', -2: 'bad value',
-3: 'invalid argument type', -4: 'invalid row',
-5: 'invalid column', -6: 'invalid instance id'}
error_type = error_types[error_code]
if error_type:
raise PhreeqcException(error_type)
def _RaiseStringError(self, errors):
"""
Raise an exception with message from IPhreeqc error.
Parameters
----------
errors: integer
Number of error occured
"""
if errors > 1:
msg = '%s errors occured.\n' % errors
elif errors == 1:
msg = 'An error occured.\n'
else:
msg = 'Wrong error number.'
raise Exception(msg + self.GetErrorString())
[docs] def AccumulateLine(self, line):
"""
Accumlate line(s) for input to phreeqc
"""
errors = self._AccumulateLine(self.id, bytes(line, 'utf-8'))
if errors != 0:
self._RaiseStringError(errors)
[docs] def RunAccumulated(self):
"""
Run the input buffer as defined by calls to :func:`~IphreeqcPy.IPhreeqc.AccumulateLine`
"""
errors = self._RunAccumulated(self.id)
if errors != 0:
self._RaiseStringError(errors)
[docs] def AddError(self, phc_error_msg):
"""
Appends the given error message and increments the error count. Internally
used fo create an error condition
Parameters
----------
phc_error_msg: string
Error message to append
"""
errors = self._AddError(self.id, bytes(phc_error_msg, 'utf-8'))
if errors < 0:
self._RaiseStringError(errors)
else:
self.phc_error_count = errors
[docs] def AddWarning(self, phc_warn_msg):
"""
Appends the given warning message and increments the warning count.
Internally used to create warning condition
Parameters
----------
phc_warn_msg: string
Warning massage to append
"""
errors = self._AddWarning(self.id, bytes(phc_warn_msg, 'utf-8'))
if errors < 0:
self._RaiseStringError(errors)
else:
self.phc_warning_count = errors
[docs] def ClearAccumulatedLines(self):
"""
Clear the accumlated input buffer. Input buffer is accumlated from
calls to :func:`~IphreeqcPy.IPhreeqc.AccumulateLine`
"""
errors = self._ClearAccumulatedLines(self.id)
if errors != 0:
self._RaiseStringError(errors)
@property
def GetSelectedOutputColumnCount(self):
"""
Retrieves the number of columns in the selected output buffer
"""
return self._GetSelectedOutputColumnCount(self.id)
[docs] def CreateIPhreeqc(self):
"""
Create a new IPhreeqc instance
"""
error_code = self._CreateIPhreeqc(ctypes.c_voidp())
if error_code < 0:
self._RaisePhreeqcError(error_code)
id = error_code
return id
[docs] def DestroyIPhreeqc(self):
"""
Release an IPhreeqc instance from memory
"""
error_code = self._DestroyIPhreeqc(self.id)
if error_code < 0:
self._RaisePhreeqcError(error_code)
[docs] def GetComponent(self, index):
"""
Retrieves the given component
Parameters
----------
index: integer
The zero-based index of the component to retrieve
"""
component = self._GetComponent(self.id, index).decode('utf-8')
if not component:
raise IndexError('No component for index %s' % index)
return component
@property
def GetComponentCount(self):
"""
Retrieves the number of component in the current component list
"""
return self._GetComponentCount(self.id)
[docs] def GetComponentList(self):
"""
Get names of all components
Returns
-------
list
list with component names
"""
get_component = self.GetComponent
return [get_component(index) for index in range(self.GetComponentCount)]
[docs] def GetErrorString(self):
"""
Retrieves the error messages from the last call to :func:`~IphreeqcPy.IPhreeqc.RunAccumulated`,
:func:`~IphreeqcPy.IPhreeqc.RunFile`, :func:`~IphreeqcPy.IPhreeqc.RunString`, :func:`~IphreeqcPy.IPhreeqc.LoadDatabase`,
:func:`~IphreeqcPy.IPhreeqc.LoadDatabaseString`
Returns
-------
string
Error string
"""
return self._GetErrorString(self.id).decode('utf-8')
[docs] def GetSelectedOutputValue(self, row, col):
"""
Get one value from selected output at given row and column
Parameters
----------
row: integer
row index
column: integer
column index
Returns
-------
Real
Selected output value
"""
error_code = self._GetSelectedOutputValue(self.id, row, col, self.var)
if error_code != 0:
self._RaisePhreeqcError(error_code)
type_ = self.var.type
value = self.var.value
if type_ == 3:
val = value.double_value
elif type_ == 2:
val = value.long_value
elif type_ == 4:
val = value.string_value.decode('utf-8')
elif type_ == 0:
val = None
if type_ == 1:
self.raise_error(value.error_code)
return val
[docs] def GetSelectedOutputArray(self):
"""
Get all values from selected output
Returns
-------
list
All values of the selected output in multi-list form
"""
nrows = self.GetSelectedOutputRowCount
ncols = self.GetSelectedOutputColumnCount
results = []
for row in range(nrows):
result_row = []
for col in range(ncols):
result_row.append(self.GetSelectedOutputValue(row, col))
results.append(result_row)
return results
[docs] def GetSelectedOutputRow(self, row):
"""
Get all values for one row from selected output
Parameters
----------
row: integer
row index
Returns
-------
list
list of selected output for a given row
"""
if row < 0:
row = self.GetSelectedOutputRowCount + row
ncols = self.GetSelectedOutputColumnCount
results = []
for col in range(ncols):
results.append(self.GetSelectedOutputValue(row, col))
return results
[docs] def GetSelectedOutputCol(self, col):
"""
Get all values for one column from selected output
Parameters
----------
col: integer
column index
Returns
-------
list
list of selected output for a given column
"""
if col < 0:
col = self.GetSelectedOutputColumnCount + col
nrows = self.GetSelectedOutputRowCount
results = []
for row in range(nrows):
results.append(self.GetSelectedOutputValue(row, col))
return results
[docs] def SetSelectedOutputFileOff(self):
"""
Turn on writing to selected output file
"""
self._SetSelectedOutputFileOff(self.id, 0)
[docs] def SetSelectedOutputFileOn(self):
"""
Turn on writing to selected output file
"""
self._SetSelectedOutputFileOn(self.id, 1)
[docs] def LoadDatabase(self, database_name):
"""
Load a database with given file_name
Parameters
----------
database_name: string
path to the database. IphreeqcPy comes with all databases with
phreeqc and cemdata07.To one of this database type the relevant
filename. Filenames are listed below
* alkaline.dat
* cemdata07.dat
* ex15.dat
* llnl.dat
* mcatexch.dat
* minteq.dat
* minteq.V4.dat
* phreeqc.dat
* phreeqcU.dat
* phreeqd.dat
* pitzer.dat
* wateq4f.dat
"""
fpath = os.path.join(os.path.dirname(__file__),'databases',database_name)
if os.path.isfile(fpath):
database_name=fpath
self.phc_database_error_count = self._LoadDatabase(
self.id, bytes(database_name, 'utf-8'))
[docs] def LoadDatabaseString(self, input_string):
"""
Load a datbase from a string
Parameters
----------
input_string: string
input database string
"""
self.phc_database_error_count = self._LoadDatabaseString(
self.id, ctypes.c_char_p(bytes(input_string, 'utf-8')))
@property
def GetSelectedOutputRowCount(self):
"""
Get number of rows in selected output
"""
return self._GetSelectedOutputRowCount(self.id)
[docs] def RunString(self, cmd_string):
"""
Run PHREEQC input from string
Parameters
----------
cmd_string: string
string of phreeqc command to be executed
"""
errors = self._RunString(self.id,
ctypes.c_char_p(bytes(cmd_string, 'utf-8')))
if errors != 0:
self._RaiseStringError(errors)
[docs] def RunFile(self,Filename):
"""
run instructions from a file
Parameters
----------
Filename: string
path to file to run instructions from
"""
input_file = open(Filename,'r')
frun = input_file.read()
errors = self._RunString(self.id,
ctypes.c_char_p(bytes(frun, 'utf-8')))
if errors != 0:
self._RaiseStringError(errors)
[docs] def SetDumpFileOn(self):
"""
Set the dump file switch on to write dump file
"""
errors = self._SetDumpFileOn(self.id,1)
if errors != 0:
self._RaiseStringError(errors)
[docs] def SetDumpFileOff(self):
"""
Set the dump file switch off
"""
errors = self._SetDumpFileOff(self.id,0)
if errors != 0:
self._RaiseStringError(errors)
[docs] def SetDumpStringOn(self):
"""
Set the dump string switch on to get dump string
"""
errors = self._SetDumpStringOn(self.id,1)
if errors != 0:
self._RaiseStringError(errors)
[docs] def SetDumpStringOff(self):
"""
Set the dump string switch off
"""
errors = self._SetDumpStringOff(self.id,0)
if errors != 0:
self._RaiseStringError(errors)
[docs] def GetDumpString(self):
"""
Gives dump string as output
"""
return self._GetDumpString(self.id)
class _VARUNION(ctypes.Union):
# pylint: disable-msg=R0903
# no methods
"""Union with types.
See Var.h in PHREEQC source.
"""
_fields_ = [('long_value', ctypes.c_long),
('double_value', ctypes.c_double),
('string_value', ctypes.c_char_p),
('error_code', ctypes.c_int)]
class _VAR (ctypes.Structure):
# pylint: disable-msg=R0903
# no methods
"""Struct with data type and data values.
See Var.h in PHREEQC source.
"""
_fields_ = [('type', ctypes.c_int),
('value', _VARUNION)]
class PhreeqcException(Exception):
"""Error in Phreeqc call.
"""
pass
def test():
"""
"""
x=IPhreeqc()
x.LoadDatabase('cemdata07.dat')
x.SetDumpStringOn()
x.AccumulateLine(
"""
solution 0-1
-pH 7 charge
-water 1.0
Equilibrium phases 1
portlandite 0 1
save solution 1
save Equilibrium phases 1
save solution 0
selected_output
-file abstracted_model.xls
-totals Ca Si
-temp true
-high_precision true
-equilibrium_phases portlandite
Dump
-all
end
"""
)
x.RunAccumulated()
x.RunString(
"""
use solution 1
use Equilibrium phases 1
use solution 0
Advection
-cells 1
-shifts 10000
-punch_frequency 500
"""
)
print x.GetSelectedOutputArray()
print x.GetDumpString()
return
if __name__ == '__main__':
test()