Source code for ewoksmx.tasks.SlurmSubmit

import os
from typing import Optional

from edna2.utils import UtilsSlurm

from .base_tasks.icat_task import IcatCallbackTask


[docs] class SlurmSubmit( IcatCallbackTask, input_names=["pipeline_name", "slurm_params"], output_names=["slurm_params"], ): """Submit job to Slurm."""
[docs] def run(self): error_message = self.inputs.slurm_params.get("error_message", None) if error_message is None: self.outputs.slurm_params = self._submit() else: self.outputs.slurm_params = dict(self.inputs.slurm_params)
@property def _icat_callback_url(self) -> Optional[str]: return self.inputs.slurm_params["icat_callback_url"] def _submit(self) -> dict: pipeline_name = self.inputs.pipeline_name parameters = dict(self.inputs.slurm_params) script_file_path = parameters["script_file_path"] working_directory = os.path.dirname(script_file_path) command_line = "" environ = parameters.get("environ", None) if environ is not None: for key, value in environ.items(): command_line += f"export {key}={value}; " command_line += script_file_path ( slurm_script_path, slurm_id, stdout_file_path, stderr_file_path, ) = UtilsSlurm.submit_job_to_slurm( command_line=command_line, working_directory=working_directory, queue=parameters["queue"], mem=parameters["mem"], nodes=parameters["nodes"], core=parameters["core"], time=parameters["time"], name=pipeline_name, list_modules=["mxworkflows"], ) self.notify_icat( { "step": {"name": pipeline_name, "status": "SUBMITTED"}, "logs": {"name": pipeline_name, "working_directory": working_directory}, } ) parameters["script_path"] = slurm_script_path parameters["working_directory"] = working_directory parameters["slurm_id"] = slurm_id parameters["pipeline_name"] = pipeline_name return parameters