Size: a a a

2020 October 15

GB

Georgy Borodin in Airflow
Vladislav 👻 Shishkov
В первую очередь надо разобраться со слотами, т.к. даг один, то сколько воркеров не поднимай, а по дефолту стартанет только 16 тасок вроде
Там две очереди, в одной 20 воркеров с четырьмя слотами и 20 с одним в другой, для дага concurrency выставлен 100, таски ещё и пулами для двойной голландской защиты ограничены
источник

VS

Vladislav 👻 Shishkov... in Airflow
Какой-то ахтунг
источник

GB

Georgy Borodin in Airflow
Есть очень жирные таски, под них очередь с воркерами покруче, и очень лёгкие, после которых могут тяжёлые и скипаться
источник

GB

Georgy Borodin in Airflow
Дефолтные воркеры просто вылетят по памяти в трубу, вот это и разруливаю очередями
источник
2020 October 16

AK

Alena Korogodova in Airflow
Nikolay
> тем, что в Аерфлоу есть куча коннекторов и интеграций

и это, скорее, в минус айрфлоу. Слишком перегружен всякой ерундой.

> а насчёт Дженкинса я вот не знаю

на дженкинсе плагинов и коннекторов на порядки больше, чем у айрфлоу, но он остается простой запускалкой, хоть и довольно гибкой. Плюс часто он уже есть в инфраструктуре и люди умеют с ним работать
Старт трэда про дженкинс джобы
источник

С

Сюткин in Airflow
Старт 25 мая ,так он в итоге закончился?
источник

AK

Alena Korogodova in Airflow
Конечно нет.
источник
2020 October 17

k

kSandr in Airflow
Кто-нить устанавливал airflow-exporter в gcloud gke composer ?
источник

SK

Sergej Khakhulin in Airflow
Georgy Borodin
Подскажите, как можно исправить следующую ситауцию?
CeleryExecutor, DAG из 4+ тысяч тасков, разделённых на две разные очереди, с разными воркерами.
Таски долго висят в статусе None, в очереди появляются с очень большим лагом. Вижу, что выполнились задачи, но прежде чем новые попадут хотя бы в scheduled, проходит очень много времени (>5 минут)
Таймаут на загрузку тасок шедулером увеличивали?
источник
2020 October 19

AS

Askar Shabykov in Airflow
Подскажите, пожалуйста, что лучше использовать в качестве промежуточного хранилища, где необходимо объеденить данные из двух источников (hive, postgresql) по определённому полю? Размер данных около 1000000 строк.  Pandas не подходит.
источник

A

Aleksandr Kirilenko in Airflow
Askar Shabykov
Подскажите, пожалуйста, что лучше использовать в качестве промежуточного хранилища, где необходимо объеденить данные из двух источников (hive, postgresql) по определённому полю? Размер данных около 1000000 строк.  Pandas не подходит.
А результат куда приедет в итоге? В Postgres или Hive?
источник

AS

Askar Shabykov in Airflow
Aleksandr Kirilenko
А результат куда приедет в итоге? В Postgres или Hive?
Результат загрузится  Postgresql. В общем, есть 2 источника
hive_source с полями name, sap и pg_source с полями sap, id, нужно объеденить по полю sap, должно работать примерно так
insert into pg_target (name, sap, fk_id) values (select hive_source.name, hive_source.sap, pg_source.id from hive_source inner join pg_source on (hive_source.sap = pg_source.sap))
источник

A

Aleksandr Kirilenko in Airflow
Askar Shabykov
Результат загрузится  Postgresql. В общем, есть 2 источника
hive_source с полями name, sap и pg_source с полями sap, id, нужно объеденить по полю sap, должно работать примерно так
insert into pg_target (name, sap, fk_id) values (select hive_source.name, hive_source.sap, pg_source.id from hive_source inner join pg_source on (hive_source.sap = pg_source.sap))
Для выбора где будут производиться вычисления, желательно учитывать следующее:
- Разовая ли это задача или регулярная с указанным объемом данных?
- В какой промежуток времени? В бизнес часы или нет
- Какой из источников более нагружен и в какой промежуток времени и сколько ресурсов есть у обоих источников?
источник

A

