import os
import subprocess
import xarray as xr
from .decisions import Decisions
from .file_manager import FileManager
from .output_control import OutputControl
from .local_param_info import LocalParamInfo
from .force_file_list import ForceFileList
[docs]class Simulation():
"""
The Simulation object provides a wrapper for SUMMA simulations.
It can be used to set up, modify, and run SUMMA. The Simulation
object consists of information about how to run SUMMA (the
location of the executable, various command line flags and options)
as well as configuration details which describe the domain to
be simulated.
A standard workflow for running a simulation locally is::
import pysumma as ps
s = ps.Simulation('summa.exe', 'file_manager.txt')
s.start('local')
s.monitor()
assert s._status == 'Success'
print(s.output)
A standard workflow for running a simulation through docker is::
import pysumma as ps
s = ps.Simulation('summa:develop', 'file_manager.txt')
s.start('docker')
s.monitor()
assert s._status == 'Success'
print(s.output)
Parameters
----------
executable:
Path to locally compiled SUMMA executable
or the name of the docker image to run
filemanager:
Path to the file manager for the desired
simulation setup. Can be specified as a
relative path.
"""
library_path = None
process = None
manager: FileManager = None
decisions: Decisions = None
output_control: OutputControl = None
local_param_info: LocalParamInfo = None
basin_param_info: LocalParamInfo = None
force_file_list: ForceFileList = None
local_attributes: xr.Dataset = None
parameter_trial: xr.Dataset = None
def __init__(self, executable, filemanager):
self.executable = executable
self.manager_path = filemanager
self.manager = FileManager(filemanager)
self._status = 'Initialized'
self.decisions = self.manager.decisions
self.output_control = self.manager.output_control
self.parameter_trial = self.manager.parameter_trial
self.force_file_list = self.manager.force_file_list
self.local_param_info = self.manager.local_param_info
self.basin_param_info = self.manager.basin_param_info
self.local_attributes = self.manager.local_attributes
self._status = 'Initialized'
def _gen_summa_cmd(self, run_suffix, processes=1, prerun_cmds=None,
startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress='m'):
"""
Generate the text of the SUMMA run commmand based on the desired
command line arguments.
Returns
-------
run_cmd (string):
A string representation of the SUMMA run command
"""
if prerun_cmds is None:
prerun_cmds = []
prerun_cmds.append('export OMP_NUM_THREADS={}'.format(processes))
summa_run_cmd = "{} -s {} -m {}".format(self.executable,
run_suffix,
self.manager_path)
if startGRU is not None and countGRU is not None:
summa_run_cmd += ' -g {} {}'.format(startGRU, countGRU)
if iHRU is not None:
summa_run_cmd += ' -h {}'.format(iHRU)
if freq_restart is not None:
summa_run_cmd += ' -r {}'.format(freq_restart)
if progress is not None:
summa_run_cmd += ' -p {}'.format(progress)
if prerun_cmds:
preprocess_cmd = " && ".join(prerun_cmds) + " && "
else:
preprocess_cmd = ""
return preprocess_cmd + summa_run_cmd
def _run_local(self, run_suffix, processes=1, prerun_cmds=None,
startGRU=None, countGRU=None, iHRU=None, freq_restart=None,
progress=None):
"""Start a local simulation"""
run_cmd = self.gen_summa_cmd(run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU, freq_restart,
progress)
self.process = subprocess.Popen(run_cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
self._status = 'Running'
def _run_docker(self, run_suffix, processes=1,
prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress=None):
"""Start a docker simulation"""
run_cmd = self.gen_summa_cmd(run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU,
freq_restart, progress)
fman_dir = os.path.dirname(self.manager_path)
settings_path = self.manager.settings_path.value
input_path = self.manager.input_path.value
output_path = self.manager.output_path.value
cmd = ''.join(['docker run -v {}:{}'.format(fman_dir, fman_dir),
' -v {}:{}'.format(settings_path, settings_path),
' -v {}:{}'.format(input_path, input_path),
' -v {}:{}'.format(output_path, output_path),
'/bin/bash -c "',
run_cmd, '"'])
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
self._status = 'Running'
[docs] def start(self, run_option, run_suffix='pysumma_run', processes=1,
prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None,
freq_restart=None, progress=None):
"""
Start a SUMMA simulation. By default does not halt execution
of further python commands, and simply launches the SUMMA
process in the background. Progress can be halted by using
the ``Simulation.monitor()`` method.
Parameters
----------
run_option (string):
The method to use for running SUMMA, can be either
``local`` or ``docker``.
run_suffix (string):
A unique identifier to include as part of the output
file names
processes (string):
The number of processors to use. Note: This only matters
if the SUMMA version being run is compiled with OpenMP
support. Defaults to 1.
prerun_cmds (List[string]):
A list of commands to be run before the SUMMA executable
is invoked. This can be used to set things like library
paths or creation of subdirectories.
startGRU (int):
The GRU index to start the simulation at. Used to run only
specified sections of the domain. Must be co-specified with
``countGRU`` to do anything. Cannot be used with ``iHRU``.
countGRU (int):
Number of GRU to run, starting at ``startGRU``. Used to run
only the specified portion of the domain. Must be co-
specified with ``startGRU`` to do anything. Cannot be used
with ``iHRU``.
iHRU (int):
Index of a single HRU to be run. Cannot be used with
``startGRU`` and ``countGRU``.
freq_restart (string):
How often to write restart files. Can be ``d`` for daily,
``m`` for monthly, and ``y`` for yearly.
progress (string):
How often to record progress. Can be ``d`` for daily,
``m`` for monthly, and ``y`` for yearly.
"""
#TODO: Implement running on hydroshare here
self.run_suffix=run_suffix
self._write_configuration()
if run_option == 'local':
self.run_local(run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU, freq_restart, progress)
elif run_option == 'docker':
self.run_docker(run_suffix, processes, prerun_cmds,
startGRU, countGRU, iHRU, freq_restart, progress)
else:
raise NotImplementedError('Invalid runtime given! '
'Valid options: local, docker')
def _write_configuration(self):
"""Write the configuration"""
#TODO: Still need to update for all netcdf writing
self.manager.write()
self.decisions.write()
self.force_file_list.write()
self.local_param_info.write()
self.basin_param_info.write()
self.output_control.write()
def _get_output(self):
"""Find all output files and return a list of their paths"""
new_file_text = 'Created output file:'
assert self._status == 'Success'
out_files = []
for l in self.stdout.split('\n'):
if new_file_text in l:
out_files.append(l.replace(new_file_text, ''))
return out_files
[docs] def execute(self, run_option, run_suffix=None,
preprocess_cmds=[], monitor=False):
"""Run a SUMMA simulation"""
self.start(run_option, run_suffix=run_suffix,
prerun_cmds=preprocess_cmds)
if monitor:
result = self.monitor()
self.process = result
return result
[docs] def monitor(self):
"""
Watch a running simulation until it is done and
collect the run information in the simulation object.
This will halt execution of a started simulation
through the ``Simulation.start()`` method.
"""
if self.process is None:
raise RuntimeError('No simulation running! Use simulation.start '
'or simulation.execute to begin a simulation!')
if self._status in ['Error', 'Success']:
return self._status == 'Success'
self._result = bool(self.process.wait())
try:
self._stderr = self.process.stderr.read().decode('utf-8')
self._stdout = self.process.stdout.read().decode('utf-8')
except UnicodeDecodeError:
self._stderr = self.process.stderr.read()
self._stdout = self.process.stdout.read()
if self._result:
self._status = 'Error'
else:
self._status = 'Success'
try:
self._output = [xr.open_dataset(f) for f in self._get_output()]
if len(self._output) == 1:
self._output = self._output[0]
except Exception:
self._output = None
return self._result
@property
def result(self):
"""
Attribute describing whether a model run was a success or not
"""
if self.process is None:
raise RuntimeError('No simulation started! Use simulation.start '
'or simulation.execute to begin a simulation!')
elif isinstance(self.process, str):
return self._status == 'Success'
else:
return self.monitor()
@property
def stdout(self):
"""
The standard output. This contains a string representation of the
output that a SUMMA simulation would write to screen. Use `print`
to view this so that line breaks are rendered correctly.
"""
if self.process is None:
raise RuntimeError('No simulation started! Use simulation.start '
'or simulation.execute to begin a simulation!')
elif isinstance(self.process, str):
return self._stdout
else:
self.monitor()
return self._stdout
@property
def stderr(self):
"""
The standard error. This contains a string representation of the
output that a SUMMA simulation would write to screen if any
unforseen errors occurred. Use `print` to view this so that
line breaks are rendered correctly.
"""
if self.process is None:
raise RuntimeError('No simulation started! Use simulation.start '
'or simulation.execute to begin a simulation!')
elif isinstance(self.process, str):
return self._stderr
else:
self.monitor()
return self._stderr
@property
def output(self):
"""
An xarray dataset, or list of xarray datasets representing
the output that a simulation has written to disk.
"""
if self.process is None:
raise RuntimeError('No simulation started! Use simulation.start '
'or simulation.execute to begin a simulation!')
elif isinstance(self.process, str):
return self._output
else:
self.monitor()
return self._output
def __repr__(self):
"""Show some information about the simulation setup"""
info = ["Executable path: {}".format(self.executable),
"Simulation status: {}".format(self._status),
"File manager configuration:",
str(self.manager)]
return '\n'.join(info)