Size: a a a

2021 March 08

JF

Justice For All in Airflow
Max Efremov
т.е. можно при локал экзекьюторе задать parallelism=1 и получить сиквнтал)
Так а где это задавать? В коде дага? Или конфиге? Если допустим нет желание через переменные окружения это делать (AIRFLOW__CORE__DAG_CONCURRENCY)
источник

ME

Max Efremov in Airflow
Justice For All
Так а где это задавать? В коде дага? Или конфиге? Если допустим нет желание через переменные окружения это делать (AIRFLOW__CORE__DAG_CONCURRENCY)
есть 2 варианта: переменные среды или в конфиге
источник

ME

Max Efremov in Airflow
в конфиге в секции core
источник

ME

Max Efremov in Airflow
вот как раз тут есть это
источник

N

Nikolai in Airflow
Justice For All
Да дело в том что как раз нужно параллельно. Причем с оговорками. Например чтобы настраивалось количество допустимых параллельно запущенных тасков runme. Например не больше 5 параллельно, а всего их может быть сотня. Я прочитал про Local executor, что он умеет в параллелизм, но это для всех тасков сразу, а мне надо ограничить кол-во параллельно запущенных именно runme
как вариант, возьми селери экзекьютер, запусти отдельный воркер в выделенную очередь, поставь этому воркеру 5 тредов, отправляй параллельные таски в очередь этого воркера
источник

JF

Justice For All in Airflow
Max Efremov
в конфиге в секции core
нашел в секции core,  спасибо
источник

JF

Justice For All in Airflow
Nikolai
как вариант, возьми селери экзекьютер, запусти отдельный воркер в выделенную очередь, поставь этому воркеру 5 тредов, отправляй параллельные таски в очередь этого воркера
Это ж еще селери ставить и настраивать) Для начала хоть бы через пулы попробовать это сделать
источник

N

Nikolai in Airflow
ну если у тебя не на столько чувствительный даг, то можешь просто concurrency  передавать прям в описании
источник

N

Nikolai in Airflow
concurrency (int) – the number of task instances allowed to run concurrently
источник

JF

Justice For All in Airflow
Nikolai
ну если у тебя не на столько чувствительный даг, то можешь просто concurrency  передавать прям в описании
Дело в том, что мне надо ограничить параллельность только для определенных тасков внутри дага, а не в общем по дагу
источник

JF

Justice For All in Airflow
Так, ну вот я поднастроил пул и экзекьютор, запустил и... все мои параллельные BashOperator (runme) таски висят в коричневой (scheduled) рамочке и не выполняются. Как посмотреть что происходит?
источник

JF

Justice For All in Airflow
Как вообще понять текущий запуск дага уже закончился или до сих пор идет?
источник

JF

Justice For All in Airflow
Долго смотрел на такую картину (статус дага running), пока не попробовал вручную принудительно запустить runme_0. В итоге вебинтерфейс вежливо сообщил мне "Only works with the Celery or Kubernetes executors, sorry"
источник

JF

Justice For All in Airflow
И по какой причине оно не хочет работать на LocalExecutor'е?
источник

JF

Justice For All in Airflow
Justice For All
И по какой причине оно не хочет работать на LocalExecutor'е?
Похоже что это произошло из-за опечатки в имени пула... Т.е. если задать несуществующий пул для таска, то аирфлоу реагирует тем, что этот таск можно запускать только на селери или кубернетис
источник

AA

Anton Afonin in Airflow
Justice For All
Долго смотрел на такую картину (статус дага running), пока не попробовал вручную принудительно запустить runme_0. В итоге вебинтерфейс вежливо сообщил мне "Only works with the Celery or Kubernetes executors, sorry"
Это про ручные запуски отдельных тасков, дело не в этом)
источник

AA

Anton Afonin in Airflow
Justice For All
Похоже что это произошло из-за опечатки в имени пула... Т.е. если задать несуществующий пул для таска, то аирфлоу реагирует тем, что этот таск можно запускать только на селери или кубернетис
На всякий случай повторю - ошибка была про то, что отдельные таски можно перезапускать не на всех экзекьюторах)
По поводу опечатки в имени пула - после исправления заработало?
источник

JF

Justice For All in Airflow
Anton Afonin
На всякий случай повторю - ошибка была про то, что отдельные таски можно перезапускать не на всех экзекьюторах)
По поводу опечатки в имени пула - после исправления заработало?
Да, заработало
источник

AA

Anton Afonin in Airflow
Ну и хорошо)
источник

ME

Max Efremov in Airflow
Justice For All
Похоже что это произошло из-за опечатки в имени пула... Т.е. если задать несуществующий пул для таска, то аирфлоу реагирует тем, что этот таск можно запускать только на селери или кубернетис
Скорее он просто ждал тот пул)
источник