Size: a a a

2018 May 25

I

Igor in Airflow
TriggerDagRunOperator всё равно использует utcnow когда создаёт DagRunOrder, так что там даг будет запущен не в то же время, если даг, который его тригернул не в utc
https://github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py#L61

А если буквально заменить timezone.utcnow() в этой строчке на datetime.datetime.utcnow() то всё будет предсказуемо
источник
2018 May 28

NB

Nick Bilozerov in Airflow
Igor
У меня была похожая проблема, из-за странного времени запуска, получалось упорно что даг который я тригерю запускался не сразу а как-то там в другое время. Решилось написанием своего класса на базе библиотечного
так а в чем конкретно была проблема, у меня как раз таки аирфлов не в utc, там где то проверка стоит когда время запуска дага наступает? я пробовал запускать TriggerDagRunOperator с текущим временем системы - результат такой же - не работает. свой оператор был написан точно также через crate_dagrun и добавления в сессию? у меня есть рабочий вариант но немного хардкорный с запуском дага через BashOperator
источник

S

Sergii in Airflow
Добрый день, коллеги!
Хотел бы спросить на сколько стабилен у вас планировщик? У нас scheduler интегрирован в systemd и раз в неделю или около того упадет. Загрузка относительно небольшая, 50 дагов, около 5000 тасков за день.
airflow 1.9, python 3
Спасибо :)
источник

V

Vlad in Airflow
Всем привет, хотел спросить как можно использовать  pythonvirtualenvoperator если машинка находится за прокси. при попытке поставить пакеты не хочет пробиться в инет как не пробовали. Если поделитесь вашим примером был бы признателен.
источник

SS

Sergey Sheremeta in Airflow
Sergii
Добрый день, коллеги!
Хотел бы спросить на сколько стабилен у вас планировщик? У нас scheduler интегрирован в systemd и раз в неделю или около того упадет. Загрузка относительно небольшая, 50 дагов, около 5000 тасков за день.
airflow 1.9, python 3
Спасибо :)
а как именно вы его запускаете?
источник

SS

Sergey Sheremeta in Airflow
попробуйте опции -r / -n
источник

S

Sergii in Airflow
Sergey Sheremeta
а как именно вы его запускаете?
-r 14400 по 4 часам бегает
каждый раз упадет на чем-то новом, в последнее время падает на ошибках с коммуникацией с MySQL. Подозреваю что это "нормально", где-то какой-то лаг чуть дольше чем надо и прощай, а может и нет
источник

S

Sergii in Airflow
Есть баг посмешнее
"Executor reports task instance %s finished (%s) although the task says its %s. Was the task killed externally?"
знакомо ли вам это сообщение об ошибке?

Иногда из airflow приходят оповещения с этим сообщением что таск упал.
Но когда смотрю в логи таска вижу счастливое сообщение "Command exited with return code 0". Как мне кажется, случается это только в тех случаях, когда первая попытка была неуспешна. Таск репортит ненулевой exit-code и уходит в очередь на retry.
Потом происходит какая-то магия и вторая попытка отрабатывает нормально, но что-то ломается в обработке статуса или бог еще знает чего.

Признаюсь, что так глубоко в изучении исходников airflow я не добрался, так что могу только спекулировать на тему того где появляется ошибка.
источник
2018 May 29

OI

Oleg Ilinsky in Airflow
Привет!
Если кому интересно и вдруг кто не знает, наткнулся на такую особенность airflow (про которую ничего не нашел в документации):
параметр priority_weight по умолчанию работает не совсем так, как я ожидал. В очереди учитывается параметр priority_weight_total, который по умолчанию считается как сумма приоритетов, тасков, зависящих от текущего, и приоритета текущего.
return self.priority_weight + sum(
map(lambda task_id: self._dag.task_dict[task_id].priority_weight,
self.get_flat_relative_ids(upstream=upstream))
)

То есть, если есть цепочка тасков 1->2->3->4 и у всех  priority_weight 10, то у первого будет priority_weight_total 40, у второго 30, у третьего 20 и только у последнего будет 10. И когда есть несколько дагов с разными приоритетами и одним пулом на всех, то возникают проблемы.

В параметрах дага или таска можно отключить это, если задать weight_rule='absolute'. Тогда priority_weight_total будет равен priority_weight и все будет работать так, как вы ожидаете)
if self.weight_rule == WeightRule.ABSOLUTE:
   return self.priority_weight


Если хотите посмотреть подробнее, то это в модуле models, в классе BaseOperator в функции priority_weight_total.
источник

SS

