Source code for runner.relay

"""
Relay to form complex workflows
"""

import json
import time
from copy import deepcopy

import numpy as np
from ase import Atoms, db

from runner.runners import runner_types
from runner.utils import cancel, get_graphical_status, submit
from runner.utils.runnerdata import RunnerData


[docs] class Relay: """ Relay :class:`~ase.db` rows into a workflow Args: label (str): unique label of relay, for easy access in graph parents (list): list of parents as input to the relay. Parents can be :class:`~ase.Atoms` object, id in the database, or :class:`~Relay`. runnerdata (:class:`~runner.RunnerData`): the runner data to be attached to the row. runnername (str): name of the runner that handles the row run. .. note:: In the case of :class:`~ase.Atoms` object as parent, the object can be the only one in the list. """ def __init__(self, label, parents=None, runnerdata=None, runnername=None): # setting up defaults if parents is None: parents = [] if runnerdata is None: runnerdata = RunnerData() # if relay commited to the database # None implies not commited self._database = None # if database needs update when relay is edited self._updated = False self.parents = parents self.runnerdata = runnerdata self.runnername = runnername self.label = label # assigning arbitrary str id to reference before commiting letters = np.array(list(chr(ord("a") + i) for i in range(26))) used_ids = set() used_labels = set() for parent in self.parents: if isinstance(parent, Relay): used_ids |= set(parent._spider().keys()) used_labels |= {value.label for value in parent._spider().values()} used_labels -= set([""]) if label in used_labels: raise RuntimeError(f"{label} already taken") while True: self.id_ = "".join(np.random.choice(letters, 5)) if self.id_ not in used_ids: break def __repr__(self): label_str = f", label='{self.label}'" if self.label != "" else "" database_str = ( f", database='{self._database}'" if self._database is not None else "" ) repr_str = ( f"{self.__class__.__name__}(id={self.id_}" f", parents=[{', '.join([str(x) for x in self.parents])}]" f"{database_str}" f", runnerdata='{self._runnerdata.name}'" f", runnername='{self._runnername}'" f"{label_str})" ) return repr_str def __str__(self): label_str = f", label='{self.label}'" if self.label != "" else "" return f"{self.__class__.__name__}(id={self.id_}{label_str})"
[docs] def get_parent_relay(self, item): """ Returns a Relay instance of one of the parents associated with item, in the parent relay Args: item (str or int): id or label associated with parent relay """ spider = self._spider() ret_item = spider.get(item, None) if ret_item is None: ret_item = { (value.label if value.label != "" else value.id_): value for key, value in spider.items() }.get(item, None) return ret_item
[docs] def list_parent_relay(self): """ Returns a list of relay instances present in the relay. """ spider = self._spider() list_relay = list(spider.keys()) for value in spider.values(): if value.label != "": list_relay.append(value.label) return list_relay
[docs] def commit(self, database=None): """ Commits the relay to the database, i.e. adds data to the database. Args: database (str): ASE database to commit to. If None (default) then stored database from previous commit is used. Returns: int: the id of the row commited to in the database. """ if database is None: if self._database is None: raise RuntimeError( f"{self.__str__()} needs a database for commit. " f"No previous database commits." ) database = self._database elif self._database is not None: if self._database != database: raise RuntimeError( f"{self.__str__()} already commited with {self._database}, " f"cannot commit with {database}" ) # recursively commit parents for parent in self.parents: if isinstance(parent, Relay): parent.commit(database=database) if self._updated: return self.id_ if self._database is not None: # if commited, then check status with db.connect(database) as fdb: status = fdb.get(self.id_).get("status", "") if status in ["submit", "running", "cancel"]: raise RuntimeError( f"cannot commit {self.__str__()}. It is either submitted, " f"running, or being cancelled." ) with db.connect(database) as fdb: parents = self.parents parent_id = [ parent.id_ if isinstance(parent, Relay) else parent for parent in parents ] if self._database is None: if len(parents) != 0 and isinstance(parents[0], Atoms): self.id_ = fdb.write(parents[0], label=self.label) self.runnerdata.parents = parent_id[1:] self._parents.pop(0) else: self.id_ = fdb.write(Atoms(), label=self.label) self.runnerdata.parents = parent_id # now add to self self._database = database elif len(parents) != 0 and isinstance(parents[0], Atoms): fdb.update(self.id_, parents[0], label=self.label) self.runnerdata.parents = parent_id[1:] self._parents.pop(0) else: # update label fdb.update(self.id_, label=self.label) self.runnerdata.parents = parent_id self.runnerdata.to_db(self.database, self.id_) self._updated = True return self.id_
[docs] def needs_commit(self): """ Checks all parents to confirm if relay needs a commit Returns: bool: if relay needs a commit .. note:: set_property methods should be used to update the relay. This marks the relay needs commit. If the set_property methods aren't used, then relay will not commit the changes. """ bool_list = [not self._updated] dict_ = self._spider() bool_list += [not parent._updated for parent in dict_.values()] return np.any(bool_list)
[docs] def start(self, force=False, force_all=False): """ Submits the relay for run. Rows with submitted parents are also submitted. NB: rows with status 'submit' or 'running' are not resubmitted. Args: force (bool): resubmit row with 'done' status too. force_all (bool): resubmit all parent rows with 'done' status too. Returns: bool: if a row is submitted """ if self.needs_commit(): raise RuntimeError("Commit the relay.") # recursively submitting parents parent_submitted = False for parent in self.parents: if isinstance(parent, Relay): status = parent.start(force=force_all, force_all=force_all) parent_submitted = parent_submitted or status with db.connect(self.database) as fdb: status = fdb.get(self.id_).get("status", "") if status in ["submit", "running"]: return True if status == "cancel": return False if (status == "done" and (force or parent_submitted)) or status in [ "", "failed", ]: submit(self.id_, self.database, self.runnername) return True return False
[docs] def cancel(self, cancel_all=False): """ Cancel row job Args: all (bool): Cancel all runs in the parent relay too. """ if self._database is None: raise RuntimeError("Relay not commited") with db.connect(self._database) as fdb: status = fdb.get(self.id_).get("status", "") if status in ["submit", "running"]: cancel(self.id_, self._database) if cancel_all: spider = self._spider() for value in spider.values(): status = fdb.get(value.id_).get("status", "") if status in ["submit", "running"]: value.cancel(cancel_all=cancel_all)
[docs] def get_status(self): """ Returns status of the present row Returns: str: status of present row """ if self._database is None: return "Relay not commited" with db.connect(self._database) as fdb: status = fdb.get(self.id_).get("status", "No status") return status
status = property(get_status, doc=("Returns status of present relay row")) @property def parents(self): """ parents associated with the relay """ return self._parents @parents.setter def parents(self, parents): if not isinstance(parents, list): parents = [parents] if len(parents) != 0 and not isinstance(parents[0], Atoms): bool_ = [isinstance(parent, (int, Relay)) for parent in parents] assert np.all(bool_), "parent should be a relay or an int index" self._parents = parents self._updated = False @property def runnerdata(self): """ runnerdata associated with the relay """ return self._runnerdata @runnerdata.setter def runnerdata(self, runnerdata): assert isinstance(runnerdata, RunnerData) self._runnerdata = deepcopy(runnerdata) self._updated = False @property def runnername(self): """ runnername associated with the relay """ return self._runnername @runnername.setter def runnername(self, runnername): if runnername is not None: assert isinstance(runnername, str) is_defined = runnername.split(":")[0] in runner_types assert is_defined, f"{runnername} type not in {runner_types}" self._runnername = deepcopy(runnername) self._updated = False @property def database(self): """ database associated with the relay """ if self._database is None: raise RuntimeError("Rely not commited, no database exists.") return self._database @database.setter def database(self, database): raise RuntimeError("database should not be changed.") @property def label(self): """ runnername associated with the relay """ return self._label @label.setter def label(self, label): assert isinstance(label, str) for i in [int, float]: try: _ = i(label) raise RuntimeError( f"Label {label} is put in as string" f" but can be interpreted as {i.__name__}!" f" Not accepted by ASE database" ) except ValueError: pass self._label = label self._updated = False
[docs] def get_row(self, cycle_time=10, wait=True): """ Get atoms row attached with the relay. Waits for the run if not finished. Args: cycle_time (float): time (s) to cycle the query. wait (bool): wait for run to finish Returns: :class:`~ase.db.row`: atoms row of attached with the relay """ if self._database is None: raise RuntimeError("Relay not commited") fdb = db.connect(self.database) while True: row = fdb.get(self.id_) status = row.get("status", "done") if status == "done" or not wait: break if status == "failed": raise RuntimeError(f"Run {self.id_} failed") time.sleep(cycle_time) return row
row = property( get_row, doc=("Atoms row attached with the relay, awaits for the run to finish"), ) def _to_dict(self): """ Converts relay to dict. relay parents are saved as id. """ dict_ = {} dict_["id"] = self.id_ dict_["updated"] = self._updated dict_["database"] = self.database dict_["runnerdata"] = self.runnerdata.data dict_["runnername"] = self.runnername dict_["label"] = self.label dict_["parents"] = [] for parent in self.parents: if isinstance(parent, Atoms): pass # dict_['parents'].append(parent.todict()) elif isinstance(parent, int): dict_["parents"].append(parent) elif isinstance(parent, Relay): dict_["parents"].append(parent.id_) return dict_ @classmethod def _from_dict(cls, dict_, parent_dict=None): """ construct relay from relay dictionary Args: dict_ (dict): dictionary of relay, as output from Relay._to_dict() parent_dict (dict): a dictionary of ids as keys, relay instance as values. Used to construct parent graph of the relay. """ # constructing parent if parent_dict is None: parent_dict = {} parents = [] for parent in dict_["parents"]: if isinstance(parent, (int, str)): if parent in parent_dict: parents.append(parent_dict[parent]) else: parents.append(parent) elif isinstance(parent, dict): parents.append(Atoms.fromdict(parent)) else: raise RuntimeError(f"Unidentified parent: {parent}") relay = cls( dict_["label"], parents, RunnerData.from_data_dict(dict_["runnerdata"]), dict_["runnername"], ) relay.id_ = dict_["id"] relay._updated = dict_["updated"] relay._database = dict_["database"] return relay
[docs] def to_json(self, filename): """ Save the relay as a json file Args: filename (str): json filename """ dict_ = self._to_dict() spider = self._spider() for key, value in spider.items(): spider[key] = value._to_dict() dict_["parent_dict"] = spider with open(filename, "w") as fio: json.dump(dict_, fio)
[docs] @classmethod def from_json(cls, filename): """ Get relay from json file. Args: filename (str): json filename """ with open(filename) as fio: data = json.load(fio) return cls._parse_dict(data)
@classmethod def _parse_dict(cls, data): """ Parses raw dict data saved in json or database """ # preparing parent_dict parent_dict_ = data.pop("parent_dict", {}) # make relay objects parent_dict = {} for key, value in parent_dict_.items(): # json stores int as str try: key = int(key) except ValueError: pass parent_dict[key] = cls._from_dict(value) # make the parents in relay object as relay for key, value in parent_dict.items(): for i, parent in enumerate(value._parents): if parent in parent_dict: value._parents[i] = parent_dict[parent] relay = cls._from_dict(data, parent_dict) return relay @classmethod def _from_database(cls, index, fdb, parent_dict=None): """ Helper function for from_database. Returns data per row """ if parent_dict is None: parent_dict = {} row = fdb.get(index) data = {} data["id"] = index data["updated"] = True data["runnerdata"] = row.data.get("runner", {}) data["runnername"] = row.get("runner", None) data["label"] = row.get("label", "") data["parents"] = row.data.get("runner", {}).get("parents", []) for parent in data["parents"]: _, parent_dict = cls._from_database(parent, fdb, parent_dict) parent_dict[parent] = _ return data, parent_dict
[docs] @classmethod def from_database(cls, index, database): """ Get relay from database Args: index (int): id of row in database database (str): ASE database Returns: Relay object: relay associated with id """ with db.connect(database) as fdb: data, parent_dict = cls._from_database(index, fdb) data["database"] = database for value in parent_dict.values(): value["database"] = database data["parent_dict"] = parent_dict return cls._parse_dict(data)
def _spider(self, dict_=None, parent_call=None): """ crawls the relay graph to collate the parent data, for easy access of parent relay instances Args: dict_ (dict): present dictionary of added relays parent_call (list): list of relays in the present parent crawl, used to detect cyclic graph calls Returns: dict: a dictionary of ids as keys and relay instance as value """ if dict_ is None: dict_ = {} if parent_call is None: parent_call = set() parent_call.add(self.id_) for parent in self._parents: if isinstance(parent, Relay): if parent.id_ not in dict_: if parent.id_ in parent_call: raise RuntimeError("Cyclic connection detected. Abandon ship!") dict_[parent.id_] = parent dict_ = parent._spider(dict_, parent_call) parent_call.discard(self.id_) return dict_
[docs] def get_relay_graph(self, filename, add_tasks=False): """ Save the relay as a directional acyclic graph, with status Args: filename (str): png filename to save the graph add_tasks (bool): adds task information to the graph """ get_graphical_status( filename, [self.id_], self, add_tasks=add_tasks, _get_info=_get_info )
[docs] def replace_runnerdata(self, runnerdata): """ Replace `RunnerData` for the relay and all relays with same runner name as the given `RunnerData` Args: runnerdata (:class:`~runner.RunnerData`): RunnerData to supplant. """ runnername = runnerdata.name # replace self if self.runnerdata.name == runnername: self.runnerdata = runnerdata # replace parents parent_relays = self._spider() for key, parent_relay in parent_relays.items(): if parent_relay.runnerdata.name == runnername: parent_relay.runnerdata = runnerdata
def _get_info(input_id, relay): """returns parent list, used in get_graphical_status""" if input_id != relay.id_: if isinstance(input_id, Atoms): return (input_id.get_chemical_formula(), None, [], "No status", []) if input_id in relay._spider(): relay = relay._spider().get(input_id, None) else: parents = [] tasks = [] name = None if relay.database is None: status = "No status" formula = input_id else: with db.connect(relay.database) as fdb: row = fdb.get(input_id) formula = row.formula status = row.get("status", "No status") if "runner" in row.data: # no parents returned, since out of relay # parents = row.data['runner'].get('parents', []) tasks = row.data["runner"]["tasks"] name = row.data["runner"]["name"] return formula, name, parents, status, tasks parents = [ parent.id_ if isinstance(parent, Relay) else parent for parent in relay.parents ] tasks = relay.runnerdata.tasks name = relay.runnerdata.name formula = relay.__str__() if relay.database is None: status = "No status" else: with db.connect(relay.database) as fdb: row = fdb.get(relay.id_) status = row.get("status", "No status") if status == "done": formula = row.formula return formula, name, parents, status, tasks