Size: a a a

2018 June 21

V

Vladimir in Airflow
Вот я боюсь все не так просто.
Допустим я сделал Даг. Позапускал его в scheduled режиме: у меня (не важно по каким причинам) есть запуски за 2018 год:
1 января 2018
2 января 2018
...
1 июня 2018

Теперь я хочу прогнать программу на исторических данных допустим с 1990 года. То есть получить множество DAG_run'ов в который execution_date будут (1990,1,1), (1990,1,2) ...  Если я поставлю start_date = 1990 в DAG и запущу, что произойдет?
источник

V

Vladimir in Airflow
Есть ощущение, что поскольку УЖЕ БЫЛИ запуски в 2018 (допустим последний датируется 1 июня 2018), и start_date = 1990, то airflow считает, что ВЕСЬ период с 1990 до 2018 уже пробежал.
источник

I

Igor in Airflow
Нет, скедулер смотрит по записям в базе и заполняет недостающие интервалы. Если даг настроен так, что ему можно это делать
источник

V

Vladimir in Airflow
(Я понимаю, что если отчистить мета-базу airflow, поставить start_date и запустить даг, то запуски будут идти четко по scheduled interval, как вы и описали. Вопрос в том, какое поведение когда УЖЕ есть какие то запуски и они рандомно расположены по времени)
источник

V

Vladimir in Airflow
@shrimpsizemoose хотел как раз это услышать
источник

I

Igor in Airflow
А в доках это не считывается разве?
источник

V

Vladimir in Airflow
>Если даг настроен так, что ему можно это делать
Можете, пожалуйста, раскрыть этот момент
источник

V

Vladimir in Airflow
@shrimpsizemoose боюсь что считывается, но видимо мне этого показалось мало 😁
источник

OI

Oleg Ilinsky in Airflow
Мб это баг лично у нас, но я замечал, что новый start_date не подтягивается в уже созданный даг и заполненными метаданными
источник

I

Igor in Airflow
Vladimir
>Если даг настроен так, что ему можно это делать
Можете, пожалуйста, раскрыть этот момент
источник

I

Igor in Airflow
Есть ещё возможность насоздавать запусков из CLI с помощью команды backfill в заданном диапазоне дат
источник

OI

Oleg Ilinsky in Airflow
я так посчитал, получается больше 10к инстансов дагов. Сколько у вас тасков я не знаю, но что-то у меня есть сомнения насчет того, что это будет нормально работать. А если и заработает, то вряд ли сможет выполниться за адекватное время)
источник

OI

Oleg Ilinsky in Airflow
Как пример: выгрузку исторических данных из источника backfill'ом мы почти сразу перестали делать, потому что это работало неадекватно долго. Гораздо проще создать отдельный даг с другой логикой и выгрузить/обработать исторические данные пачками. На запуск тасков и дагов у airflow слишком много накладных расходов.
источник

OI

Oleg Ilinsky in Airflow
У меня вопрос насчет приоритетов) я пока не нашел решение этой проблемы. Кто-нибудь пробовал настраивать приоритеты? Или может быть знаете, куда копать?)

Где вообще хранится приоритет таска перед созданием его инстанса? Как я понимаю, этот приоритет считает во время рендеринга дага (вроде это так называется). И priority_weight_total, если добавить логирование, считается корректно. Но в веб-интерфейсе и в таблице task_instance записывается все равно неправильно
источник

OI

Oleg Ilinsky in Airflow
Привет!
Если кому интересно и вдруг кто не знает, наткнулся на такую особенность airflow (про которую ничего не нашел в документации):
параметр priority_weight по умолчанию работает не совсем так, как я ожидал. В очереди учитывается параметр priority_weight_total, который по умолчанию считается как сумма приоритетов, тасков, зависящих от текущего, и приоритета текущего.
return self.priority_weight + sum(
map(lambda task_id: self._dag.task_dict[task_id].priority_weight,
self.get_flat_relative_ids(upstream=upstream))
)

То есть, если есть цепочка тасков 1->2->3->4 и у всех  priority_weight 10, то у первого будет priority_weight_total 40, у второго 30, у третьего 20 и только у последнего будет 10. И когда есть несколько дагов с разными приоритетами и одним пулом на всех, то возникают проблемы.

В параметрах дага или таска можно отключить это, если задать weight_rule='absolute'. Тогда priority_weight_total будет равен priority_weight и все будет работать так, как вы ожидаете)
if self.weight_rule == WeightRule.ABSOLUTE:
   return self.priority_weight


Если хотите посмотреть подробнее, то это в модуле models, в классе BaseOperator в функции priority_weight_total.
источник

OI

Oleg Ilinsky in Airflow
А кто-нибудь пробовал изменить weight_rule?
Я указываю его как 'absolute' и для тасков и для дагов, но priority_weight_total все равно считается как для downstream.
источник

OI

Oleg Ilinsky in Airflow
Ого) привет!
источник

А

Алексей in Airflow
привет
источник

ДС

Дина Сафина in Airflow
ГБЦ переходит на airflow? )
источник

МС

Максим Сёмочкин in Airflow
ГБЦ?
источник