Size: a a a

2020 October 05

ИХ

Ильяс Хакиев... in Airflow
Ilya Lozhkin
типа такого, только не для airflow
а вы сами использовали? есть какой-нить мануал, инструкции, примеры?
источник

IL

Ilya Lozhkin in Airflow
Ильяс Хакиев
а вы сами использовали? есть какой-нить мануал, инструкции, примеры?
Вот ищу же только
источник

ИХ

Ильяс Хакиев... in Airflow
а самим этим плагином? )) дизайн понравился, инфы как обычно не много ((
источник

А

Анастасия in Airflow
Я еще не очень шарю в питоне, а есть возможность на одну строку вызывать два разных исключения в зависимости от ситуации?

Например, у меня есть вот такое:

hook.run_cli(insert)

Иногда бывают случаи, что HiveCliHook говорит, что по каким-то причинам нет соединения - одно исключение
Иногда бывают случаи, что оно пытается выполнить незаконный insert, так как пришли поломанные файлы json - другое исключение

Тогда в первом случае нужно просто повторить попытку, а во втором - скипнуть файл
источник

SG

Sergey Gavrilov in Airflow
Анастасия
Я еще не очень шарю в питоне, а есть возможность на одну строку вызывать два разных исключения в зависимости от ситуации?

Например, у меня есть вот такое:

hook.run_cli(insert)

Иногда бывают случаи, что HiveCliHook говорит, что по каким-то причинам нет соединения - одно исключение
Иногда бывают случаи, что оно пытается выполнить незаконный insert, так как пришли поломанные файлы json - другое исключение

Тогда в первом случае нужно просто повторить попытку, а во втором - скипнуть файл
try:
 hook.run_cli(insert)
except ПервоеИсключение as err:
 do_smt()
 raise err
except ВтороеИсключение as err:
 do smth_else()
источник

А

Анастасия in Airflow
Sergey Gavrilov
try:
 hook.run_cli(insert)
except ПервоеИсключение as err:
 do_smt()
 raise err
except ВтороеИсключение as err:
 do smth_else()
Спасибо!

А как указать, что это именно java.lang.RuntimeException при попытке парсинга json силами hive и какое вообще исключение может быть при таких логах, если везде просто INFO?

INFO - [2020-10-01 13:08:50,532] {{hive_hooks.py:265}} INFO - Connecting to jdbc: <здесь jdbc>
[2020-10-01 13:10:14,039] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,039] {{hive_hooks.py:265}} INFO - Unknown HS2 problem when communicating with Thrift server.
[2020-10-01 13:10:14,039] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,039] {{hive_hooks.py:265}} INFO - Error: Could not open client transport with JDBC Uri: <Здесь jdbc>: Peer indicated failure: Error validating the login (state=08S01,code=0)
[2020-10-01 13:10:14,103] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,103] {{hive_hooks.py:265}} INFO - beeline> USE default;
[2020-10-01 13:10:14,104] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,103] {{hive_hooks.py:265}} INFO - No current connection
[2020-10-01 13:10:14,104] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,104] {{hive_hooks.py:265}} INFO -
[2020-10-01 13:10:14,232] {{logging_mixin.py:112}} WARNING
источник

SG

Sergey Gavrilov in Airflow
Ну мм это не является экцепшоном. Нужно смотреть в коде, что возвращает эта функция. Потому что объяснять, как перехватывать исходящий поток в logging мне бы не хотелось
источник

А

Анастасия in Airflow
Sergey Gavrilov
Ну мм это не является экцепшоном. Нужно смотреть в коде, что возвращает эта функция. Потому что объяснять, как перехватывать исходящий поток в logging мне бы не хотелось
Функция ничего не возвращает. Она просто... есть и что-то делает

А именно:
1. копирует файл с помощью sshhook
2. делает insert с помощью hiveclihook
3. удаляет обратно через sshhook

Между первым и вторым шагом есть еще проверка с помощью webhdfshook на то, что файл скопировался
источник

YL

Yevhenii Lukashov in Airflow
Кто то в курсе библиотека airflow.providers что б напрямую запулить бекап mysql в s3 работает с версией 1.10.9?
источник

SG

Sergey Gavrilov in Airflow
Анастасия
Функция ничего не возвращает. Она просто... есть и что-то делает

