Source code for ewoksmx.tasks.MXPipelineSynchronize
from typing import Optional
from .base_tasks.icat_task import IcatCallbackTask
[docs]
class MXPipelineSynchronize(
IcatCallbackTask,
input_names=["slurm_params"],
optional_input_names=[],
output_names=["slurm_params"],
):
[docs]
def run(self):
slurm_params = dict(self.inputs.slurm_params)
jobs = self.get_icat_jobs()
job_id = self.job_id
no_finished = 0
for job in jobs:
if "jobId" in job:
if job["jobId"] == job_id:
for dict_step in job["steps"]:
if dict_step["status"] in ["FINISHED", "ERROR"]:
no_finished += 1
if slurm_params["no_pipelines"] == no_finished:
self.notify_icat({"status": "FINISHED"})
slurm_params["workflow_status"] = "FINISHED"
self.outputs.slurm_params = slurm_params
@property
def _icat_callback_url(self) -> Optional[str]:
return self.inputs.slurm_params["icat_callback_url"]