Size: a a a

2020 December 28

IK

Ivan Kizimenko in Airflow
Как лучше оформить решение такой задачи:
1. Один даг по сенсору ждет инфу, если она пришла обрабатывает. Работает только по будням. Инфу заливает в БД

Второй даг/либо сабдаг или вообще впилить в первый:
По будням должен запускаться сразу после выполнения первого, а по выходным запускаться сам по себе не используя данные первого дага, используя грбо говоря дефолтные значения.
источник

IK

Ivan Kizimenko in Airflow
Из за разного расписания хочется сделать 2 отдельных дага. Но  как тогда сделать чтоб он стартовал от команды первого дага
источник

GB

Georgy Borodin in Airflow
Ivan Kizimenko
Из за разного расписания хочется сделать 2 отдельных дага. Но  как тогда сделать чтоб он стартовал от команды первого дага
TriggerDagOperator в первом даге
источник

IK

Ivan Kizimenko in Airflow
а во втором ?
источник

GB

Georgy Borodin in Airflow
Во втором ничего не нужно, следует только расписание разрулить
источник

IK

Ivan Kizimenko in Airflow
а кстати есть может что-то для разделения дней на Выходные/празники и рабочие?
источник

z

zxyzxy in Airflow
https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/sensors/weekday_sensor/index.html ?
Ну или сделать дополнительный шаг с ShortCircuitOperator и там проверять какой сейчас день и если что дропать или продолжать :)
источник

С

Сюткин in Airflow
Ivan Kizimenko
а кстати есть может что-то для разделения дней на Выходные/празники и рабочие?
Производственный календарь называется
источник

IK

Ivan Kizimenko in Airflow
zxyzxy
https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/sensors/weekday_sensor/index.html ?
Ну или сделать дополнительный шаг с ShortCircuitOperator и там проверять какой сейчас день и если что дропать или продолжать :)
тоже пока только такая мысль и пришла
источник

I

Igor in Airflow
Ivan Kizimenko
а кстати есть может что-то для разделения дней на Выходные/празники и рабочие?
источник

IK

Ivan Kizimenko in Airflow
Спасибо
источник

ФЧ

Филипп Чистяков... in Airflow
Добрый день!

У меня есть DAG который обращается к api и делает выгрузку за день. api  ограничивает:
1) по кол-ву обращений в день
2) по кол-ву выгружаемых строк за запрос

Мне нужно выгрузить исторические данные, для этого я создал таски за нужный период и передаю нужную дату через execution_date

Проблема:
Если я упираюсь в лимит запросов, я получаю ошибку на сервере и как следствие dag становиться failed, и при этом он продолжает стартовать таски за следующие дни, которые соответсвенно тоже фейляться.

Как я думаю это можно решить:
1) Добавить проверку что задача за предыдущий день в этом даге завершена
2) Поставить ограничение на только одну попытку запуска задачи в сутки
3) Каким-то образом при новом дне запускать failed таски заново

Вопрос:

Если с первыми двумя пунктами я понимаю, как мне найти это в мануале, то с третьим я честно теряюсь. Может быть кто-то может посоветовать,
источник

GB

Georgy Borodin in Airflow
Филипп Чистяков
Добрый день!

У меня есть DAG который обращается к api и делает выгрузку за день. api  ограничивает:
1) по кол-ву обращений в день
2) по кол-ву выгружаемых строк за запрос

Мне нужно выгрузить исторические данные, для этого я создал таски за нужный период и передаю нужную дату через execution_date

Проблема:
Если я упираюсь в лимит запросов, я получаю ошибку на сервере и как следствие dag становиться failed, и при этом он продолжает стартовать таски за следующие дни, которые соответсвенно тоже фейляться.

Как я думаю это можно решить:
1) Добавить проверку что задача за предыдущий день в этом даге завершена
2) Поставить ограничение на только одну попытку запуска задачи в сутки
3) Каким-то образом при новом дне запускать failed таски заново

Вопрос:

Если с первыми двумя пунктами я понимаю, как мне найти это в мануале, то с третьим я честно теряюсь. Может быть кто-то может посоветовать,
Оффтоп: не AppsFlyer часом?)
источник

ФЧ

Филипп Чистяков... in Airflow
Georgy Borodin
Оффтоп: не AppsFlyer часом?)
Он самый)
источник

GB

Georgy Borodin in Airflow
Я делал, не совсем ортодоксальная задача для Airflow, но подробности разглашать не могу(

Если что у нас отличный сервис, чтобы всё с дэшиками ещё было и по максимуму (в пределах ограничений токенов) выгружалось, но чтобы не банили, сообщать название не стану не в лс)
источник

SG

Sergey Gavrilov in Airflow
Филипп Чистяков
Добрый день!

У меня есть DAG который обращается к api и делает выгрузку за день. api  ограничивает:
1) по кол-ву обращений в день
2) по кол-ву выгружаемых строк за запрос

Мне нужно выгрузить исторические данные, для этого я создал таски за нужный период и передаю нужную дату через execution_date

Проблема:
Если я упираюсь в лимит запросов, я получаю ошибку на сервере и как следствие dag становиться failed, и при этом он продолжает стартовать таски за следующие дни, которые соответсвенно тоже фейляться.

Как я думаю это можно решить:
1) Добавить проверку что задача за предыдущий день в этом даге завершена
2) Поставить ограничение на только одну попытку запуска задачи в сутки
3) Каким-то образом при новом дне запускать failed таски заново

Вопрос:

Если с первыми двумя пунктами я понимаю, как мне найти это в мануале, то с третьим я честно теряюсь. Может быть кто-то может посоветовать,
Я частично покрыл проблему тем, что а) добавил репортинг об ошибках в даг, который грузит данные; и б) написал Даг, который стартует зафейленные даги в случае, если соблюдается ряд условий, как-то: время ежедневной выгрузки не подошло, ничего не загружается прямо сейчас, число ошибок не превышает какое-то количество
источник

SG

Sergey Gavrilov in Airflow
Проблема в том, что такое решение надо долго и муторно подпиливать. И чем ближе вы к лимитам относительно ежедневной выгрузки, тем более тяжкая задача.
источник

ФЧ

Филипп Чистяков... in Airflow
Sergey Gavrilov
Я частично покрыл проблему тем, что а) добавил репортинг об ошибках в даг, который грузит данные; и б) написал Даг, который стартует зафейленные даги в случае, если соблюдается ряд условий, как-то: время ежедневной выгрузки не подошло, ничего не загружается прямо сейчас, число ошибок не превышает какое-то количество
Да это наверное частично решит проблему. А как ты делал проверку что dag зафейлен? Парсил логи, читал из бд или еще как-то?
источник

SG

Sergey Gavrilov in Airflow
Думаю, код будет достаточно ясным:
       for dag_run in DagRun.find(dag_id=self.analyticsDagId, state=State.FAILED):  # type: DagRun
           for ti in dag_run.get_task_instances(state=State.FAILED):  # type: TaskInstance
               clear_task_instances([ti], Session, activate_dag_runs=True)
               return
источник

ФЧ

Филипп Чистяков... in Airflow
Я в целом ощущаю, что делаю костыль, но пока к сожалению нет бюджета на их data lake решения)

За код спасибо большое! Он мне поможет
источник