Size: a a a

2020 June 22

OI

Oleg Ilinsky in Airflow
ну и с каким-нибудь селери экзекъютором
источник

E

Edya in Airflow
вариант с разделением- не подходит. Это конечно правильно, и про селери или кубер.
Но все же хочется полечить проблему именно в такой конфигурации сервера и экзекьютора
источник

E

Edya in Airflow
Она по идее должна решаться в теории
источник

VS

Vladislav 👻 Shishkov... in Airflow
а какой экзекьютор, если про брокер ответили "да"?
источник

НС

Николай Сергушенков... in Airflow
Наткнулся на неприятную фичу в айрфлоу с выполнением команды vacuum table на БД Greenplum, в ситуациях, когда айрфлоу не является владельцем таблицы.

Если вакуум выполняется внутри пайтон_оператора, через cursor.execute('vacuum table'), то якобы всё проходит ок, без каких либо сообщений об ошибке. Но это похоже проблема psycopg2, так как локально проходит так же.

Если вакуум выполняет постгрес_оператор, варнинг есть, но таск всё равно помечался как выполненный
Это из логов:
[2020-06-22 08:00:59,730] {logging_mixin.py:112} INFO - [2020-06-22 08:00:59,730] {dbapi_hook.py:174} INFO - vacuum analyze sales.rid_status_log;
[2020-06-22 08:00:59,946] {postgres_operator.py:67} INFO - WARNING:  skipping "rid_status_log" --- only table or database owner can vacuum it
[2020-06-22 08:00:59,952] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=v1.gp_LOAD_INC_pg_nats, task_id=vacuum_analyze_rid_status_log, execution_date=20200622T041530, start_date=20200622T050059, end_date=20200622T050059

Можно ли путём настройки конфига или параметров дага/таска сделать так, чтобы в подобных случаях таск получал статус failed?
источник

SG

Sergey Gavrilov in Airflow
Короче, скорее всего дело в том, что не-воркеры выдирают всё
источник

SG

Sergey Gavrilov in Airflow
Edya
Понимаю, что зависит. Возможно у кого-нибудь есть предложения и решал уже эту проблему?

Сейчас и так подкрутил все в конфиге (parallelism, threads, heartbeat ...), но это не сильно помогло. Может быть что-то конретное пропустил?
Смотрю на исходники LocalExecutor-a и там пока не могу понять причину такого поведения.
Попробуйте ограничить в ресурсах вебсервер и шедулер
источник

SG

Sergey Gavrilov in Airflow
...и перейдите на селери
источник

SG

Sergey Gavrilov in Airflow
Николай Сергушенков
Наткнулся на неприятную фичу в айрфлоу с выполнением команды vacuum table на БД Greenplum, в ситуациях, когда айрфлоу не является владельцем таблицы.

Если вакуум выполняется внутри пайтон_оператора, через cursor.execute('vacuum table'), то якобы всё проходит ок, без каких либо сообщений об ошибке. Но это похоже проблема psycopg2, так как локально проходит так же.

Если вакуум выполняет постгрес_оператор, варнинг есть, но таск всё равно помечался как выполненный
Это из логов:
[2020-06-22 08:00:59,730] {logging_mixin.py:112} INFO - [2020-06-22 08:00:59,730] {dbapi_hook.py:174} INFO - vacuum analyze sales.rid_status_log;
[2020-06-22 08:00:59,946] {postgres_operator.py:67} INFO - WARNING:  skipping "rid_status_log" --- only table or database owner can vacuum it
[2020-06-22 08:00:59,952] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=v1.gp_LOAD_INC_pg_nats, task_id=vacuum_analyze_rid_status_log, execution_date=20200622T041530, start_date=20200622T050059, end_date=20200622T050059

Можно ли путём настройки конфига или параметров дага/таска сделать так, чтобы в подобных случаях таск получал статус failed?
По-моему, эта проблема в целом касается гринплама. Можете только кодом дописать
источник

НС

Николай Сергушенков... in Airflow
Sergey Gavrilov
По-моему, эта проблема в целом касается гринплама. Можете только кодом дописать
Печалька. Спасибо - хотя я надеялся на другой ответ :)
источник

SG

Sergey Gavrilov in Airflow
Ну да, периодически проскальзывает вообще везде. Думаю, что только запрос на селект предварительный может как-то помочь
источник

ML

Mikhail Lopotkov in Airflow
Добрый день! Появилась возможность в проекте попробовать airflow
Но пока не понимаю, как подобную схему сделать на airflow. Подскажите.

1. По шедулеру запускается сканировние. Из внешней системы получаем идентификаторы объектов по которым были изменения (создались новые/обновились уже загруженные). Для каждого идентификатора создается задача в очереди

2. Воркеры получают задачи из очереди, загружают соответствующие данные из внешней системы, преобразовывают, сохраняют данные в БД.

Как в такой схеме правильно использовать start_date DAG'а (или, возможно, схема должна быть принципиально другой?)
И как распараллелить обработку изменений по нескольким воркерам?
источник

АЖ

Андрей Жуков... in Airflow
выглядит как попытка натянуть найфай на эйрфлоу
источник

M

Mikhail in Airflow
или селери
или префект
источник

ME

Max Efremov in Airflow
Стримсеты ещё)
источник

M

Mikhail in Airflow
Mikhail Lopotkov
Добрый день! Появилась возможность в проекте попробовать airflow
Но пока не понимаю, как подобную схему сделать на airflow. Подскажите.

1. По шедулеру запускается сканировние. Из внешней системы получаем идентификаторы объектов по которым были изменения (создались новые/обновились уже загруженные). Для каждого идентификатора создается задача в очереди

2. Воркеры получают задачи из очереди, загружают соответствующие данные из внешней системы, преобразовывают, сохраняют данные в БД.

Как в такой схеме правильно использовать start_date DAG'а (или, возможно, схема должна быть принципиально другой?)
И как распараллелить обработку изменений по нескольким воркерам?
ну короче да, структура дага должна быть известна до начала исполнения
один таск для каждого объекта не получится сделать
источник

SG

Sergey Gavrilov in Airflow
Mikhail Lopotkov
Добрый день! Появилась возможность в проекте попробовать airflow
Но пока не понимаю, как подобную схему сделать на airflow. Подскажите.

1. По шедулеру запускается сканировние. Из внешней системы получаем идентификаторы объектов по которым были изменения (создались новые/обновились уже загруженные). Для каждого идентификатора создается задача в очереди

2. Воркеры получают задачи из очереди, загружают соответствующие данные из внешней системы, преобразовывают, сохраняют данные в БД.

Как в такой схеме правильно использовать start_date DAG'а (или, возможно, схема должна быть принципиально другой?)
И как распараллелить обработку изменений по нескольким воркерам?
Вы слишком много думаете за систему. Вам надо знать лишь по какому условию запускать задачу и написать код, который будет разруливать что в каком случае выполнять. Если вам нужна бОльшая детализация, то да, не стоит тратить время на аерфлоу
источник

GG

George Gaál in Airflow
Sergey Gavrilov
Вы слишком много думаете за систему. Вам надо знать лишь по какому условию запускать задачу и написать код, который будет разруливать что в каком случае выполнять. Если вам нужна бОльшая детализация, то да, не стоит тратить время на аерфлоу
а что делать? писать самописные сервисы поверх стандартной очереди (redis? rabbit? kafka?)
источник

M

Mikhail in Airflow
да ну, селери справится прекрасно
источник

M

Mikhail in Airflow
там и мониторинг есть
источник