Size: a a a

2021 March 16

DP

Dmitriy Pyrin in Airflow
Nikolai
ну даже так, можно репиты там настроить, или я что то не так понимаю?
Эм, а как?
источник

N

Nikolai in Airflow
retries (int) – the number of retries that should be performed before failing the task
retry_delay (datetime.timedelta) – delay between retries
источник

N

Nikolai in Airflow
можно настроить для каждой таски в даге или в default_args засунуть
источник

DP

Dmitriy Pyrin in Airflow
похоже это и есть вполне крутое решение, спасибо)
источник

ЕК

Евгений Кузнецов... in Airflow
Всем привет! подскажите пожалуйста еть ли в сети какие-либо толковые мануалы по работе с airflow?  Коротко о моей ситуации: долгое время пользовался самописными скриптами и через дженкинс их запускал - сейчас когда кол-во источников переваливает за 10, хочется какое то коробочное решение и вот решили попробовать airflow, сразу столкнулся с рядом проблем и побольшей части это детские вопросы
источник

ЕК

Евгений Кузнецов... in Airflow
по типу: настроить трансфер данных из одной бд в другую (pg -> clickhouse), как настроить переменные чтобы оно хватало последнюю дату в бд хранилища и сувало в бд источник
источник

ЕК

Евгений Кузнецов... in Airflow
или подскажите что значит эта ошибка:
Broken DAG: [/opt/airflow/dags/test_migration_pg_clickhouse.py] Traceback (most recent call last):
 File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1241, in set_upstream
   self._set_relatives(task_or_task_list, upstream=True)
 File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
   task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
источник

GB

Georgy Borodin in Airflow
Евгений Кузнецов
или подскажите что значит эта ошибка:
Broken DAG: [/opt/airflow/dags/test_migration_pg_clickhouse.py] Traceback (most recent call last):
 File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1241, in set_upstream
   self._set_relatives(task_or_task_list, upstream=True)
 File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
   task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
Покажи код дага, хотя бы куски, где ты создаёшь DAG-объект и добавляешь в него операторы
источник

ЕК

Евгений Кузнецов... in Airflow
Georgy Borodin
Покажи код дага, хотя бы куски, где ты создаёшь DAG-объект и добавляешь в него операторы
from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta

def postgres_to_clickhouse():
   postgres_hook = PostgresHook(postgres_conn_id='la_postgres_slave')
   ch_hook = ClickHouseHook(clickhouse_conn_id='dwh')
   records = postgres_hook.get_records('''
   SELECT
       c.id category_id,
       c.name_ru category,
       c.group_id,
       x.name_ru as "group",
       case when c.created_at > x.created_at then c.created_at else x.created_at end as created_at,
       case when c.updated_at > x.updated_at then c.updated_at else x.updated_at end as updated_at
   FROM targetings.categories c
   inner join targetings.category_groups x
       on c.group_id = x.id
   ''')
   ch_hook.run('INSERT INTO ads.categories VALUES', records)

with DAG(
       dag_id='postgres_to_clickhouse',
       start_date=datetime(2020, 4, 20),
       schedule_interval=timedelta(minutes=5),
       default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
       tags=['zalupa'],
) as dag:
   dag >> PythonOperator(
       task_id='postgres_to_clickhouse',
       python_callable=postgres_to_clickhouse,
   )
источник

GB

Georgy Borodin in Airflow
Евгений Кузнецов
from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta

def postgres_to_clickhouse():
   postgres_hook = PostgresHook(postgres_conn_id='la_postgres_slave')
   ch_hook = ClickHouseHook(clickhouse_conn_id='dwh')
   records = postgres_hook.get_records('''
   SELECT
       c.id category_id,
       c.name_ru category,
       c.group_id,
       x.name_ru as "group",
       case when c.created_at > x.created_at then c.created_at else x.created_at end as created_at,
       case when c.updated_at > x.updated_at then c.updated_at else x.updated_at end as updated_at
   FROM targetings.categories c
   inner join targetings.category_groups x
       on c.group_id = x.id
   ''')
   ch_hook.run('INSERT INTO ads.categories VALUES', records)

with DAG(
       dag_id='postgres_to_clickhouse',
       start_date=datetime(2020, 4, 20),
       schedule_interval=timedelta(minutes=5),
       default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
       tags=['zalupa'],
) as dag:
   dag >> PythonOperator(
       task_id='postgres_to_clickhouse',
       python_callable=postgres_to_clickhouse,
   )
dag >> PythonOperator(
       task_id='postgres_to_clickhouse',
       python_callable=postgres_to_clickhouse,
   )

Так низя
источник

GB

Georgy Borodin in Airflow
Евгений Кузнецов
from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta

