Source code for pyrate.loader

"""This module provides the Loader class which loads a pyrate session from a
configuation file. This session can then run tasks on data repositories and
algorithms."""

import logging
import imp
import pkgutil
import inspect
import contextlib
from configparser import ConfigParser
from pyrate import get_resource_filename

[docs]def load_module(name, paths): """Load module name using the given search paths.""" handle, pathname, description = imp.find_module(name, paths) if handle != None: return imp.load_module(name, handle, pathname, description) else: return None
[docs]def load_all_modules(paths): """Load all modules on the given paths.""" modules = {} for _, name, _ in pkgutil.walk_packages(paths): try: modules[name] = load_module(name, paths) except ImportError as error: logging.warn("Error importing module "+ name +": {}".format(error)) return modules
[docs]class Loader: """The Loader joins together data repositories and algorithms, and executes operations on them.""" def __init__(self, config=None): # load from file if path provided loaded_conf = ConfigParser() if config is None: raise RuntimeError("No config file defined.") else: if isinstance(config, str): loaded_conf.read(config) config = loaded_conf if 'globals' in config: repopaths = str(config.get('globals', 'repos')) repopaths = repopaths.split(',') repopaths.extend([get_resource_filename('repositories')]) else: repopaths = [get_resource_filename('repositories')] logging.debug("Paths to repositories: {}".format(repopaths)) # load repo drivers from repopaths repo_drivers = load_all_modules(repopaths) # get repo configurations from config if 'globals' in config: repo_config = set(config.sections()) - set(['globals']) else: repo_config = set(config.sections()) # check which repos we have drivers for repo_conf_dict = {} for repo_name in repo_config: conf = config[repo_name] if not 'type' in conf: logging.warning("Repository "+ repo_name +" does not specify a type in the config file.") elif not conf['type'] in repo_drivers: logging.warning("Driver of type "+ conf['type'] +" for repository "+ repo_name +" not found.") else: repo_conf_dict[repo_name] = conf if 'globals' in config: algopaths = str(config.get('globals', 'algos')) algopaths = algopaths.split(',') algopaths.extend([get_resource_filename('algorithms')]) else: algopaths = [get_resource_filename('algorithms')] # load algorithms from algopaths logging.debug("Paths to algorithms: {}".format(algopaths)) algorithms = load_all_modules(algopaths) self.repo_drivers = repo_drivers self.repo_config = repo_conf_dict self.algorithms = algorithms
[docs] def get_data_repositories(self): """Returns a set of the names of available data repositories""" return self.repo_config.keys()
[docs] def get_repository_commands(self, repo_name): """Returns a list of available commands for the specified repository""" try: return self.repo_drivers[self.repo_config[repo_name]['type']].EXPORT_COMMANDS except AttributeError: return []
[docs] def get_algorithm_commands(self, algname): """Returns a list of available commands for the specified algorithm""" try: return self.algorithms[algname].EXPORT_COMMANDS except AttributeError: return []
[docs] def get_algorithms(self): """Returns a set of the names of available algorithms""" return self.algorithms.keys()
[docs] def execute_repository_command(self, reponame, command, **args): """Execute the specified command on the specified repository.""" if not command in [c[0] for c in self.get_repository_commands(reponame)]: raise ValueError("Invalid command {} for repository {}".format(command, reponame)) # load repostory class repo = self.get_data_repository(reponame) fns = inspect.getmembers(repo, lambda x: inspect.ismethod(x) and x.__name__ == command) if len(fns) != 1: raise RuntimeError("Unable to find method {} in repository {}: {}".format(command, reponame, repo)) with repo: # call command fns[0][1](**args)
[docs] def execute_algorithm_command(self, algname, command, **args): """Execute the specified command on the specified algorithm""" alg = self.get_algorithm(algname) fns = inspect.getmembers(alg, lambda x: inspect.isfunction(x) and x.__name__ == command) if len(fns) != 1: raise RuntimeError("Unable to find function {} in algorithm {}: {}".format(command, algname, alg)) # get inputs and outputs inputs = {} outputs = {} for inp in alg.INPUTS: inputs[inp] = self.get_data_repository(inp, readonly=True) for out in alg.OUTPUTS: outputs[out] = self.get_data_repository(out) with contextlib.ExitStack() as stack: # prepare repositories for i in inputs: stack.enter_context(inputs[i]) for i in outputs: stack.enter_context(outputs[i]) fns[0][1](inputs, outputs, **args)
[docs] def get_data_repository(self, name, readonly=False): """Returns a loaded instance of the specified data repository.""" return self.repo_drivers[self.repo_config[name]['type']].load(self.repo_config[name], readonly=readonly)
[docs] def get_algorithm(self, name): """Returns the algorithm module specified.""" return self.algorithms[name]