Source code for runner.utils.runnerdata

""" Utility to handle runner data"""
import json
import os
from base64 import b64encode
from copy import copy

from runner.utils.utils import json_keys2int, get_db_connect


default_files = ["run.sh", "batch.slrm", "atoms.pkl", "run.py", "status.txt", "job.id"]


[docs] class RunnerData: """Class to handle runner data using helper function Example: >>> # typical runner data >>> data = {'scheduler_options': {'-N': 1, ... '-n': 16, ... '-t': '0:5:0:0', ... '--mem-per-cpu': 2000}, ... 'name': '<calculation name>', ... 'parents': [], ... 'tasks': [['python', '<filename>'], # simple python run ... ['python', '<filename>', <params>], ... ['python', '<filename>', <params>, '<pycommand>'], ... ['shell', '<command>']] # any shell command ... 'files': {'<filename1>': '<contents, string or bytes>', ... '<filename2>': '<contents, string or bytes>' ... } ... 'keep_run': False ... 'log': ''} >>> runnerdata = RunnerData.from_data_dict(data) where: * <params>: can be a dictionary of parameters, or an empty {} for no parameters * <pycommand>: is a string of python command, example, 'python3' or 'mpirun -n 4 python3' default 'python' * keep_run: is a bool to keep run after status done, otherwise the run folder is deleted. However, the :class:`RunnerData` can be used to generate the data stepwise, using the functions provided as:: >>> runnerdata = RunnerData('<calculation name>') >>> runnerdata.add_file('<filename>') >>> runnerdata.append_tasks('python', ... '<filename>', ... params_dict, ... '<pycommand>') >>> runnerdata.add_scheduler_options({'-N': 1, ... '-n': 16, ... '-t': '0:5:0:0', ... '--mem-per-cpu': 2000}) >>> # and so on Args: name (str): name of RunnerData Attributes: data: dictionary of the runner data """ def __init__(self, name="untitled_run"): self.data = { "scheduler_options": {}, "name": name, "tasks": [], "files": {}, "parents": [], "keep_run": False, } def __repr__(self): return repr(self.data) @property def name(self): """Name of the RunnerData""" return self.data["name"] @name.setter def name(self, name): _test_name(name) self.data["name"] = name @property def tasks(self): """tasks in RunnerData""" return self.data["tasks"] @tasks.setter def tasks(self, tasks): _test_tasks(tasks, self.files, _skip_empty_task_test=True) self.data["tasks"] = tasks
[docs] def append_tasks(self, task_type, *args): """Appends task to tasks Example: >>> rdat = runner.RunnerData() >>> # shell task_type followed by shell command >>> rdat.append_tasks('shell', 'module load anaconda3') >>> # python task_type followed by python file >>> rdat.append_tasks('python', 'get_energy.py') >>> # python task_type with parameters >>> rdat.append_tasks('python', 'get_energy.py', {'param': 0}) >>> # python task_type with python execute command >>> # NB: the 3rd argument has to be parameters, if no parameters >>> # empty dict has to be given. >>> # default: python <python file> >>> # to execute: mpirun -n 4 python3 get_energy.py >>> rdat.append_tasks('python', 'get_energy.py', {}, ... 'mpirun -n 4 python3') Args: task_type (str): task type, 'shell' or 'python' *args: args for task type, see example for shell task_type, args is shell command (str) for python task_type, args is python filename (str), parameters (dict), and python execute command (str) """ if task_type == "shell": task = ["shell", *args] elif task_type == "python": task = ["python", *args] if len(task) == 2: task.append({}) if len(task) == 3: task.append("python") else: raise RuntimeError("task type shell or python supported") _test_tasks([task], self.files) self.data["tasks"].append(task)
@property def files(self): """Files in RunnerData""" return self.data["files"] @files.setter def files(self, files): _test_files(files) self.data["files"] = files
[docs] def add_file(self, filename, add_as=None): """Add file to runner data Args: filename (str): name of the file add_as (str): name the file should be added as""" if add_as is None: add_as = filename try: with open(filename, "r") as fio: basename = os.path.basename(add_as) self.data["files"][basename] = fio.read() except UnicodeDecodeError: # file is binary with open(filename, "rb") as fio: basename = os.path.basename(add_as) self.data["files"][basename] = ( "data:application/octet-stream;base64," + b64encode(fio.read()).decode() )
[docs] def add_files(self, filenames, add_as=None): """Adds files to runner data Args: filenames (list): list of filenames to be added add_as (list, optional): list of name the file should be added as in the runner data""" if not isinstance(filenames, (tuple, list)): filenames = [filenames] if add_as is not None: if not isinstance(add_as, (tuple, list)): add_as = [add_as] if len(add_as) != len(filenames): raise RuntimeError( "Length of filenames and add_as should" " be the same" ) else: add_as = filenames for name_, filename in zip(add_as, filenames): self.add_file(filename, name_)
@property def scheduler_options(self): """Scheduler_options in RunnerData""" return self.data["scheduler_options"] @scheduler_options.setter def scheduler_options(self, scheduler_options): _test_scheduler_options(scheduler_options) self.data["scheduler_options"] = scheduler_options
[docs] def add_scheduler_options(self, scheduler_options): """Adds scheduler_options to runner data Args: scheduler_options (dict): dictionary of options""" _test_scheduler_options(scheduler_options) self.data["scheduler_options"].update(scheduler_options)
@property def parents(self): """Parent simulations of the row""" return self.data["parents"] @parents.setter def parents(self, parents): """set parents to runner data""" _test_parents(parents) self.data["parents"] = parents @property def keep_run(self): """Stores bool, indicates if the run should be saved after completing tasks .. note:: Failed run folders are not deleted regardless of keep_run value. This aids in the debugging of the run.""" return self.data["keep_run"] @keep_run.setter def keep_run(self, keep_run): _test_keep_run(keep_run) self.data["keep_run"] = keep_run
[docs] def get_runner_data(self, _skip_empty_task_test=False): """ helper function to get complete runner data Returns: dict: containing all options to run a job str: name of the calculation, for tags list: list of parents attached to the present job list: list of tasks to perform dict: dictionary of filenames as key and strings as value """ data = self.data scheduler_options = {} name = "" parents = [] tasks = [] files = {} if data is None: raise RuntimeError("No runner data") scheduler_options = data.get("scheduler_options", {}) name = str(data.get("name", "untitled_run")) parents = data.get("parents", []) files = data.get("files", {}) tasks = data.get("tasks", []) keep_run = data.get("keep_run", False) log_msg = data.get("log", "") _test_scheduler_options(scheduler_options, log_msg) _test_name(name, log_msg) _test_parents(parents, log_msg) _test_files(files, log_msg) _test_tasks(tasks, files, log_msg, _skip_empty_task_test) _test_keep_run(keep_run, log_msg) return (scheduler_options, name, parents, tasks, files)
[docs] def to_db(self, database, ids): """add run data to ids in database Args: database (str): ase database ids (int, or list): ids in the database""" if not isinstance(ids, (tuple, list)): ids = [ids] # test if data is appropriate _ = self.get_runner_data() fdb = get_db_connect(database) for id_ in ids: data = fdb.get(id_).data data["runner"] = self.data fdb.update(id_, data=data)
[docs] def to_json(self, filename): """Saves RunnerData to json Args: filename (str): name of `json` file""" with open(filename, "w") as fio: json.dump(self.data, fio)
[docs] @classmethod def from_db(cls, database, id_): """get RunnerData from database Args: databse (str): ase database id_ (int): id in the database Returns: :class:`~runner.utils.runnerdata.RunnerData`: class defining runner data """ fdb = get_db_connect(database) data = fdb.get(id_).data["runner"] data.pop("log", None) return cls.from_data_dict(data)
[docs] @classmethod def from_json(cls, filename): """get RunnerData from json Args: filename (str): name of `json` file Returns: :class:`~runner.utils.runnerdata.RunnerData`: class defining runner data """ with open(filename) as fio: data = json.load(fio, object_hook=json_keys2int) return cls.from_data_dict(data)
[docs] @classmethod def from_data_dict(cls, data): """Construct RunnerData from data dictionary Args: data (dict): runnerdata dictionary Returns: :class:`~runner.utils.runnerdata.RunnerData`: class defining runner data """ runnerdata = cls() if data: runnerdata.data.update(data) return runnerdata
def _test_name(name, log_msg=""): if not isinstance(name, str): err = log_msg + "Runner: name should be str\n" raise RuntimeError(err) def _test_keep_run(keep_run, log_msg=""): if not isinstance(keep_run, bool): err = log_msg + "Runner: keep_run should be bool\n" raise RuntimeError(err) def _test_parents(parents, log_msg=""): if not isinstance(parents, (list, tuple)): err = log_msg + "Runner: Parents should be a list of int\n" raise RuntimeError(err) for i in parents: if not isinstance(i, int): err = log_msg + "Runner: parents should be a list of" "int\n" raise RuntimeError(err) def _test_files(files, log_msg=""): if not isinstance(files, dict): err = log_msg + "Runner: files should be a dictionary\n" raise RuntimeError(err) for filename, content in files.items(): if filename in default_files: raise RuntimeError(log_msg + f"Runner: {filename=} in {default_files=}") if not isinstance(filename, str): err = log_msg + "Runner: filenames should be str\n" raise RuntimeError(err) if not isinstance(content, (str, bytes)): err = log_msg + "Runner: file contents should be str" " or bytes\n" raise RuntimeError(err) def _test_tasks(tasks, files=None, log_msg="", _skip_empty_task_test=True): if files is None: files = {} if not isinstance(tasks, (list, tuple)): err = log_msg + "Runner: tasks should be a list\n" raise RuntimeError(err) if len(tasks) == 0 and not _skip_empty_task_test: err = log_msg + "Runner: tasks empty\n" raise RuntimeError(err) for task in tasks: if not isinstance(task, (tuple, list)): err = log_msg + "Runner: each task should be a list\n" raise RuntimeError(err) if len(task) < 2: err = ( log_msg + "Runner: each task sould have a name" " and command or filename\n" ) raise RuntimeError(err) if not isinstance(task[1], str): err = ( log_msg + "Runner: shell command or python filename" " should be str\n" ) raise RuntimeError(err) if task[0] == "python": # testing filename in files filename = copy(task[1]) if not filename.endswith(".py"): filename += ".py" if filename not in files: err = ( log_msg + "Runner: python filename {} should" " be in files\n".format(filename) ) raise RuntimeError(err) if len(task) > 2: if not isinstance(task[2], dict): err = log_msg + "Runner: python parameters " "should be dict\n" raise RuntimeError(err) if len(task) > 3: if not isinstance(task[3], str): err = log_msg + "Runner: python command should" "be str\n" raise RuntimeError(err) elif task[0] != "shell": raise RuntimeError("Runner: task should either be 'shell'" " or 'python'\n") def _test_scheduler_options(scheduler_options, log_msg=""): if not isinstance(scheduler_options, dict): err = log_msg + "Runner: scheduler_options should be a dict\n" raise RuntimeError(err) def _tasks2file(tasks): """converts tasks to run_scripts and files""" pass