Size: a a a

2021 February 12

ПП

Павел Приказчиков... in Airflow
всем привет! не получается импортировать модули в даг айрфлоу - пишет Broken DAG: [/usr/local/airflow/dags/test.py] No module named 'pyunpack'. Модуль установлен в /usr/local/airflow/.local/lib/python3.7/site-packages/pyunpack. на всякий случай установил используя pip, pip3, pip3.7. Подскажите куда копать?
источник

SG

Sergey Gavrilov in Airflow
Павел Приказчиков
всем привет! не получается импортировать модули в даг айрфлоу - пишет Broken DAG: [/usr/local/airflow/dags/test.py] No module named 'pyunpack'. Модуль установлен в /usr/local/airflow/.local/lib/python3.7/site-packages/pyunpack. на всякий случай установил используя pip, pip3, pip3.7. Подскажите куда копать?
А если просто через консоль импортировать, без аерфлоу?
источник

ПП

Павел Приказчиков... in Airflow
Sergey Gavrilov
А если просто через консоль импортировать, без аерфлоу?
через консоль импорт проходит и py файл запускается без ошибок
источник

ПП

Павел Приказчиков... in Airflow
Павел Приказчиков
через консоль импорт проходит и py файл запускается без ошибок
неочевидно, но помог перезапуск шедулера
источник

GB

Georgy Borodin in Airflow
Павел Приказчиков
неочевидно, но помог перезапуск шедулера
А как развёрнуто? Если в докере, то часто установку зависимостей выносят в entrypoint-скрипт, шедулер как раз читает даги, у него и мог быть не установлен на тот момент пакет
источник

ПП

Павел Приказчиков... in Airflow
Georgy Borodin
А как развёрнуто? Если в докере, то часто установку зависимостей выносят в entrypoint-скрипт, шедулер как раз читает даги, у него и мог быть не установлен на тот момент пакет
да, эйрфлоу в докере, посмотрю про entrypoint-скрипт, спасибо
источник

JZ

Julia Zhosan in Airflow
всем привет. возник следующий вопрос. у меня есть Branch оператор. возможно ли по одной из его веток отправить сам же  branch оператор в состояние up_to_retry и назначить ему время, через которое попробовать выполниться еще раз?
для чего это нужно. есть несколько таблиц источников и не совсем ясно , в какой момент они будут одновременно обновлены. поэтому в бранч операторе идет проверка на то, что данные обновлены. А если нет, то надо подождать пару часов и попробовать заново. если данные обновлены, то пускается дальше другой большой скрипт
источник

GB

Georgy Borodin in Airflow
Julia Zhosan
всем привет. возник следующий вопрос. у меня есть Branch оператор. возможно ли по одной из его веток отправить сам же  branch оператор в состояние up_to_retry и назначить ему время, через которое попробовать выполниться еще раз?
для чего это нужно. есть несколько таблиц источников и не совсем ясно , в какой момент они будут одновременно обновлены. поэтому в бранч операторе идет проверка на то, что данные обновлены. А если нет, то надо подождать пару часов и попробовать заново. если данные обновлены, то пускается дальше другой большой скрипт
raise Exception('Time for retry') в нужный момент
источник

GB

Georgy Borodin in Airflow
Julia Zhosan
всем привет. возник следующий вопрос. у меня есть Branch оператор. возможно ли по одной из его веток отправить сам же  branch оператор в состояние up_to_retry и назначить ему время, через которое попробовать выполниться еще раз?
для чего это нужно. есть несколько таблиц источников и не совсем ясно , в какой момент они будут одновременно обновлены. поэтому в бранч операторе идет проверка на то, что данные обновлены. А если нет, то надо подождать пару часов и попробовать заново. если данные обновлены, то пускается дальше другой большой скрипт
Ну а политику ретрая задать параметрами оператора. BranchPythonOperator такой же питонячий оператор, как и другие)
источник

JZ

Julia Zhosan in Airflow
Georgy Borodin
raise Exception('Time for retry') в нужный момент
ссори за глупый вопрос, но как правильно прописать этот экспешион?
def branch(**kwargs):
   if smth==smth_else:
        return 'bash_task'
   else:
       raise Exception('Time for retry')

