Source code for ewoksmx.tasks.mx_pipelines.slurm_submit
import os
from typing import Optional
from ewoksmx.shell_utils.execute_slurm import execute_bash_commands
from ewoksmx.tasks.base_tasks.icat_task import IcatCallbackTask
[docs]
class SlurmSubmit(
IcatCallbackTask,
input_names=["pipeline_name", "slurm_params"],
output_names=["slurm_params"],
):
"""Submit job to Slurm."""
[docs]
def run(self):
error_message = self.inputs.slurm_params.get("error_message", None)
if error_message is None:
self.outputs.slurm_params = self._submit()
else:
self.outputs.slurm_params = dict(self.inputs.slurm_params)
@property
def _icat_callback_url(self) -> Optional[str]:
return self.inputs.slurm_params["icat_callback_url"]
def _submit(self) -> dict:
pipeline_name = self.inputs.pipeline_name
parameters = dict(self.inputs.slurm_params)
script_file_path = parameters["script_file_path"]
working_directory = os.path.dirname(script_file_path)
list_command_line = []
environ = parameters.get("environ", None)
if environ is not None:
for key, value in environ.items():
list_command_line.append(f"export {key}={value}")
list_command_line.append(script_file_path)
# Submit job to SLURM
bash_result = execute_bash_commands(
shell_commands=list_command_line,
working_directory=working_directory,
name=pipeline_name,
parameters={
"partition": parameters.get("queue"),
"mem": parameters.get("mem"),
"ntasks": parameters.get("nodes"),
"cpus-per-task": parameters.get("core"),
"time": parameters.get("time"),
},
)
self.notify_icat(
{
"step": {"name": pipeline_name, "status": "SUBMITTED"},
"logs": {"name": pipeline_name, "working_directory": working_directory},
}
)
parameters["script_path"] = str(script_file_path)
parameters["working_directory"] = str(working_directory)
parameters["slurm_id"] = bash_result.slurm_id
parameters["pipeline_name"] = pipeline_name
return parameters