Source code for ewoksmx.tasks.WaitForStarted

import os
import time
from typing import Optional

from .base_tasks.icat_task import IcatCallbackTask


[docs] class WaitForStarted( IcatCallbackTask, input_names=["slurm_params"], output_names=[ "slurm_params", ], ):
[docs] def run(self): started = False slurm_params = self.inputs.slurm_params error_message = slurm_params.get("error_message", None) if error_message is None: working_directory = slurm_params["working_directory"] slurm_id = slurm_params["slurm_id"] if "grenades" in working_directory: started = True elif ( working_directory is not None and slurm_id is not None and os.path.exists(working_directory) ): startedFile = os.path.join(working_directory, "STARTED") while not os.path.exists(startedFile): time.sleep(1) fd = os.open(working_directory, os.O_DIRECTORY) _ = os.fstat(fd) os.close(fd) started = True self.notify_icat( {"step": {"name": slurm_params["pipeline_name"], "status": "STARTED"}}, ) slurm_params["started"] = started self.outputs.slurm_params = slurm_params
@property def _icat_callback_url(self) -> Optional[str]: return self.inputs.slurm_params["icat_callback_url"]