from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.jdbc_operator import JdbcOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.jdbc_hook import JdbcHook
from teradataml.dataframe import fastload
from urllib.parse import urlparse
import pandas
import teradataml as tdml
import time
import re
import logging
from airflow.utils.trigger_rule import TriggerRule
JdbcConn = JdbcHook(jdbc_conn_id='Tera_light')
JdbcConn.get_connection('Tera_light')
connection = JdbcConn.get_conn()
allQuery = f"SELECT Id, SAP_CODE_DC, schema_id, host, user_id, password_id FROM dev_abc_adm.CM_ORACLE_RC WHERE Active_Flg = 'A'"
allCursor = connection.cursor()
allCursor.execute(allQuery)
allDf = pandas.DataFrame(allCursor.fetchall(),
columns=["Id", "SAP_CODE_DC", "schema_id", "host", "user_id", "password_id"])
allCursor.close()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 5, 10)
}
receipts_sql = """
SELECT SHIFT_DATE, SAP_CODE_DC, SAP_CODE, WMS_CODE, DATE '2021-05-01'
FROM wms.x5_REP_BP_INFO
WHERE SHIFT_DATE = '20.05.2021'
"""
def _get_teradata_conn():
conn = BaseHook.get_connection("Tera_light")
return conn
def _get_conn_host(conn):
jdbc_str =
conn.host jdbc_pattern = 'jdbc:teradata://(.*?)/'
return re.compile(jdbc_pattern).findall(jdbc_str)[0]
def _data_from_postgres():
#pass
pg_hook = JdbcHook(jdbc_conn_id='Oracle_jdbc')
df: DataFrame = pg_hook.get_pandas_df(sql = receipts_sql)
teradata_conn = _get_teradata_conn()
tdml.create_context(username = teradata_conn.login, password=teradata_conn.password, host=_get_conn_host(teradata_conn))
try:
fastload.fastload(df = df, table_name = 'ORACLE_REP_BP_INFO1',schema_name = 'DEV_ABC_STG')
except Exception as error:
raise Exception
with DAG(dag_id='wf_ora_tera_test', schedule_interval=None, default_args=default_args, max_active_runs=1 ) as dag:
startDummyTask = DummyOperator(task_id='start_task', retries=3, dag=dag)
finishDummyTask = DummyOperator(task_id = 'finish_task', retries = 3, dag = dag)
get_data_from_clickhouse = PythonOperator(
task_id='get_data_from_tera',
python_callable=_data_from_postgres,
dag=dag)
startDummyTask >> get_data_from_clickhouse >> finishDummyTask