Source code for runner.runners.terminal

import subprocess as sb
from datetime import datetime

from runner.runner import BaseRunner


[docs] class TerminalRunner(BaseRunner): """ Terminal Runner Args: name (str): The name of the runner. It is saved as `terminal:<given_name>`. Used to identify from other runners attached to the database. database (str): ASE database to connect interpreter (str): the interpreter for the shell pre_runner_data (:class:`RunnerData`): pre-run runnerdata Files, tasks, and scheduler_options can be added to be added to all the runs handled by this runner. max_jobs (int): maximum number of jobs running at an instance cycle_time (int): time in seconds keep_run (bool): keep the folder in which the run was performed run_folder (str): the folder that needs to be populated multi_fail (int): The number of re-runs on failure logfile (str): The log filename for logging """ def __init__( self, name, database="database.db", interpreter="#!/bin/bash", pre_runner_data=None, max_jobs=50, cycle_time=30, keep_run=False, run_folder="./", multi_fail=0, logfile=None, ): if not name.startswith("terminal:"): name = "terminal:" + name super().__init__( name=name, database=database, interpreter=interpreter, pre_runner_data=pre_runner_data, max_jobs=max_jobs, cycle_time=cycle_time, keep_run=keep_run, run_folder=run_folder, multi_fail=multi_fail, logfile=logfile, ) def _submit(self, tasks, scheduler_options): """ Submit job Args: tasks (list): list of tasks to be added scheduler_options (dictionary): dictionary of headers to be added Returns: str: Job id of the successful run, None if failed str: log message of the run """ # default values job_id = None log_msg = "{}\nSubmission using {} scheduler\n".format( datetime.now(), self.name ) # add start status file with open("status.txt", "w") as f: f.write("start\n") # add interpreter run_script = "{}\n".format(self.interpreter) # make script escape if error run_script += "set -e\n" # add tasks run_script += "\n".join(tasks) # add done status file on completion run_script += "\necho done > status.txt\n" with open("run.sh", "w") as f: f.write(run_script) out = sb.run(["chmod", "+x", "run.sh"], stderr=sb.PIPE, stdout=sb.PIPE) if out.returncode == 0: out1 = sb.Popen(["./run.sh", ">", "run.out"]) # successful submission job_id = out1.pid log_msg += "Submitted batch job {}\n".format(job_id) else: # failed log_msg += "Submission failed: {}\n".format(out.stderr.decode("utf-8")) return job_id, log_msg def _cancel(self, job_id): """ Cancels job_id """ import psutil as ps if job_id is not None: try: process = ps.Process(int(job_id)) process.kill() except ps.NoSuchProcess: pass def _status(self, job_id): """ return status of job_id Args: job_id (str): job id of the run Returns: str: status of the job id str: log message of the last change """ status = "running" log_msg = "" import psutil as ps try: process = ps.Process(int(job_id)) if process.is_running(): # if its a zombie, process is still running cmdline = process.cmdline() if len(cmdline) == 0: status = "done" else: status = "done" except ps.NoSuchProcess: status = "done" with open("status.txt", "r") as f: lines = f.readlines()[0].strip() if lines != "done" and status == "done": status = "failed" if status == "done": log_msg += "{}\n Job finished.".format(datetime.now()) elif status == "failed": log_msg += "{}\n Job failed".format(datetime.now()) return status, log_msg