Size: a a a

2021 February 09

AL

Artem Leonov in Airflow
Коллеги, привет. Есть кто может поделиться опытом использования airflow в сервинге ml моделей?
источник
2021 February 10

T

Tishka17 in Airflow
Хай. Я понимаю, что вопрос тупой и я вроде его задавал. Но я решил вернуться к Airflow и не пойму как мне отслеживать запуски дагов.

Вот у меня в системе появляются объекты. В день хз сколько штук в неопределенное вермя. Мне было бы удобно при их создании через api триггерить запуск дагов для обработки этих объектов. Но как я понимаю, единственный параметр рана - дата.

Как правильно организовать обработку объектов?
источник

DK

Damir Kuskildin in Airflow
Доброго дня всем, вопрос по настройке отправки уведомлений из Airflow при превышение времени  работы daga.

Какие стандартные механизмы можно применить, и как настроить отправку сообщения ? ( отдельным таском)
источник

GB

Georgy Borodin in Airflow
Tishka17
Хай. Я понимаю, что вопрос тупой и я вроде его задавал. Но я решил вернуться к Airflow и не пойму как мне отслеживать запуски дагов.

Вот у меня в системе появляются объекты. В день хз сколько штук в неопределенное вермя. Мне было бы удобно при их создании через api триггерить запуск дагов для обработки этих объектов. Но как я понимаю, единственный параметр рана - дата.

Как правильно организовать обработку объектов?
Триггерить по апи можно, передавая в конфиге даграна информацию об объектах. http://airflow.apache.org/docs/apache-airflow/1.10.2/api.html

Но это плохая практика: если будет спайк в нагрузке, то даг раны просто встанут (испытано на собственной шкуре)
источник

GB

Georgy Borodin in Airflow
Damir Kuskildin
Доброго дня всем, вопрос по настройке отправки уведомлений из Airflow при превышение времени  работы daga.

Какие стандартные механизмы можно применить, и как настроить отправку сообщения ? ( отдельным таском)
источник

T

Tishka17 in Airflow
Georgy Borodin
Триггерить по апи можно, передавая в конфиге даграна информацию об объектах. http://airflow.apache.org/docs/apache-airflow/1.10.2/api.html

Но это плохая практика: если будет спайк в нагрузке, то даг раны просто встанут (испытано на собственной шкуре)
А что делать? Брать не эйрфлоу? Я хочу сразу при появлении объектов начинать их обработку не дожидаясь завершения обработки прошлого. При это параллелизовать что можно
источник

DK

Damir Kuskildin in Airflow
Спасибо
источник

マサ

マキシム サモイロフ... in Airflow
Tishka17
Хай. Я понимаю, что вопрос тупой и я вроде его задавал. Но я решил вернуться к Airflow и не пойму как мне отслеживать запуски дагов.

Вот у меня в системе появляются объекты. В день хз сколько штук в неопределенное вермя. Мне было бы удобно при их создании через api триггерить запуск дагов для обработки этих объектов. Но как я понимаю, единственный параметр рана - дата.

Как правильно организовать обработку объектов?
я бы сказал, что путь эйрфлоу — сделать сенсоры на эти объекты (например, которые будут дергать внешний api и проходить/валиться по статус-коду ответа, мануалы по написанию сенсоров есть в оф.доке) и поставить их перед обработкой, соответственно скипая всю ветку если объекта нет. Поставить по расписанию на комфортный для вас интервал (зависит от того как часто появляются объекты и какую задержку вы готовы переждать). отслеживать запуски дагов не нужно, если вы готовы к асинхронной обработке.
Если ваше основное и необходимое требование — триггерить обработку сразу при получении объекта, то эйрфлоу вам тут не поможет, так как он не рассчитан на подобное, для шедулера допустимы паузы и сложно настроить хороший порядок исполнения разных дагов, так что проще будет сразу взять не эйрфлоу.
источник

GB

Georgy Borodin in Airflow
Tishka17
А что делать? Брать не эйрфлоу? Я хочу сразу при появлении объектов начинать их обработку не дожидаясь завершения обработки прошлого. При это параллелизовать что можно
Ну это не совсем задача для Airflow, да. Там всё-таки расписание всему голова.
Для себя мы решили, что нам хватит и запускать раз в полчаса даг, в котором обнаруживать новые объекты и обрабатывать их.
источник

GB

Georgy Borodin in Airflow
Tishka17
А что делать? Брать не эйрфлоу? Я хочу сразу при появлении объектов начинать их обработку не дожидаясь завершения обработки прошлого. При это параллелизовать что можно
А если говорить о параллелизации такого подхода – первый оператор – ShortCircuitOperator (ничего не нашли – не триггерим обработку), в нём раскладываем объекты в Variables вида <dag_name>_<i> (держим где-то 20 тасков, обрабатывающих всё это дело), это позволяет не иметь проблем со структурой дага (эирфлоу не любит, когда динамически меняют даг)
источник

T

Tishka17 in Airflow
Georgy Borodin
А если говорить о параллелизации такого подхода – первый оператор – ShortCircuitOperator (ничего не нашли – не триггерим обработку), в нём раскладываем объекты в Variables вида <dag_name>_<i> (держим где-то 20 тасков, обрабатывающих всё это дело), это позволяет не иметь проблем со структурой дага (эирфлоу не любит, когда динамически меняют даг)
в смысле, заранее создать 20 копий тасков?
источник

PA

Panchenko Andrey in Airflow
Tishka17
в смысле, заранее создать 20 копий тасков?
Можно создать циклом под объекты
источник

GB

Georgy Borodin in Airflow
Tishka17
в смысле, заранее создать 20 копий тасков?
Ага, там однотипные действия по обработке новых объектов, сами задачки тяжёлые, один воркер будет либо долго кряхтеть, либо вылетит по памяти
источник

T

Tishka17 in Airflow
Panchenko Andrey
Можно создать циклом под объекты
Ну циклом или не циклом - не важно. Вопрос про граф скорее
источник

PA

Panchenko Andrey in Airflow
Georgy Borodin
Ага, там однотипные действия по обработке новых объектов, сами задачки тяжёлые, один воркер будет либо долго кряхтеть, либо вылетит по памяти
можно сразу вопрос задать пользуясь случаем?
источник

T

Tishka17 in Airflow
То есть аэйрфлоу из коробки ничегь не предлагает для параллелизации?
источник

GB

Georgy Borodin in Airflow
Panchenko Andrey
можно сразу вопрос задать пользуясь случаем?
Можно, на будущее лучше сразу сам вопрос задавать, а не метавопрос)
источник

GB

Georgy Borodin in Airflow
Tishka17
То есть аэйрфлоу из коробки ничегь не предлагает для параллелизации?
Ну почему не предлагает? Celery раскидывает задачки по воркерам для выполнения – вот она и параллелизация)
источник

PA

Panchenko Andrey in Airflow
у меня есть неопеделенное количество объектов например. я написал пайтон функцию которая клиентом ходит на бигквери и ранает там джобы и я вот так паралелюсь.
   transfer_tasks = []
   for table in tables_list:
       transfer_tasks.append(PythonOperator(
           task_id='transfer_tasks_' + str(table),
           python_callable=data_transfer,
           op_kwargs={'table': table},
           dag=dag,
       )
       )
это норм подход?
источник

PA

Panchenko Andrey in Airflow
или можно как то изящнее решить?
источник