Source code for ewoksmx.tasks.WaitForFinished

import os
import pathlib
import time
from typing import Optional

from .base_tasks.icat_task import IcatCallbackTask


[docs] class WaitForFinished( IcatCallbackTask, input_names=["slurm_params"], output_names=[ "slurm_params", ], ):
[docs] def run(self): finished = False slurm_params = self.inputs.slurm_params pipeline_name = slurm_params["pipeline_name"] error_message = slurm_params.get("error_message", None) if error_message is None: working_directory = slurm_params["working_directory"] slurm_id = slurm_params["slurm_id"] if ( working_directory is not None and slurm_id is not None and os.path.exists(working_directory) ): startedFile = os.path.join(working_directory, "FINISHED") while not os.path.exists(startedFile): time.sleep(1) fd = os.open(working_directory, os.O_DIRECTORY) _ = os.fstat(fd) os.close(fd) finished = True # Check icat_dir for results... icat_dir = pathlib.Path(slurm_params["icat_dir"]) list_files = list(icat_dir.iterdir()) if len(list_files) > 1: self.notify_icat( { "step": { "name": slurm_params["pipeline_name"], "status": "FINISHED", } }, ) else: slurm_params["error_message"] = ( f"{pipeline_name} processing failed." ) self.notify_icat( { "step": { "name": slurm_params["pipeline_name"], "status": "ERROR", } }, ) else: slurm_params["error_message"] = f"{pipeline_name} processing failed." self.notify_icat( json={ "step": { "name": slurm_params["pipeline_name"], "status": "ERROR", } }, ) slurm_params["finished"] = finished self.outputs.slurm_params = slurm_params
@property def _icat_callback_url(self) -> Optional[str]: return self.inputs.slurm_params["icat_callback_url"]