๐งช Schema and data contracts
dlt
will evolve the schema at the destination by following the structure and data types of the extracted data. There are several modes
that you can use to control this automatic schema evolution, from the default modes where all changes to the schema are accepted to
a frozen schema that does not change at all.
Consider this example:
@dlt.resource(schema_contract={"tables": "evolve", "columns": "freeze"})
def items():
...
This resource will allow new tables (both nested tables and tables with dynamic names) to be created, but will throw an exception if data is extracted for an existing table which contains a new column.
Setting up the contractโ
You can control the following schema entities:
tables
- contract is applied when a new table is createdcolumns
- contract is applied when a new column is created on an existing tabledata_type
- contract is applied when data cannot be coerced into a data type associate with existing column.
You can use contract modes to tell dlt
how to apply contract for a particular entity:
evolve
: No constraints on schema changes.freeze
: This will raise an exception if data is encountered that does not fit the existing schema, so no data will be loaded to the destinationdiscard_row
: This will discard any extracted row if it does not adhere to the existing schema, and this row will not be loaded to the destination.discard_value
: This will discard data in an extracted row that does not adhere to the existing schema and the row will be loaded without this data.
The default mode (evolve) works as follows:
- New tables may be always created
- New columns may be always appended to the existing table
- Data that do not coerce to existing data type of a particular column will be sent to a variant column created for this particular type.
Passing schema_contract argumentโ
The schema_contract
exists on the dlt.source decorator as a default for all resources in that source and on the
dlt.resource decorator as a directive for the individual resource - and as a consequence - on all tables created by this resource.
Additionally it exists on the pipeline.run()
method, which will override all existing settings.
The schema_contract
argument accepts two forms:
- full: a mapping of schema entities to contract modes
- shorthand a contract mode (string) that will be applied to all schema entities.
For example setting schema_contract
to freeze will expand to the full form:
{"tables": "freeze", "columns": "freeze", "data_type": "freeze"}
You can change the contract on the source instance via schema_contract
property. For resource you can use apply_hints.
Nuances of contract modes.โ
- Contracts are applied after names of tables and columns are normalized.
- Contract defined on a resource is applied to all root tables and nested tables created by that resource.
discard_row
works on table level. So for example if you have two tables in nested relationship ie. users and users__addresses and contract is violated in users__addresses table, the row of that table is discarded while the parent row in users table will be loaded.
Use Pydantic models for data validationโ
Pydantic models can be used to define table schemas and validate incoming data. You can use any model you already have. dlt
will internally synthesize (if necessary) new models that conform with the schema contract on the resource.
Just passing a model in column
argument of the dlt.resource sets a schema contract that conforms to default Pydantic behavior:
{
"tables": "evolve",
"columns": "discard_value",
"data_type": "freeze"
}
New tables are allowed, extra fields are ignored and invalid data raises an exception.
If you pass schema contract explicitly the following happens to schema entities:
- tables do not impact the Pydantic models
- columns modes are mapped into the extra modes of Pydantic (see below).
dlt
will apply this setting recursively if models contain other models. - data_type supports following modes for Pydantic: evolve will synthesize lenient model that allows for any data type. This may result with variant columns upstream.
freeze will re-raise
ValidationException
. discard_row will remove the non-validating data items. discard_value is not currently supported. We may eventually do that on Pydantic v2.
dlt
maps column contract modes into the extra fields settings as follows.
Note that this works in two directions. If you use a model with such setting explicitly configured, dlt
sets the column contract mode accordingly. This also avoids synthesizing modified models.
column mode | pydantic extra |
---|---|
evolve | allow |
freeze | forbid |
discard_value | ignore |
discard_row | forbid |
discard_row
requires additional handling when ValidationError is raised.
Model validation is added as a transform step to the resource. This step will convert the incoming data items into instances of validating models. You could easily convert them back to dictionaries by using add_map(lambda item: item.dict())
on a resource.
Pydantic models work on the extracted data before names are normalized or nested tables are created. Make sure to name model fields as in your input data and handle nested data with the nested models.
As a consequence, discard_row
will drop the whole data item - even if nested model was affected.
Set contracts on Arrow Tables and Pandasโ
All contract settings apply to arrow tables and panda frames as well.
- tables mode the same - no matter what is the data item type
- columns will allow new columns, raise an exception or modify tables/frames still in extract step to avoid re-writing parquet files.
- data_type changes to data types in tables/frames are not allowed and will result in data type schema clash. We could allow for more modes (evolving data types in Arrow tables sounds weird but ping us on Slack if you need it.)
Here's how dlt
deals with column modes:
- evolve new columns are allowed (table may be reordered to put them at the end)
- discard_value column will be deleted
- discard_row rows with the column present will be deleted and then column will be deleted
- freeze exception on a new column
Get context from DataValidationError in freeze modeโ
When contract is violated in freeze mode, dlt
raises DataValidationError
exception. This exception gives access to the full context and passes the evidence to the caller.
As with any other exception coming from pipeline run, it will be re-raised via PipelineStepFailed
exception which you should catch in except:
try:
pipeline.run()
except PipelineStepFailed as pip_ex:
if pip_ex.step == "normalize":
if isinstance(pip_ex.__context__.__context__, DataValidationError):
...
if pip_ex.step == "extract":
if isinstance(pip_ex.__context__, DataValidationError):
...
DataValidationError
provides the following context:
schema_name
,table_name
andcolumn_name
provide the logical "location" at which the contract was violated.schema_entity
andcontract_mode
tell which contract was violatedtable_schema
contains the schema against which the contract was validated. May be Pydantic model ordlt
TTableSchema instanceschema_contract
the full, expanded schema contractdata_item
causing data item (Python dict, arrow table, pydantic model or list of there of)
Contracts on new tablesโ
If a table is a new table that has not been created on the destination yet, dlt will allow the creation of new columns. For a single pipeline run, the column mode is changed (internally) to evolve and then reverted back to the original mode. This allows for initial schema inference to happen and then on subsequent run, the inferred contract will be applied to a new data.
Following tables are considered new:
- Child tables inferred from the nested data
- Dynamic tables created from the data during extraction
- Tables containing incomplete columns - columns without data type bound to them.
For example such table is considered new because column number is incomplete (define primary key and NOT null but no data type)
blocks:
description: Ethereum blocks
write_disposition: append
columns:
number:
nullable: false
primary_key: true
name: number
What tables are not considered new:
- Those with columns defined by Pydantic modes
Working with datasets that have manually added tables and columns on the first loadโ
In some cases you might be working with datasets that have tables or columns created outside of dlt. If you are loading to a table not created by dlt
for the first time, dlt
will not know about this table while enforcing schema contracts. This means that if you do a load where the tables
are set to evolve
, all will work as planned. If you have tables
set to freeze
, dlt will raise an exception because it thinks you are creating a new table (which you are from dlts perspective). You can allow evolve
for one load and then switch back to freeze
.
The same thing will happen if dlt
knows your table, but you have manually added a column to your destination and you have columns
set to freeze
.
Code Examplesโ
The below code will silently ignore new subtables, allow new columns to be added to existing tables and raise an error if a variant of a column is discovered.
@dlt.resource(schema_contract={"tables": "discard_row", "columns": "evolve", "data_type": "freeze"})
def items():
...
The below Code will raise on any encountered schema change. Note: You can always set a string which will be interpreted as though all keys are set to these values.
pipeline.run(my_source(), schema_contract="freeze")
The below code defines some settings on the source which can be overwritten on the resource which in turn can be overwritten by the global override on the run
method.
Here for all resources variant columns are frozen and raise an error if encountered, on items
new columns are allowed but other_items
inherits the freeze
setting from
the source, thus new columns are frozen there. New tables are allowed.
@dlt.resource(schema_contract={"columns": "evolve"})
def items():
...
@dlt.resource()
def other_items():
...
@dlt.source(schema_contract={"columns": "freeze", "data_type": "freeze"})
def source():
return [items(), other_items()]
# this will use the settings defined by the decorators
pipeline.run(source())
# this will freeze the whole schema, regardless of the decorator settings
pipeline.run(source(), schema_contract="freeze")