import os
import copy
import shutil
import subprocess
import numpy as np
import pandas as pd
import xarray as xr
from glob import glob
from pathlib import Path
from typing import List
from .decisions import Decisions
from .file_manager import FileManager
from .output_control import OutputControl
from .global_params import GlobalParams
from .force_file_list import ForcingList, ForcingOption
[docs]class Simulation():
"""
The simulation object provides a wrapper for SUMMA simulations
Attributes
----------
stdout:
Store standard output of the run
stderr:
Handle to the process during runtime
manager_path:
Path to the file manager
config_path:
Path to where configuration will be written
status:
Current status of the simulation
manager:
File manager object (populated after calling ``initialize``)
decisions:
Decisions object (populated after calling ``initialize``)
output_control:
OutputControl object (populated after calling ``initialize``)
trial_params:
Spatially distributed parameters (populated after calling ``initialize``)
force_file_list:
ForcingList object (populated after calling ``initialize``)
global_hru_params:
GlobalParams object for hru (populated after calling ``initialize``)
global_gru_params:
GlobalParams object for gru (populated after calling ``initialize``)
local_attributes:
LocalAttributes object (populated after calling ``initialize``)
initial_conditions:
InitialConditions object (populated after calling ``initialize``)
"""
def __init__(self, executable, filemanager, initialize=True, config_dir='.pysumma'):
"""Initialize a new simulation object"""
self.stdout = None
self.stderr = None
self.process = None
self.executable = executable
self.manager_path = Path(os.path.abspath(os.path.realpath(filemanager)))
self.config_path = self.manager_path.parent / config_dir
self.status = 'Uninitialized'
if initialize:
self.initialize()
[docs] def initialize(self):
"""
Initialize reads in all of the relevant files. This may not
be desired on instantiation, so the ``initialize`` parameter
can be set in the constructor. Calling this will also create
a backup of the configuration that can be restored via the
``reset`` method.
"""
self.manager = FileManager(
self.manager_path.parent, self.manager_path.name)
self.status = 'Initialized'
self.decisions = self.manager.decisions
self.output_control = self.manager.output_control
self.trial_params = self.manager.trial_params
self.force_file_list = self.manager.force_file_list
self.global_hru_params = self.manager.global_hru_params
self.global_gru_params = self.manager.global_gru_params
self.local_attributes = self.manager.local_attributes
self.initial_conditions = self.manager.initial_conditions
self.genparm = self.manager.genparm
self.mptable = self.manager.mptable
self.soilparm = self.manager.soilparm
self.vegparm = self.manager.vegparm
self.create_backup()
self.status = 'Initialized'
[docs] def apply_config(self, config: dict):
"""
Change the settings of the simulation based on a configuration
dictionary.
Parameters
----------
config:
A dictionary where keys represent the type of change and
the values represent the changes to be applied. A representative
example might be:
::
config = {
'file_manager': '/home/user/cool_setup/file_manager_new.txt',
'decisions': {'snowLayers': 'CLM_2010'},
'parameters': {'albedoDecayRate': 1e-6},
'trial_parameters': {'theta_mp': 0.4},
'attributes': {'mHeight': 15}
}
"""
if 'file_manager' in config:
self.manager_path = Path(os.path.abspath(config['file_manager']))
for k, v in config.get('decisions', {}).items():
self.decisions.set_option(k, v)
for k, v in config.get('parameters', {}).items():
self.global_hru_params.set_option(k, v)
for k, v in config.get('output_control', {}).items():
self.output_control.set_option(k, **v)
for k, v in config.get('attributes', {}).items():
self.assign_attributes(k, v)
for k, v in config.get('trial_parameters', {}).items():
self.assign_trial_params(k, v)
if self.decisions['snowLayers'] == 'CLM_2010':
self.validate_layer_params(self.global_hru_params)
[docs] def assign_attributes(self, name, data):
"""
Assign new data to the ``local_attributes`` dataset.
Parameters
----------
name:
The name (or key) of the attribute to modify
data:
The data to change the attribute to. The shape
must match the shape in the local attributes file
"""
required_shape = self.local_attributes[name].shape
try:
self.local_attributes[name].values = np.array(data).reshape(required_shape)
except ValueError as e:
raise ValueError('The shape of the provided replacement data does',
' not match the shape of the original data.', e)
except KeyError as e:
raise KeyError(f'The key {name} does not exist in this attribute',
'file. See the documentation at https://summa.readthedocs.',
'io/en/latest/input_output/SUMMA_input/#attribute-and-',
'parameter-files for more information', e)
[docs] def assign_trial_params(self, name: str, data: np.array, dim='hru', create=True):
"""
Assign new data to the ``spatial_params`` dataset.
Parameters
----------
name:
The name (or key) of the attribute to modify
data:
The data to change the parameter to. The shape
must match the shape in the parameter trial file
"""
# Create the variable if we need
if create and name not in self.trial_params.variables:
self.trial_params[name] = self.trial_params[dim].astype(float).copy()
required_shape = self.trial_params[name].shape
try:
self.trial_params[name].values = np.array(data).reshape(required_shape)
except ValueError as e:
raise ValueError('The shape of the provided replacement data does',
' not match the shape of the original data.', e)
except KeyError as e:
raise KeyError(f'The key {name} does not exist in this trial parameter',
'file. See the documentation at https://summa.readthedocs.',
'io/en/latest/input_output/SUMMA_input/#attribute-and-',
'parameter-files for more information', e)
[docs] def assign_forcing_file(self, name: str, data: xr.Dataset):
"""
Assign a new forcing dataset, writing out the data and updating
the forcing file list and file manager
Parameters
----------
name: str
The name of the new forcing file dataset
data: xr.Dataset
The new forcing dataset
"""
# Write out the new forcing data & update the force file list
if not name.endswith('.nc'):
name = f'{name}.nc'
current_ffile_dirpath = self.force_file_list.prefix
new_force_file =os.sep.join([current_ffile_dirpath, name])
data.to_netcdf(new_force_file)
new_force_file_list = f'{name[0:-3]}_force_list.txt'
# Update the file manager
new_file_manager = f'{str(self.manager_path)[0:-4]}_{name[0:-3]}.txt'
fm = self.manager
with open(f'{fm["settingsPath"].value}/{new_force_file_list}', 'w') as f:
f.write(f"'{name}'")
fm['forcingListFile'].value = new_force_file_list
fm.file_name = new_file_manager
fm.write()
# Update the simulation object with the new file manager
self.manager_path = Path(os.path.abspath(os.path.realpath(fm.file_name)))
self.initialize()
def create_backup(self):
self.backup = {}
self.backup['manager'] = copy.deepcopy(self.manager)
self.backup['manager_path'] = copy.deepcopy(self.manager_path)
[docs] def reset(self):
"""Restores the original settings of the Simulation"""
self.manager = copy.deepcopy(self.backup['manager'])
self.manager_path = copy.deepcopy(self.backup['manager_path'])
self.config_path = self.manager_path.parent / '.pysumma'
self.decisions = self.manager.decisions
self.output_control = self.manager.output_control
self.trial_params = self.manager.trial_params
self.force_file_list = self.manager.force_file_list
self.global_hru_params = self.manager.global_hru_params
self.global_gru_params = self.manager.global_gru_params
self.local_attributes = self.manager.local_attributes
self.initial_conditions = self.manager.initial_conditions
self.genparm = self.manager.genparm
self.mptable = self.manager.mptable
self.soilparm = self.manager.soilparm
self.vegparm = self.manager.vegparm
[docs] def validate_layer_params(self, params):
"""Ensure that the layer parameters are valid"""
for i in range(1, 5):
assert params[f'zmaxLayer{i}_upper'] \
<= params[f'zmaxLayer{i}_lower'], i
assert params[f'zmaxLayer{i}_upper'] / params[f'zminLayer{i}'] \
>= 2.5, i
assert params[f'zmaxLayer{i}_upper'] / params[f'zminLayer{i+1}'] \
>= 2.5, i
def _gen_summa_cmd(self, run_suffix, processes=1, prerun_cmds=[],
startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress='m'):
prerun_cmds.append('export OMP_NUM_THREADS={}'.format(processes))
summa_run_cmd = "{} -s {} -m {}".format(self.executable,
run_suffix, self.manager_path)
if startGRU is not None and countGRU is not None:
summa_run_cmd += ' -g {} {}'.format(startGRU, countGRU)
if iHRU is not None:
summa_run_cmd += ' -h {}'.format(iHRU)
if freq_restart is not None:
summa_run_cmd += ' -r {}'.format(freq_restart)
if progress is not None:
summa_run_cmd += ' -p {}'.format(progress)
if len(prerun_cmds):
preprocess_cmd = " && ".join(prerun_cmds) + " && "
else:
preprocess_cmd = ""
return preprocess_cmd + summa_run_cmd
def _run_local(self, run_suffix, processes=1, prerun_cmds=None,
startGRU=None, countGRU=None, iHRU=None, freq_restart=None,
progress=None):
"""Start a local simulation"""
run_cmd = self._gen_summa_cmd(run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU, freq_restart,
progress)
self.process = subprocess.Popen(run_cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
self.status = 'Running'
def _run_docker(self, run_suffix, processes=1,
prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress=None):
"""Start a docker simulation"""
run_cmd = self._gen_summa_cmd(run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU,
freq_restart, progress)
run_cmd = run_cmd.replace(self.executable, '/code/bin/summa.exe')
fman_dir = os.path.dirname(self.manager_path)
settings_path = self.manager['settingsPath'].value
input_path = self.manager['forcingPath'].value
output_path = self.manager['outputPath'].value
cmd = ''.join(['docker run -v {}:{}'.format(fman_dir, fman_dir),
' -v {}:{}'.format(input_path, input_path),
' -v {}:{} '.format(output_path, output_path),
" --entrypoint '/bin/bash' ",
self.executable,
' -c "',
run_cmd, '"'])
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
self.status = 'Running'
[docs] def start(self, run_option='local', run_suffix='pysumma_run', processes=1,
prerun_cmds=[], startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress=None, **kwargs):
"""
Run a SUMMA simulation without halting. Most likely this should
not be used. Use the ``run`` method for most common use cases.
"""
#TODO: Implement running on hydroshare here
if not prerun_cmds:
prerun_cmds = []
self.run_suffix = run_suffix
self._write_configuration(name=run_suffix)
if run_option == 'local':
self._run_local(run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU, freq_restart, progress)
elif run_option == 'docker':
self._run_docker(run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU, freq_restart, progress)
else:
raise NotImplementedError('Invalid runtime given! '
'Valid options: local, docker')
[docs] def run(self, run_option='local', run_suffix='pysumma_run', processes=1,
prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress=None, **kwargs):
"""
Run a SUMMA simulation and halt until completion or error.
Parameters
----------
run_option:
Method to run SUMMA, must be one of either local or docker
run_suffix:
Name to append to the output files for this SUMMA run
processes:
Number of openmp processes to use for this run. For this
to do anything SUMMA must be compiled with openmp support
prerun_cmds:
A list of commands to execute before running SUMMA. May be
necessary to set environment variables or do any preprocessing
startGRU:
GRU index to start the simulation on (must also set ``countGRU``
if this argument is set)
countGRU:
Number of GRU to run, starting at ``startGRU`` (must also set
``startGRU`` if this argument is set)
iHRU:
Index of HRU to run (cannot be used with ``startGRU`` and
``countGRU``)
freq_restart:
Frequency to write restart files. Options include
``[y, m, d, never]`` for yearly, monthly, and daily restart
files. Defaults to ``never``
progress:
Frequency to write stdout progress. Note this is not printed
during runtime via pysumma, but can be checked after completion.
Options include ``[m, d, h, never]`` for monthly, daily, and
hourly output.
"""
self.start(run_option, run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU, freq_restart, progress, **kwargs)
self.monitor()
[docs] def monitor(self):
'''Halt execution until Simulation either finishes or errors'''
# Simulation already run
if self.status in ['Error', 'Success']:
return self.status
if self.process is None:
raise RuntimeError('No simulation started! Use simulation.start '
'or simulation.execute to begin a simulation!')
self.stdout, self.stderr = self.process.communicate()
if isinstance(self.stdout, bytes):
self.stderr = self.stderr.decode('utf-8', 'ignore')
self.stdout = self.stdout.decode('utf-8', 'ignore')
SUCCESS_MSG = 'FORTRAN STOP: finished simulation successfully.'
if SUCCESS_MSG not in self.stdout:
self.status = 'Error'
else:
self.status = 'Success'
try:
self._output = [xr.open_dataset(f) for f in self.get_output_files()]
if len(self._output) == 1:
self._output = self._output[0]
except Exception:
self._output = None
return self.status
def spinup(self, run_option='local', period='1Y', niters=10, run_suffix='pysumma_spinup', **kwargs):
# open forcings
spinup_force_name = f'{run_suffix}.nc'
with xr.open_mfdataset(self.force_file_list.forcing_paths) as ds:
start_date = pd.to_datetime(ds['time'].values[0])
end_date = pd.to_datetime(start_date + pd.Timedelta(period))
forcings = ds.sel(time=slice(start_date, end_date)).load()
spinup_force_path = self.manager['forcingPath'].value + spinup_force_name
forcings.to_netcdf(spinup_force_path)
self.force_file_list.options = [ForcingOption(spinup_force_path)]
self.manager['simStartTime'] = start_date
self.manager['simEndTime'] = end_date
ymdh = str(self.manager['simEndTime'].value).replace(' ', '-').replace('-', '').split(':')[0]
for n in range(niters):
self.run(run_option, run_suffix=run_suffix, freq_restart='e', **kwargs)
out_dir = self.manager['outputPath'].value
prefix = self.manager['outFilePrefix'].value
restart_file_name = f'{prefix}_restart_{ymdh}_{run_suffix}*nc'
restart_file_path = glob(f'{out_dir}{restart_file_name}')[0]
restart_file_name = restart_file_path.split(os.path.sep)[-1]
self.reset()
self.manager['simStartTime'] = start_date
self.manager['simEndTime'] = end_date
restart_dest = os.path.dirname(self.manager['initConditionFile'].value) + f'/{restart_file_name}'
shutil.copy(restart_file_path, self.manager['settingsPath'].value + restart_dest)
[os.remove(f) for f in self.get_output_files()]
self.manager['initConditionFile'] = restart_dest
with xr.open_dataset(self.manager['settingsPath'].value + restart_dest) as ds:
self.initial_conditions = ds.load()
def _write_configuration(self, name=''):
import shutil
self.config_path = self.config_path / name
self.config_path.mkdir(parents=True, exist_ok=True)
manager_path = str(self.manager_path.parent)
settings_path = os.path.abspath(os.path.realpath(str(self.manager['settingsPath'].value)))
settings_path = Path(settings_path.replace(manager_path, str(self.config_path)))
self.manager_path = self.config_path / self.manager.file_name
self.manager['settingsPath'] = str(settings_path) + os.sep
self.manager.write(path=self.config_path)
self.decisions.write(path=settings_path)
self.force_file_list.write(path=settings_path)
self.global_hru_params.write(path=settings_path)
self.global_gru_params.write(path=settings_path)
self.output_control.write(path=settings_path)
self.local_attributes.to_netcdf(settings_path / self.manager['attributeFile'].value)
self.trial_params.to_netcdf(settings_path / self.manager['trialParamFile'].value)
self.initial_conditions.to_netcdf(settings_path / self.manager['initConditionFile'].value)
with open(settings_path / self.manager['generalTableFile'].value, 'w+') as f:
f.writelines(self.genparm)
with open(settings_path / self.manager['noahmpTableFile'].value, 'w+') as f:
f.writelines(self.mptable)
with open(settings_path / self.manager['soilTableFile'].value, 'w+') as f:
f.writelines(self.soilparm)
with open(settings_path / self.manager['vegTableFile'].value, 'w+') as f:
f.writelines(self.vegparm)
def get_forcing_data_list(self) -> List[xr.Dataset]:
return self.force_file_list.open_forcing_data()
[docs] def get_output_files(self) -> List[str]:
"""Find output files given the ``stdout`` generated from a run"""
new_file_text = 'Created output file:'
out_files = []
for l in self.stdout.split('\n'):
if new_file_text in l:
out_files.append(
l.split(';')[0].replace(new_file_text, '').strip())
return out_files
@property
def output(self):
"""Get the output as an xarray dataset"""
if self.status == 'Success':
return self._output
elif self.status == 'Error':
raise RuntimeError('There was an error during the simulation!'
' Print the `stdout` and `stderr` for more'
' information.')
else:
raise RuntimeError('No simulation started! Use simulation.start '
'or simulation.execute to begin a simulation!')
def __repr__(self):
repr = []
repr.append("Executable path: {}".format(self.executable))
repr.append("Simulation status: {}".format(self.status))
try:
repr.append("File manager configuration:")
repr.append(str(self.manager))
except:
repr.append("Use Simulation.initialize() to "
"read input files for more information")
return '\n'.join(repr)