Operators
Reusable operators to build Airflow DAGs.
CreateExternalTableArrow
¶
CreateExternalTableArrow(table_name: str, sql_conn_id: str, database: str, schema_name: str, s3_location: str, type_map: dict[str, str], partitioned: bool = False, force_drop: bool = False, external_schema_name: str = 'src', **kwargs: Any)
Bases: BaseOperator
Generate SQL DDL to create external table in target database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
The name of the table to create. |
required |
|
str
|
The source connection id. |
required |
|
str
|
The S3 location of the data. |
required |
|
bool
|
If True, drop the table first. Defaults to False. |
False
|
|
str
|
The schema name. Defaults to "src". |
'src'
|
|
bool
|
Partition by |
False
|
Raises:
Type | Description |
---|---|
AirflowException
|
If the table does not exist in the source database |
Returns:
Type | Description |
---|---|
None
|
None |
Example
SQLReflectOperator
¶
SQLReflectOperator(*, table_name: str, database: str | None = None, schema: str | None = None, **kwargs: Any)
Bases: SQLExecuteQueryOperator
Operator to perform SQLAlchemy like database reflection.
The target_table is returned as a SELECT
statement DDL.
Example
The example below illustrates a PostrgeSQL database and the returned SELECT query.
CREATE TABLE IF NOT EXISTS ats
(
departure_id varchar(40) COLLATE pg_catalog."default" NOT NULL,
route_leg_code varchar(40) COLLATE pg_catalog."default" NOT NULL,
planned_departure_date_time timestamp without time zone NOT NULL,
ferry_name varchar(40) COLLATE pg_catalog."default" NOT NULL,
cnv_outlet varchar(40) COLLATE pg_catalog."default" NOT NULL,
store_name varchar(40) COLLATE pg_catalog."default" NOT NULL,
store_item varchar(200) COLLATE pg_catalog."default" NOT NULL,
predicted_sales double precision NOT NULL,
good boolean DEFAULT false,
CONSTRAINT ats_pkey PRIMARY KEY (departure_id, route_leg_code, ferry_name, cnv_outlet, store_name, store_item)
);
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
target table name |
required | |
|
Any
|
additional arguments to pass to SQLExecuteQueryOperator |
{}
|
SqlToS3Operator
¶
SqlToS3Operator(chunksize: int = 10 ** 6, fix_dtypes: bool = True, where_clause: str | None = None, join_clause: str | None = None, type_mapping: PyarrowMapping = Pyarrow2redshift, database: str | None = None, **kwargs: Any)
Bases: SqlToS3Operator
Move data from SQL source to S3.
Uses batching. Empty files are not written, which prevents breaking external tables in Redshift.
Partitioning
If the data is partitioned, run_date=
partitions will be used.
Until we have observability, we will not optimize/move away from the built-in provider (and pandas).
kwargs
Example
Output query: