Source code for ewoksmx.tasks.MXPipelineSynchronize

from typing import Optional

from .base_tasks.icat_task import IcatCallbackTask


[docs] class MXPipelineSynchronize( IcatCallbackTask, input_names=["slurm_params"], optional_input_names=[], output_names=["slurm_params"], ):
[docs] def run(self): slurm_params = dict(self.inputs.slurm_params) jobs = self.get_icat_jobs() job_id = self.job_id no_finished = 0 for job in jobs: if "jobId" in job: if job["jobId"] == job_id: for dict_step in job["steps"]: if dict_step["status"] in ["FINISHED", "ERROR"]: no_finished += 1 if slurm_params["no_pipelines"] == no_finished: self.notify_icat({"status": "FINISHED"}) slurm_params["workflow_status"] = "FINISHED" self.outputs.slurm_params = slurm_params
@property def _icat_callback_url(self) -> Optional[str]: return self.inputs.slurm_params["icat_callback_url"]