Source code for malloovia.util

"""Utility functions to save and load Malloovia problem definitions"""

from typing import (
    Mapping,
    Dict,
    Sequence,
    Tuple,
    Union,
    Any,
    List,
    Set,
    Iterable,
)
from functools import lru_cache
import os.path
import gzip
import re
import urllib.request

# To use ruamel.yaml instead of pyyaml:
from ruamel.yaml import YAML  # type: ignore

yaml = YAML(typ="safe")
yaml.safe_load = yaml.load

from .model import (
    App,
    LimitingSet,
    InstanceClass,
    Workload,
    PerformanceSet,
    PerformanceValues,
    Problem,
)
from .solution_model import (
    SolutionI,
    SolutionII,
    SolvingStats,
    MallooviaStats,
    GlobalSolvingStats,
    AllocationInfo,
    ReservedAllocation,
    Status,
)

MallooviaObjectModel = Union[
    App,
    LimitingSet,
    InstanceClass,
    Workload,
    PerformanceSet,
    Problem,
    SolutionI,
    SolutionII,
]

def _sanitize(_id: str) -> str:
    """Sanitizes a string to use it as part of a YAML anchor.
    It allows only for alphanumeric characters, and all the others
    are replaced by underscore."""

    return re.sub("[^0-9a-zA-Z_]+", "_", _id)


def _anchor_from_id(obj: MallooviaObjectModel) -> str:
    """Given one Malloovia object, generate a valid YAML anchor by
    combining the internal ``obj.i``, sanitized by replacing every
    non-ascii letter or digit by ``_``, and appending an hexedecimal
    string derived from the internal Python ``id`` of the object.LimitingSet
    
    Args:
        obj: Malloovia object (NamedTuple)

    Returns:
        A string to be used as anchor
    """

    # Special handling of SolutionI and SolutionII objects
    # since these objects contain fields of type List, and
    # thus they are not hashable, as required by lru_cache
    if type(obj) in [SolutionI, SolutionII]:
        return "{}_{}".format(_sanitize(obj.id), hex(id(obj)))

    # For any other case, delegate to the cached version
    return __anchor_from_id_cached(obj)

@lru_cache(maxsize=None)
def __anchor_from_id_cached(obj: MallooviaObjectModel) -> str:
    if "id" in obj._fields:
        _id = _sanitize(obj.id)     # type: ignore
    else:
        # Some Malloovia object doesn't have an id field
        # Use an empty string in this case
        _id = ""
    return "{}_{}".format(_id, hex(id(obj)))

