Skip to content

Operators

Reusable operators to build Airflow DAGs.

CreateExternalTableArrow

CreateExternalTableArrow(table_name: str, sql_conn_id: str, database: str, schema_name: str, s3_location: str, type_map: dict[str, str], partitioned: bool = False, force_drop: bool = False, external_schema_name: str = 'src', **kwargs: Any)

Bases: BaseOperator

Generate SQL DDL to create external table in target database.

Parameters:

Name Type Description Default

table_name

str

The name of the table to create.

required

sql_conn_id

str

The source connection id.

required

s3_location

str

The S3 location of the data.

required

force_drop

bool

If True, drop the table first. Defaults to False.

False

external_schema_name

str

The schema name. Defaults to "src".

'src'

partitioned

bool

Partition by run_date. Defaults to False.

False

Raises:

Type Description
AirflowException

If the table does not exist in the source database

Returns:

Type Description
None

None

Example
create_external_table = CreateExternalTable(
    task_id="create_external_table",
    table_name="my_table",
    sql_conn_id="my_conn",
    s3_location="s3://my-bucket/my-folder/",
    force_drop=True,
)

SQLReflectOperator

SQLReflectOperator(*, table_name: str, database: str | None = None, schema: str | None = None, **kwargs: Any)

Bases: SQLExecuteQueryOperator

Operator to perform SQLAlchemy like database reflection.

The target_table is returned as a SELECT statement DDL.

Example

The example below illustrates a PostrgeSQL database and the returned SELECT query.

CREATE TABLE IF NOT EXISTS ats
(
    departure_id varchar(40) COLLATE pg_catalog."default" NOT NULL,
    route_leg_code varchar(40) COLLATE pg_catalog."default" NOT NULL,
    planned_departure_date_time timestamp without time zone NOT NULL,
    ferry_name varchar(40) COLLATE pg_catalog."default" NOT NULL,
    cnv_outlet varchar(40) COLLATE pg_catalog."default" NOT NULL,
    store_name varchar(40) COLLATE pg_catalog."default" NOT NULL,
    store_item varchar(200) COLLATE pg_catalog."default" NOT NULL,
    predicted_sales double precision NOT NULL,
    good boolean DEFAULT false,
    CONSTRAINT ats_pkey PRIMARY KEY (departure_id, route_leg_code, ferry_name, cnv_outlet, store_name, store_item)
);
reflect_table = SQLReflectOperator(
    table_name="ats",
    task_id="reflect_database",
    conn_id=CONN_ID,
)
SELECT
    ats.departure_id,
    ats.route_leg_code,
    ats.planned_departure_date_time,
    ats.ferry_name,
    ats.cnv_outlet,
    ats.store_name,
    ats.store_item,
    ats.predicted_sales,
    ats.good
FROM ats

Parameters:

Name Type Description Default

table

target table name

required

kwargs

Any

additional arguments to pass to SQLExecuteQueryOperator

{}

SqlToS3Operator

SqlToS3Operator(chunksize: int = 10 ** 6, fix_dtypes: bool = True, where_clause: str | None = None, join_clause: str | None = None, type_mapping: PyarrowMapping = Pyarrow2redshift, database: str | None = None, **kwargs: Any)

Bases: SqlToS3Operator

Move data from SQL source to S3.

Uses batching. Empty files are not written, which prevents breaking external tables in Redshift.

Partitioning If the data is partitioned, run_date= partitions will be used.

Until we have observability, we will not optimize/move away from the built-in provider (and pandas).

kwargs
Example

Output query:

SELECT [EngineSourceKey]
      ,[EngineSourceCode]
      ,[EngineSourceName]
FROM [dbo].[t_D_EngineSource];

file_options property

file_options: FileOptions

Get the file options for the file format.

partitioned property

partitioned: bool

Check if the file is partitioned.

Returns:

Name Type Description
bool bool

True if the file is partitioned

get_random_string staticmethod

get_random_string(char_count: int = 10) -> str

Given number of characters returns a random string with that length.

:param char_count: number of characters :type char_count: int