Source code for runner.utils.utils

"""
Utility tools for runners
"""
import os
import ase.db as db


class Cd:
    """Context manager for changing the current working directory

    :meta private:"""

    def __init__(self, new_path, mkdir=True):
        self.new_path = os.path.expanduser(new_path)
        self.saved_path = None

        if not os.path.exists(new_path) and mkdir:
            os.mkdir(new_path)

    def __enter__(self):
        self.saved_path = os.getcwd()
        os.chdir(self.new_path)

    def __exit__(self, etype, value, traceback):
        os.chdir(self.saved_path)


def json_keys2int(dict_):
    """Converts dict keys to int if all dict keys can be converted to int
    JSON only has string keys, its a compromise to save int keys, if all int

    :meta private:
    """
    if isinstance(dict_, dict):
        try:
            return {{int(k): v for k, v in dict_.items()}}
        except ValueError:
            pass
    return dict_


def get_db_connect(database):
    """Returns :class:`~ase.db` from database string

    :meta private:
    """
    if isinstance(database, str):
        database = db.connect(database)
    return database


[docs] def get_status(input_id, database): """Gets status of input_id in the database Args: input_id (int): input_id in the database database (str): the database name Returns: str: the status of the run """ input_id = int(input_id) fdb = get_db_connect(database) status = fdb.get(input_id).get("status", "No status") return status
[docs] def submit(input_id, database, runner_name): """Submits input_id in the database Args: input_id (int): input_id in the database database (str): the database name runner_name (str): name of the runner """ input_id = int(input_id) fdb = get_db_connect(database) fdb.update(input_id, status="submit", runner=runner_name)
[docs] def cancel(input_id, database): """Cancel run of input_id in the database Args: input_id (int): input_id in the database database (str): the database name """ input_id = int(input_id) fdb = get_db_connect(database) row = fdb.get(input_id) if "status" in row: fdb.update(input_id, status="cancel")
def _get_info(input_id, database): """returns parent list, used in get_graphical_status""" parents = [] tasks = [] name = None fdb = get_db_connect(database) row = fdb.get(input_id) formula = row.formula status = row.get("status", "No status") if "runner" in row.data: parents = row.data["runner"].get("parents", []) tasks = row.data["runner"]["tasks"] name = row.data["runner"]["name"] return formula, name, parents, status, tasks
[docs] def get_graphical_status( filename, input_ids, database, add_tasks=False, _get_info=_get_info ): """Returns dot graph of the status of all parents of input_ids Args: filename (str): name of file to store the graph (pdf, png or svg) input_ids (int or list): input_id(s) in the database database (str): the database name add_tasks (bool): adds tasks to the graph """ try: from graphviz import Digraph except ModuleNotFoundError: raise ModuleNotFoundError( "get_graphical_status needs graphviz, " "run: pip install graphviz" ) def add_task_graph(name, tasks, dot): """adds task graph Args: name (str): name of runnerdata tasks (list): tasks list dot (dot): dot """ param_task = [] node_str = "Tasks:" for i, task in enumerate(tasks): # add params if python params exist if task[0] == "python": if task[2]: # is not empty param param_task.append(i) node_str += f"|<{i}>{task[1]}" node_str = f"{{{node_str}}}" # making it vertical dot.node(f"{name}-tasks", node_str, shape="record") # adding param nodes for i in param_task: node_str = [f"{{{key}|{value}}}" for key, value in tasks[i][2].items()] node_str = "|".join(node_str) node_str = f"{{{node_str}}}" # making it vertical dot.node(f"{name}-tasks-{i}", node_str, shape="record") dot.edge(f"{name}-tasks-{i}", f"{name}-tasks:{i}") # add to the runnerdata dot.edge(f"{name}-tasks", name) def spider(id_, seen_ids, dot): if id_ in seen_ids: return seen_ids.append(id_) formula, name, parents, status, tasks = _get_info(id_, database) dot.node( str(id_), label=f"{id_}: {formula}", style="filled", fillcolor="lightblue" ) if name: # add runner data graph # get unique node name node_name = f"{id_}-runnerdata" color = status_colors[status] dot.node( node_name, label=f"{name}\nStatus: {status}", shape="box", style="filled", fillcolor=color, ) if add_tasks: add_task_graph(node_name, tasks, dot) # now connect it to the formula node dot.edge(node_name, str(id_)) # add parents for parent in parents: spider(parent, seen_ids, dot) # connect the parent dot.edge(str(parent), node_name) if isinstance(input_ids, int): input_ids = [input_ids] status_colors = { "running": "yellow", "failed": "red", "submit": "grey", "cancel": "grey", "done": "green", "No status": "white", } dot = Digraph(comment="The Runner workflow", strict=True) seen_ids = [] for input_id in input_ids: spider(input_id, seen_ids, dot) fileformat = filename.split(".")[-1] filename = ".".join(filename.split(".")[:-1]) dot.render(filename, format=fileformat)
[docs] def get_runner_list(database): """Returns a dictionary of runners on database Args: database (str): ASE database of atoms Returns: dict: dict of runner names as keys and their running status as bool value """ fdb = get_db_connect(database) runners_meta = fdb.metadata.get("runners", {}) runner_dict = {} for key, value in runners_meta.items(): runner_dict[key] = value.get("running", False) return runner_dict
[docs] def remove_runner(runner_name, database, force=False): """Removes runner from database, if not running Args: runner_name (str): name of the runner database (str): ASE database force (bool): forcefully remove runner, if running """ fdb = get_db_connect(database) meta = fdb.metadata if "runners" in meta: if runner_name in meta["runners"]: if not meta["runners"][runner_name].get("running", False) or force: meta["runners"].pop(runner_name) fdb.metadata = meta else: raise RuntimeError(f"{runner_name} is running") else: raise RuntimeError(f"{runner_name} does not exist") else: raise RuntimeError(f"no runners in {database}")
[docs] def stop_runner(runner_name, database): """Stops runner through database metadata Runner isn't stopped immediately, but before it enters sleep. Args: runner_name (str): name of the runner database (str): ASE database """ fdb = get_db_connect(database) meta = fdb.metadata if "runners" in meta: if runner_name in meta["runners"]: if meta["runners"][runner_name].get("running", False): meta["runners"][runner_name]["_explicit_stop"] = True fdb.metadata = meta else: raise RuntimeError(f"{runner_name} is not running") else: raise RuntimeError(f"{runner_name} does not exist") else: raise RuntimeError(f"no runners in {database}")