Source code for ewoksmx.tasks.WaitForFinished
import os
import pathlib
import time
from typing import Optional
from .base_tasks.icat_task import IcatCallbackTask
[docs]
class WaitForFinished(
IcatCallbackTask,
input_names=["slurm_params"],
output_names=[
"slurm_params",
],
):
[docs]
def run(self):
finished = False
slurm_params = self.inputs.slurm_params
pipeline_name = slurm_params["pipeline_name"]
error_message = slurm_params.get("error_message", None)
if error_message is None:
working_directory = slurm_params["working_directory"]
slurm_id = slurm_params["slurm_id"]
if (
working_directory is not None
and slurm_id is not None
and os.path.exists(working_directory)
):
startedFile = os.path.join(working_directory, "FINISHED")
while not os.path.exists(startedFile):
time.sleep(1)
fd = os.open(working_directory, os.O_DIRECTORY)
_ = os.fstat(fd)
os.close(fd)
finished = True
# Check icat_dir for results...
icat_dir = pathlib.Path(slurm_params["icat_dir"])
list_files = list(icat_dir.iterdir())
if len(list_files) > 1:
self.notify_icat(
{
"step": {
"name": slurm_params["pipeline_name"],
"status": "FINISHED",
}
},
)
else:
slurm_params["error_message"] = (
f"{pipeline_name} processing failed."
)
self.notify_icat(
{
"step": {
"name": slurm_params["pipeline_name"],
"status": "ERROR",
}
},
)
else:
slurm_params["error_message"] = f"{pipeline_name} processing failed."
self.notify_icat(
json={
"step": {
"name": slurm_params["pipeline_name"],
"status": "ERROR",
}
},
)
slurm_params["finished"] = finished
self.outputs.slurm_params = slurm_params
@property
def _icat_callback_url(self) -> Optional[str]:
return self.inputs.slurm_params["icat_callback_url"]