Deep Learning with Azure: PyTorch distributed training done right in Kedro
At GetInData we use the Kedro framework as the core building block of our MLOps solutions as it structures ML projects well, providing great…
Read moreOne of our internal initiatives are GetInData Labs, the projects where we discover and work with different data tools. In the DataOps Labs, we’ve been recently working on data ingestion and one of its popular implementations, Airbyte.
In simple terms, data ingestion is the process of integrating the dataset from an external place into our data system. Unfortunately, it’s only a simplified definition because in the real world, the process is way more complex.
The complexity comes from the following points:
In our analysis we’ve decided to focus on the offline data ingestion with Airbyte. Why this one in particular? Mostly for its business model. Airbyte is an Open Source project backed by a company. It has a lot of available connectors, supports different data ingestion modes and integrates pretty well with Apache Airflow and dbt, which are 2 other tools present in the DataOps Labs project.
Airbyte shares with Airflow not only the prefix. As the Open Source data orchestrator, Airbyte uses multi-tier architecture composed of:
As a data engineer, you’ll certainly be most concerned by the execution environment. Under-the-hood, Airbyte usesTemporal platform, which is relatively new in the data landscape but has already convinced big tech companies like Datadog or Netflix. Initially Airbyte had been using an in-house scheduling framework that worked well in the initial phase of the project. However, each new connection and team member made the maintenance of that solution more complicated. Finally, the development team decided to migrate to a more reliable alternative which happened to be Temporal.
More specifically, Airbyte executes Temporal workflows for various stages of the synchronization process, such as connection verification (CheckConnectionWorkflowImpl), synchronization (SyncWorkflowImpl), or scheduling (ConnectionManagerWorkflow). If you want to code your own workflows outside Airflow, Temporal has pretty well covered the How-to guide (Java example).
Airbyte represents the manipulated data by 3 different abstractions:
What happens when the scheduler discovers a new data ingestion task to execute? To understand this better, let’s analyze the logs for the PostgreSQL to JSON synchronization defined in the screenshot above:
i.a.w.DefaultReplicationWorker(run):103 - start sync worker. job id: 8 attempt id: 0
i.a.w.DefaultReplicationWorker(run):115 - configured sync modes: {airbyte_db_schema.users=incremental - append}
After some preliminary health checks, it’s the first synchronization important log. Airbyte starts the sync worker and retrieves the connection configuration from the metadata store. In Airbyte’s nomenclature, these parameters are part of the Airbyte Catalog structure.
In the next step, Airbyte starts the source and destination Docker containers inside the sync process:
i.a.w.p.a.DefaultAirbyteDestination(start):69 - Running destination...
i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/destination-local-json:0.2.9 exists...
i.a.c.i.LineGobbler(voidCall):82 - airbyte/destination-local-json:0.2.9 was found locally.
i.a.w.p.DockerProcessFactory(create):157 - Preparing command: docker run --rm --init -i -w /data/10/0 --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local airbyte/destination-local-json:0.2.9 write --config destination_config.json --catalog destination_catalog.json
i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/source-postgres:0.4.3 exists...
i.a.c.i.LineGobbler(voidCall):82 - airbyte/source-postgres:0.4.3 was found locally.
i.a.w.p.DockerProcessFactory(create):157 - Preparing command: docker run --rm --init -i -w /data/10/0 --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local airbyte/source-postgres:0.4.3 read --config source_config.json --catalog source_catalog.json --state input_state.json
i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):307 - Destination output thread started.
i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):268 - Replication thread started.
i.a.w.DefaultReplicationWorker(run):147 - Waiting for source and destination threads to complete.
Now, you should get a better idea on how the data gets exchanged. No, there is no network exchange or a message queue buffering. The communication passes through OS pipes. The source opens a connection to the STDOUT on the started container, whereas the destination to the STDIN on its own container. Additionally, the destination also waits for the data to synchronize by instantiating a reader to listen to the STDOUT.
But let’s move on to see the next interesting part, the cursor. To handle incremental loads you need to define a column that Airbyte will use to discover the data that appeared since the last synchronization. The code below shows an incremental and append-only synchronization using the register_time column.
source > i.a.i.s.j.AbstractJdbcSource(lambda$getCheckOperations$1):87 - Attempting to get metadata from the database to see if we can connect.
source > i.a.i.s.r.CdcStateManager(<init>):26 - Initialized CDC state with: null
source > i.a.i.s.r.StateManager(createCursorInfoForStream):108 - Found matching cursor in state. Stream: AirbyteStreamNameNamespacePair{name='users', namespace='airbyte_db_schema'}. Cursor Field: register_time Value: 2021-04-30T19:10:25Z
After discovering the data, Airbyte starts the synchronization. This step of the operation extracts the new rows for the append mode, or all rows for the full mode, and writes them to the destination. Additionally, it updates the state information in the metadata storage:
source > i.a.i.s.j.AbstractJdbcSource(queryTableIncremental):255 - Queueing query for table: users
source > i.a.i.s.j.AbstractJdbcSource(lambda$queryTableIncremental$16):260 - Preparing query for table: users
source > i.a.i.s.j.AbstractJdbcSource(lambda$queryTableIncremental$16):269 - Executing query for table: users
source > i.a.i.s.r.StateDecoratingIterator(computeNext):60 - State Report: stream name: AirbyteStreamNameNamespacePair{name='users', namespace='airbyte_db_schema'}, original cursor field: register_time, original cursor 2021-04-30T19:10:25Z, cursor field: register_time, new cursor: 2022-04-30T19:10:25Z
source > i.a.i.s.p.PostgresSource(main):358 - completed source: class io.airbyte.integrations.source.postgres.PostgresSource
i.a.w.DefaultReplicationWorker(run):152 - One of source or destination thread complete. Waiting on the other.
i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):312 - state in DefaultReplicationWorker from Destination: io.airbyte.protocol.models.AirbyteMessage@4ec7b834[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@40005f9c[data={"cdc":false,"streams":[{"stream_name":"users","stream_namespace":"airbyte_db_schema","cursor_field":["register_time"],"cursor":"2022-04-30T19:10:25Z"}]},additionalProperties={}],additionalProperties={}]
destination > i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):60 - Airbyte message consumer: succeeded.
destination > i.a.i.d.l.LocalJsonDestination$JsonConsumer(close):174 - finalizing consumer.
destination > i.a.i.d.l.LocalJsonDestination$JsonConsumer(close):190 - File output: /local/json_data/_airbyte_raw_users.jsonl
destination > i.a.i.b.IntegrationRunner(run):133 - Completed integration: io.airbyte.integrations.destination.local_json.LocalJsonDestination
i.a.w.DefaultReplicationWorker(run):154 - Source and destination threads complete.
i.a.w.DefaultReplicationWorker(run):217 - sync summary: io.airbyte.config.ReplicationAttemptSummary@2205c605[status=completed,recordsSynced=2,bytesSynced=150,startTime=1651827559146,endTime=1651827562241,totalStats=io.airbyte.config.SyncStats@7a037704[recordsEmitted=2,bytesEmitted=150,stateMessagesEmitted=1,recordsCommitted=2],streamStats=[io.airbyte.config.StreamSyncStats@6561d7c7[streamName=users,stats=io.airbyte.config.SyncStats@612d4506[recordsEmitted=2,bytesEmitted=150,stateMessagesEmitted=<null>,recordsCommitted=2]]]]
i.a.w.DefaultReplicationWorker(run):237 - Source output at least one state message
i.a.w.DefaultReplicationWorker(run):243 - State capture: Updated state to: Optional[io.airbyte.config.State@6d1492c8[state={"cdc":false,"streams":[{"stream_name":"users","stream_namespace":"airbyte_db_schema","cursor_field":["register_time"],"cursor":"2022-04-30T19:10:25Z"}]}]]
The synchronization ends here. The output will be slightly different if you define a DBT transformation for the loaded data. But no worries, you won’t get lost!
One point before finishing, though. The output generated by Airbyte won’t always reflect the input. Look at the files generated for CSV destination:
"_airbyte_ab_id",_airbyte_emitted_at,_airbyte_data
21407e41-b9e4-4a87-b039-c2eedbe9c6c7,1651237763008,"{""login"":""user1"",""city"":""Warsaw"",""register_time"":""2020-03-02T19:10:25Z""}"
eb850b6b-573f-442e-87fc-a2a92a482233,1651237763008,"{""login"":""user2"",""city"":""Poznan"",""register_time"":""2020-03-30T19:10:25Z""}"
b2d5fbc6-0375-44f3-8a86-7cf385088928,1651237763008,"{""login"":""user3"",""city"":""Paris"",""register_time"":""2020-04-12T19:10:25Z""}"
ea7c3be8-b304-41be-a598-a035c7a9a509,1651237763008,"{""login"":""user4"",""city"":""Marseille"",""register_time"":""2020-05-21T19:10:25Z""}"
As you can see, the data is a stringified JSON and alongside the copied records, you will find the metadata generated by Airbyte, such as the record UUID assigned by the tool and the extraction time. For certain data stores, you can set up a Basic Normalization that will convert the flattened results to the columns.
Airbyte has its own built-in scheduler for tasks. However, this doesn’t mean that you have to use it - especially if you have all the jobs orchestrated from a data orchestrator like Apache Airflow. The good news is that you can still rely on Airbyte for the data ingestion but control its execution from the orchestration layer. The solution not only helps better integrate the data ingestion tasks with the existing pipelines, but also extends the limited triggering capabilities of Airbyte. As of today, the tool supports only static triggering schedules going from manual to every 24 hours. Apache Airflow offers much more flexibility for that part and fortunately, you can trigger the task from an AirbyteTriggerSyncOperator
and listen for its outcome from an AirbyteJobSensor.
Additionally, Airbyte also integrates with other popular data tools, namely dbt. If necessary , you can add a post-ingestion step and transform the native Extract Load job into an Extract Load Transform one. How? By adding a transformation step to the connection that invokes dbt commands, such as seed or run, on top of the ingested data.
Would you like to know more about dbt? Check out our blog post Up & Running: data pipeline with BigQuery and dbt.
Although Airbyte solves many data ingestion problems, it has several pitfalls. Firstly, the deployment. Defining the pipelines visually is nice and probably more pleasant than editing them from an IDE. However, the low-code character of this solution poses the question of how to deploy the ingestion jobs and integrate them to the classic CI/CD process? A potential solution could be Octavia CLI that transforms a static Airbyte configuration into templates that can be deployed to different environments.
The second question we had after the initial discovery was Big Data. As you saw in the previous section, the ingestion process works on the single node and uses CPU-powered parallelisation. Yet there seems to be no way to parallelize a single ingestion job instance on a cluster. Hence, synchronizing big datasets may be slower with Airbyte than with an Apache Spark or Apache Flink job that also provides native connectivity to many data sources.
Finally, we’ve also raised a question about daily maintenance. To understand the problem, let’s recall one of the basics of functional data engineering by Maxime Beauchemin, reproducibility and replicability. Simply speaking, a job execution should always use the same logic, even if it gets backfilled. How does it relate to Airbyte, if we consider it as a batch job of the passthrough type? Unfortunately, Airbyte doesn’t respect that logic. Even if you define your ingestion as an incremental process, you don’t have the possibility of replaying a specific execution instance. Instead, you have to relaunch the whole synchronization from scratch. It might not look like a big deal, but if your destination had lost the data from only the 3 most recent jobs, having to reingest everything won’t be optimal.
As a workaround for the previous problem you could consider changing the cursor field in the metadata table. But considering that tools like Apache Airflow provide that feature out-of-the-box, there are possibly some improvements to be made in the Airbyte process.
You’re now aware of the gotchas and hopefully, you’ll be able to make better data ingestion decisions. But if you need some help, we’re waiting for your message!
At GetInData we use the Kedro framework as the core building block of our MLOps solutions as it structures ML projects well, providing great…
Read moreA year is definitely a long enough time to see new trends or technologies that get more traction. The Big Data landscape changes increasingly fast…
Read moreIn a lot of business cases that we solve at Getindata when working with our clients, we need to analyze sessions: a series of related events of actors…
Read moreIntro Machine Learning is now used by thousands of businesses. Its ubiquity has helped to drive innovations that are increasingly difficult to predict…
Read moreA monitoring system is a necessary component of any data platform. We can find a lot of different services that use different approaches to the same…
Read moreThe need for a unified format for geospatial data In recent years, a lot of geospatial frameworks have been created to process and analyze big…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?