Source code for pysumma.ensemble

from copy import deepcopy
from distributed import Client
import pandas as pd
import time
import xarray as xr

from .simulation import Simulation
from .utils import ChainDict, product_dict


[docs]class Ensemble(object): ''' Ensembles represent multiple SUMMA configurations based on changing the decisions file or parameter values. At it's core, the Ensemble class is a thin wrapper around multiple Simulation objects, whos execution is managed by a dask ``Client``. A standard workflow for running an ensemble is:: import pysumma as ps param_options = { 'fixedThermalCond_snow': np.arange(0.35, 0.85, 0.05) } config = ps.ensemble.parameter_product(param_options) # initialize and start runs e = ps.Ensemble(summa_exe, file_manager, config, num_workers=len(config)) e.start('local', arg_list=['export OMP_NUM_THREADS=1', 'export LD_LIBRARY_PATH=/opt/local/lib']) e.monitor() # check status for i, (n, s) in enumerate(e.simulations.items()): assert s._status == 'Success' # open output ds = e.merge_output() Parameters ---------- executable (string): Path to locally compiled SUMMA executable or the name of the docker image to run filemanager (string): Path to the file manager for the desired simulation setup. Can be specified as a relative path ''' executable: str = None simulations: dict = {} submissions: list = [] def __init__(self, executable: str, filemanager: str, configuration: dict, num_workers: int=1): self._status = 'Initialized' self.executable = executable self.filemanager = filemanager self.configuration = configuration self.num_workers = num_workers self._client = Client(n_workers=num_workers+1, threads_per_worker=1) self._generate_simulation_objects() def _generate_simulation_objects(self): """Method for internally creating all of the Simulation objects""" for name, config in self.configuration.items(): s = Simulation(self.executable, self.filemanager) for k, v in config.get('decisions', {}).items(): s.decisions.set_option(k, v) for k, v in config.get('file_manager', {}).items(): s.manager.set_option(k, v) for k, v in config.get('force_files', {}).items(): s.manager.set_option(k, v) for k, v in config.get('parameters', {}).items(): s.local_param_info.set_option(k, v) self.simulations[name] = s def _generate_coords(self): """ Method for internally creating the coordinates for merging the output of all of the simulations """ decision_dims = ChainDict() manager_dims = ChainDict() parameter_dims = ChainDict() for name, conf in self.configuration.items(): for k, v in conf.get('decisions', {}).items(): decision_dims[k] = v for k, v in conf.get('file_manager', {}).items(): manager_dims[k] = v for k, v in conf.get('parameters', {}).items(): parameter_dims[k] = v return {'decisions': decision_dims, 'managers': manager_dims, 'parameters': parameter_dims} def _merge_decision_output(self): new_coords = self._generate_coords()['decisions'] decision_tuples = [tuple(n.split('++')[1:-1]) for n in self.configuration.keys()] decision_names = ['++'.join(n) for n in decision_tuples] new_idx = pd.MultiIndex.from_tuples( decision_tuples, names=list(new_coords.keys())) out_file_paths = [s._get_output() for s in self.simulations.values()] out_file_paths = [fi for sublist in out_file_paths for fi in sublist] full = xr.open_mfdataset(out_file_paths, concat_dim='run_number') merged = full.assign_coords(run_number=decision_names) merged['run_number'] = new_idx merged = merged.unstack('run_number') return merged def _merge_parameter_output(self): new_coords = self._generate_coords()['parameters'] decision_tuples = [tuple(n.split('++')[1:-1]) for n in self.configuration.keys()] for i, t in enumerate(decision_tuples): decision_tuples[i] = tuple((float(l.split('=')[-1]) for l in t)) decision_names = ['++'.join(tuple(n.split('++')[1:-1])) for n in self.configuration.keys()] for i, t in enumerate(decision_names): decision_names[i] = '++'.join(l.split('=')[0] for l in t) new_idx = pd.MultiIndex.from_tuples( decision_tuples, names=list(new_coords.keys())) out_file_paths = [s._get_output() for s in self.simulations.values()] out_file_paths = [fi for sublist in out_file_paths for fi in sublist] full = xr.open_mfdataset(out_file_paths, concat_dim='run_number') merged = full.assign_coords(run_number=decision_names) merged['run_number'] = new_idx merged = merged.unstack('run_number') return merged
[docs] def merge_output(self): """ Merge the output of the simulatino objects into a single xarray dataset """ nc = self._generate_coords() new_coords = (list(nc.get('decisions', {})) + list(nc.get('parameters', {}))) decision_tuples = [tuple(n.split('++')[1:-1]) for n in self.configuration.keys()] for i, t in enumerate(decision_tuples): decision_tuples[i] = tuple((float(l.split('=')[-1]) if '=' in l else l for l in t)) decision_names = ['++'.join(tuple(n.split('++')[1:-1])) for n in self.configuration.keys()] for i, t in enumerate(decision_names): decision_names[i] = '++'.join(l.split('=')[0] for l in t) new_idx = pd.MultiIndex.from_tuples( decision_tuples, names=new_coords) out_file_paths = [s._get_output() for s in self.simulations.values()] out_file_paths = [fi for sublist in out_file_paths for fi in sublist] full = xr.open_mfdataset(out_file_paths, concat_dim='run_number') merged = full.assign_coords(run_number=decision_names) merged['run_number'] = new_idx merged = merged.unstack('run_number') return merged
[docs] def start(self, run_option: str, arg_list: list=[], monitor: bool=False): """ Start running the ensemble. By default does not halt execution of further python commands, and simply launches the processes in the background. Progress can be halted by setting the ``monitor`` keyword. Parameters ---------- run option (string): The method to use for running SUMMA. Can be either ``local`` or ``docker``. arg_list (List[string]): A list of commands to be run before the simulations are started. monitor (bool): Whether to halt execution until completion of all of the simulations. Defaults to ``False`` """ for n, s in self.simulations.items(): # Sleep calls are to ensure writeout happens time.sleep(1.5) self.submissions.append(self._client.submit( _submit, s, n, run_option, arg_list)) time.sleep(1.5) if monitor: return self.monitor()
[docs] def monitor(self): """ Watch a running ensemble until it is done and collect the run information in the list of simulation objects. This will halt execution of a started simulation through the ``Ensemble.start()`` method if ``monitor`` was not set there. """ simulations = self._client.gather(self.submissions) for s in simulations: self.simulations[s.run_suffix] = s
def _submit(s: Simulation, name: str, run_option: str, arg_list): s.execute(run_option, run_suffix=name, monitor=True, preprocess_cmds=arg_list) return s
[docs]def decision_product(list_config): """ Create an Ensemble compatible configuration dictionary which contains the Cartesian product of the options given. For example:: decisions_options = { 'veg_traits': ['Raupach_BLM1994', 'CM_QJRMS1988'], 'cIntercept': ['sparseCanopy', 'storageFunc'], } ps.ensemble.decision_product(decisions_options) will produce 4 runs in a configuration that looks like:: {'++Raupach_BLM1994++sparseCanopy++': {'decisions': {'veg_traits': 'Raupach_BLM1994', 'cIntercept': 'sparseCanopy'}}, '++Raupach_BLM1994++storageFunc++': {'decisions': {'veg_traits': 'Raupach_BLM1994', 'cIntercept': 'storageFunc'}}, '++CM_QJRMS1988++sparseCanopy++': {'decisions': {'veg_traits': 'CM_QJRMS1988', 'cIntercept': 'sparseCanopy'}}, '++CM_QJRMS1988++storageFunc++': {'decisions': {'veg_traits': 'CM_QJRMS1988', 'cIntercept': 'storageFunc'}}} """ return {'++'+'++'.join(d.values())+'++': {'decisions': d} for d in product_dict(**list_config)}
[docs]def parameter_product(list_config): """ Create an Ensemble compatible configuration dictionary which contains the Cartesian product of the options given. For example:: param_options = { 'leafExchangeCoeff': np.arange(0.025, 0.0354, 0.01), 'albedoMax': np.arange(0.9, 0.96, 0.05) } ps.ensemble.parameter_product(param_options) will produce 4 runs in a configuration that looks like:: {'++leafExchangeCoeff=0.025++albedoMax=0.9++': {'parameters': {'leafExchangeCoeff': 0.025, 'albedoMax': 0.9}}, '++leafExchangeCoeff=0.025++albedoMax=0.95++': {'parameters': {'leafExchangeCoeff': 0.025, 'albedoMax': 0.95}}, '++leafExchangeCoeff=0.035++albedoMax=0.9++': {'parameters': {'leafExchangeCoeff': 0.035, 'albedoMax': 0.9}}, '++leafExchangeCoeff=0.035++albedoMax=0.95++': {'parameters': {'leafExchangeCoeff': 0.035, 'albedoMax': 0.95}}} """ return {'++'+'++'.join('{}={}'.format(k, v) for k, v in d.items())+'++': {'parameters': d} for d in product_dict(**list_config)}
[docs]def total_product(dec_conf=None, param_conf=None): """ Create an Ensemble compatible configuration dictionary which contains the Cartesian product of the options given. For example:: param_options = { 'leafExchangeCoeff': np.arange(0.025, 0.0354, 0.01), } decisions_options = { 'veg_traits': ['Raupach_BLM1994', 'CM_QJRMS1988'], } ps.ensemble.total_product(decisions_options, param_options) will produce 4 runs in a configuration that looks like:: {'++Raupach_BLM1994++leafExchangeCoeff=0.025++': {'decisions': {'veg_traits': 'Raupach_BLM1994'}, 'parameters': {'leafExchangeCoeff': 0.025}}, '++Raupach_BLM1994++leafExchangeCoeff=0.035++': {'decisions': {'veg_traits': 'Raupach_BLM1994'}, 'parameters': {'leafExchangeCoeff': 0.035}}, '++CM_QJRMS1988++leafExchangeCoeff=0.025++': {'decisions': {'veg_traits': 'CM_QJRMS1988'}, 'parameters': {'leafExchangeCoeff': 0.025}}, '++CM_QJRMS1988++leafExchangeCoeff=0.035++': {'decisions': {'veg_traits': 'CM_QJRMS1988'}, 'parameters': {'leafExchangeCoeff': 0.035}}} """ if not dec_conf: dec_conf = {} if not param_conf: param_conf = {} full_conf = deepcopy(dec_conf) full_conf.update(param_conf) prod_dict = product_dict(**full_conf) config = {} for d in prod_dict: name = '++' + '++'.join( '{}={}'.format(k, v) if k in param_conf else v for k, v in d.items()) + '++' config[name] = {'decisions': {}, 'parameters': {}} for k, v in d.items(): if k in dec_conf: config[name]['decisions'][k] = v elif k in param_conf: config[name]['parameters'][k] = v return config