Source code for ewoksmx.tasks.mx_pipelines.slurm_submit

import os
from typing import Optional

from ewoksmx.shell_utils.execute_slurm import execute_bash_commands
from ewoksmx.tasks.base_tasks.icat_task import IcatCallbackTask


[docs] class SlurmSubmit( IcatCallbackTask, input_names=["pipeline_name", "slurm_params"], output_names=["slurm_params"], ): """Submit job to Slurm."""
[docs] def run(self): error_message = self.inputs.slurm_params.get("error_message", None) if error_message is None: self.outputs.slurm_params = self._submit() else: self.outputs.slurm_params = dict(self.inputs.slurm_params)
@property def _icat_callback_url(self) -> Optional[str]: return self.inputs.slurm_params["icat_callback_url"] def _submit(self) -> dict: pipeline_name = self.inputs.pipeline_name parameters = dict(self.inputs.slurm_params) script_file_path = parameters["script_file_path"] working_directory = os.path.dirname(script_file_path) list_command_line = [] environ = parameters.get("environ", None) if environ is not None: for key, value in environ.items(): list_command_line.append(f"export {key}={value}") list_command_line.append(script_file_path) # Submit job to SLURM bash_result = execute_bash_commands( shell_commands=list_command_line, working_directory=working_directory, name=pipeline_name, parameters={ "partition": parameters.get("queue"), "mem": parameters.get("mem"), "ntasks": parameters.get("nodes"), "cpus-per-task": parameters.get("core"), "time": parameters.get("time"), }, ) self.notify_icat( { "step": {"name": pipeline_name, "status": "SUBMITTED"}, "logs": {"name": pipeline_name, "working_directory": working_directory}, } ) parameters["script_path"] = str(script_file_path) parameters["working_directory"] = str(working_directory) parameters["slurm_id"] = bash_result.slurm_id parameters["pipeline_name"] = pipeline_name return parameters