Sergey Sheremeta in Airflow
Oleg Ilinsky
Привет!
Если кому интересно и вдруг кто не знает, наткнулся на такую особенность airflow (про которую ничего не нашел в документации):
параметр priority_weight по умолчанию работает не совсем так, как я ожидал. В очереди учитывается параметр priority_weight_total, который по умолчанию считается как сумма приоритетов, тасков, зависящих от текущего, и приоритета текущего.
return self.priority_weight + sum(
map(lambda task_id: self._dag.task_dict[task_id].priority_weight,
self.get_flat_relative_ids(upstream=upstream))
)

То есть, если есть цепочка тасков 1->2->3->4 и у всех  priority_weight 10, то у первого будет priority_weight_total 40, у второго 30, у третьего 20 и только у последнего будет 10. И когда есть несколько дагов с разными приоритетами и одним пулом на всех, то возникают проблемы.

В параметрах дага или таска можно отключить это, если задать weight_rule='absolute'. Тогда priority_weight_total будет равен priority_weight и все будет работать так, как вы ожидаете)
if self.weight_rule == WeightRule.ABSOLUTE:
   return self.priority_weight


Если хотите посмотреть подробнее, то это в модуле models, в классе BaseOperator в функции priority_weight_total.
спасибо!
источник

YK

Yaroslav Kuchmiy in Airflow
Oleg Ilinsky
Привет!
Если кому интересно и вдруг кто не знает, наткнулся на такую особенность airflow (про которую ничего не нашел в документации):
параметр priority_weight по умолчанию работает не совсем так, как я ожидал. В очереди учитывается параметр priority_weight_total, который по умолчанию считается как сумма приоритетов, тасков, зависящих от текущего, и приоритета текущего.
return self.priority_weight + sum(
map(lambda task_id: self._dag.task_dict[task_id].priority_weight,
self.get_flat_relative_ids(upstream=upstream))
)

То есть, если есть цепочка тасков 1->2->3->4 и у всех  priority_weight 10, то у первого будет priority_weight_total 40, у второго 30, у третьего 20 и только у последнего будет 10. И когда есть несколько дагов с разными приоритетами и одним пулом на всех, то возникают проблемы.

В параметрах дага или таска можно отключить это, если задать weight_rule='absolute'. Тогда priority_weight_total будет равен priority_weight и все будет работать так, как вы ожидаете)
if self.weight_rule == WeightRule.ABSOLUTE:
   return self.priority_weight


Если хотите посмотреть подробнее, то это в модуле models, в классе BaseOperator в функции priority_weight_total.
Спасибо!
источник

OI

Oleg Ilinsky in Airflow
А кто-нибудь пробовал изменить weight_rule?
Я указываю его как 'absolute' и для тасков и для дагов, но priority_weight_total все равно считается как для downstream.
источник

OI

Oleg Ilinsky in Airflow
в исходниках этот момент менять не хотелось бы))
источник

OI

Oleg Ilinsky in Airflow
В момент создания дага priority_weight_total корректный...
источник
2018 May 31

NB

Nick Bilozerov in Airflow
Nick Bilozerov
Пример который я использую: https://codebunk.com/b/700221710/ и проблема в том что dependent dag который я запускаю с TriggerDagRunOperator находится всегда в состоянии running и не запускает свои таски
для тех у кого возможно есть/была/будет подобная проблема. вся суть проблемы в этой строчке - https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L4380 , когда создается dag если у вас время не utc, то будет создаваться даг с текущим временем системы что по сравнению utc будет в будущем. Фикситься путем копипасты оригинального класса и добавления execution_date параметра с utc.now. в мастере аирфлов эта проблема уже исправлена
источник

I

Igor in Airflow
Nick Bilozerov
для тех у кого возможно есть/была/будет подобная проблема. вся суть проблемы в этой строчке - https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L4380 , когда создается dag если у вас время не utc, то будет создаваться даг с текущим временем системы что по сравнению utc будет в будущем. Фикситься путем копипасты оригинального класса и добавления execution_date параметра с utc.now. в мастере аирфлов эта проблема уже исправлена
told ya
источник

I

Igor in Airflow
Кстати, я сегодня наткнулся ещё на один ээмм "баг"? С datetime в airflow, https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L833
Вот тут только что созданный даг с schedule @once не запускается потому что сравнивается next_start может быть None и его нельзя сравнить с now
источник

I

Igor in Airflow
Кто-то вообще имел успешный опыт с таким расписанием?
источник
2018 June 01

KP

Konstantin Palyanichka in Airflow
Привет. Подскажите: могу ли я использовать jinja template в SqlOperator ?
источник

OI

Oleg Ilinsky in Airflow
Привет!
В своей версии такой оператор не вижу. Но вообще, скорее всего, в параметре sql можно. В коде оператора перед init все параметры, которые темплейтятся должны быть в templated_fields.
источник