Source code for ewoksmx.tasks.SlurmSubmit
import os
from typing import Optional
from edna2.utils import UtilsSlurm
from .base_tasks.icat_task import IcatCallbackTask
[docs]
class SlurmSubmit(
IcatCallbackTask,
input_names=["pipeline_name", "slurm_params"],
output_names=["slurm_params"],
):
"""Submit job to Slurm."""
[docs]
def run(self):
error_message = self.inputs.slurm_params.get("error_message", None)
if error_message is None:
self.outputs.slurm_params = self._submit()
else:
self.outputs.slurm_params = dict(self.inputs.slurm_params)
@property
def _icat_callback_url(self) -> Optional[str]:
return self.inputs.slurm_params["icat_callback_url"]
def _submit(self) -> dict:
pipeline_name = self.inputs.pipeline_name
parameters = dict(self.inputs.slurm_params)
script_file_path = parameters["script_file_path"]
working_directory = os.path.dirname(script_file_path)
command_line = ""
environ = parameters.get("environ", None)
if environ is not None:
for key, value in environ.items():
command_line += f"export {key}={value}; "
command_line += script_file_path
(
slurm_script_path,
slurm_id,
stdout_file_path,
stderr_file_path,
) = UtilsSlurm.submit_job_to_slurm(
command_line=command_line,
working_directory=working_directory,
queue=parameters["queue"],
mem=parameters["mem"],
nodes=parameters["nodes"],
core=parameters["core"],
time=parameters["time"],
name=pipeline_name,
list_modules=["mxworkflows"],
)
self.notify_icat(
{
"step": {"name": pipeline_name, "status": "SUBMITTED"},
"logs": {"name": pipeline_name, "working_directory": working_directory},
}
)
parameters["script_path"] = slurm_script_path
parameters["working_directory"] = working_directory
parameters["slurm_id"] = slurm_id
parameters["pipeline_name"] = pipeline_name
return parameters