Привет!
Если кому интересно и вдруг кто не знает, наткнулся на такую особенность airflow (про которую ничего не нашел в документации):
параметр
priority_weight
по умолчанию работает не совсем так, как я ожидал. В очереди учитывается параметр
priority_weight_total
, который по умолчанию считается как сумма приоритетов, тасков, зависящих от текущего, и приоритета текущего.
return self.priority_weight + sum(
map(lambda task_id: self._dag.task_dict[task_id].priority_weight,
self.get_flat_relative_ids(upstream=upstream))
)
То есть, если есть цепочка тасков 1->2->3->4 и у всех
priority_weight
10, то у первого будет
priority_weight_total
40, у второго 30, у третьего 20 и только у последнего будет 10. И когда есть несколько дагов с разными приоритетами и одним пулом на всех, то возникают проблемы.
В параметрах дага или таска можно отключить это, если задать
weight_rule='absolute'
. Тогда
priority_weight_total
будет равен
priority_weight
и все будет работать так, как вы ожидаете)
if self.weight_rule == WeightRule.ABSOLUTE:
return self.priority_weight
Если хотите посмотреть подробнее, то это в модуле
models
, в классе
BaseOperator
в функции
priority_weight_total
.