Size: a a a

2020 September 01

L

Lexis in Airflow
Edya
у кого-нибудь есть рабочий пример logging_config_class ?
Я так юзаю
LOGGING_CONFIG = {
   'version': 1,
   'disable_existing_loggers': False,
   'formatters': {
       'airflow': {
           'format': LOG_FORMAT
       },
       'airflow_coloured': {
           'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
           FORMATTER_CLASS_KEY: COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter'
       },
   },
   'handlers': {
       'console': {
           'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
           'formatter': 'airflow_coloured',
           'stream': 'sys.stdout'
       },
       'console2': {
           'class': 'logging.StreamHandler',
           'formatter': 'airflow_coloured'
       },
       'task': {
           'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
           'formatter': 'airflow',
           'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
           'filename_template': FILENAME_TEMPLATE,
       },
       'processor': {
           'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
           'formatter': 'airflow',
           'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
           'filename_template': PROCESSOR_FILENAME_TEMPLATE,
       }
   },
   'loggers': {
       'airflow.processor': {
           'handlers': ['processor'],
           'level': LOG_LEVEL,
           'propagate': False,
       },
       'airflow.task': {
           'handlers': ['task', 'console2'],
           'level': LOG_LEVEL,
           'propagate': False,
       },
       'flask_appbuilder': {
           'handler': ['console'],
           'level': FAB_LOG_LEVEL,
           'propagate': True,
       }
   },
   'root': {
       'handlers': ['console'],
       'level': LOG_LEVEL,
   }
}  # type: Dict[str, Any]
источник

E

Edya in Airflow
Lexis
Я так юзаю
LOGGING_CONFIG = {
   'version': 1,
   'disable_existing_loggers': False,
   'formatters': {
       'airflow': {
           'format': LOG_FORMAT
       },
       'airflow_coloured': {
           'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
           FORMATTER_CLASS_KEY: COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter'
       },
   },
   'handlers': {
       'console': {
           'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
           'formatter': 'airflow_coloured',
           'stream': 'sys.stdout'
       },
       'console2': {
           'class': 'logging.StreamHandler',
           'formatter': 'airflow_coloured'
       },
       'task': {
           'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
           'formatter': 'airflow',
           'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
           'filename_template': FILENAME_TEMPLATE,
       },
       'processor': {
           'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
           'formatter': 'airflow',
           'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
           'filename_template': PROCESSOR_FILENAME_TEMPLATE,
       }
   },
   'loggers': {
       'airflow.processor': {
           'handlers': ['processor'],
           'level': LOG_LEVEL,
           'propagate': False,
       },
       'airflow.task': {
           'handlers': ['task', 'console2'],
           'level': LOG_LEVEL,
           'propagate': False,
       },
       'flask_appbuilder': {
           'handler': ['console'],
           'level': FAB_LOG_LEVEL,
           'propagate': True,
       }
   },
   'root': {
       'handlers': ['console'],
       'level': LOG_LEVEL,
   }
}  # type: Dict[str, Any]
спасибо
источник

ME

Max Efremov in Airflow
Подскажите, создаю такой даг:

def foo():
   two = DummyOperator(task_id="two")
   three = DummyOperator(task_id="three")
   return two >> three

one = DummyOperator(task_id="one")
four = DummyOperator(task_id="four")
one >> foo() >> four

В результате получаю такое:
источник

ME

Max Efremov in Airflow
источник

ME

Max Efremov in Airflow
Как сделать, чтобы они были по порядку?
источник

OI

Oleg Ilinsky in Airflow
Max Efremov
Подскажите, создаю такой даг:

def foo():
   two = DummyOperator(task_id="two")
   three = DummyOperator(task_id="three")
   return two >> three

one = DummyOperator(task_id="one")
four = DummyOperator(task_id="four")
one >> foo() >> four

В результате получаю такое:
я думаю, лучше как-то так:
def foo():
   two = DummyOperator(task_id="two")
   three = DummyOperator(task_id="three")
   two >> three
   return [two, three]

one = DummyOperator(task_id="one")
four = DummyOperator(task_id="four")
one >> foo() >> four
источник

ME

Max Efremov in Airflow
источник

ME

Max Efremov in Airflow
получилось ещё забавнее)
источник

ME

Max Efremov in Airflow
оно и как массив отработало и связало внутри
источник

OI

Oleg Ilinsky in Airflow
Max Efremov
оно и как массив отработало и связало внутри
а, точно, всё правильно
там оба элемента массива будут зависеть от one
источник

OI

Oleg Ilinsky in Airflow
фактический порядок будет как вы хотите, но лишняя зависимость, тут конечно ни к чему
источник

OI

Oleg Ilinsky in Airflow
источник

ME

Max Efremov in Airflow
да, такое только усложнит итоговый даг
источник

ME

Max Efremov in Airflow
наверное можно сабдагом, но это выглядит как усложнение уже)
источник

ME

Max Efremov in Airflow
я просто хочу часть операторов вынести в функцию и всё, для чистоты кода
источник

OI

Oleg Ilinsky in Airflow
def foo(previous, follows):
   two = DummyOperator(task_id="two")
   three = DummyOperator(task_id="three")
   previous >> two >> three >> follows
   return [two, three]
   

one = DummyOperator(task_id="one")
four = DummyOperator(task_id="four")
foo(one, four)


а если так?
источник

ИГ

Игорь Гомановский... in Airflow
Последнюю строку заменить на
foo = foo()
one >> foo[0]
foo[-1] >> four
источник

ИГ

Игорь Гомановский... in Airflow
Но выглядит оч странно
источник

ME

Max Efremov in Airflow
Oleg Ilinsky
def foo(previous, follows):
   two = DummyOperator(task_id="two")
   three = DummyOperator(task_id="three")
   previous >> two >> three >> follows
   return [two, three]
   

one = DummyOperator(task_id="one")
four = DummyOperator(task_id="four")
foo(one, four)


а если так?
да, это работает, но ясности коду не приносит)
источник

ME

Max Efremov in Airflow
Игорь Гомановский
Последнюю строку заменить на
foo = foo()
one >> foo[0]
foo[-1] >> four
это вроде лучше выглядит, хотя да, тоже не совсем понятно)
источник