Size: a a a

2020 December 16

GB

Georgy Borodin in Airflow
Sergey Zhuravlev
Всем привет. Подскажите ответы на три вопроса, хотя бы ссылками на доки или Ютуб:
1. Можно ли как то интерактивно взаимодействовать с запущенным вручную дагом кроме как переменными из админки? К примеру указать имя БД и таблицы для текущего выполнения?
2. Я где то читал, что есть готовый оператор или хук для запуска задачи в Talent ETL из airflow, но теперь не могу найти, есть ли тут кто то, кто в курсе?
3. Возможно вопрос не сюда, но каким образом лучше переливать данные из mongo в реляционную БД, при условии, что один файл в mongo около 10 Гб может быть, а перелить надо несколько таких файлов? Придется где то приземлять около airflow в промежуточную базу? На практике пробовал через pandas, но на 10-м файле может упасть так как очистка памяти в питоне как то не полностью все вычищает после закрытия файла и уничтожения датафрейма. gc.collect тоже не помогает.

Спасибо
1. Что ты подразумеваешь под «интерактивно»? То, что ты описал, ты можешь передавать в конфигурации DAG Run-а.
https://www.astronomer.io/guides/templating  – приблизительно то, ещё можно выставить оператору provide_context=True, и в питонячьем коде доставать параметры из dag_run.conf
источник

SZ

Sergey Zhuravlev in Airflow
Имел ввиду чтобы юзер не знающий питон, мог дернуть его руками выбрав в раскрывающимся списке какой то параметр перед запуском.
источник

GB

Georgy Borodin in Airflow
Sergey Zhuravlev
Имел ввиду чтобы юзер не знающий питон, мог дернуть его руками выбрав в раскрывающимся списке какой то параметр перед запуском.
Ну раскрывающийся список для такого юзера придётся писать самому. Смотри на API
источник

SZ

Sergey Zhuravlev in Airflow
Так и думал)
источник

IK

Ivan Kizimenko in Airflow
Sergey Zhuravlev
Всем привет. Подскажите ответы на три вопроса, хотя бы ссылками на доки или Ютуб:
1. Можно ли как то интерактивно взаимодействовать с запущенным вручную дагом кроме как переменными из админки? К примеру указать имя БД и таблицы для текущего выполнения?
2. Я где то читал, что есть готовый оператор или хук для запуска задачи в Talent ETL из airflow, но теперь не могу найти, есть ли тут кто то, кто в курсе?
3. Возможно вопрос не сюда, но каким образом лучше переливать данные из mongo в реляционную БД, при условии, что один файл в mongo около 10 Гб может быть, а перелить надо несколько таких файлов? Придется где то приземлять около airflow в промежуточную базу? На практике пробовал через pandas, но на 10-м файле может упасть так как очистка памяти в питоне как то не полностью все вычищает после закрытия файла и уничтожения датафрейма. gc.collect тоже не помогает.

Спасибо
источник

AL

Anton Losev in Airflow
Добрый вечер всем!
Генерируем динамический dag. Сам .py файл с dag'ом лезет в базу за параметрами тасков. Как ограничить частоту с которой Airflow 1.10.10 запускает (перечитывает) этот .py файл? Получается, что он обращается к базе несколько раз в секунду.

Выставил
# The number of seconds to wait between consecutive DAG file processing
processor_poll_interval = 30

# after how much time (seconds) a new DAGs should be picked up from the filesystem
min_file_process_interval = 30

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 30
Не помогает. Airflow рестартовал, естественно. Так и спамит базу запросами.
источник

SZ

Sergey Zhuravlev in Airflow
Да, спасибо
источник

ME

Max Efremov in Airflow
Anton Losev
Добрый вечер всем!
Генерируем динамический dag. Сам .py файл с dag'ом лезет в базу за параметрами тасков. Как ограничить частоту с которой Airflow 1.10.10 запускает (перечитывает) этот .py файл? Получается, что он обращается к базе несколько раз в секунду.

Выставил
# The number of seconds to wait between consecutive DAG file processing
processor_poll_interval = 30

# after how much time (seconds) a new DAGs should be picked up from the filesystem
min_file_process_interval = 30

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 30
Не помогает. Airflow рестартовал, естественно. Так и спамит базу запросами.
А он на воркерах не создаёт ещё раз даг при работе?
источник

SG

Sergey Gavrilov in Airflow
Sergey Zhuravlev
Всем привет. Подскажите ответы на три вопроса, хотя бы ссылками на доки или Ютуб:
1. Можно ли как то интерактивно взаимодействовать с запущенным вручную дагом кроме как переменными из админки? К примеру указать имя БД и таблицы для текущего выполнения?
2. Я где то читал, что есть готовый оператор или хук для запуска задачи в Talent ETL из airflow, но теперь не могу найти, есть ли тут кто то, кто в курсе?
3. Возможно вопрос не сюда, но каким образом лучше переливать данные из mongo в реляционную БД, при условии, что один файл в mongo около 10 Гб может быть, а перелить надо несколько таких файлов? Придется где то приземлять около airflow в промежуточную базу? На практике пробовал через pandas, но на 10-м файле может упасть так как очистка памяти в питоне как то не полностью все вычищает после закрытия файла и уничтожения датафрейма. gc.collect тоже не помогает.

Спасибо
3. Используйте средство для закачивания кучи данных файлом именно этой базы. У каждой есть такой. Например, для постгри - copy,   для гринплама gpfdist, ну и в таком роде
источник