Aleksandr Kirilenko in Airflow
Askar Shabykov
Результат загрузится  Postgresql. В общем, есть 2 источника
hive_source с полями name, sap и pg_source с полями sap, id, нужно объеденить по полю sap, должно работать примерно так
insert into pg_target (name, sap, fk_id) values (select hive_source.name, hive_source.sap, pg_source.id from hive_source inner join pg_source on (hive_source.sap = pg_source.sap))
Я бы рассмотрел следующие подходы:
1) Стянуть данные с Postgres в Hive (через Sqoop или JDBC Storage Handler) и потом произвести объединение данных на стороне Hive, с последующей выгрузкой обратно в Postgres. Тут будет 2-я перегрузка данных по сети.
источник

A

Aleksandr Kirilenko in Airflow
Askar Shabykov
Результат загрузится  Postgresql. В общем, есть 2 источника
hive_source с полями name, sap и pg_source с полями sap, id, нужно объеденить по полю sap, должно работать примерно так
insert into pg_target (name, sap, fk_id) values (select hive_source.name, hive_source.sap, pg_source.id from hive_source inner join pg_source on (hive_source.sap = pg_source.sap))
2) В обратном случае, стянуть данные из Hive в Postgres (через sqoop) и потом производить объединение данных. Будет однократная сетевая загрузка, все вычисления будет производить Postgres. Если база активно используется, то пользователи смогут ощутить просадку по производительности.
источник

A

Aleksandr Kirilenko in Airflow
Askar Shabykov
Результат загрузится  Postgresql. В общем, есть 2 источника
hive_source с полями name, sap и pg_source с полями sap, id, нужно объеденить по полю sap, должно работать примерно так
insert into pg_target (name, sap, fk_id) values (select hive_source.name, hive_source.sap, pg_source.id from hive_source inner join pg_source on (hive_source.sap = pg_source.sap))
В первом сценарии, постарайтесь сделать секционирование одинаковым чтобы сопоставляемые объемы данных разбились на одинаковые "секции". В Hive Partition + BucketedTables. Обратную загрузку в Postgres можно поставить во временную таблицу с UNLOGGED, и потом если позволяет секционирование сделать ATTACH PARTITION и DETACH PARTITION.
источник

A

Aleksandr Kirilenko in Airflow
Askar Shabykov
Результат загрузится  Postgresql. В общем, есть 2 источника
hive_source с полями name, sap и pg_source с полями sap, id, нужно объеденить по полю sap, должно работать примерно так
insert into pg_target (name, sap, fk_id) values (select hive_source.name, hive_source.sap, pg_source.id from hive_source inner join pg_source on (hive_source.sap = pg_source.sap))
Во втором сценарии, можно вытянуть данные во временную таблицу с UNLOGGED и секционированием как на основной таблице, и потом сделать JOIN. Секционирование сделает prunning и это ускорит обработку. В конце временную таблицу просто удалить. Вставку данных можно во временную таблицу с UNLOGGED и потом в финальную через ATTACH PARTITION и DETACH PARTITION.
источник

AS

Askar Shabykov in Airflow
Aleksandr Kirilenko
В первом сценарии, постарайтесь сделать секционирование одинаковым чтобы сопоставляемые объемы данных разбились на одинаковые "секции". В Hive Partition + BucketedTables. Обратную загрузку в Postgres можно поставить во временную таблицу с UNLOGGED, и потом если позволяет секционирование сделать ATTACH PARTITION и DETACH PARTITION.
Спасибо, подумаю над первым сценарием. БД активно используется, dag нужно запускать несколько раз в день.
источник

A

Aleksandr Kirilenko in Airflow
Если исходная таблица в Postgres помечена как logging, можно посмотреть в сторону использования wal2json и затягивать изменения в Hive. А уже потом на уровне Hive просто объединять за какой-то промежуток и выливать обратно в Postgres. Тут уже сами смотрите.
источник

AS

Askar Shabykov in Airflow
Aleksandr Kirilenko
Если исходная таблица в Postgres помечена как logging, можно посмотреть в сторону использования wal2json и затягивать изменения в Hive. А уже потом на уровне Hive просто объединять за какой-то промежуток и выливать обратно в Postgres. Тут уже сами смотрите.
спасибо)
источник