def postgres_to_clickhouse():
   postgres_hook = PostgresHook(postgres_conn_id='la_postgres_slave')
   ch_hook = ClickHouseHook(clickhouse_conn_id='dwh')
   records = postgres_hook.get_records('''
   SELECT
       c.id category_id,
       c.name_ru category,
       c.group_id,
       x.name_ru as "group",
       case when c.created_at > x.created_at then c.created_at else x.created_at end as created_at,
       case when c.updated_at > x.updated_at then c.updated_at else x.updated_at end as updated_at
   FROM targetings.categories c
   inner join targetings.category_groups x
       on c.group_id = x.id
   ''')
   ch_hook.run('INSERT INTO ads.categories VALUES', records)

with DAG(
       dag_id='postgres_to_clickhouse',
       start_date=datetime(2020, 4, 20),
       schedule_interval=timedelta(minutes=5),
       default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
       tags=['zalupa'],
) as dag:
   dag >> PythonOperator(
       task_id='postgres_to_clickhouse',
       python_callable=postgres_to_clickhouse,
   )
Не могу утверждать, как корректно писать с контекстными менеджерами (только переезжаю на 2, пока не пробовал), но таски принадлежат дагу, а не находятся с ним в цепочке
источник

ЕК

Евгений Кузнецов... in Airflow
а в соседнем таске работает)
источник

ЕК

Евгений Кузнецов... in Airflow
Евгений Кузнецов
а в соседнем таске работает)
from airflow import DAG
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

with DAG(
       dag_id='update_income_aggregate',
       start_date=days_ago(2),
) as dag:
   ClickHouseOperator(
       task_id='update_income_aggregate',
       database='default',
       sql=(
           '''
               INSERT INTO default.date_test
               SELECT DISTINCT date FROM ads.daily_webmasters dw
               WHERE date = '{{ ds }}'
           ''', '''
               SELECT max(date) FROM date_test
               WHERE date BETWEEN
                   '{{ execution_date.start_of('month').to_date_string() }}'
                   AND '{{ execution_date.end_of('month').to_date_string() }}'
           ''',
           # result of the last query is pushed to XCom
       ),
       clickhouse_conn_id='dwh',
   ) >> PythonOperator(
       task_id='print_month_income',
       provide_context=True,
       python_callable=lambda task_instance, **_:
           # pulling XCom value and printing it
           print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
   )
источник

GB

Georgy Borodin in Airflow
Короче, dag >> PythonOperator – читается как "сначала оператор dag, потом оператор PythonOperator". AIrflow пытается найти у объекта dag методы нужные для общения с соседями, но получает шиш
источник

GB

Georgy Borodin in Airflow
Евгений Кузнецов
from airflow import DAG
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

with DAG(
       dag_id='update_income_aggregate',
       start_date=days_ago(2),
) as dag:
   ClickHouseOperator(
       task_id='update_income_aggregate',
       database='default',
       sql=(
           '''
               INSERT INTO default.date_test
               SELECT DISTINCT date FROM ads.daily_webmasters dw
               WHERE date = '{{ ds }}'
           ''', '''
               SELECT max(date) FROM date_test
               WHERE date BETWEEN
                   '{{ execution_date.start_of('month').to_date_string() }}'
                   AND '{{ execution_date.end_of('month').to_date_string() }}'
           ''',
           # result of the last query is pushed to XCom
       ),
       clickhouse_conn_id='dwh',
   ) >> PythonOperator(
       task_id='print_month_income',
       provide_context=True,
       python_callable=lambda task_instance, **_:
           # pulling XCom value and printing it
           print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
   )
Ну тут всё корректно, нету строчки вида dag >> operator
источник

ЕК

Евгений Кузнецов... in Airflow
аааа
источник

ЕК

Евгений Кузнецов... in Airflow
тоесть похорошему нужно перенести PythonOperator в структуру дага
источник

GB

Georgy Borodin in Airflow
Вас только что динамически оттипизировали, впредь будьте осторожнее
источник

ЕК

Евгений Кузнецов... in Airflow
Georgy Borodin
Вас только что динамически оттипизировали, впредь будьте осторожнее
учту сей аспект
источник

GB

Georgy Borodin in Airflow
Евгений Кузнецов
тоесть похорошему нужно перенести PythonOperator в структуру дага
Нет, совсем не так.
Есть абстракция DAG – описание этапов воркфлоу, ему принадлежат какие-то таски-шаги.

Когда ты пишешь так:
with dag:
  task_1 = PythonOperator(...)

Это всё равно, что:
dag = DAG(...)
task_1 = PythonOperator(dag=dag, ...)


Таски принадлежат DAG-у, он как контейнер, и не может быть с ними в каких-либо зависимостях.
источник