Big Data for E-commerce.
The year 2020 was full of challenges in many areas, and in many companies and organizations. Often, it was necessary to introduce radical changes or…
Read moreThe Airbyte 0.50 release has brought some exciting changes to the platform:
The last feature - column selection - had been really sought-after by the community. If you go to Airbyte’s repository and the issues, sorting them by the thumbs-up emoji brings Allow a user to filter fields/columns from a table to the very top.
This article describes two small tests of the new feature we performed to get answers for the below questions:
Let’s quickly go through the possible reasons for using column filtering in your data ingestion processes.
Well, it was. Kind of.
There were some workarounds that might have worked for you. One of them was using database views. You could create a view exposing only the columns you needed for synchronization and that was it! However, views do not always solve the problem:
The other way you could approach the problem was to hide the redundant columns during the normalization process (removing them from select statements within a custom DBT project). While this solved the problem for normalized tables, Airbyte raw tables (the ones starting with _airbyte_raw_ prefix) would still contain all the fields from the source. Moreover, raw tables store source data as JSONB string, which is usually much bigger in terms of size compared to source tables (it is difficult to apply efficient compression when keeping all the records as a string).
As a last resort, if you’re knowledgeable enough, you could try implementing your own custom connector with column selection.
We’ll use Docker & docker-compose for spinning up a container with the PostgreSQL 15 image. To keep things simple, the source and destination will be the same Postgres database (but the data will be moved between different schemas). If you’re not familiar with the concept of incremental ingestion with CDC, we encourage you to check Airbyte’s documentation for this topic.
Enabling CDC requires some extra steps to be executed.
We’ll take care of them inside docker-compose and db init SQL script, so everything will be ready upon database startup.
Here’s the docker-compose
file:
version: '3.7'
services:
db:
container_name: airbyte-column-selection-db
image: postgres:15
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- "5555:5432"
volumes:
- postgres_airbyte_volume:/var/lib/postgresql/data
- ./init_db.sql:/docker-entrypoint-initdb.d/init_db.sql
command: ["postgres", "-c", "wal_level=logical"]
volumes:
postgres_airbyte_volume:
And here’s the init_db.sql
:
-- create schemas
CREATE SCHEMA source;
CREATE SCHEMA destination;
-- create source tables
CREATE TABLE source.table_a (
id serial primary key,
column_1 varchar,
column_2 varchar,
column_3 varchar
);
-- create replication slot & publication
SELECT pg_create_logical_replication_slot('airbyte_slot', 'pgoutput');
CREATE PUBLICATION airbyte_publication FOR TABLE source.table_a;
-- add initial records
INSERT INTO source.table_a (column_1, column_2, column_3)
VALUES
('foo1', 'foo2', 'foo3'),
('bar1', 'bar2', 'bar3'),
('baz1', 'baz2', 'baz3');
You can also find these files in the github repository created for the purpose of this article.
Environment can be started with the command:
docker-compose up -d
Let’s run some queries to confirm everything was created as expected.
docker exec -ti airbyte-column-selection-db psql -U postgres -d postgres
Everything seems correct! It’s time to perform the first test.
Before column selection was introduced, I tried to overcome the problem of only synchronizing a subset of columns by adding a custom DBT transformation, which only contained the columns I wanted to sync (bearing in mind that such an approach only removes columns from the normalized tables). There’s an article in official documentation that walks through the steps of generating Airbyte’s normalization DBT project (so you quickly get the boilerplate code that you can modify according to your needs).
The problem with such an approach is that, when using CDC, raw tables will contain changes for all columns (including the ones that we’d like to exclude from the synchronization). If you simply remove the unwanted columns from select statements in the normalization SQL, the normalized table will contain duplicated records. The below diagram depicts such a situation. We have a table with columns (id, col_1, col_2, col_3) and we’d like to sync only the first three. Col_3 will be removed in the normalization SQL.
The first synchronization pulls a full snapshot of data. The second one captured an update to the value of col_2 so a new version of the record is appended to the target table. During the third sync, an update to col_3 is pulled. Again, another version of the record is appended to the destination table. However, since we’ve removed the column with the updated value from the table, we’ll see a duplicate of the record with id=1, col_1=val 1, col_2=val 2_.
Keep in mind that Airbyte adds some extra metadata columns related to the CDC process (_ab_cdc_lsn - log sequence number, _ab_cdc_updated_at - timestamp
of update event, _ab_cdc_deleted_at - timestamp of record deletion
) and their values will be different. Yet from the source data view, we’re getting duplicated data.
When we stumbled upon this issue, we added some extra code in the normalization DBT project to deduplicate the normalized tables. We were curious whether the new feature produced the duplicated data or not. Let’s find out!
After adding source & destination pointing to the appropriate schemas, we created a connection between them. In the field selection pane, we deselected column_3. Sync mode was set to “Incremental | Append” and, for normalization, the default one was used.
Let’s run the first sync and copy all of the data to the destination.
Here are the results from target table:
The column selection feature works as expected - we don’t see column_3 in the table. Let’s also ensure this column is not loaded into the raw table. Here’s a formatted JSON taken from the first record from _airbyte_raw_table_a table:
{
"id": 1,
"column_1": "foo1",
"column_2": "foo2",
"_ab_cdc_lsn": 23242280,
"_ab_cdc_deleted_at": null,
"_ab_cdc_updated_at": "2023-06-18T21:07:07.873Z"
}
The raw table does not contain deselected fields. Cool!
Now, let’s try to run 2 updates on the source database: one for column_1 and the other one for column_3 (which is not selected in our synchronization):
update source.table_a
set column_1 = column_1 || ' updated'
where id = 1;
update source.table_a
set column_3 = column_3 || ' updated'
where id = 1;
Then run the synchronization.
As you can see from the logs, 2 records had been emitted. Filtering the destination table by id=1 shows that there are 3 versions of the record and the last two have the same values for id, column_1 and column_2 columns.
The conclusion of this test is that Airbyte does not deduplicate extra rows generated by updates on columns excluded from a connection (in a scenario with a CDC-enabled source). It is your responsibility to handle such scenarios.
When reading the article about column selection implementation details, it caught our eye that the feature was implemented on a worker level, rather than on the source connector level. Our understanding of this paragraph was that despite of deselecting columns from sync, Airbyte will still pull all of the data from the source (basically running select * from source_table
) and the deselected columns will be dropped later by the worker.
While we totally get the justification for such an implementation (removing fields by the worker does not require any changes to the source connectors), we hoped that the column selection feature would also help with reducing the overall size of bytes transferred over the network (which obviously impacts the execution time of synchronization).
We decided to perform a small test and see what the actual query that Airbyte executes was over the source database. Again, we tested that with PostgreSQL 15 -> PostgreSQL 15 connection. The source table (named foobar) contained 11 fields (id, column_1, column_2, … up to column_10). Fields column_5 to column_10 were deselected in the connection settings. We populated this table with a few more records than the one from the previous test - so the sync took more time and we were able to see the Airbyte’s query by looking at the pg_stat_activity view (it lists currently running queries).
We used the psql interactive terminal along with \watch command to refresh the results from the select statement every second:
\watch select query from pg_stat_activity where query ilike '%foobar%'
And the result was a surprise - in a good way :-):
select query from pg_stat_activity where query ilike '%foobar%';
SELECT "id","col_1","col_2","col_3","col_4","col_5" FROM "public"."foobar"
As you can see, the select statement did not use an asterisk and only enumerated s the columns selected in the connection settings. This was cool!
Summing up the tests:
The year 2020 was full of challenges in many areas, and in many companies and organizations. Often, it was necessary to introduce radical changes or…
Read moreIf you are looking at Nifi to help you in your data ingestions pipeline, there might be an interesting alternative. Let’s assume we want to simply…
Read morePlease dive in the third part of a blog series based on a project delivered for one of our clients. Please click part I, part II to read the…
Read morePlease dive in the second part of a blog series based on a project delivered for one of our clients. If you miss the first part, please check it here…
Read moreThe Kubeflow Pipelines project has been growing in popularity in recent years. It's getting more prominent due to its capabilities - you can…
Read moreHow to minimize data processing latency to hundreds of milliseconds when dealing with 100 mln messages per hour? How can data quality be secure and…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?