Size: a a a

2021 February 26

А

Андрей in Airflow
Georgy Borodin
Сделай свой LatestOnlyOperator без скипания даунстрима 🤷🏻‍♂️
так мне часть тасков надо скипать. часть нет.
источник

GB

Georgy Borodin in Airflow
Андрей
так мне часть тасков надо скипать. часть нет.
Ну сделай свой LatestOnlyOperator со скипом выбранных)
источник

GB

Georgy Borodin in Airflow
LatestOnlyOperator проверяет, последний ли это даг ран и без вариантов скипает последующие таски
источник

А

Андрей in Airflow
мне надо чтобы load_to_stage_done отрабатывал всегда
источник

А

Андрей in Airflow
добавил в него trigger_rule='none_failed', чет не помогает
источник

GB

Georgy Borodin in Airflow
Андрей
мне надо чтобы load_to_stage_done отрабатывал всегда
Посмотри код LatestOnlyOperator-а и поймёшь, о чём я
источник

А

Андрей in Airflow
Georgy Borodin
Посмотри код LatestOnlyOperator-а и поймёшь, о чём я
я понял. он же весь downstream скипает
источник

GB

Georgy Borodin in Airflow
Андрей
я понял. он же весь downstream скипает
Ага. Можно спокойно наследовать LatestOnlyOperator, а в нём уже поменять логику, вопрос копипаста и двух строчек кода)
источник

А

Андрей in Airflow
Georgy Borodin
Ага. Можно спокойно наследовать LatestOnlyOperator, а в нём уже поменять логику, вопрос копипаста и двух строчек кода)
Похоже получилось. Спасибо!
источник

А

Андрей in Airflow
# downstream_tasks = context['task'].get_flat_relatives(upstream=False)
           downstream_tasks = context['task'].get_direct_relatives(upstream=False)
источник

А

Андрей in Airflow
одну строчку поменять
источник

ME

Max Efremov in Airflow
А насколько болезненно перейти на airflow 2?
источник

GB

Georgy Borodin in Airflow
Max Efremov
А насколько болезненно перейти на airflow 2?
https://airflow.apache.org/docs/apache-airflow/stable/upgrade-check.html должен подсказать, что придётся изменить.
А так мы пока только готовимся, в проде не перешли
источник

ME

Max Efremov in Airflow
Я больше про субъективные впечатления)
источник

ME

Max Efremov in Airflow
Но тула интересная, спасибо)
источник
2021 February 27

ЮФ

Юрий Федин in Airflow
Всем привет. А в DockerOperator нет возможности порты маппить?
источник

C

Combot in Airflow
Добро пожаловать в самое дружелюбное комьюнити.
источник

C

Combot in Airflow
Добро пожаловать в самое дружелюбное комьюнити.
источник

PA

Panchenko Andrey in Airflow
Ребята нужна помощь.
Дело вот в чем я написал функцию для отправки месседжей в гугл чат по хуку, все работает.
но почему то тасчка сначала отправляет сообщение а потом падает хотя все по факту произошло уже.
def message_on_success_dag(**context):
   """Hangouts Chat incoming webhook quickstart."""
   dag_id = context.get('task_instance').dag_id
   end_date = datetime.utcnow().strftime("%Y-%m-%d %H:%m")

   bot_message = {
       'text': "DAG *{dag}*, marking as Success, "
               "scheduled at {date_time}".format(dag=dag_id, date_time=end_date)
   }

   message_headers = {'Content-Type': 'application/json; charset=UTF-8'}
   http_obj = Http()

   return http_obj.request(
       uri=webhook_url,
       method='POST',
       headers=message_headers,
       body=dumps(bot_message)
   )

[2021-02-27 09:51:46,405] {python_operator.py:113} INFO - Done.
INFO - Job 200377: Subtask DAG_update_successful [2021-02-27 09:51:46,407] {__init__.py:1631} ERROR - Object of type 'bytes' is not JSON serializable
что оно не может сериализовать?
источник

PA

Panchenko Andrey in Airflow
Traceback (most recent call last)
 File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1495, in _run_raw_tas
   self.xcom_push(key=XCOM_RETURN_KEY, value=result
 File "/usr/local/lib/airflow/airflow/models/__init__.py", line 1919, in xcom_pus
   execution_date=execution_date or self.execution_date
 File "/usr/local/lib/airflow/airflow/utils/db.py", line 73, in wrappe
   return func(*args, **kwargs
 File "/usr/local/lib/airflow/airflow/models/__init__.py", line 4595, in se
   value = json.dumps(value).encode('UTF-8'
 File "/opt/python3.6/lib/python3.6/json/__init__.py", line 231, in dump
   return _default_encoder.encode(obj
 File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encod
   chunks = self.iterencode(o, _one_shot=True
 File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencod
   return _iterencode(o, 0
 File "/opt/python3.6/lib/python3.6/json/encoder.py", line 180, in defaul
   o.__class__.__name__
источник