Source code for ewoksmx.tasks.WaitForStarted
import os
import time
from typing import Optional
from .base_tasks.icat_task import IcatCallbackTask
[docs]
class WaitForStarted(
IcatCallbackTask,
input_names=["slurm_params"],
output_names=[
"slurm_params",
],
):
[docs]
def run(self):
started = False
slurm_params = self.inputs.slurm_params
error_message = slurm_params.get("error_message", None)
if error_message is None:
working_directory = slurm_params["working_directory"]
slurm_id = slurm_params["slurm_id"]
if "grenades" in working_directory:
started = True
elif (
working_directory is not None
and slurm_id is not None
and os.path.exists(working_directory)
):
startedFile = os.path.join(working_directory, "STARTED")
while not os.path.exists(startedFile):
time.sleep(1)
fd = os.open(working_directory, os.O_DIRECTORY)
_ = os.fstat(fd)
os.close(fd)
started = True
self.notify_icat(
{"step": {"name": slurm_params["pipeline_name"], "status": "STARTED"}},
)
slurm_params["started"] = started
self.outputs.slurm_params = slurm_params
@property
def _icat_callback_url(self) -> Optional[str]:
return self.inputs.slurm_params["icat_callback_url"]