import json from airflow.utils.dates import days_ago from airflow import DAG from airflow.providers.http.operators.http import SimpleHttpOperator # Sensors from airflow.providers.http.sensors.http import HttpSensor from airflow.models.param import Param default_dag_args = { 'start_date': days_ago(2) } with DAG( dag_id='mse_cmd_over_http', default_args=default_dag_args, schedule=None, params={ "cmdName": Param("", type="string"), "format": Param("", type="string"), "env": Param("prod", type="string"), } ) as dag: is_api_available = HttpSensor( task_id='is_api_available', http_conn_id='mse_api', endpoint='/api/v1/cmds' ) task = SimpleHttpOperator( task_id='mse_cmd', method="GET", http_conn_id='mse_api', endpoint='/api/v1/cmds', data={ "cmdName": "{{ dag_run.conf.get('cmdName') }}", "format": "{{ dag_run.conf.get('format') }}", "env": "{{ dag_run.conf.get('env') }}" }, headers={"Content-Type": "application/json"}, dag=dag ) is_api_available >> task