[docs]def read_problems_from_yaml(filename: str) -> Mapping[str, Problem]: """Reads the problem(s) definition from a YAML file. Args: filename: name of the YAML file to read, it has to have the extension ``.yaml`` or ``.yaml.gz`` (which is automatically decompressed on read). Returns: A dictionary whose keys are problem ids, and the values are :class:`Problem` objects. Raises: ValueError if the file has not the expected extension. """ _open = _get_open_function_from_extension(filename) with _open(filename, mode="rt", encoding="utf8") as stream: data = yaml.safe_load(stream) return problems_from_dict(data, filename)
[docs]def read_problems_from_github( dataset: str, _id: str = None, base_url: str = None ) -> Union[Problem, Mapping[str, Problem]]: """Reads a problem or set of problems from a GitHub repository. Args: dataset: the name of the yaml file which contains the set of problems, without extension. id: the id of the particular problem to load, if omitted all problems are read and a dictionary is returned, whose keys are problem ids and the values are the :class:`Problem` instances. base_url: the url to the folder where the file is stored. If None, it will read from https://raw.githubusercontent.com/asi-uniovi/malloovia/master/tests/test_data/problems/ Returns: A dictionary whose keys are problem ids, and the values are :class:`Problem` objects, or a single :class:`Problem` if the id is passed as argument. """ if base_url is None: base_url = ( "https://raw.githubusercontent.com/asi-uniovi/malloovia" "/units/tests/test_data/problems/" ) url = "{}/{}.yaml".format(base_url, dataset) with urllib.request.urlopen(url) as stream: data = yaml.safe_load(stream) problems = problems_from_dict(data, dataset) if _id is None: return problems return problems[_id]
def problems_from_dict( data: Mapping[str, Any], yaml_filename: str ) -> Mapping[str, Problem]: """Takes data from a dictionary with a particular structure, and stores it in several Problem instances. Args: data: a dictionary which is the result of reading a YAML file. The dictionary is expected to have a particular structure. It can be previously validated through a YAML schema to ensure so. Returns: A dictionary whose keys are problem ids, and the values are :class:`Problem` objects. """ problems, _ = _problems_and_ids_from_dict(data, yaml_filename) return problems def _problems_and_ids_from_dict( data: Mapping[str, Any], yaml_filename: str ) -> Tuple[Mapping[str, Problem], Dict[Any, Any]]: """Takes data from a dictionary with a particular structure, and stores it in several Problem instances. It also returns another dictionary that can be used to translate between YAML ids and the corresponding objects. Args: data: a dictionary which is the result of reading a YAML file. The dictionary is expected to have a particular structure. It can be previously validated through a YAML schema to ensure so. Returns: A tuple with two values: - A dictionary whose keys are problem ids, and the values are :class:`Problem` objects. - A dictionary whose keys are YAML ids, and the values are the corresponding malloovia object """ # Mapping to remember which dictionaries were already converted to objects # Keys are object ids of dictionaries, values are the corresponding malloovia objects ids_to_objects: Dict[int, Any] = {} def create_if_neccesary(_class, _dict): """Auxiliary function to instantiate a new object from a dict only if the same dict was not already instantiated""" # If already created, return the stored object if id(_dict) in ids_to_objects: return ids_to_objects[id(_dict)] # If _dict is not a dict, it is an already created object, return it if not isinstance(_dict, dict): return _dict # Else, create the object, store it and return it new = _class(**_dict) ids_to_objects[id(_dict)] = new return new def copy_id_to_name(_dict): """Helper function to set the name equal to id, if missing""" if isinstance(_dict, dict) and "name" not in _dict: _dict["name"] = _dict["id"] def create_instance_classes(_list): """Helper function which creates all required Instance_classes from a list of InstanceClasses, and the Limiting_sets referenced from those Instance_classes""" for ic_data in _list: copy_id_to_name(ic_data) limiting_sets = [] for lset_data in ic_data["limiting_sets"]: copy_id_to_name(lset_data) limiting_sets.append(create_if_neccesary(LimitingSet, lset_data)) ic_data["limiting_sets"] = tuple(limiting_sets) create_if_neccesary(InstanceClass, ic_data) def create_workloads(_list): """Helper function which creates all required Workloads from a list of workloads, and the Apps referenced from those workloads""" for w_data in _list: w_data["app"] = create_if_neccesary(App, w_data["app"]) if w_data.get("filename"): values = read_from_relative_csv( filename=w_data["filename"], relative_to=yaml_filename ) else: values = tuple(w_data["values"]) w_data.update(values=values) create_if_neccesary(Workload, w_data) def create_performances(_dict): """Helper function which creates a Performances object from a list of performance dictionaries whose keys are instance_classes and apps""" # Check if this set of performances was already converted to # a Performances object, and reuse it if id(_dict) in ids_to_objects: return ids_to_objects[id(_dict)] # Else, create a dictionary suited for Performances constructor _list = _dict["values"] perf_dict = {} for p_data in _list: # Get references to instance_class and app objects. Hence all # required instance types and apps were already created by now, # their ids should be present in ids_to_objects. # Otherwise it would be a internal error, and an exception # will be raised ic_object = ids_to_objects[id(p_data["instance_class"])] app_object = ids_to_objects[id(p_data["app"])] value = p_data["value"] if ic_object not in perf_dict: perf_dict[ic_object] = {} perf_dict[ic_object][app_object] = float(value) perf = PerformanceSet( id=_dict["id"], values=PerformanceValues(perf_dict), time_unit=_dict["time_unit"], ) ids_to_objects[id(_dict)] = perf return perf # The main program only instantiates problems, and the other objects # referenced from those problems problems = {} # First pass: traverse all problems to ensure that all ics and apps # referenced from the problems are converted to namedtuples for problem in data["Problems"]: create_instance_classes(problem["instance_classes"]) create_workloads(problem["workloads"]) # Now traverse again to create the performances and problems for problem in data["Problems"]: performances = create_performances(problem["performances"]) problem.update( workloads=tuple(ids_to_objects[id(w)] for w in problem["workloads"]), instance_classes=tuple( ids_to_objects[id(i)] for i in problem["instance_classes"] ), performances=performances, ) new_problem = Problem(**problem) problems[new_problem.id] = new_problem ids_to_objects[id(problem)] = new_problem return problems, ids_to_objects def read_solutions_from_yaml( filename: str ) -> Mapping[str, Union[SolutionI, SolutionII]]: """Reads the solutions(s) contained in a YAML file. Args: filename: name of the YAML file to read, it has to have the extension ``.yaml`` or ``.yaml.gz`` (which is automatically decompressed on read). Returns: A dictionary whose keys are solution ids, and the values are :class:`Solution` objects. Raises: ValueError if the file has not the expected extension. """ _open = _get_open_function_from_extension(filename) with _open(filename, mode="rt", encoding="utf8") as stream: data = yaml.safe_load(stream) return solutions_from_dict(data, filename) def solutions_from_dict( data: Mapping[str, Any], yaml_filename: str ) -> Mapping[str, Union[SolutionI, SolutionII]]: """Takes data from a dictionary with a particular structure, and stores it in several Solution instances. Args: data: a dictionary which is the result of reading a YAML file. The dictionary is expected to have a particular structure. It can be previously validated through a YAML schema to ensure so. Returns: A dictionary whose keys are solution ids, and the values are :class:`Solution` objects. """ # Mapping to remember which dictionaries were already converted to objects # Keys are object ids of dictionaries, values are the corresponding malloovia objects ids_to_objects: Dict[int, Any] = {} def _is_phase_i_solution(solution_dict): """Receives a solution as a dict generated by yaml_load() and returns true if is a phase I solution and false otherwise""" if not "previous_phase" in solution_dict: return True return False def _create_phase_i_solution(solution_dict): return SolutionI(**solution_dict) def _create_phase_ii_solution(solution_dict): solution_dict["previous_phase"] = ids_to_objects[ id(solution_dict["previous_phase"]) ] return SolutionII(**solution_dict) def _dict_list_to_id_list(dict_list): id_list = [] for item in dict_list: id_list.append(ids_to_objects[id(item)]) return id_list def _convert_allocation(solution_dict): alloc = solution_dict["allocation"] alloc["apps"] = tuple(_dict_list_to_id_list(alloc["apps"])) alloc["instance_classes"] = tuple(_dict_list_to_id_list(alloc["instance_classes"])) alloc["values"] = alloc.pop("vms_number") alloc["values"] = tuple( tuple(tuple(vms) for vms in app) for app in alloc["values"] ) if "units" not in alloc: alloc["units"] = "vms" if "workload_tuples" not in alloc: alloc["workload_tuples"] = tuple() else: alloc["workload_tuples"] = list(tuple(wl) for wl in alloc["workload_tuples"]) solution_dict["allocation"] = AllocationInfo(**alloc) def _convert_reserved_allocation(solution_dict): alloc = solution_dict["reserved_allocation"] alloc["instance_classes"] = tuple(_dict_list_to_id_list(alloc["instance_classes"])) alloc["vms_number"] = tuple(alloc["vms_number"]) solution_dict["reserved_allocation"] = ReservedAllocation(**alloc) def _status_to_enum(status: str) -> Status: status_enum = Status.__members__.get(status) if status_enum is None: raise ValueError("Invalid status '{}' in solving_stats".format(status)) return status_enum def _convert_malloovia_stats(data: Dict[str, Any]) -> MallooviaStats: status = data["status"] data["status"] = _status_to_enum(status) return MallooviaStats(**data) def _convert_solving_stats(solving_stats: Dict[str, Any]) -> SolvingStats: alg_stats = solving_stats.get("algorithm") if alg_stats and alg_stats.get("malloovia"): solving_stats["algorithm"] = _convert_malloovia_stats(alg_stats.get("malloovia")) return SolvingStats(**solving_stats) def _convert_solving_stats_phase_i(solution_dict): solving_stats = solution_dict.get("solving_stats") if solving_stats: solution_dict["solving_stats"] = _convert_solving_stats(solving_stats) def _convert_malloovia_stats_phase_ii(solution_dict): solving_stats = solution_dict.get("solving_stats") if solving_stats: result = [] for stats in solving_stats: result.append(_convert_solving_stats(stats)) solution_dict["solving_stats"] = result def _convert_global_solving_stats(solution_dict): g_solving_stats = solution_dict.get("global_solving_stats") if g_solving_stats: status = g_solving_stats["status"] g_solving_stats["status"] = _status_to_enum(status) solution_dict["global_solving_stats"] = GlobalSolvingStats(**g_solving_stats) def _create_solution(solution_dict): solution_dict["problem"] = ids_to_objects[id(solution_dict["problem"])] if "allocation" in solution_dict: _convert_allocation(solution_dict) if _is_phase_i_solution(solution_dict): _convert_solving_stats_phase_i(solution_dict) _convert_reserved_allocation(solution_dict) result = _create_phase_i_solution(solution_dict) else: _convert_malloovia_stats_phase_ii(solution_dict) _convert_global_solving_stats(solution_dict) result = _create_phase_ii_solution(solution_dict) ids_to_objects[id(solution_dict)] = result return result _, ids_to_objects = _problems_and_ids_from_dict(data, yaml_filename) solutions = {} # Create solutions for phase I. They have to be created before solutions for # phase II because the latter reference the former for solution_dict in data["Solutions"]: if _is_phase_i_solution(solution_dict): solution = _create_solution(solution_dict) solutions[solution.id] = solution # Create solutions for phase II for solution_dict in data["Solutions"]: if not _is_phase_i_solution(solution_dict): solution = _create_solution(solution_dict) solutions[solution.id] = solution return solutions
[docs]def problems_to_yaml( problems: Mapping[str, Problem] ) -> str: # pylint: disable=too-many-locals """Converts problems from the classes used by malloovia to a yaml string. Args: problems: it is a dictionary whose keys are the ids of the problems, and the values are instances of :class:`Problem`, which indirectly contains the full specification of the system, apps, workloads and performances, through references to other classes Returns: A string with a yaml representation of the problem and all the data associated with it. The YAML contains separate fields for "Apps", "Workloads", "Limiting_sets", "Instance_classes", "Performances" and "Problems", each one containing a list of apps, workloads, etc. respectively. These lists are dynamically built and contains the entities which are directly or indirectly referenced from the dict of problems received as input. The generated yaml contains internal anchors (automatically generated from the ids of the objects) and yaml references to those anchors, so that when the yaml is parsed back to python, the resulting dict contains internal references (instead of copies) to other dicts. """ def collect_instance_classes_and_limiting_sets( problem ): # pylint: disable=invalid-name """Populates and returns instance_classes and limiting_sets sets""" instance_classes = set() limiting_sets = set() for i_c in problem.instance_classes: instance_classes.add(i_c) limiting_sets.update(set(i_c.limiting_sets)) return instance_classes, limiting_sets def collect_workloads_and_apps(problem): """Populates and returns workloads and apps sets""" workloads = set() apps = set() for wld in problem.workloads: workloads.add(wld) apps.add(wld.app) return workloads, apps def collect_performances(problem): """Populates and returns performances set""" performances = set() performances.add(problem.performances) return performances def lsets_to_yaml(limiting_sets): """Returns an array of lines to add to the yaml array, representing the Limiting_sets part""" lines = [] lines.append("Limiting_sets:") for l_s in sorted(limiting_sets): lines.append(" - &{}".format(_anchor_from_id(l_s))) lines.extend(_namedtuple_to_yaml(l_s, level=2)) lines.append("") return lines def iclasses_to_yaml(instance_classes): """Returns an array of lines to add to the yaml array, representing the Instance_classes part""" lines = [] lines.append("Instance_classes:") for i_c in sorted(instance_classes): anchor = _anchor_from_id(i_c) aux = i_c._replace( limiting_sets="[{}]".format( ", ".join("*{}".format(_anchor_from_id(ls)) for ls in i_c.limiting_sets) ) ) lines.append(" - &{}".format(anchor)) lines.extend(_namedtuple_to_yaml(aux, level=2)) lines.append("") return lines def apps_to_yaml(apps): """Returns an array of lines to add to the yaml array, representing the Apps part""" lines = [] lines.append("Apps:") for app in sorted(apps): lines.append(" - &{}".format(_anchor_from_id(app))) lines.extend(_namedtuple_to_yaml(app, level=2)) lines.append("") return lines def wloads_to_yaml(workloads): """Returns an array of lines to add to the yaml array, representing the Workloads part""" lines = [] # It is necessary to remove "filename" if it is None, or "values" if not # But fields cannot be removed from namedtuples, so we convert it to dict lines.append("Workloads:") for w_l in sorted(workloads): anchor = _anchor_from_id(w_l) aux = w_l._asdict() if aux["filename"]: aux.pop("values") else: aux.pop("filename") aux.update(values=list(w_l.values)) aux.update(app="*{}".format(_anchor_from_id(w_l.app))) lines.append(" - &{}".format(anchor)) lines.extend(_dict_to_yaml(aux, level=2)) lines.append("") return lines def probs_to_yaml(problems): """Returns an array of lines to add to the yaml array, representing the Problems part""" lines = [] lines.append("Problems:") for prob in problems.values(): anchor = _anchor_from_id(prob) aux = prob._replace( instance_classes="[{}]".format( ", ".join("*{}".format(_anchor_from_id(ic)) for ic in prob.instance_classes) ), workloads="[{}]".format( ", ".join("*{}".format(_anchor_from_id(wl)) for wl in prob.workloads) ), performances="*{}".format(_anchor_from_id(prob.performances)), ) lines.append(" - &{}".format(anchor)) lines.extend(_namedtuple_to_yaml(aux, level=2)) lines.append("") return lines def perfs_to_yaml(performances): """Returns an array of lines to add to the yaml array, representing the Performances part""" lines = [] lines.append("Performances:") for perfset in sorted(performances): lines.append(" - &{}".format(_anchor_from_id(perfset))) lines.append(" id: {}".format(perfset.id)) lines.append(" time_unit: {}".format(perfset.time_unit)) lines.append(" values:") for iclass, app, perf in perfset.values: lines.append(" - instance_class: *{}".format(_anchor_from_id(iclass))) lines.append(" app: *{}".format(_anchor_from_id(app))) lines.append(" value: {}".format(perf)) return lines # "main" body of the function yam: List[str] = [] # List of lines of the resulting yaml apps: Set[App] = set() # set of App objects indirectly referenced from the problems # (via the workloads) workloads: Set[ Workload ] = set() # set of Workload objects directly referenced from the problems limiting_sets: Set[ LimitingSet ] = set() # set of Limiting_set objects indirectly referenced from the problems # (via instance classes) instance_classes: Set[ InstanceClass ] = set() # set of Instance_class objects directly referenced from the problems performances: Set[ PerformanceSet ] = set() # set of Performance objects directly referenced from the problem for prob in problems.values(): _wls, _apps = collect_workloads_and_apps(prob) _ics, _ls = collect_instance_classes_and_limiting_sets(prob) apps.update(_apps) workloads.update(_wls) limiting_sets.update(_ls) instance_classes.update(_ics) for prob in problems.values(): performances.update(collect_performances(prob)) yam.extend(lsets_to_yaml(limiting_sets)) yam.extend(iclasses_to_yaml(instance_classes)) yam.extend(apps_to_yaml(apps)) yam.extend(wloads_to_yaml(workloads)) yam.extend(perfs_to_yaml(performances)) yam.extend(probs_to_yaml(problems)) return "\n".join(yam)
def preprocess_yaml(input_yaml_filename: str) -> str: """Reads a YAML file and "expands" the ``Problems_from_file`` section. Args: input_yaml_filename: name of the yaml file to read Returns: A string containing the contents read from the file, but without the section ``Problems_from_file`` which was replaced by the contents of the file referenced in that section. This name is considered relative to the path of the main yaml file. """ _open = _get_open_function_from_extension(input_yaml_filename) output = [] with _open(input_yaml_filename, mode="rt", encoding="utf8") as istream: for line in istream: if line.startswith("Problems_from_file"): filename = line.split(":")[1].strip() line = read_file_relative_to( filename=filename, relative_to=input_yaml_filename ) output.append(line) return "".join(output) def read_file_relative_to(filename: str, relative_to: str, kind: str = "yaml") -> str: """Reads one file by its name, considered relative to other filename. Args: filename: the name of the file to read relative_to: the name of the file to which the first one is considered relative kind: expected extension of the filename Examples: * ``read_file_relative_to("foo/bar/whatever.txt", "other.txt")`` will read the file at ``"foo/bar/other.txt"`` * ``read_file_relative_to("foo/bar/whatever.txt", "../other.txt")`` will read the file at ``"foo/other.txt"`` Returns: The whole content of the file, as a string. Raises: FileNotFoundError: If the file is not found. """ path_to_input = os.path.abspath(relative_to) path_to_filename = os.path.join(os.path.dirname(path_to_input), filename) _open = _get_open_function_from_extension(filename, kind=kind) return _open(path_to_filename, mode="rt", encoding="utf8").read() def read_from_relative_csv(filename: str, relative_to: str) -> Tuple[float, ...]: """Reads and parses the content of one file, given its name considered relative to other filename. The file is first read by :func:`read_file_relative_to()` and the contents are assumed to be a sequence of floating numbers, one per line. Args: filename: the name of the file to read relative_to: the name of the file to which the first one is considered relative Returns: The sequence of read floating numbers, as a tuple. Raises: FileNotFoundError: If the file is not found. """ content = read_file_relative_to(filename, relative_to, kind="csv") return tuple(float(line) for line in content.split("\n") if line)
[docs]def solutions_to_yaml(solutions: Sequence[Union[SolutionI, SolutionII]]) -> str: """Converts a list of solutions to a YAML string. Args: solutions: list of solutions to convert, each one can be a :class:`SolutionI` or a :class:`SolutionII`. Returns: A string with a YAML representation of the solution and the associated problem. The YAML uses anchors and references to tie up the different parts. """ def solution_i_to_yaml(sol: SolutionI) -> List[str]: """Converts a SolutionI to a yaml string""" lines: List[str] = [] lines.extend( ( "- &{}".format(_anchor_from_id(sol)), " id: {}".format(sol.id), " problem: *{}".format(_anchor_from_id(sol.problem)), ) ) lines.append(" solving_stats:") lines.extend(solving_stats_to_yaml(sol.solving_stats, level=2)) lines.append(" reserved_allocation:") lines.extend(reserved_allocation_to_yaml(sol.reserved_allocation, level=2)) lines.append(" allocation:") lines.extend(allocation_to_yaml(sol.allocation, level=2)) return lines def solution_ii_to_yaml(sol: SolutionII) -> List[str]: """Converts a SolutionII to a yaml string""" lines: List[str] = [] lines.extend( ( "- &{}".format(_anchor_from_id(sol)), " id: {}".format(sol.id), " problem: *{}".format(_anchor_from_id(sol.problem)), " previous_phase: *{}".format(_anchor_from_id(sol.previous_phase)), ) ) lines.append(" global_solving_stats:") lines.extend(global_solving_stats_to_yaml(sol.global_solving_stats, level=2)) lines.append(" solving_stats:") for i, stats in enumerate(sol.solving_stats): lines.append( " - # {} -> {}".format(i, sol.allocation.workload_tuples[i]) ) lines.extend(solving_stats_to_yaml(stats, level=3)) lines.append(" allocation:") lines.extend(allocation_to_yaml(sol.allocation, level=2)) return lines def solving_stats_to_yaml(stats: SolvingStats, level: int) -> List[str]: """Converts a SolvingStats to a yaml string""" lines: List[str] = [] tab = " " * level lines.extend( ( "{}creation_time: {}".format(tab, stats.creation_time), "{}solving_time: {}".format(tab, stats.solving_time), "{}optimal_cost: {}".format(tab, _yamlize(stats.optimal_cost)), "{}algorithm:".format(tab), " {}malloovia:".format(tab), ) ) lines.extend(_namedtuple_to_yaml(stats.algorithm, level=level + 2)) return lines def global_solving_stats_to_yaml( stats: GlobalSolvingStats, level: int ) -> List[str]: """Converts a GlobalSolvingStats to a yaml string""" lines: List[str] = [] tab = " " * level lines.extend( ( "{}creation_time: {}".format(tab, stats.creation_time), "{}solving_time: {}".format(tab, stats.solving_time), "{}optimal_cost: {}".format(tab, stats.optimal_cost), "{}status: {}".format(tab, stats.status.name), ) ) return lines def reserved_allocation_to_yaml(rsv: ReservedAllocation, level: int) -> List[str]: """Converts a ReservedAllocation to a yaml string""" lines: List[str] = [] tab = " " * level if rsv is None: instance_classes: List[InstanceClass] = [] vms_number: List[float] = [] else: instance_classes = list(rsv.instance_classes) vms_number = list(rsv.vms_number) lines.extend( ( "{}instance_classes: [{}]".format( tab, list_of_references_to_yaml(instance_classes) ), "{}vms_number: [{}]".format(tab, ", ".join(str(v) for v in vms_number)), ) ) return lines def list_of_references_to_yaml(lst: Sequence[Any]) -> str: """Generates a comma separated list of yaml references using the id""" return ", ".join("*{}".format(_anchor_from_id(element)) for element in lst) def list_to_yaml(lst: Iterable[Any]) -> str: """Generates a comma separated list of python objects""" return ", ".join(str(element) for element in lst) def allocation_to_yaml(alloc: AllocationInfo, level: int) -> List[str]: """Converts an AllocationInfo to a yaml string""" lines: List[str] = [] tab = " " * level if alloc is None: instance_classes: List[InstanceClass] = [] workload_tuples: List[Tuple[float, ...]] = [] apps: List[App] = [] repeats: List[int] = [] values: Tuple[Tuple[Tuple[float, ...], ...], ...] = tuple() else: instance_classes = list(alloc.instance_classes) workload_tuples = list(alloc.workload_tuples) apps = list(alloc.apps) repeats = list(alloc.repeats) values = tuple(alloc.values) lines.extend( ( "{}instance_classes: [{}]".format( tab, list_of_references_to_yaml(instance_classes) ), "{}apps: [{}]".format(tab, list_of_references_to_yaml(apps)), "{}workload_tuples: [{}]".format( tab, list_to_yaml(list(wl) for wl in workload_tuples) ), "{}repeats: [{}]".format(tab, list_to_yaml(repeats)), ) ) if values: lines.append("{}vms_number:".format(tab)) for i, t_alloc in enumerate(values): lines.append(" {}- # {} -> {}".format(tab, i, workload_tuples[i])) for app_alloc in t_alloc: lines.append(" {}- {}".format(tab, list(app_alloc))) else: lines.append("{}vms_number: []".format(tab)) return lines # First collect all problems referenced in the solutions problems = set() for solution in solutions: problems.add(solution.problem) # Convert those problems to yaml lines = [] lines.append(problems_to_yaml({p.id: p for p in problems})) # Now convert each solution lines.append("Solutions:") for solution in solutions: if isinstance(solution, SolutionI): lines.extend(solution_i_to_yaml(solution)) elif isinstance(solution, SolutionII): lines.extend(solution_ii_to_yaml(solution)) else: raise ValueError( "Solution({}) is of unknown type {}".format(solution.id, type(solution)) ) return "\n".join(lines)
def _namedtuple_to_yaml(data, level=2): """Converts to yaml any namedtuple, via dict. Args: data: the namedtuple to convert level: the indentation level Returns: array of lines to add to yaml array """ return _dict_to_yaml(data._asdict(), level) def _dict_to_yaml(data, level): """Converts to yaml any dictionary, by iterating through its keys and values. Args: data: the dict to convert level: the indentation level Returns: array of lines to add to yaml array """ lines = [] for key, value in data.items(): value = _yamlize(value) lines.append("{}{}: {}".format(" " * level, key, value)) return lines def _yamlize(value: Any) -> Any: """Converts a python value to a valid YAML representation. Args: value: the python value to convert Returns: Either a string containing ``"null"``, ``"true"`` or ``"false"`` for the special cases ``None``, ``True`` and ``False``, resp., or ``value.name`` if present (for ``Enum``\\ s), or the same value received as input for other cases.""" if value is None: return "null" if value is True: return "true" if value is False: return "false" if hasattr(value, "name"): # For Enums return value.name # pylint:disable=no-member return value def get_schema() -> Dict[str, Any]: """Returns Malloovia's json schema which can be used to validate the problem and solution files""" path_to_schema = os.path.join(os.path.dirname(__file__), "malloovia.schema.yaml") with open(path_to_schema) as schema_file: schema = yaml.safe_load(schema_file) return schema
[docs]def allocation_info_as_dicts( alloc: AllocationInfo, use_ids=True, include_timeslot=True, include_workloads=True, include_repeats=True, ) -> Iterable[Mapping[Any, Any]]: """Converts the :class:`AllocationInfo` structure to a sequence of dicts, which are more convenient for analysis with pandas. Each element of the returned sequence is a python dictionary whose keys and values are: * "instance_class" -> either the id or the reference to an instance class * "app" -> either the id or the reference to an app * "timeslot" -> the integer which represents the timeslot for this particular allocation * "workload" -> a tuple with the workload to be fulfilled by this particular allocation * "repeats" -> the number of times this workload appears in phaseI (always 1 for phase II) * AllocationInfo.units -> value for this particular allocation. If the units is "vms", the value represents the number of VMs of the kind "instance_class" to be activated during timeslot "timeslot" (in phase II), or when the workload is "workload" (in phase I), for the application "app". Some of these fields are useful only for Phase I, while others are for Phase II. Some boolean arguments allow the selection of these specific fields. Args: alloc: The :class:`AllocationInfo` to convert use_ids: True to use the ids of instance classes and apps, instead of the objects which store those entities. False to use references to instance classes and apps instead of the ids. The ids version produces a more compact representation when used with pandas. include_timeslot: False if you don't want the "timeslot" field (it conveys no meaning for Phase I allocations) include_workloads: False if you don't want the "workload" field include_repeats: False if you don't want the "repeats" field (it is always 1 for Phase II allocations) Returns: A generator for sequence of dictionaries with the required fields. You can iterate over the generator, or pass it directly to pandas DataFrame constructor. Example: >>> import pandas as pd >>> df = (pd.DataFrame( allocation_info_as_dicts( alloc = phase_i_solution.allocation, use_ids=True, include_repeats=True, include_workloads=True, include_timeslot=False)) .set_index(["repeats", "workload", "app", "instance_class"]) .unstack() ) >>> df vms instance_class m3large m3large_r repeats workload app 1 (30, 1194) app0 0.0 3.0 app1 0.0 3.0 (32, 1200) app0 1.0 3.0 app1 0.0 3.0 2 (30, 1003) app0 0.0 3.0 app1 0.0 3.0 >>> df2 = (pd.DataFrame( allocation_info_as_dicts( alloc = phase_ii_solution.allocation, use_ids=True, include_repeats=False, include_workloads=True, include_timeslot=True)) .set_index(["timeslot", "workload", "app", "instance_class"]) .unstack() ) >>> df vms instance_class m3large m3large_r timeslot workload app 0 (30, 1003) app0 0.0 3.0 app1 0.0 3.0 1 (32, 1200) app0 1.0 3.0 app1 0.0 3.0 2 (30, 1194) app0 0.0 3.0 app1 0.0 3.0 3 (30, 1003) app0 0.0 3.0 app1 0.0 3.0 """ def _repr(element): if use_ids: return element.id return element for slot, t_alloc in enumerate(alloc.values): for app, a_alloc in enumerate(t_alloc): for i, ic_alloc in enumerate(a_alloc): result = {} result["instance_class"] = _repr(alloc.instance_classes[i]) result["app"] = _repr(alloc.apps[app]) result[alloc.units] = ic_alloc if include_workloads: result["workload"] = alloc.workload_tuples[slot] if include_timeslot: result["timeslot"] = slot if include_repeats: result["repeats"] = alloc.repeats[slot] yield result
def _get_open_function_from_extension(filename, kind="yaml"): """Returns the function open is the extension is ``kind`` or 'gzip.open' if it is ``kind``.gz'; otherwise, raises ValueError """ if filename.endswith(".{}.gz".format(kind)): return gzip.open elif filename.endswith(".{}".format(kind)): return open else: raise ValueError("Invalid filename. Should be .{} or .{}.gz".format(kind, kind)) __all__ = [ "read_problems_from_yaml", "read_problems_from_github", "problems_to_yaml", "solutions_to_yaml", "get_schema", "allocation_info_as_dicts", ]