import os
import pandas as pd
import numpy as np
import shutil
import stat
import sys
import inspect
import subprocess
from functools import partial
from pathlib import Path
from pkg_resources import resource_filename as resource
from pysumma import Simulation
from string import Template
from typing import List, Dict, Union
def read_template(path):
with open(path, 'r') as f:
OST_FILE= f.read()
return Template(OST_FILE)
resource = partial(resource, __name__)
# Paths to template files
INPT_FILE = resource('meta/ostIn.template')
EXEC_FILE = resource('meta/model_executable.template')
SAVE_FILE = resource('meta/save_parameters.template')
# Templates
INPT_META = read_template(INPT_FILE)
EXEC_META = read_template(EXEC_FILE)
SAVE_META = read_template(SAVE_FILE)
[docs]class Ostrich():
"""
Provides a high level interface to the OSTRICH optimization package.
This class can currently only be used for single-objective optimization
using the DDS algorithm as defined in the template file. Currently the
metrics calculated are KGE, MAE, and MSE as defined in the evaluation
package, though more metrics can be implemmented quite easily.
A basic workflow for this object is:
::
import pysumma as ps
summa_exe = './summa.exe'
ostrich_exe = './ostrich.exe'
file_manager = './file_manager.txt'
python_exe = '/pool0/data/andrbenn/.conda/all/bin/python'
ostrich = ps.Ostrich(ostrich_exe, summa_exe, file_manager, python_path=python_exe)
ostrich.calib_params = [
ps.OstrichParam('paramName', starValue, (minValue, maxValue)),
]
ostrich.obs_data_file = 'obs_data.nc'
ostrich.sim_calib_var = 'sim_varname'
ostrich.obs_calib_var = 'obs_varname'
ostrich.write_config()
ostrich.run()
Attributes
----------
ostrich:
Path to OSTRICH executable
python_path:
Path to Python executable used for the ``run_script``.
Note, you may need to set this if you are running the calibration
from inside a non-default environment (ie from conda/poetry/etc)!
summa:
Path to the SUMMA executable
template:
OSTRICH configuration file template
save_template:
Template for script to save best parameters
run_template:
Template for script to run and evaluate SUMMA
config_path:
Path to location of calibration runs/logs
simulation:
pysumma Simulation object used as template
file_manager:
File manager file for SUMMA simulation
seed:
Random seed for calibration
errval:
Error value for OSTRICH
perturb_val:
Strength of parameter perturbations during calibration
max_iters:
Number of calibration trial runs
cost_function:
Metric to use when ranking calibration runs
maximize:
Whether to maximize the ``cost_function``
simulation_kwargs:
Keyword arguments to pass to the simulation run function
"""
def __init__(self, ostrich_executable, summa_executable, file_manager, python_path=sys.executable):
"""Initialize a new Ostrich object"""
self.default_metrics: np.ndarray = np.array(['KGE', 'MAE', 'MSE', 'RMSE', 'NSE'])
self.ostrich: str = os.path.abspath(ostrich_executable)
self.python_path: str = python_path
self.summa: str = summa_executable
self.template: Template = INPT_META
self.save_template: Template = SAVE_META
self.run_template: Template = EXEC_META
self.config_path: Path = Path(os.path.abspath(file_manager)).parent / 'calibration'
self.simulation = Simulation(summa_executable, file_manager,
config_dir=self.config_path)
self.file_manager = self.simulation.manager
self.run_script: Path = self.config_path / 'run_script.py'
self.save_script: Path = self.config_path / 'save_script.py'
self.metrics_file: Path = self.config_path / 'metrics.txt'
self.metrics_log: Path = self.config_path / 'metrics_log.csv'
self.import_strings: str = ''
self.function_strings: str = ''
self.conversion_function: callable = lambda x: x
self.filter_function: callable = lambda x, y: (x, y)
self.preserve_output: str = 'no'
self.seed: int = 42
self.errval: float = -9999
self.perturb_val: float = 0.2
self.max_iters: int = 100
self.calib_params: List[OstrichParam] = []
self.tied_params: List[OstrichTiedParam] = []
self.cost_function: Union[str, callable] = 'KGE'
self.cost_function_code: str = ''
self.objective_function: str = 'gcop'
self.maximize: bool = True
self.simulation_kwargs: Dict = {}
self.allow_failures: bool = False
self.obs_data_file: str = None
self.sim_calib_vars: List = None
self.obs_calib_vars: List = None
[docs] def run(self, prerun_cmds=[], monitor=True):
"""Start calibration run"""
if len(prerun_cmds):
preprocess_cmd = " && ".join(prerun_cmds) + " && "
else:
preprocess_cmd = ""
cmd = preprocess_cmd + f'cd {str(self.config_path)} && ./ostrich'
self.cmd = cmd
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
if monitor:
self.stdout, self.stderr = self.process.communicate()
if isinstance(self.stdout, bytes):
self.stderr = self.stderr.decode('utf-8', 'ignore')
self.stdout = self.stdout.decode('utf-8', 'ignore')
[docs] def test_runscript(self, prerun_cmds=[], monitor=True):
"""Run a single instance of the underlying runscript"""
if len(prerun_cmds):
preprocess_cmd = " && ".join(prerun_cmds) + " && "
else:
preprocess_cmd = ""
cmd = preprocess_cmd + f'cd {str(self.config_path)} && ./run_script.py'
self.cmd = cmd
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
if monitor:
self.stdout, self.stderr = self.process.communicate()
if isinstance(self.stdout, bytes):
self.stderr = self.stderr.decode('utf-8', 'ignore')
self.stdout = self.stdout.decode('utf-8', 'ignore')
def monitor(self):
if not self.process:
return
else:
self.stdout, self.stderr = self.process.communicate()
if isinstance(self.stdout, bytes):
self.stderr = self.stderr.decode('utf-8', 'ignore')
self.stdout = self.stdout.decode('utf-8', 'ignore')
return self.stdout, self.stderr
[docs] def write_config(self):
"""Writes all necessary files for calibration"""
self.validate()
if not os.path.exists(self.config_path):
os.mkdir(self.config_path)
# Substitue templates and save
self.weightTemplateFile = self.write_weight_template_section()
self.weightValueFile = self.write_weight_value_section()
with open(self.config_path / 'ostIn.txt', 'w') as f:
f.write(self.template.substitute(self.map_vars_to_template))
with open(self.save_script, 'w') as f:
f.write(self.save_template.substitute(self.map_vars_to_save_template))
self.simulation._write_configuration()
with open(self.run_script, 'w') as f:
f.write(self.run_template.substitute(self.map_vars_to_run_template))
shutil.copy(self.ostrich, self.config_path / 'ostrich')
# Make sure we set permissions for execution
st = os.stat(self.config_path / 'ostrich')
os.chmod(self.config_path / 'ostrich', st.st_mode | stat.S_IEXEC)
st = os.stat(self.run_script)
os.chmod(self.run_script, st.st_mode | stat.S_IEXEC)
st = os.stat(self.save_script)
os.chmod(self.save_script, st.st_mode | stat.S_IEXEC)
[docs] def write_weight_template_section(self, file_name=Path('param_mapping.tpl')) -> Path:
"""Write the parameter name mapping for OSTRICH"""
with open(self.config_path / file_name, 'w') as f:
for cp in self.calib_params:
if cp.weightname.endswith('mtp'):
f.write(f'{cp.realname} | {cp.weightname}\n')
for tp in self.tied_params:
if tp.realname.endswith('mtp'):
f.write(f'{tp.realname.replace("_mtp", "")} | {tp.realname}\n')
return Path('.') / file_name
[docs] def write_weight_value_section(self, file_name='param_weights.txt') -> Path:
"""Write the parameter values for OSTRICH"""
with open(self.config_path / file_name, 'w') as f:
f.write('\n'.join([f'{cp.realname} | {cp.value}'
for cp in self.calib_params]) + '\n')
return Path('.') / file_name
def add_tied_param(self, param_name, lower_bound, upper_bound, initial_value=0.5):
self.calib_params.append(OstrichParam(f'{param_name}', initial_value, (0.01, 0.99),
weightname=f'{param_name}_scale'))
self.tied_params.append(OstrichTiedParam(param_name, lower_bound, upper_bound))
@property
def param_section(self) -> str:
"""Write list of calibration parameters"""
return '\n'.join(str(param) for param in self.calib_params)
@property
def tied_param_section(self) -> str:
"""Write list of tied calibration parameters"""
if len(self.tied_params):
return '\n'.join(str(param) for param in self.tied_params)
else:
return '# nothing to do here'
@property
def response_section(self) -> str:
"""Write section of OSTRICH configuration for selecting metric"""
try:
metric_row = np.argwhere(self.cost_function == self.default_metrics)[0][0]
except IndexError:
metric_row = -1
return f"{self.cost_function} {self.metrics_file}; OST_NULL {metric_row} 1 ' '"
@property
def tied_response_section(self) -> str:
"""Write section for determining if we are maximizing or minimizing the metric"""
if self.maximize:
return f'neg{self.cost_function} 1 {self.cost_function} wsum -1.00'
else:
return '# nothing to do here'
def open_metrics_log(self):
columns = ['kge', 'mae', 'mse', 'rmse', 'nse']
if self.cost_function not in columns:
columns.append(self.cost_function)
file = str(self.metrics_log)
if os.path.exists(file):
df = pd.read_csv(file, names=columns)
return df
else:
#TODO: Error handling
return None
def open_parameter_log(self):
file = str(self.config_path) + '/OstModel0.txt'
if os.path.exists(file):
df = pd.read_csv(file, delim_whitespace=True)
return df
else:
#TODO: Error handling
return None
[docs] def validate(self):
"""Try to make sure the configuration is usable"""
# Ensure observation data file exists
assert os.path.isfile(self.obs_data_file), \
(f"Observed file path doesn't exist!"
" You specified {self.obs_data_file}")
# Ensure the filter function has 2 args
filter_fun_args = len(dict(inspect.signature(self.filter_function).parameters))
assert filter_fun_args == 2, \
(f"The filter function must have two inputs so that it can be applied",
" to both the simulated data and the observed data!")
# Ensure the conversion function has 1 args
convert_fun_args = len(dict(inspect.signature(self.conversion_function).parameters))
assert convert_fun_args == 1, \
(f"The conversion function must have only one input so",
" that it can be applied to the observed data!")
# Ensure variables for calibration match up
assert self.sim_calib_vars is not None, "sim_calib_vars cannot be None!"
assert self.obs_calib_vars is not None, "obs_calib_vars cannot be None!"
if type(self.sim_calib_vars) == str:
self.sim_calib_vars = [self.sim_calib_vars]
if type(self.obs_calib_vars) == str:
self.obs_calib_vars = [self.obs_calib_vars]
assert len(self.sim_calib_vars) == len(self.obs_calib_vars), \
(f"The number of simulated calibration variables must match",
f" the number of observed variables to compare against.",
f" You gave {len(self.sim_calib_vars)} and",
f" {len(self.obs_calib_vars)}, respectively")
if callable(self.cost_function):
self.cost_function_code = inspect.getsource(self.cost_function)
self.cost_function = self.cost_function.__name__
@property
def map_vars_to_template(self):
"""For completion of the OSTRICH input template"""
return {'runScript': self.run_script,
'objectiveFun': self.objective_function,
'saveScript': self.save_script,
'preserveOutput': self.preserve_output,
'seed': self.seed,
'errval': self.errval,
'perturbVal': self.perturb_val,
'maxIters': self.max_iters,
'paramSection': self.param_section,
'tiedParamSection': self.tied_param_section,
'responseSection': self.response_section,
'tiedResponseSection': self.tied_response_section,
'costFunction': f'neg{self.cost_function}' if self.maximize else self.cost_function,
'weightTemplateFile': self.weightTemplateFile,
'weightValueFile': self.weightValueFile
}
@property
def map_vars_to_save_template(self):
"""For completion of the parameter saving template"""
return {'pythonPath': self.python_path,
'saveDir': self.config_path.parent / 'best_calibration',
'modelDir': self.config_path}
@property
def map_vars_to_run_template(self):
"""For completion of the model run script template"""
return {
'pythonPath': self.python_path,
'summaExe': self.summa,
'fileManager': self.simulation.manager_path,
'obsDataFile': os.path.abspath(self.obs_data_file),
'simVarList': self.sim_calib_vars,
'obsVarList': self.obs_calib_vars,
'outFile': self.metrics_file,
'metricsLog': self.metrics_log,
'importStrings': self.import_strings,
'functionStrings': self.function_strings,
'costFunctionCode': self.cost_function_code,
'costFunction': self.cost_function,
'maximize': self.maximize,
'conversionFunc': "=".join(inspect.getsource(self.conversion_function).split('=')[1:]),
'filterFunc': "=".join(inspect.getsource(self.filter_function).split('=')[1:]),
'paramMappingFile': self.weightTemplateFile,
'paramWeightFile': self.weightValueFile,
'simulationArgs': self.simulation_kwargs,
'allowFailures': self.allow_failures,
'paramFile': (self.simulation.manager['settingsPath'].value
+ self.simulation.manager['trialParamFile'].value),
}
class OstrichParam():
"""
Definition of a SUMMA parameter to be optimized by OSTRICH
Parameters
----------
realname:
Parameter name as seen by SUMMA
weightname:
Parameter name as seen by OSTRICH
value:
Default value
lower:
Lower bound for parameter value
upper:
Upper bound for parameter value
"""
def __init__(self, name, value, val_range, weightname=''):
self.realname = name
if not weightname:
self.weightname = f'{name}_mtp'
else:
self.weightname = weightname
self.value = value
self.lower, self.upper = val_range
def __str__(self):
return f"{self.weightname} {self.value} {self.lower} {self.upper} none none none free"
class OstrichTiedParam():
def __init__(self, name, lower_param, upper_param):
self.realname = f'{name}_mtp'
self.weightname = f'{name}_scale'
self.lower_param = f'{lower_param}_mtp'
self.upper_param = f'{upper_param}_mtp'
@property
def type_data(self):
"""This corresponds to the equation y = x2 + x1x3 - x1x2"""
if self.lower_param and self.upper_param:
return "ratio 0 -1 1 0 0 1 0 0 0 0 0 0 0 0 0 1 free"
elif self.lower_param:
raise NotImplementedError()
return ""
elif self.upper_param:
raise NotImplementedError()
return ""
def __str__(self):
return f"{self.realname} 3 {self.weightname} {self.lower_param} {self.upper_param} {self.type_data}"