branch = BranchPythonOperator(
   task_id='branch',
   python_callable=branch,
   trigger_rule="all_done",
   provide_context=True,
  schedule_interval=‘@daily
  'retries': 5,
  'retry_delay': timedelta(hours=2),
   dag=dag)

Это похоже на правду? Я просто не понимаю, что ставить во второй return в операторе и нужно ли там что-то ставить
источник

GB

Georgy Borodin in Airflow
Julia Zhosan
ссори за глупый вопрос, но как правильно прописать этот экспешион?
def branch(**kwargs):
   if smth==smth_else:
        return 'bash_task'
   else:
       raise Exception('Time for retry')

branch = BranchPythonOperator(
   task_id='branch',
   python_callable=branch,
   trigger_rule="all_done",
   provide_context=True,
  schedule_interval=‘@daily
  'retries': 5,
  'retry_delay': timedelta(hours=2),
   dag=dag)

Это похоже на правду? Я просто не понимаю, что ставить во второй return в операторе и нужно ли там что-то ставить
Чтобы таск ушёл на ретрай, ему нужно провалиться. Достаточно вызвать любое исключение (кроме AirflowFailException и AirflowSkipException, и может ещё каких-то их исключений, всех не знаю).

Return-а в таком случае просто не нужно, код «упадёт» из-за того, что это необработанное исключение, а Airflow поставит его в статус up_for_retry.

Так что да, похоже на правду)
источник

JZ

Julia Zhosan in Airflow
Georgy Borodin
Чтобы таск ушёл на ретрай, ему нужно провалиться. Достаточно вызвать любое исключение (кроме AirflowFailException и AirflowSkipException, и может ещё каких-то их исключений, всех не знаю).

Return-а в таком случае просто не нужно, код «упадёт» из-за того, что это необработанное исключение, а Airflow поставит его в статус up_for_retry.

Так что да, похоже на правду)
спасибо большое! пойду пробовать)
источник

JZ

Julia Zhosan in Airflow
Georgy Borodin
Чтобы таск ушёл на ретрай, ему нужно провалиться. Достаточно вызвать любое исключение (кроме AirflowFailException и AirflowSkipException, и может ещё каких-то их исключений, всех не знаю).

Return-а в таком случае просто не нужно, код «упадёт» из-за того, что это необработанное исключение, а Airflow поставит его в статус up_for_retry.

Так что да, похоже на правду)
еще одно уточнение. если я прописываю бранч оператор, как выше, то как прописывать последовательности задач в конце дага?
branch » [bash_task, stop] (stop - dummy оператор)
или branch » bash_task и все?
источник

ЮЛ

Юрий Лифанов... in Airflow
Люди, сталкивался кто с такой проблемой, что при запуске Dag run первый таск ставится в очередь... и все, там и стоит без исполнения. Происходит на тестовом сервере, есть ещё один, с 99% настройками такой-же, на нем все работает, airflow.cfg идентичный на обоих стендах
источник

GB

Georgy Borodin in Airflow
Юрий Лифанов
Люди, сталкивался кто с такой проблемой, что при запуске Dag run первый таск ставится в очередь... и все, там и стоит без исполнения. Происходит на тестовом сервере, есть ещё один, с 99% настройками такой-же, на нем все работает, airflow.cfg идентичный на обоих стендах
Надо проверить, есть ли пулы для задач, есть ли воркеры для очередей и посмотреть логи шедулера, конечно
источник

ЮЛ

Юрий Лифанов... in Airflow
пулы есть
источник

ЮЛ

Юрий Лифанов... in Airflow
первое и третье не в курсе
источник

ЮЛ

Юрий Лифанов... in Airflow
логи гляну, а где рабочих глянуть?
источник

ЮЛ

Юрий Лифанов... in Airflow
Event cli worker в логах - это же вызов работника?
источник

GB

Georgy Borodin in Airflow
Юрий Лифанов
первое и третье не в курсе
Если где-то в тасках указывается queue="chto-to-tam", то селери воркеры должны запускаться с указанием очереди. Это делается либо через конфиг (default_queue вроде), либо через команду запуска, в ней должно быть написано --queues chto-to-tam
источник