Size: a a a

2021 May 19

RF

Ruslan Fialkovsky in Airflow
или вместо sql_alchemy_pool использовать pgbouncer?
источник

VS

Vladislav 👻 Shishkov... in Airflow
Это две разные сущности
источник

VS

Vladislav 👻 Shishkov... in Airflow
Мне лень с телефона объяснять, поясню проще, вам надо сделать относительно не тасок, а всех воркеров с тредами + вебсерверу надо + шедулеру
источник

VS

Vladislav 👻 Shishkov... in Airflow
Копайте в эту сторону
источник

AC

Anton Chabanets in Airflow
Подскажите, а как конкретно AF проверяет heartbeat? (есть глупая мысль что по логу из дага)
источник

VS

Vladislav 👻 Shishkov... in Airflow
Мысль точно глупая
источник

VS

Vladislav 👻 Shishkov... in Airflow
Вроде даже в документации есть инфа, посмотрите точнее
источник

НС

Надежда Стицюк... in Airflow
Всем привет) подскажите пожалуйста как лучше сделать, у меня есть таска которая получает массив данных, дальше надо запустить по 1 таске на каждый элемент массива, как это лучше сделать?
источник
2021 May 20

P

Pavel in Airflow
В цикле делаете новую таску
источник

НС

Надежда Стицюк... in Airflow
В переменную цикла можно передать данные результат 1 таски?
источник

DP

Dmitriy Pyrin in Airflow
если используете subdug, то можно в динамике через xcom
источник

P

Pavel in Airflow
Да
источник

НС

Надежда Стицюк... in Airflow
Спасибо! Попробую)
источник

AM

Andrey M in Airflow
Добрый день, скажите, в ExternalTaskSensor timedelta может быть отрицательная?
источник

RF

Ruslan Fialkovsky in Airflow
а как - "вам надо сделать относительно не тасок, а всех воркеров с тредами" сделать?
источник

RF

Ruslan Fialkovsky in Airflow
у меня упорно висит 370 idle сессий
источник

RM

Ramil Miftyaev in Airflow
всем привет, может кто подскажет. задача дага собрать данные с нескольких серверов оракла и залить в терадату
первым шагом реализовал забор данных с одного сервера и заливку в терадату, отработало корректно. но как только добавляю блок забора мета данных с терадаты , выдает ошибку Class oracle.jdbc.OracleDriver not found
источник

RM

Ramil Miftyaev in Airflow
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.jdbc_operator import JdbcOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.jdbc_hook import JdbcHook
from teradataml.dataframe import fastload
from urllib.parse import urlparse
import pandas
import teradataml as tdml
import time
import re
import logging
from airflow.utils.trigger_rule import TriggerRule
JdbcConn = JdbcHook(jdbc_conn_id='Tera_light')
JdbcConn.get_connection('Tera_light')
connection = JdbcConn.get_conn()
allQuery = f"SELECT Id, SAP_CODE_DC, schema_id, host, user_id, password_id FROM dev_abc_adm.CM_ORACLE_RC WHERE Active_Flg = 'A'"
allCursor = connection.cursor()
allCursor.execute(allQuery)
allDf = pandas.DataFrame(allCursor.fetchall(),
                        columns=["Id", "SAP_CODE_DC", "schema_id", "host", "user_id", "password_id"])
allCursor.close()
default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'start_date': datetime(2021, 5, 10)
}
receipts_sql = """
SELECT SHIFT_DATE, SAP_CODE_DC, SAP_CODE, WMS_CODE, DATE '2021-05-01'
FROM wms.x5_REP_BP_INFO
WHERE SHIFT_DATE = '20.05.2021'
"""
def _get_teradata_conn():
   conn =  BaseHook.get_connection("Tera_light")
   return conn
def _get_conn_host(conn):
   jdbc_str = conn.host
   jdbc_pattern = 'jdbc:teradata://(.*?)/'
   return re.compile(jdbc_pattern).findall(jdbc_str)[0]
def _data_from_postgres():
   #pass
   pg_hook = JdbcHook(jdbc_conn_id='Oracle_jdbc')
   df: DataFrame = pg_hook.get_pandas_df(sql = receipts_sql)
   teradata_conn = _get_teradata_conn()
   tdml.create_context(username = teradata_conn.login, password=teradata_conn.password, host=_get_conn_host(teradata_conn))
   try:
       fastload.fastload(df = df, table_name = 'ORACLE_REP_BP_INFO1',schema_name = 'DEV_ABC_STG')
   except Exception as error:
       raise Exception
with DAG(dag_id='wf_ora_tera_test', schedule_interval=None, default_args=default_args, max_active_runs=1 ) as dag:
   startDummyTask = DummyOperator(task_id='start_task', retries=3, dag=dag)
   finishDummyTask = DummyOperator(task_id = 'finish_task', retries = 3, dag = dag)
   get_data_from_clickhouse = PythonOperator(
       task_id='get_data_from_tera',
       python_callable=_data_from_postgres,
       dag=dag)
   startDummyTask >> get_data_from_clickhouse >> finishDummyTask
источник

VS

Vladislav 👻 Shishkov... in Airflow
Посчитать
источник

RF

Ruslan Fialkovsky in Airflow
я не очень понимаю как это поможет уменьшить пул соединений? можете пожалуйста подробнее объяснить?
источник