А именно:
1. копирует файл с помощью sshhook
2. делает insert с помощью hiveclihook
3. удаляет обратно через sshhook

Между первым и вторым шагом есть еще проверка с помощью webhdfshook на то, что файл скопировался
Прекрасная боль. Очень похоже на написание тестов, только без возможности сделать мок. Я бы предложил залезть внутрь кода, который отправляет в hdfs эту команду и посмотреть, есть ли там хоть какая-то возможность вызывать Exception или во3вращать хоть что-нибудь
источник

ДН

Дмитрий Негреев... in Airflow
Анастасия
Спасибо!

А как указать, что это именно java.lang.RuntimeException при попытке парсинга json силами hive и какое вообще исключение может быть при таких логах, если везде просто INFO?

INFO - [2020-10-01 13:08:50,532] {{hive_hooks.py:265}} INFO - Connecting to jdbc: <здесь jdbc>
[2020-10-01 13:10:14,039] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,039] {{hive_hooks.py:265}} INFO - Unknown HS2 problem when communicating with Thrift server.
[2020-10-01 13:10:14,039] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,039] {{hive_hooks.py:265}} INFO - Error: Could not open client transport with JDBC Uri: <Здесь jdbc>: Peer indicated failure: Error validating the login (state=08S01,code=0)
[2020-10-01 13:10:14,103] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,103] {{hive_hooks.py:265}} INFO - beeline> USE default;
[2020-10-01 13:10:14,104] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,103] {{hive_hooks.py:265}} INFO - No current connection
[2020-10-01 13:10:14,104] {{logging_mixin.py:112}} INFO - [2020-10-01 13:10:14,104] {{hive_hooks.py:265}} INFO -
[2020-10-01 13:10:14,232] {{logging_mixin.py:112}} WARNING
Судя по коду хука, он просто работает с cli клиентом хайва, поэтому питонячьи exception вам тут не помогут.
Похоже вам остается только ловить исключение сабпроцесса и парсить джавовый traceback.
источник

А

Анастасия in Airflow
Дмитрий Негреев
Судя по коду хука, он просто работает с cli клиентом хайва, поэтому питонячьи exception вам тут не помогут.
Похоже вам остается только ловить исключение сабпроцесса и парсить джавовый traceback.
Это как раз то, о чем говорил Сергей пару сообщений выше?

(набираю варианты изложения для гуглежа)
источник

ДН

Дмитрий Негреев... in Airflow
Анастасия
Это как раз то, о чем говорил Сергей пару сообщений выше?

(набираю варианты изложения для гуглежа)
Угу.
Гуглится такой клиент для хайва.
https://pypi.org/project/PyHive/#description

Не знаю насколько хорош, но вроде по pep сделанный клиент.
Там можно какой-нибудь DatabaseError ловить, который хотя бы проще разобрать можно будет.
Но придется свой хук сообразить на основе этого клиента.
Раз он по pep сделан, можно попробовать от DbApiHook отнаследоваться, там уже есть функции для создания коннекта и еще много всего.
источник

А

Анастасия in Airflow
Дмитрий Негреев
Угу.
Гуглится такой клиент для хайва.
https://pypi.org/project/PyHive/#description

Не знаю насколько хорош, но вроде по pep сделанный клиент.
Там можно какой-нибудь DatabaseError ловить, который хотя бы проще разобрать можно будет.
Но придется свой хук сообразить на основе этого клиента.
Раз он по pep сделан, можно попробовать от DbApiHook отнаследоваться, там уже есть функции для создания коннекта и еще много всего.
Спасибо большое)
источник

А

Анастасия in Airflow
Хмм... А dag run не может парсить свои же логи?)

Может, мне проще обрабатывать это в retry?
источник

V

Victor in Airflow
Всем доброго времени суток! При попытке воспроизвести пример из туториала Airflow, у меня запускается DAG Runs, но Task в состоянии Paused...
источник

V

Victor in Airflow
источник

V

Victor in Airflow
Что я сделал не так? Может кто наставить на путь истинный?
источник

AA

Anton Afonin in Airflow
Там вроде написано, что даг на паузе?)
источник

AA

Anton Afonin in Airflow
источник