Size: a a a

2021 February 10

GB

Georgy Borodin in Airflow
Panchenko Andrey
у меня есть неопеделенное количество объектов например. я написал пайтон функцию которая клиентом ходит на бигквери и ранает там джобы и я вот так паралелюсь.
   transfer_tasks = []
   for table in tables_list:
       transfer_tasks.append(PythonOperator(
           task_id='transfer_tasks_' + str(table),
           python_callable=data_transfer,
           op_kwargs={'table': table},
           dag=dag,
       )
       )
это норм подход?
Ну не оч, все эти танцы с Variable нужны, чтобы не выполнять кучу кода при парсинге даг-файла
источник

マサ

マキシム サモイロフ... in Airflow
Tishka17
То есть аэйрфлоу из коробки ничегь не предлагает для параллелизации?
в эйрфлоу параллелизация про другое. он может раскидывать разные таск-раны по разным воркерам. но для этого 1) нужен подходящий экзекьютор (селери, кубы, и т.д.) 2) несколько воркеров 3) нормально написанные операторы. если вся работа выполняется в одной таске дага, то эйрфло из коробки ничего распараллелить не сможет.
источник

GB

Georgy Borodin in Airflow
Panchenko Andrey
у меня есть неопеделенное количество объектов например. я написал пайтон функцию которая клиентом ходит на бигквери и ранает там джобы и я вот так паралелюсь.
   transfer_tasks = []
   for table in tables_list:
       transfer_tasks.append(PythonOperator(
           task_id='transfer_tasks_' + str(table),
           python_callable=data_transfer,
           op_kwargs={'table': table},
           dag=dag,
       )
       )
это норм подход?
В идеале в коде, который вне операторов, должны выполняться несложные операции – это раз.
Два – если меняется tables_list – меняется структура дага, можно напороться на проблемы с перезапуском (а иногда и вообще на ошибки базы)
источник

マサ

マキシム サモイロフ... in Airflow
Panchenko Andrey
или можно как то изящнее решить?
мы свой асинхронный бигквери оператор написали, который работает примерно как сенсор — сабмитит джобу и валится по reschedule exception
источник

PA

Panchenko Andrey in Airflow
там не сложная операция там просто список таблиц, а пайтон код просто организует выполнение на стороне бигквери. тип таски возьми там и положи туда.
источник

PA

Panchenko Andrey in Airflow
マキシム サモイロフ
мы свой асинхронный бигквери оператор написали, который работает примерно как сенсор — сабмитит джобу и валится по reschedule exception
круто, а можно код поглядеть или под NDA?
источник

マサ

マキシム サモイロフ... in Airflow
к сожалению нельзя)
источник

GB

Georgy Borodin in Airflow
Panchenko Andrey
у меня есть неопеделенное количество объектов например. я написал пайтон функцию которая клиентом ходит на бигквери и ранает там джобы и я вот так паралелюсь.
   transfer_tasks = []
   for table in tables_list:
       transfer_tasks.append(PythonOperator(
           task_id='transfer_tasks_' + str(table),
           python_callable=data_transfer,
           op_kwargs={'table': table},
           dag=dag,
       )
       )
это норм подход?
Я вообще для таких штук обычно делаю динамическую генерацию дагов (функция create_dag возвращает даг для, в этом случае, таблицы), потом, конечно, проблема в том, что надо отключать даги, когда убирается одна из таблиц
источник

T

Tishka17 in Airflow
マキシム サモイロフ
в эйрфлоу параллелизация про другое. он может раскидывать разные таск-раны по разным воркерам. но для этого 1) нужен подходящий экзекьютор (селери, кубы, и т.д.) 2) несколько воркеров 3) нормально написанные операторы. если вся работа выполняется в одной таске дага, то эйрфло из коробки ничего распараллелить не сможет.
У меня k8s операторы скорее всего будут. В этом случае что-то можно сделать?
источник

PA

Panchenko Andrey in Airflow
ну так у меня ведь тоже динамически выходит.
источник

PA

Panchenko Andrey in Airflow
источник

GB

Georgy Borodin in Airflow
Этот список таблиц хранится в переменной в коде или достаётся из BQ?
источник

PA

Panchenko Andrey in Airflow
создается из BQ каждый раз
источник

GB

Georgy Borodin in Airflow
Panchenko Andrey
создается из BQ каждый раз
Тогда это опасно для здоровья Airflow)
источник

PA

Panchenko Andrey in Airflow
опасно тем что например их может быть тысяча и это может повлиять на исполнение других дагов?
источник

z

zxyzxy in Airflow
Panchenko Andrey
опасно тем что например их может быть тысяча и это может повлиять на исполнение других дагов?
Нет, тем что этот код условно исполняется раз в 30 секунд (или сколько у вас стоит таймаут шедулера)
источник

PA

Panchenko Andrey in Airflow
код исполняется раз в сутки, тайм аут стоковый
источник

z

zxyzxy in Airflow
Шедулер сканирует даги постоянно, с минимальным интервалом.
источник

z

zxyzxy in Airflow
В вашем случае просто список таблиц куда-нибудь закиньте в конфиг файл, или например если нужно прям постоянно синхронизировать с таблицей из BQ, то тут предлагали такое решение - создать еще один даг, который будет забирать названия таблиц и складывать в переменную, а вы уже из этой переменной будете создавать свой даг который  складывает что-то в BQ
источник

マサ

マキシム サモイロフ... in Airflow
Tishka17
У меня k8s операторы скорее всего будут. В этом случае что-то можно сделать?
не особо понял, что значит “что-то сделать” 🙂 если нужно просто асинхронно поднимать кучу контейнеров на кубах, то да, можно такое сделать.

есть немного разные вещи в концепции эйрфлоу — k8s executor и k8s operator.
первый заключается в том что запуск каждой джобы является запуском контейнера на кубовом кластере, и по коду дагов этого может быть даже не видно (даги могут не знать, где запускаются таски, и не должны в общем-то). если есть какие-то случайного вида таски, которые нужно как-то бесконечно параллелить с помощью кубового кластера (типа как в гугле например), то это идеальный вариант.
второй — это явный запуск подов на кубах через операторы, это применимо только в случае если вам нужно разворачивать какую-то инфраструктуру, описывая эту инфраструктуру как даг. ну или у вас каждая таска уже имеет предсозданный образ, но это извращение кажется.

операторы — это “что и как сделать”, экзекьютор — это “где сделать что угодно”
источник