42 lines
1.1 KiB
Python
42 lines
1.1 KiB
Python
|
import json
|
||
|
from airflow.utils.dates import days_ago
|
||
|
from airflow import DAG
|
||
|
from airflow.providers.http.operators.http import SimpleHttpOperator
|
||
|
# Sensors
|
||
|
from airflow.providers.http.sensors.http import HttpSensor
|
||
|
from airflow.models.param import Param
|
||
|
|
||
|
default_dag_args = {
|
||
|
'start_date': days_ago(2)
|
||
|
}
|
||
|
|
||
|
with DAG(
|
||
|
dag_id='mse_cmd_over_http',
|
||
|
default_args=default_dag_args,
|
||
|
schedule=None,
|
||
|
params={
|
||
|
"cmdName": Param("", type="string"),
|
||
|
"format": Param("", type="string"),
|
||
|
"env": Param("prod", type="string"),
|
||
|
}
|
||
|
) as dag:
|
||
|
is_api_available = HttpSensor(
|
||
|
task_id='is_api_available',
|
||
|
http_conn_id='mse_api',
|
||
|
endpoint='/api/v1/cmds'
|
||
|
)
|
||
|
task = SimpleHttpOperator(
|
||
|
task_id='mse_cmd',
|
||
|
method="GET",
|
||
|
http_conn_id='mse_api',
|
||
|
endpoint='/api/v1/cmds',
|
||
|
data={
|
||
|
"cmdName": "{{ dag_run.conf.get('cmdName') }}",
|
||
|
"format": "{{ dag_run.conf.get('format') }}",
|
||
|
"env": "{{ dag_run.conf.get('env') }}"
|
||
|
},
|
||
|
headers={"Content-Type": "application/json"},
|
||
|
dag=dag
|
||
|
)
|
||
|
|
||
|
is_api_available >> task
|