Source code for pysumma.ensemble

from copy import deepcopy
from distributed import Client, get_client
import os
import pandas as pd
import time
from typing import List, Dict
import xarray as xr

from .simulation import Simulation
from .utils import ChainDict, product_dict

OMP_NUM_THREADS = int(os.environ.get('OMP_NUM_THREADS', 1))


[docs]class Ensemble(object): ''' Ensembles represent an multiple SUMMA configurations based on changing the decisions or parameters of a given run. Attributes ---------- executable: Path to the SUMMA executable filemanager: (optional) Path to the file manager configuration: Dictionary of runs, along with settings num_workers: Number of parallel workers to use simulations: Dictionary of run names and Simulation objects ''' def __init__(self, executable: str,configuration: dict, filemanager: str=None, num_workers: int=1, threads_per_worker: int=OMP_NUM_THREADS, scheduler: str=None, client: Client=None): """ Create a new Ensemble object. The API mirrors that of the Simulation object. """ self._status = 'Initialized' self.executable: str = executable self.filemanager: str = filemanager self.configuration: dict = configuration self.num_workers: int = num_workers self.simulations: dict = {} self.submissions: list = [] # Try to get a client, and if none exists then start a new one if client: self._client = client workers = len(self._client.get_worker_logs()) if workers <= self.num_workers: self._client.cluster.scale(workers) else: try: self._client = get_client() # Start more workers if necessary: workers = len(self._client.get_worker_logs()) if workers <= self.num_workers: self._client.cluster.scale(workers) except ValueError: self._client = Client(n_workers=self.num_workers, threads_per_worker=threads_per_worker) self._generate_simulation_objects() def _generate_simulation_objects(self): """ Create a mapping of configurations to the simulation objects. """ if self.filemanager: for name, config in self.configuration.items(): self.simulations[name] = Simulation( self.executable, self.filemanager, False) else: for name, config in self.configuration.items(): assert config['file_manager'] is not None, \ "No filemanager found in configuration or Ensemble!" self.simulations[name] = Simulation( self.executable, config['file_manager'], False) def _generate_coords(self): """ Generate the coordinates that can be used to merge the output of the ensemble runs into a single dataset. """ decision_dims = ChainDict() manager_dims = ChainDict() parameter_dims = ChainDict() for name, conf in self.configuration.items(): for k, v in conf.get('decisions', {}).items(): decision_dims[k] = v for k, v in conf.get('file_manager', {}).items(): manager_dims[k] = v #for k, v in conf.get('parameters', {}).items(): # parameter_dims[k] = v for k, v in conf.get('trial_parameters', {}).items(): parameter_dims[k] = v return {'decisions': decision_dims, 'managers': manager_dims, 'parameters': parameter_dims}
[docs] def merge_output(self): """ Open and merge all of the output datasets from the ensemble run into a single dataset. """ nc = self._generate_coords() new_coords = (list(nc.get('decisions', {})) + list(nc.get('parameters', {}))) decision_tuples = [tuple(n.split('++')[1:-1]) for n in self.configuration.keys()] for i, t in enumerate(decision_tuples): decision_tuples[i] = tuple((float(l.split('=')[-1]) if '=' in l else l for l in t)) decision_names = ['++'.join(tuple(n.split('++')[1:-1])) for n in self.configuration.keys()] if sum([len(dt) for dt in decision_tuples]) == 0: raise NameError("Simulations in the ensemble do not share all" " common decisions! Please use `open_output`" " to retrieve the output of this Ensemble") for i, t in enumerate(decision_names): decision_names[i] = '++'.join(l.split('=')[0] for l in t) new_idx = pd.MultiIndex.from_tuples( decision_tuples, names=new_coords) out_file_paths = [s.get_output_files() for s in self.simulations.values()] out_file_paths = [fi for sublist in out_file_paths for fi in sublist] full = xr.open_mfdataset(out_file_paths, concat_dim='run_number', combine='nested') merged = full.assign_coords(run_number=decision_names) merged['run_number'] = new_idx merged = merged.unstack('run_number') return merged
[docs] def open_output(self): """ Open all of the output datasets from the ensembe and return as a dictionary of datasets """ return {n: s.output for n, s in self.simulations.items()}
[docs] def start(self, run_option: str='local', prerun_cmds: list=None): """ Start running the ensemble members. Parameters ---------- run_option: The run type. Should be either 'local' or 'docker' prerun_cmds: A list of preprocessing commands to run """ for n, s in self.simulations.items(): # Sleep calls are to ensure writeout happens config = self.configuration[n] self.submissions.append(self._client.submit( _submit, s, n, run_option, prerun_cmds, config))
[docs] def run(self, run_option: str='local', prerun_cmds=None, monitor: bool=True): """ Run the ensemble Parameters ---------- run_option: Where to run the simulation. Can be ``local`` or ``docker`` prerun_cmds: A list of shell commands to run before running SUMMA monitor: Whether to halt operation until runs are complete """ self.start(run_option, prerun_cmds) if monitor: return self.monitor() else: return True
def map(self, fun, args, include_sims=True, monitor=True): for n, s in self.simulations.items(): config = self.configuration[n] if include_sims: all_args = (s, n, *args, {'config': config}) else: all_args = (*args, {'config': config}) self.submissions.append(self._client.submit( fun, *all_args)) if monitor: return self.monitor() else: return True
[docs] def monitor(self): """ Halt computation until submitted simulations are complete """ simulations = self._client.gather(self.submissions) for s in simulations: self.simulations[s.run_suffix] = s
[docs] def summary(self): """ Show the user information about ensemble status """ success, error, other = [], [], [] for n, s in self.simulations.items(): if s.status == 'Success': success.append(n) elif s.status == 'Error': error.append(n) else: other.append(n) return {'Success': success, 'Error': error, 'Other': other}
[docs] def rerun_failed(self, run_option: str='local', prerun_cmds=None, monitor: bool=True): """ Try to re-run failed simulations. Parameters ---------- run_option: Where to run the simulation. Can be ``local`` or ``docker`` prerun_cmds: A list of shell commands to run before running SUMMA monitor: Whether to halt operation until runs are complete """ run_summary = self.summary() self.submissions = [] for n in run_summary['error']: config = self.configuration[n] s = self.simulations[n] s.reset() self.submissions.append(self._client.submit( _submit, s, n, run_option, prerun_cmds, config)) if monitor: return self.monitor() else: return True
def _submit(s: Simulation, name: str, run_option: str, prerun_cmds: List, config: Dict, **kwargs): s.initialize() s.apply_config(config) s.run(run_option, run_suffix=name, prerun_cmds=prerun_cmds, freq_restart='e') s.process = None return s def decision_product(list_config): """ Create a dictionary of runs based on a simpler list configuration of decision options Parameters ---------- list_config: A dictionary of the sort {key1: [list of values], key2: [list of values]} Returns -------- A dictionary of the sort: {name: {key1: value1, key2: value1}, name: {key1: value2, key2: value1}, ... name: {key1: valueN, key2: valueN}} """ return {'++'+'++'.join(d.values())+'++': {'decisions': d} for d in product_dict(**list_config)} def parameter_product(list_config): """ Create a dictionary of runs based on a simpler list configuration of parameter values Parameters ---------- list_config: A dictionary of the sort {key1: [list of values], key2: [list of values]} Returns -------- A dictionary of the sort: {name: {key1: value1, key2: value1}, name: {key1: value2, key2: value1}, ... name: {key1: valueN, key2: valueN}} """ return {'++'+'++'.join('{}={}'.format(k, v) for k, v in d.items())+'++': {'parameters': d} for d in product_dict(**list_config)} def attribute_product(list_config): """ Create a dictionary of runs based on a simpler list configuration of attribute values Parameters ---------- list_config: A dictionary of the sort {key1: [list of values], key2: [list of values]} Returns -------- A dictionary of the sort: {name: {key1: value1, key2: value1}, name: {key1: value2, key2: value1}, ... name: {key1: valueN, key2: valueN}} """ return {'++'+'++'.join('{}={}'.format(k, v) for k, v in d.items())+'++': {'attributes': d} for d in product_dict(**list_config)} def trial_parameter_product(list_config): """ Create a dictionary of runs based on a simpler list configuration of trial parameter values Parameters ---------- list_config: A dictionary of the sort {key1: [list of values], key2: [list of values]} Returns -------- A dictionary of the sort: {name: {key1: value1, key2: value1}, name: {key1: value2, key2: value1}, ... name: {key1: valueN, key2: valueN}} """ return {'++'+'++'.join('{}={}'.format(k, v) for k, v in d.items())+'++': {'trial_parameters': d} for d in product_dict(**list_config)} def file_manager_product(list_config): """ Create a dictionary of runs based on a simpler list configuration of file managers Parameters ---------- list_config: A dictionary of the sort {key1: [list of values], key2: [list of values]} Returns -------- A dictionary of the sort: {name: {key1: value1, key2: value1}, name: {key1: value2, key2: value1}, ... name: {key1: valueN, key2: valueN}} """ return {'++'+'++'.join('{}={}'.format(k, v) for k, v in d.items())+'++': {'file_manager': d} for d in product_dict(**list_config)} def total_product(dec_conf={}, param_conf={}, attr_conf={}, fman_conf={}, param_trial_conf={}, sequential_keys=False): """ Combines multiple types of model changes into a single configuration for the Ensemble object. """ full_conf = deepcopy(dec_conf) full_conf.update(param_conf) full_conf.update(attr_conf) full_conf.update(param_trial_conf) full_conf.update(fman_conf) prod_dict = product_dict(**full_conf) config = {} for i, d in enumerate(prod_dict): name = '++' + '++'.join( '{}={}'.format(k, v) if k in param_conf or k in attr_conf or k in param_trial_conf else v.replace('/', '_').replace('\\', '_') for k, v in d.items()) + '++' if sequential_keys: name = f'run_{i}' config[name] = {'decisions': {}, 'parameters': {}, 'attributes': {}, 'trial_parameters': {}} for k, v in d.items(): if k in dec_conf: config[name]['decisions'][k] = v elif k in param_conf: config[name]['parameters'][k] = v elif k in attr_conf: config[name]['attributes'][k] = v elif k in param_trial_conf: config[name]['trial_parameters'][k] = v elif k in fman_conf: config[name]['file_manager'] = v return config