from copy import deepcopy
from distributed import Client
import pandas as pd
import time
import xarray as xr
from .simulation import Simulation
from .utils import ChainDict, product_dict
[docs]class Ensemble(object):
'''
Ensembles represent multiple SUMMA configurations based
on changing the decisions file or parameter values.
At it's core, the Ensemble class is a thin wrapper around
multiple Simulation objects, whos execution is managed by
a dask ``Client``.
A standard workflow for running an ensemble is::
import pysumma as ps
param_options = {
'fixedThermalCond_snow': np.arange(0.35, 0.85, 0.05)
}
config = ps.ensemble.parameter_product(param_options)
# initialize and start runs
e = ps.Ensemble(summa_exe, file_manager,
config, num_workers=len(config))
e.start('local', arg_list=['export OMP_NUM_THREADS=1',
'export LD_LIBRARY_PATH=/opt/local/lib'])
e.monitor()
# check status
for i, (n, s) in enumerate(e.simulations.items()):
assert s._status == 'Success'
# open output
ds = e.merge_output()
Parameters
----------
executable (string):
Path to locally compiled SUMMA executable
or the name of the docker image to run
filemanager (string):
Path to the file manager for the desired
simulation setup. Can be specified as a
relative path
'''
executable: str = None
simulations: dict = {}
submissions: list = []
def __init__(self, executable: str, filemanager: str,
configuration: dict, num_workers: int=1):
self._status = 'Initialized'
self.executable = executable
self.filemanager = filemanager
self.configuration = configuration
self.num_workers = num_workers
self._client = Client(n_workers=num_workers+1, threads_per_worker=1)
self._generate_simulation_objects()
def _generate_simulation_objects(self):
"""Method for internally creating all of the Simulation objects"""
for name, config in self.configuration.items():
s = Simulation(self.executable, self.filemanager)
for k, v in config.get('decisions', {}).items():
s.decisions.set_option(k, v)
for k, v in config.get('file_manager', {}).items():
s.manager.set_option(k, v)
for k, v in config.get('force_files', {}).items():
s.manager.set_option(k, v)
for k, v in config.get('parameters', {}).items():
s.local_param_info.set_option(k, v)
self.simulations[name] = s
def _generate_coords(self):
"""
Method for internally creating the coordinates
for merging the output of all of the simulations
"""
decision_dims = ChainDict()
manager_dims = ChainDict()
parameter_dims = ChainDict()
for name, conf in self.configuration.items():
for k, v in conf.get('decisions', {}).items():
decision_dims[k] = v
for k, v in conf.get('file_manager', {}).items():
manager_dims[k] = v
for k, v in conf.get('parameters', {}).items():
parameter_dims[k] = v
return {'decisions': decision_dims,
'managers': manager_dims,
'parameters': parameter_dims}
def _merge_decision_output(self):
new_coords = self._generate_coords()['decisions']
decision_tuples = [tuple(n.split('++')[1:-1])
for n in self.configuration.keys()]
decision_names = ['++'.join(n) for n in decision_tuples]
new_idx = pd.MultiIndex.from_tuples(
decision_tuples, names=list(new_coords.keys()))
out_file_paths = [s._get_output() for s in self.simulations.values()]
out_file_paths = [fi for sublist in out_file_paths for fi in sublist]
full = xr.open_mfdataset(out_file_paths, concat_dim='run_number')
merged = full.assign_coords(run_number=decision_names)
merged['run_number'] = new_idx
merged = merged.unstack('run_number')
return merged
def _merge_parameter_output(self):
new_coords = self._generate_coords()['parameters']
decision_tuples = [tuple(n.split('++')[1:-1])
for n in self.configuration.keys()]
for i, t in enumerate(decision_tuples):
decision_tuples[i] = tuple((float(l.split('=')[-1]) for l in t))
decision_names = ['++'.join(tuple(n.split('++')[1:-1]))
for n in self.configuration.keys()]
for i, t in enumerate(decision_names):
decision_names[i] = '++'.join(l.split('=')[0] for l in t)
new_idx = pd.MultiIndex.from_tuples(
decision_tuples, names=list(new_coords.keys()))
out_file_paths = [s._get_output() for s in self.simulations.values()]
out_file_paths = [fi for sublist in out_file_paths for fi in sublist]
full = xr.open_mfdataset(out_file_paths, concat_dim='run_number')
merged = full.assign_coords(run_number=decision_names)
merged['run_number'] = new_idx
merged = merged.unstack('run_number')
return merged
[docs] def merge_output(self):
"""
Merge the output of the simulatino objects
into a single xarray dataset
"""
nc = self._generate_coords()
new_coords = (list(nc.get('decisions', {}))
+ list(nc.get('parameters', {})))
decision_tuples = [tuple(n.split('++')[1:-1])
for n in self.configuration.keys()]
for i, t in enumerate(decision_tuples):
decision_tuples[i] = tuple((float(l.split('=')[-1])
if '=' in l else l for l in t))
decision_names = ['++'.join(tuple(n.split('++')[1:-1]))
for n in self.configuration.keys()]
for i, t in enumerate(decision_names):
decision_names[i] = '++'.join(l.split('=')[0] for l in t)
new_idx = pd.MultiIndex.from_tuples(
decision_tuples, names=new_coords)
out_file_paths = [s._get_output() for s in self.simulations.values()]
out_file_paths = [fi for sublist in out_file_paths for fi in sublist]
full = xr.open_mfdataset(out_file_paths, concat_dim='run_number')
merged = full.assign_coords(run_number=decision_names)
merged['run_number'] = new_idx
merged = merged.unstack('run_number')
return merged
[docs] def start(self, run_option: str, arg_list: list=[], monitor: bool=False):
"""
Start running the ensemble. By default does not halt execution
of further python commands, and simply launches the processes
in the background. Progress can be halted by setting the
``monitor`` keyword.
Parameters
----------
run option (string):
The method to use for running SUMMA. Can be either
``local`` or ``docker``.
arg_list (List[string]):
A list of commands to be run before the simulations are
started.
monitor (bool):
Whether to halt execution until completion of all of
the simulations. Defaults to ``False``
"""
for n, s in self.simulations.items():
# Sleep calls are to ensure writeout happens
time.sleep(1.5)
self.submissions.append(self._client.submit(
_submit, s, n, run_option, arg_list))
time.sleep(1.5)
if monitor:
return self.monitor()
[docs] def monitor(self):
"""
Watch a running ensemble until it is done and
collect the run information in the list of
simulation objects. This will halt execution
of a started simulation through the
``Ensemble.start()`` method if ``monitor`` was
not set there.
"""
simulations = self._client.gather(self.submissions)
for s in simulations:
self.simulations[s.run_suffix] = s
def _submit(s: Simulation, name: str, run_option: str, arg_list):
s.execute(run_option, run_suffix=name,
monitor=True, preprocess_cmds=arg_list)
return s
[docs]def decision_product(list_config):
"""
Create an Ensemble compatible configuration
dictionary which contains the Cartesian product
of the options given. For example::
decisions_options = {
'veg_traits': ['Raupach_BLM1994', 'CM_QJRMS1988'],
'cIntercept': ['sparseCanopy', 'storageFunc'],
}
ps.ensemble.decision_product(decisions_options)
will produce 4 runs in a configuration that looks like::
{'++Raupach_BLM1994++sparseCanopy++':
{'decisions':
{'veg_traits': 'Raupach_BLM1994',
'cIntercept': 'sparseCanopy'}},
'++Raupach_BLM1994++storageFunc++':
{'decisions':
{'veg_traits': 'Raupach_BLM1994',
'cIntercept': 'storageFunc'}},
'++CM_QJRMS1988++sparseCanopy++':
{'decisions':
{'veg_traits': 'CM_QJRMS1988',
'cIntercept': 'sparseCanopy'}},
'++CM_QJRMS1988++storageFunc++':
{'decisions':
{'veg_traits': 'CM_QJRMS1988',
'cIntercept': 'storageFunc'}}}
"""
return {'++'+'++'.join(d.values())+'++': {'decisions': d}
for d in product_dict(**list_config)}
[docs]def parameter_product(list_config):
"""
Create an Ensemble compatible configuration
dictionary which contains the Cartesian product
of the options given. For example::
param_options = {
'leafExchangeCoeff': np.arange(0.025, 0.0354, 0.01),
'albedoMax': np.arange(0.9, 0.96, 0.05)
}
ps.ensemble.parameter_product(param_options)
will produce 4 runs in a configuration that looks like::
{'++leafExchangeCoeff=0.025++albedoMax=0.9++':
{'parameters':
{'leafExchangeCoeff': 0.025,
'albedoMax': 0.9}},
'++leafExchangeCoeff=0.025++albedoMax=0.95++':
{'parameters':
{'leafExchangeCoeff': 0.025,
'albedoMax': 0.95}},
'++leafExchangeCoeff=0.035++albedoMax=0.9++':
{'parameters':
{'leafExchangeCoeff': 0.035,
'albedoMax': 0.9}},
'++leafExchangeCoeff=0.035++albedoMax=0.95++':
{'parameters': {'leafExchangeCoeff': 0.035,
'albedoMax': 0.95}}}
"""
return {'++'+'++'.join('{}={}'.format(k, v) for k, v in d.items())+'++':
{'parameters': d} for d in product_dict(**list_config)}
[docs]def total_product(dec_conf=None, param_conf=None):
"""
Create an Ensemble compatible configuration
dictionary which contains the Cartesian product
of the options given. For example::
param_options = {
'leafExchangeCoeff': np.arange(0.025, 0.0354, 0.01),
}
decisions_options = {
'veg_traits': ['Raupach_BLM1994', 'CM_QJRMS1988'],
}
ps.ensemble.total_product(decisions_options, param_options)
will produce 4 runs in a configuration that looks like::
{'++Raupach_BLM1994++leafExchangeCoeff=0.025++':
{'decisions':
{'veg_traits': 'Raupach_BLM1994'},
'parameters':
{'leafExchangeCoeff': 0.025}},
'++Raupach_BLM1994++leafExchangeCoeff=0.035++':
{'decisions':
{'veg_traits': 'Raupach_BLM1994'},
'parameters':
{'leafExchangeCoeff': 0.035}},
'++CM_QJRMS1988++leafExchangeCoeff=0.025++':
{'decisions':
{'veg_traits': 'CM_QJRMS1988'},
'parameters':
{'leafExchangeCoeff': 0.025}},
'++CM_QJRMS1988++leafExchangeCoeff=0.035++':
{'decisions':
{'veg_traits': 'CM_QJRMS1988'},
'parameters':
{'leafExchangeCoeff': 0.035}}}
"""
if not dec_conf:
dec_conf = {}
if not param_conf:
param_conf = {}
full_conf = deepcopy(dec_conf)
full_conf.update(param_conf)
prod_dict = product_dict(**full_conf)
config = {}
for d in prod_dict:
name = '++' + '++'.join(
'{}={}'.format(k, v) if k in param_conf else v
for k, v in d.items()) + '++'
config[name] = {'decisions': {}, 'parameters': {}}
for k, v in d.items():
if k in dec_conf:
config[name]['decisions'][k] = v
elif k in param_conf:
config[name]['parameters'][k] = v
return config