How to access the response from Airflow SimpleHttpOperator GET request

I'm learning Airflow and have a simple quesiton. Below is my DAG called dog_retrieverimport airflowfrom airflow import DAGfrom airflow.operators.http_operator import SimpleHttpOperatorfrom airflow.operators.sensors import HttpSensorfrom datetime import datetime, timedeltaimport jsondefault_args = { 'owner': 'Loftium', 'depends_on_past': False, 'start_date': datetime(2017, 10, 9), 'email': 'rachel@loftium.com', 'email_on_failure': False, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=3),}dag = DAG...Read more

Airflow DAG dynamic structure

I was looking for a solution where I can decide the dag structure when the dag is triggered as I'm not sure about the number of operators that I'll have to run.Please refer below for the execution sequence that I'm planning to create. |-- Task B.1 --| |-- Task C.1 --| |-- Task B.2 --| |-- Task C.2 --| Task A --|-- Task B.3 --|---> Task B ---> |-- Task C.3 --| | .... | | .... | |-- Task B.N --| |-- Task C.N --|I'm n...Read more

databricks - Apache Airflow Task Instance state is blank

I have the dag config like below args = { 'owner': 'XXX', 'depends_on_past': False, 'start_date': datetime(2018, 2, 26), 'email': ['sample@sample.com'], 'email_on_failure': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG(dag_id='Daily_Report', default_args=args, schedule_interval='0 11 * * *', dagrun_timeout=timedelta(seconds=30))I have a bash operator and a data bricks operator run_this = BashOperator(task_id='run_report', bash_command=...Read more

How do I stop an airflow DAG?

Running Airflow 1.9.0 with python 2.7. How do I gracefully stop a DAG?In this case, I have a DAG that's running a file upload with bad code that causes everything to take 4 times as long, and I'd really prefer not to have to wait a day for it to finally time out (timeout is set to 10 hours).The DAG looks for a tar file. When it finds one, it goes through every file in the tar, looking for files to process, and processes them.I could not find any way to stop the DAG. I tried clicking on the "Running" circle in the "DAG Runs" column (the one t...Read more

Airflow 1.9.0 is queuing but not launching tasks

Airflow is randomly not running queued tasks some tasks dont even get queued status. I keep seeing below in the scheduler logs [2018-02-28 02:24:58,780] {jobs.py:1077} INFO - No tasks to consider for execution.I do see tasks in database that either have no status or queued status but they never get started.The airflow setup is running https://github.com/puckel/docker-airflow on ECS with Redis. There are 4 scheduler threads and 4 Celery worker tasks. For the tasks that are not running are showing in queued state (grey icon) when hovering over th...Read more

Airflow: Creating a DAG in airflow via UI

Airflow veterans please help,I was looking for a cron replacement and came across apache airflow.We have a setup where multiple users should be able to create their own DAGs and schedule their jobs. Our users are a mix of people who may not know how to write the DAG python file. Also, they may not have access to the server where airflow is running.Is it possible to create an airflow DAG via UI. I could not find any reference to the same. All examples speak about creating a python file and uploading it to the $AIRFLOW_HOME/dag/ directory. Users ...Read more

Very confused about Airflow Dags folder

I am very new to Airflow, I have set-up everything according to what are stated on their website. However I find it very confusing to figure out my dag folder location. NOTE: I configure **airflow.cfg (/airflow/dags) within this folder has two files./airflow/dags---dag1.py---dag2.pyBut when I try to do airflow list_dags, it still shows the dags inside example_dags folder on usr/local/lib/python2.7/dist_packages/airflow/example_dagsHow can I see the path when I do airflow list_dags and to change it ? Any helps is appreciated....Read more

Airflow webserver not filtering Dags by owner

I'm attempting to filter Dags by owner in my Airflow instance.List of the steps i'm taking. 1- Configure my airflow.cfg as follow. portion of airflow.cfg config file 2- My Dags receive a owner through default_args variable 3- Have an Airflow user named as the one pass to my Dags as ownerStill when I login with the user, I can see all Dags. Steps to create the user Any Idea Why is not filtering Dags by owner. Thanks...Read more

Integration of Apache Airflow with Google cloud pubsub

I am quite new to airflow and trying to use the integration of apache airflow with google pubsub, which I guess is added to it under "Airflow-300" JIRA. Please correct me if I am reading incorrectly here. Also, can you please advise if this has been released or when will it be released? We are looking at adding notifications on Google Cloud Storage, upon any file events, we want to trigger some workflow in Airflow. I can't seem to find any documentation around how to use it.Any advice would be highly appreciated....Read more

Writing to Airflow Logs

One way to write to the logs in Airflow is to return a string from a PythonOperator like on line 44 here.Are there other ways that allow me to write to the airflow log files? I've found that print statements are not saved to the logs....Read more

Removing Airflow task logs

I'm running 5 DAG's which have generated a total of about 6GB of log data in the base_log_folder over a months period. I just added a remote_base_log_folder but it seems it does not exclude logging to the base_log_folder.Is there anyway to automatically remove old log files, rotate them or force airflow to not log on disk (base_log_folder) only in remote storage?...Read more

Airflow: New DAG is not found by webserver

In Airflow, how should I handle the error "This DAG isn't available in the webserver DagBag object. It shows up in this list because the scheduler marked it as active in the metadata database"?I've copied a new DAG to an Airflow server, and have tried:unpausing it and refreshing it (basic operating procedure, given in this previous answer https://stackoverflow.com/a/42291683/160406)restarting the webserverrestarting the schedulerstopping the webserver and scheduler, resetting the database (airflow resetdb), then starting the webserver and sched...Read more

Airflow: Log file isn't local, Unsupported remote log location

I am not able see the logs attached to the tasks from the Airflow UI:Log related settings in airflow.cfg file are:remote_base_log_folder =base_log_folder = /home/my_projects/ksaprice_project/airflow/logsworker_log_server_port = 8793 child_process_log_directory =/home/my_projects/ksaprice_project/airflow/logs/schedulerAlthough I am setting remote_base_log_folter it is trying to fetch the log from http://:8793/log/tutorial/print_date/2017-08-02T00:00:00 - I don't understand this behavior. According to the settings the workers should store the log...Read more

Airflow DAG does not skip tasks after BranchPythonOperator or ShortCircuitOperator

I am writing a DAG with a BranchPythonOperator to check whether or not data is available for download. If the data is there, the DAG should download and incorporate it into my PostgreSQL database. If it isn't there, all the processing tasks should be skipped and the branch should go to a DummyOperator. Unfortunately the DAG is not skipping all the tasks. It will skip up to 6 tasks, but then stops (the downstream tasks have an unknown status) and the DAG fails. I am not finding any error messages in the logs (because tasks are not failing).Airfl...Read more