Source code for ewoksmx.tests.tasks.test_autoproc_pipeline

from ewoksmx.tasks.AutoPROC_pipeline import AutoPROC_pipeline


[docs] def test_autoproc_prepare(tmp_path): metadata = { "beamline": "id30a2", "proposal": "mx2112", "MX_dataCollectionId": 123456, "reprocess_path": str(tmp_path), "no_pipelines": 1, } pipeline = AutoPROC_pipeline(inputs={"metadata": metadata, "autoPROC": True}) pipeline.execute() assert pipeline.input_file_path.exists() with open(pipeline.input_file_path) as f: input_xml = f.read() print(input_xml) assert pipeline.script_file_path.exists() expected = { "slurm_params": { "script_file_path": str( tmp_path / "autoPROC" / "nobackup" / "edna_script.py" ), "queue": "mx", "mem": 16000, "nodes": 1, "core": 20, "time": "2:00:00", "icat_dir": str(tmp_path / "autoPROC"), "icat_callback_url": None, "no_pipelines": 1, "pipeline_name": "autoPROC", "error_message": None, }, "pipeline_name": "autoPROC", } assert pipeline.get_output_values() == expected