Size: a a a

2020 September 10

AB

Anton Bukreev in Airflow
Пробую вот запустить SSHOperator
command="sh /home/spark/rmsp_csv_temp/test_spark.sh {{task_instance.xcom_pull(task_ids='get_nifi_download_message', key='message')[0][\"flowStatus\"][\"sourceDataset\"]}}"
В xcom  данные есть. Таска успешно завершается. Но до самого скрипта не доходит переменная.  В Логах вижу [2020-09-10 20:07:52,010] {ssh_operator.py:109} INFO - Running command: sh /home/spark/rmsp_csv_temp/test_spark.sh Как я понимаю, все что после пробела обрезается в команде. Как заставить его принять на вход переменную?
источник

ME

Max Efremov in Airflow
А в рендеред темплейте что?
источник

I

Igor in Airflow
Anton Bukreev
Пробую вот запустить SSHOperator
command="sh /home/spark/rmsp_csv_temp/test_spark.sh {{task_instance.xcom_pull(task_ids='get_nifi_download_message', key='message')[0][\"flowStatus\"][\"sourceDataset\"]}}"
В xcom  данные есть. Таска успешно завершается. Но до самого скрипта не доходит переменная.  В Логах вижу [2020-09-10 20:07:52,010] {ssh_operator.py:109} INFO - Running command: sh /home/spark/rmsp_csv_temp/test_spark.sh Как я понимаю, все что после пробела обрезается в команде. Как заставить его принять на вход переменную?
а попробуй через мультилайн?
command = """
sh /home/spark/cho/tam/u/tebya {{task_instance.xcom_pull(blablalba }}

""",
источник

AB

Anton Bukreev in Airflow
Сейчас
источник

AB

Anton Bukreev in Airflow
Max Efremov
А в рендеред темплейте что?
sh '/home/spark/rmsp_csv_temp/test_spark.sh ' получается вообще не отрабатывает task_instance
источник

ME

Max Efremov in Airflow
Anton Bukreev
sh '/home/spark/rmsp_csv_temp/test_spark.sh ' получается вообще не отрабатывает task_instance
Мб там пусто...
источник

AB

Anton Bukreev in Airflow
Max Efremov
Мб там пусто...
Нет. Это скрин с того же запуска дага что и рендеред темплайт
источник

ME

Max Efremov in Airflow
Хм, ключ написан с большой буквы не уверен, но вдруг регистрозависимо...
источник

AB

Anton Bukreev in Airflow
Max Efremov
Хм, ключ написан с большой буквы не уверен, но вдруг регистрозависимо...
Да это просто паста старая, я уже пофиксил
источник

AB

Anton Bukreev in Airflow
Igor
а попробуй через мультилайн?
command = """
sh /home/spark/cho/tam/u/tebya {{task_instance.xcom_pull(blablalba }}

""",
так же все
источник

ME

Max Efremov in Airflow
А если убрать имя таски, просто по ключу получить?
источник

AB

Anton Bukreev in Airflow
Max Efremov
А если убрать имя таски, просто по ключу получить?
так же пусто
источник
2020 September 11

AB

Anton Bukreev in Airflow
Anton Bukreev
Пробую вот запустить SSHOperator
command="sh /home/spark/rmsp_csv_temp/test_spark.sh {{task_instance.xcom_pull(task_ids='get_nifi_download_message', key='message')[0][\"flowStatus\"][\"sourceDataset\"]}}"
В xcom  данные есть. Таска успешно завершается. Но до самого скрипта не доходит переменная.  В Логах вижу [2020-09-10 20:07:52,010] {ssh_operator.py:109} INFO - Running command: sh /home/spark/rmsp_csv_temp/test_spark.sh Как я понимаю, все что после пробела обрезается в команде. Как заставить его принять на вход переменную?
а с переменной {{ ds }} нормально все отрабатывает
источник

AT

Al T in Airflow
Vladislav 👻 Shishkov
airflow имеет ошибки импорта за 1 минуту
Airflow import process has failed with errors 1 minute(s) ago
источник

VS

Vladislav 👻 Shishkov... in Airflow
Уже не правильно
источник

AT

Al T in Airflow
оставьте как есть тогда :) все равно никто не поймет
источник

AT

Al T in Airflow
попробую догадаться что если по русски жто означает что процесс не отвалился но за последнюю минуту кинул в логи или куда там какое-то количество ошибок, то тогда будет   Airflow import process has registered (или logged) some errors in last 1 minute(s)
источник

AB

Anton Bukreev in Airflow
Методом научного тыка было установлено что {{ task_instance.xcom_pull(task_ids='get_nifi_download_message', key='message')[0]['flowStatus'] }} вот так работает, а вот так {{ task_instance.xcom_pull(task_ids='get_nifi_download_message', key='message')[0]['flowStatus']['sourceDataset'] }} (с вложенным словарем) уже нет. Никто не сталкивался с таким? Как в Jinja можно достать по ключу из словаря значение?
источник

AB

Anton Bukreev in Airflow
Не охота Python-оператор ставить дополнительный
источник

SG

Sergey Gavrilov in Airflow
Как атрибут, насколько я помню
источник