Source code for ewoksmx.tests.tasks.mx_pipelines.test_autoproc_pipeline

from ewoksmx.tasks.mx_pipelines.autoproc_pipeline import AutoPROC_pipeline


[docs] def test_autoproc_prepare(tmp_path): metadata = { "beamline": "id30a2", "proposal": "mx2112", "MX_dataCollectionId": 123456, "reprocess_path": str(tmp_path), "no_pipelines": 1, } pipeline = AutoPROC_pipeline(inputs={"metadata": metadata, "autoPROC": True}) pipeline.execute() assert pipeline.input_file_path.exists() assert pipeline.script_file_path.exists() expected = { "slurm_params": { "script_file_path": str( tmp_path / "autoPROC" / "nobackup" / "edna_script.py" ), "queue": "mx", "mem": 16000, "nodes": 1, "core": 20, "time": 1800, "icat_dir": str(tmp_path / "autoPROC"), "icat_callback_url": None, "no_pipelines": 1, "pipeline_name": "autoPROC", "error_message": None, }, "pipeline_name": "autoPROC", } assert pipeline.get_output_values() == expected