AL

Anton Losev in Airflow
Max Efremov
А он на воркерах не создаёт ещё раз даг при работе?
нет, просто из базы вытаскиваются таски и создаются в цикле.
источник

SZ

Sergey Zhuravlev in Airflow
Sergey Gavrilov
3. Используйте средство для закачивания кучи данных файлом именно этой базы. У каждой есть такой. Например, для постгри - copy,   для гринплама gpfdist, ну и в таком роде
Спасибо за направление поиска. Еще вопрос тогда: а можно ли генерить задачи в даге "на лету"? Попробую пояснить свою идею: например первая задача-сенсор в даге смотрит в папку и видит там 5 новых файлов. Потом создается 5 новых последовательных задач с python env operator на каждый файл для дальнейшей их обработки. Мне кажется таким образом у нас лучше соблюдается условие идемпотентности и в случае чего легче откатиться если упадёт задача работавшаятс одним файлом. Если путаюсь в терминологии то сори, только начал изучать этот инструмент.
источник

マサ

マキシム サモイロフ... in Airflow
Sergey Zhuravlev
Спасибо за направление поиска. Еще вопрос тогда: а можно ли генерить задачи в даге "на лету"? Попробую пояснить свою идею: например первая задача-сенсор в даге смотрит в папку и видит там 5 новых файлов. Потом создается 5 новых последовательных задач с python env operator на каждый файл для дальнейшей их обработки. Мне кажется таким образом у нас лучше соблюдается условие идемпотентности и в случае чего легче откатиться если упадёт задача работавшаятс одним файлом. Если путаюсь в терминологии то сори, только начал изучать этот инструмент.
Так нельзя, но динамическая генерация есть. Она просто должна быть не на воркере, а на шедулере/вебсервере. Например если рядом с дагом лежит yaml, где есть описание задач (а сам даг только парсит этот конфиг), то изменение конфига вызовет изменение дага. Но из воркера (где тела тасок выполняются) изменить напрямую структуру дага нельзя
источник

SG

Sergey Gavrilov in Airflow
Sergey Zhuravlev
Спасибо за направление поиска. Еще вопрос тогда: а можно ли генерить задачи в даге "на лету"? Попробую пояснить свою идею: например первая задача-сенсор в даге смотрит в папку и видит там 5 новых файлов. Потом создается 5 новых последовательных задач с python env operator на каждый файл для дальнейшей их обработки. Мне кажется таким образом у нас лучше соблюдается условие идемпотентности и в случае чего легче откатиться если упадёт задача работавшаятс одним файлом. Если путаюсь в терминологии то сори, только начал изучать этот инструмент.
Да, можно, но работает нестабильно (вплоть до версии 1.10.8). Скорее советую создавать даг-файлы, это надёжнее... Если правильно выставить расписание.
источник

SZ

Sergey Zhuravlev in Airflow
Благодарю!
источник

SG

Sergey Gavrilov in Airflow
Но мне кажется, что вы не там ищете решение. При репликации данных аерфлоу нужен только для передачи файлов с помощью ssh \ sftp \ hdfs \ etc с сервера монги на целевой, а потом отправку запроса, упакованного в одну транзакцию, который вычитывает в базу все файлы
источник

SZ

Sergey Zhuravlev in Airflow
Sergey Gavrilov
Но мне кажется, что вы не там ищете решение. При репликации данных аерфлоу нужен только для передачи файлов с помощью ssh \ sftp \ hdfs \ etc с сервера монги на целевой, а потом отправку запроса, упакованного в одну транзакцию, который вычитывает в базу все файлы
Да, так тоже, но в отдельных задачах требуется предобработка данных перед реплицированием из монги в оперативную бд. Самое простое - посмотреть названия столбцов и т.п., создать временную таблицу в mssql чтобы аналитик посмотрел в BI инструменте на данные в нужных разрезах и принял решение которое уже запустит следующие даги.
источник

SG

Sergey Gavrilov in Airflow
Ну обычно для этого Спарк используют, на объемах датасетов выше 10 гигов
источник
2020 December 17

VN

Viacheslav Nefedov in Airflow
Даск ещё можно
источник

IK

Ivan Kizimenko in Airflow
Anton Losev
Добрый вечер всем!
Генерируем динамический dag. Сам .py файл с dag'ом лезет в базу за параметрами тасков. Как ограничить частоту с которой Airflow 1.10.10 запускает (перечитывает) этот .py файл? Получается, что он обращается к базе несколько раз в секунду.

Выставил
# The number of seconds to wait between consecutive DAG file processing
processor_poll_interval = 30

# after how much time (seconds) a new DAGs should be picked up from the filesystem
min_file_process_interval = 30

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 30
Не помогает. Airflow рестартовал, естественно. Так и спамит базу запросами.
Встречал такую инфу:
The Airflow executor executes top level code on every heartbeat, so a small amount of top level code can cause performance issues. Try to treat the DAG file like a config file and leave all the heavy lifting for the hook and operator.
источник

IK

Ivan Kizimenko in Airflow
Ivan Kizimenko
Встречал такую инфу:
The Airflow executor executes top level code on every heartbeat, so a small amount of top level code can cause performance issues. Try to treat the DAG file like a config file and leave all the heavy lifting for the hook and operator.
И в дополнение вот конфиг для интервала

processor_poll_interval
config, which is by default 1 second.
источник