Source code for pysumma.simulation

import os
import subprocess
import xarray as xr

from .decisions import Decisions
from .file_manager import FileManager
from .output_control import OutputControl
from .local_param_info import LocalParamInfo
from .force_file_list import ForceFileList


[docs]class Simulation(): """ The Simulation object provides a wrapper for SUMMA simulations. It can be used to set up, modify, and run SUMMA. The Simulation object consists of information about how to run SUMMA (the location of the executable, various command line flags and options) as well as configuration details which describe the domain to be simulated. A standard workflow for running a simulation locally is:: import pysumma as ps s = ps.Simulation('summa.exe', 'file_manager.txt') s.start('local') s.monitor() assert s._status == 'Success' print(s.output) A standard workflow for running a simulation through docker is:: import pysumma as ps s = ps.Simulation('summa:develop', 'file_manager.txt') s.start('docker') s.monitor() assert s._status == 'Success' print(s.output) Parameters ---------- executable: Path to locally compiled SUMMA executable or the name of the docker image to run filemanager: Path to the file manager for the desired simulation setup. Can be specified as a relative path. """ library_path = None process = None manager: FileManager = None decisions: Decisions = None output_control: OutputControl = None local_param_info: LocalParamInfo = None basin_param_info: LocalParamInfo = None force_file_list: ForceFileList = None local_attributes: xr.Dataset = None parameter_trial: xr.Dataset = None def __init__(self, executable, filemanager): self.executable = executable self.manager_path = filemanager self.manager = FileManager(filemanager) self._status = 'Initialized' self.decisions = self.manager.decisions self.output_control = self.manager.output_control self.parameter_trial = self.manager.parameter_trial self.force_file_list = self.manager.force_file_list self.local_param_info = self.manager.local_param_info self.basin_param_info = self.manager.basin_param_info self.local_attributes = self.manager.local_attributes self._status = 'Initialized' def _gen_summa_cmd(self, run_suffix, processes=1, prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None, freq_restart=None, progress='m'): """ Generate the text of the SUMMA run commmand based on the desired command line arguments. Returns ------- run_cmd (string): A string representation of the SUMMA run command """ if prerun_cmds is None: prerun_cmds = [] prerun_cmds.append('export OMP_NUM_THREADS={}'.format(processes)) summa_run_cmd = "{} -s {} -m {}".format(self.executable, run_suffix, self.manager_path) if startGRU is not None and countGRU is not None: summa_run_cmd += ' -g {} {}'.format(startGRU, countGRU) if iHRU is not None: summa_run_cmd += ' -h {}'.format(iHRU) if freq_restart is not None: summa_run_cmd += ' -r {}'.format(freq_restart) if progress is not None: summa_run_cmd += ' -p {}'.format(progress) if prerun_cmds: preprocess_cmd = " && ".join(prerun_cmds) + " && " else: preprocess_cmd = "" return preprocess_cmd + summa_run_cmd def _run_local(self, run_suffix, processes=1, prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None, freq_restart=None, progress=None): """Start a local simulation""" run_cmd = self.gen_summa_cmd(run_suffix, processes, prerun_cmds, startGRU, countGRU, iHRU, freq_restart, progress) self.process = subprocess.Popen(run_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) self._status = 'Running' def _run_docker(self, run_suffix, processes=1, prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None, freq_restart=None, progress=None): """Start a docker simulation""" run_cmd = self.gen_summa_cmd(run_suffix, processes, prerun_cmds, startGRU, countGRU, iHRU, freq_restart, progress) fman_dir = os.path.dirname(self.manager_path) settings_path = self.manager.settings_path.value input_path = self.manager.input_path.value output_path = self.manager.output_path.value cmd = ''.join(['docker run -v {}:{}'.format(fman_dir, fman_dir), ' -v {}:{}'.format(settings_path, settings_path), ' -v {}:{}'.format(input_path, input_path), ' -v {}:{}'.format(output_path, output_path), '/bin/bash -c "', run_cmd, '"']) self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) self._status = 'Running'
[docs] def start(self, run_option, run_suffix='pysumma_run', processes=1, prerun_cmds=None, startGRU=None, countGRU=None, iHRU=None, freq_restart=None, progress=None): """ Start a SUMMA simulation. By default does not halt execution of further python commands, and simply launches the SUMMA process in the background. Progress can be halted by using the ``Simulation.monitor()`` method. Parameters ---------- run_option (string): The method to use for running SUMMA, can be either ``local`` or ``docker``. run_suffix (string): A unique identifier to include as part of the output file names processes (string): The number of processors to use. Note: This only matters if the SUMMA version being run is compiled with OpenMP support. Defaults to 1. prerun_cmds (List[string]): A list of commands to be run before the SUMMA executable is invoked. This can be used to set things like library paths or creation of subdirectories. startGRU (int): The GRU index to start the simulation at. Used to run only specified sections of the domain. Must be co-specified with ``countGRU`` to do anything. Cannot be used with ``iHRU``. countGRU (int): Number of GRU to run, starting at ``startGRU``. Used to run only the specified portion of the domain. Must be co- specified with ``startGRU`` to do anything. Cannot be used with ``iHRU``. iHRU (int): Index of a single HRU to be run. Cannot be used with ``startGRU`` and ``countGRU``. freq_restart (string): How often to write restart files. Can be ``d`` for daily, ``m`` for monthly, and ``y`` for yearly. progress (string): How often to record progress. Can be ``d`` for daily, ``m`` for monthly, and ``y`` for yearly. """ #TODO: Implement running on hydroshare here self.run_suffix=run_suffix self._write_configuration() if run_option == 'local': self.run_local(run_suffix, processes, prerun_cmds, startGRU, countGRU, iHRU, freq_restart, progress) elif run_option == 'docker': self.run_docker(run_suffix, processes, prerun_cmds, startGRU, countGRU, iHRU, freq_restart, progress) else: raise NotImplementedError('Invalid runtime given! ' 'Valid options: local, docker')
def _write_configuration(self): """Write the configuration""" #TODO: Still need to update for all netcdf writing self.manager.write() self.decisions.write() self.force_file_list.write() self.local_param_info.write() self.basin_param_info.write() self.output_control.write() def _get_output(self): """Find all output files and return a list of their paths""" new_file_text = 'Created output file:' assert self._status == 'Success' out_files = [] for l in self.stdout.split('\n'): if new_file_text in l: out_files.append(l.replace(new_file_text, '')) return out_files
[docs] def execute(self, run_option, run_suffix=None, preprocess_cmds=[], monitor=False): """Run a SUMMA simulation""" self.start(run_option, run_suffix=run_suffix, prerun_cmds=preprocess_cmds) if monitor: result = self.monitor() self.process = result return result
[docs] def monitor(self): """ Watch a running simulation until it is done and collect the run information in the simulation object. This will halt execution of a started simulation through the ``Simulation.start()`` method. """ if self.process is None: raise RuntimeError('No simulation running! Use simulation.start ' 'or simulation.execute to begin a simulation!') if self._status in ['Error', 'Success']: return self._status == 'Success' self._result = bool(self.process.wait()) try: self._stderr = self.process.stderr.read().decode('utf-8') self._stdout = self.process.stdout.read().decode('utf-8') except UnicodeDecodeError: self._stderr = self.process.stderr.read() self._stdout = self.process.stdout.read() if self._result: self._status = 'Error' else: self._status = 'Success' try: self._output = [xr.open_dataset(f) for f in self._get_output()] if len(self._output) == 1: self._output = self._output[0] except Exception: self._output = None return self._result
@property def result(self): """ Attribute describing whether a model run was a success or not """ if self.process is None: raise RuntimeError('No simulation started! Use simulation.start ' 'or simulation.execute to begin a simulation!') elif isinstance(self.process, str): return self._status == 'Success' else: return self.monitor() @property def stdout(self): """ The standard output. This contains a string representation of the output that a SUMMA simulation would write to screen. Use `print` to view this so that line breaks are rendered correctly. """ if self.process is None: raise RuntimeError('No simulation started! Use simulation.start ' 'or simulation.execute to begin a simulation!') elif isinstance(self.process, str): return self._stdout else: self.monitor() return self._stdout @property def stderr(self): """ The standard error. This contains a string representation of the output that a SUMMA simulation would write to screen if any unforseen errors occurred. Use `print` to view this so that line breaks are rendered correctly. """ if self.process is None: raise RuntimeError('No simulation started! Use simulation.start ' 'or simulation.execute to begin a simulation!') elif isinstance(self.process, str): return self._stderr else: self.monitor() return self._stderr @property def output(self): """ An xarray dataset, or list of xarray datasets representing the output that a simulation has written to disk. """ if self.process is None: raise RuntimeError('No simulation started! Use simulation.start ' 'or simulation.execute to begin a simulation!') elif isinstance(self.process, str): return self._output else: self.monitor() return self._output def __repr__(self): """Show some information about the simulation setup""" info = ["Executable path: {}".format(self.executable), "Simulation status: {}".format(self._status), "File manager configuration:", str(self.manager)] return '\n'.join(info)