DP
Size: a a a
DP
DP
ЕК
ЕК
ЕК
Broken DAG: [/opt/airflow/dags/test_migration_pg_clickhouse.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1241, in set_upstream
self._set_relatives(task_or_task_list, upstream=True)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
GB
Broken DAG: [/opt/airflow/dags/test_migration_pg_clickhouse.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1241, in set_upstream
self._set_relatives(task_or_task_list, upstream=True)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 1178, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
ЕК
from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
def postgres_to_clickhouse():
postgres_hook = PostgresHook(postgres_conn_id='la_postgres_slave')
ch_hook = ClickHouseHook(clickhouse_conn_id='dwh')
records = postgres_hook.get_records('''
SELECT
c.id category_id,
c.name_ru category,
c.group_id,
x.name_ru as "group",
case when c.created_at > x.created_at then c.created_at else x.created_at end as created_at,
case when c.updated_at > x.updated_at then c.updated_at else x.updated_at end as updated_at
FROM targetings.categories c
inner join targetings.category_groups x
on c.group_id = x.id
''')
ch_hook.run('INSERT INTO ads.categories VALUES', records)
with DAG(
dag_id='postgres_to_clickhouse',
start_date=datetime(2020, 4, 20),
schedule_interval=timedelta(minutes=5),
default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
tags=['zalupa'],
) as dag:
dag >> PythonOperator(
task_id='postgres_to_clickhouse',
python_callable=postgres_to_clickhouse,
)
GB
from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
def postgres_to_clickhouse():
postgres_hook = PostgresHook(postgres_conn_id='la_postgres_slave')
ch_hook = ClickHouseHook(clickhouse_conn_id='dwh')
records = postgres_hook.get_records('''
SELECT
c.id category_id,
c.name_ru category,
c.group_id,
x.name_ru as "group",
case when c.created_at > x.created_at then c.created_at else x.created_at end as created_at,
case when c.updated_at > x.updated_at then c.updated_at else x.updated_at end as updated_at
FROM targetings.categories c
inner join targetings.category_groups x
on c.group_id = x.id
''')
ch_hook.run('INSERT INTO ads.categories VALUES', records)
with DAG(
dag_id='postgres_to_clickhouse',
start_date=datetime(2020, 4, 20),
schedule_interval=timedelta(minutes=5),
default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
tags=['zalupa'],
) as dag:
dag >> PythonOperator(
task_id='postgres_to_clickhouse',
python_callable=postgres_to_clickhouse,
)
dag >> PythonOperator(
task_id='postgres_to_clickhouse',
python_callable=postgres_to_clickhouse,
)
GB
from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
def postgres_to_clickhouse():
postgres_hook = PostgresHook(postgres_conn_id='la_postgres_slave')
ch_hook = ClickHouseHook(clickhouse_conn_id='dwh')
records = postgres_hook.get_records('''
SELECT
c.id category_id,
c.name_ru category,
c.group_id,
x.name_ru as "group",
case when c.created_at > x.created_at then c.created_at else x.created_at end as created_at,
case when c.updated_at > x.updated_at then c.updated_at else x.updated_at end as updated_at
FROM targetings.categories c
inner join targetings.category_groups x
on c.group_id = x.id
''')
ch_hook.run('INSERT INTO ads.categories VALUES', records)
with DAG(
dag_id='postgres_to_clickhouse',
start_date=datetime(2020, 4, 20),
schedule_interval=timedelta(minutes=5),
default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
tags=['zalupa'],
) as dag:
dag >> PythonOperator(
task_id='postgres_to_clickhouse',
python_callable=postgres_to_clickhouse,
)
ЕК
ЕК
from airflow import DAG
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='update_income_aggregate',
start_date=days_ago(2),
) as dag:
ClickHouseOperator(
task_id='update_income_aggregate',
database='default',
sql=(
'''
INSERT INTO default.date_test
SELECT DISTINCT date FROM ads.daily_webmasters dw
WHERE date = '{{ ds }}'
''', '''
SELECT max(date) FROM date_test
WHERE date BETWEEN
'{{ execution_date.start_of('month').to_date_string() }}'
AND '{{ execution_date.end_of('month').to_date_string() }}'
''',
# result of the last query is pushed to XCom
),
clickhouse_conn_id='dwh',
) >> PythonOperator(
task_id='print_month_income',
provide_context=True,
python_callable=lambda task_instance, **_:
# pulling XCom value and printing it
print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
)
GB
GB
from airflow import DAG
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='update_income_aggregate',
start_date=days_ago(2),
) as dag:
ClickHouseOperator(
task_id='update_income_aggregate',
database='default',
sql=(
'''
INSERT INTO default.date_test
SELECT DISTINCT date FROM ads.daily_webmasters dw
WHERE date = '{{ ds }}'
''', '''
SELECT max(date) FROM date_test
WHERE date BETWEEN
'{{ execution_date.start_of('month').to_date_string() }}'
AND '{{ execution_date.end_of('month').to_date_string() }}'
''',
# result of the last query is pushed to XCom
),
clickhouse_conn_id='dwh',
) >> PythonOperator(
task_id='print_month_income',
provide_context=True,
python_callable=lambda task_instance, **_:
# pulling XCom value and printing it
print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
)
dag >> operator
ЕК
ЕК
GB
ЕК
GB
with dag:
task_1 = PythonOperator(...)
dag = DAG(...)
task_1 = PythonOperator(dag=dag, ...)