One of our internal initiatives are GetInData Labs, the projects where we discover and work with different data tools. In the DataOps Labs, we’ve been recently working on data ingestion and one of its popular implementations, Airbyte.
Data ingestion 101
In simple terms, data ingestion is the process of integrating the dataset from an external place into our data system. Unfortunately, it’s only a simplified definition because in the real world, the process is way more complex.
The complexity comes from the following points:
- Ingestion type. It can be online or offline. The former leverages real-time processing to extract the data continuously. The latter works in batch and refreshes the dataset less regularly but with bigger data volume.
- Ingestion modes. The process can load the data fully or incrementally. Put another way, it’s simplicity vs. costs because loading the whole dataset at once is easy and always possible. On the other hand, it’s expensive because each load will take more data and need more resources. The incremental load is its opposite but might not always be possible. For example, the solution won’t work very well if the dataset has in-place updates or if it deletes the data instead of using the tombstone approach.
- Conflict resolution strategies. The ingestion process can solve the conflicts by overwriting or merging the dataset. The former strategy removes the existing data completely, while the latter inserts or updates the entries. Implementing the merging is more challenging since it requires defining a unique identification key for the rows.
- Technical solution. You can create a custom solution based on a data processing framework, such as Apache Spark, Apache Flink, or Kafka Connect. If you don’t have the coding skills or manpower, you can also rely on the existing data ingestion solutions, such as Airbyte, Fivetran, StreamSets, or Meltano.
In our analysis we’ve decided to focus on the offline data ingestion with Airbyte. Why this one in particular? Mostly for its business model. Airbyte is an Open Source project backed by a company. It has a lot of available connectors, supports different data ingestion modes and integrates pretty well with Apache Airflow and dbt, which are 2 other tools present in the DataOps Labs project.
Airbyte - architecture
Airbyte shares with Airflow not only the prefix. As the Open Source data orchestrator, Airbyte uses multi-tier architecture composed of:
- Data exposition layer. It’s mainly the React.js UI and a Node.js web server. They’re the entry points for manipulating the ingestion pipelines (yes, Airbyte is a low-code tool).
- Configuration API. This component interacts with the data exposition layer and stores data sources and data sinks configurations.
- Scheduler. It communicates with the Configuration API to get the ingestion jobs to execute.
- Workers. The layer is based on Temporal and is responsible for running the ingestion jobs.
As a data engineer, you’ll certainly be most concerned by the execution environment. Under-the-hood, Airbyte usesTemporal platform, which is relatively new in the data landscape but has already convinced big tech companies like Datadog or Netflix. Initially Airbyte had been using an in-house scheduling framework that worked well in the initial phase of the project. However, each new connection and team member made the maintenance of that solution more complicated. Finally, the development team decided to migrate to a more reliable alternative which happened to be Temporal.
More specifically, Airbyte executes Temporal workflows for various stages of the synchronization process, such as connection verification (CheckConnectionWorkflowImpl), synchronization (SyncWorkflowImpl), or scheduling (ConnectionManagerWorkflow). If you want to code your own workflows outside Airflow, Temporal has pretty well covered the How-to guide (Java example).
Ingestion
Airbyte represents the manipulated data by 3 different abstractions:
- Source. It’s the origin of the data to copy. Airbyte supports various sources, including the streaming brokers such as Apache Kafka or Apache Pulsar.
- Destination. This is the place where Airbyte will copy the ingested data.
- Connection. It’s the bridge between the Source and the Destination. Put another way, the Connection defines the ingestion process. There you’ll configure the frequency of the job, its type (incremental, full), and select the items to copy (e.g. tables from a PostgreSQL schema)
What happens when the scheduler discovers a new data ingestion task to execute? To understand this better, let’s analyze the logs for the PostgreSQL to JSON synchronization defined in the screenshot above:
i.a.w.DefaultReplicationWorker(run):103 - start sync worker. job id: 8 attempt id: 0
i.a.w.DefaultReplicationWorker(run):115 - configured sync modes: {airbyte_db_schema.users=incremental - append}
After some preliminary health checks, it’s the first synchronization important log. Airbyte starts the sync worker and retrieves the connection configuration from the metadata store. In Airbyte’s nomenclature, these parameters are part of the Airbyte Catalog structure.
In the next step, Airbyte starts the source and destination Docker containers inside the sync process:
i.a.w.p.a.DefaultAirbyteDestination(start):69 - Running destination...
i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/destination-local-json:0.2.9 exists...
i.a.c.i.LineGobbler(voidCall):82 - airbyte/destination-local-json:0.2.9 was found locally.
i.a.w.p.DockerProcessFactory(create):157 - Preparing command: docker run --rm --init -i -w /data/10/0 --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local airbyte/destination-local-json:0.2.9 write --config destination_config.json --catalog destination_catalog.json
i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/source-postgres:0.4.3 exists...
i.a.c.i.LineGobbler(voidCall):82 - airbyte/source-postgres:0.4.3 was found locally.
i.a.w.p.DockerProcessFactory(create):157 - Preparing command: docker run --rm --init -i -w /data/10/0 --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local airbyte/source-postgres:0.4.3 read --config source_config.json --catalog source_catalog.json --state input_state.json
i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):307 - Destination output thread started.
i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):268 - Replication thread started.
i.a.w.DefaultReplicationWorker(run):147 - Waiting for source and destination threads to complete.
Now, you should get a better idea on how the data gets exchanged. No, there is no network exchange or a message queue buffering. The communication passes through OS pipes. The source opens a connection to the STDOUT on the started container, whereas the destination to the STDIN on its own container. Additionally, the destination also waits for the data to synchronize by instantiating a reader to listen to the STDOUT.
But let’s move on to see the next interesting part, the cursor. To handle incremental loads you need to define a column that Airbyte will use to discover the data that appeared since the last synchronization. The code below shows an incremental and append-only synchronization using the register_time column.
source > i.a.i.s.j.AbstractJdbcSource(lambda$getCheckOperations$1):87 - Attempting to get metadata from the database to see if we can connect.
source > i.a.i.s.r.CdcStateManager(<init>):26 - Initialized CDC state with: null
source > i.a.i.s.r.StateManager(createCursorInfoForStream):108 - Found matching cursor in state. Stream: AirbyteStreamNameNamespacePair{name='users', namespace='airbyte_db_schema'}. Cursor Field: register_time Value: 2021-04-30T19:10:25Z
After discovering the data, Airbyte starts the synchronization. This step of the operation extracts the new rows for the append mode, or all rows for the full mode, and writes them to the destination. Additionally, it updates the state information in the metadata storage:
source > i.a.i.s.j.AbstractJdbcSource(queryTableIncremental):255 - Queueing query for table: users
source > i.a.i.s.j.AbstractJdbcSource(lambda$queryTableIncremental$16):260 - Preparing query for table: users
source > i.a.i.s.j.AbstractJdbcSource(lambda$queryTableIncremental$16):269 - Executing query for table: users
source > i.a.i.s.r.StateDecoratingIterator(computeNext):60 - State Report: stream name: AirbyteStreamNameNamespacePair{name='users', namespace='airbyte_db_schema'}, original cursor field: register_time, original cursor 2021-04-30T19:10:25Z, cursor field: register_time, new cursor: 2022-04-30T19:10:25Z
source > i.a.i.s.p.PostgresSource(main):358 - completed source: class io.airbyte.integrations.source.postgres.PostgresSource
i.a.w.DefaultReplicationWorker(run):152 - One of source or destination thread complete. Waiting on the other.
i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):312 - state in DefaultReplicationWorker from Destination: io.airbyte.protocol.models.AirbyteMessage@4ec7b834[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@40005f9c[data={"cdc":false,"streams":[{"stream_name":"users","stream_namespace":"airbyte_db_schema","cursor_field":["register_time"],"cursor":"2022-04-30T19:10:25Z"}]},additionalProperties={}],additionalProperties={}]
destination > i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):60 - Airbyte message consumer: succeeded.
destination > i.a.i.d.l.LocalJsonDestination$JsonConsumer(close):174 - finalizing consumer.
destination > i.a.i.d.l.LocalJsonDestination$JsonConsumer(close):190 - File output: /local/json_data/_airbyte_raw_users.jsonl
destination > i.a.i.b.IntegrationRunner(run):133 - Completed integration: io.airbyte.integrations.destination.local_json.LocalJsonDestination
i.a.w.DefaultReplicationWorker(run):154 - Source and destination threads complete.
i.a.w.DefaultReplicationWorker(run):217 - sync summary: io.airbyte.config.ReplicationAttemptSummary@2205c605[status=completed,recordsSynced=2,bytesSynced=150,startTime=1651827559146,endTime=1651827562241,totalStats=io.airbyte.config.SyncStats@7a037704[recordsEmitted=2,bytesEmitted=150,stateMessagesEmitted=1,recordsCommitted=2],streamStats=[io.airbyte.config.StreamSyncStats@6561d7c7[streamName=users,stats=io.airbyte.config.SyncStats@612d4506[recordsEmitted=2,bytesEmitted=150,stateMessagesEmitted=<null>,recordsCommitted=2]]]]
i.a.w.DefaultReplicationWorker(run):237 - Source output at least one state message
i.a.w.DefaultReplicationWorker(run):243 - State capture: Updated state to: Optional[io.airbyte.config.State@6d1492c8[state={"cdc":false,"streams":[{"stream_name":"users","stream_namespace":"airbyte_db_schema","cursor_field":["register_time"],"cursor":"2022-04-30T19:10:25Z"}]}]]
The synchronization ends here. The output will be slightly different if you define a DBT transformation for the loaded data. But no worries, you won’t get lost!
One point before finishing, though. The output generated by Airbyte won’t always reflect the input. Look at the files generated for CSV destination:
"_airbyte_ab_id",_airbyte_emitted_at,_airbyte_data
21407e41-b9e4-4a87-b039-c2eedbe9c6c7,1651237763008,"{""login"":""user1"",""city"":""Warsaw"",""register_time"":""2020-03-02T19:10:25Z""}"
eb850b6b-573f-442e-87fc-a2a92a482233,1651237763008,"{""login"":""user2"",""city"":""Poznan"",""register_time"":""2020-03-30T19:10:25Z""}"
b2d5fbc6-0375-44f3-8a86-7cf385088928,1651237763008,"{""login"":""user3"",""city"":""Paris"",""register_time"":""2020-04-12T19:10:25Z""}"
ea7c3be8-b304-41be-a598-a035c7a9a509,1651237763008,"{""login"":""user4"",""city"":""Marseille"",""register_time"":""2020-05-21T19:10:25Z""}"
As you can see, the data is a stringified JSON and alongside the copied records, you will find the metadata generated by Airbyte, such as the record UUID assigned by the tool and the extraction time. For certain data stores, you can set up a Basic Normalization that will convert the flattened results to the columns.
Airbyte - Data stack integration
Airbyte has its own built-in scheduler for tasks. However, this doesn’t mean that you have to use it - especially if you have all the jobs orchestrated from a data orchestrator like Apache Airflow. The good news is that you can still rely on Airbyte for the data ingestion but control its execution from the orchestration layer. The solution not only helps better integrate the data ingestion tasks with the existing pipelines, but also extends the limited triggering capabilities of Airbyte. As of today, the tool supports only static triggering schedules going from manual to every 24 hours. Apache Airflow offers much more flexibility for that part and fortunately, you can trigger the task from an AirbyteTriggerSyncOperator
and listen for its outcome from an AirbyteJobSensor.
Additionally, Airbyte also integrates with other popular data tools, namely dbt. If necessary , you can add a post-ingestion step and transform the native Extract Load job into an Extract Load Transform one. How? By adding a transformation step to the connection that invokes dbt commands, such as seed or run, on top of the ingested data.
Would you like to know more about dbt? Check out our blog post Up & Running: data pipeline with BigQuery and dbt.
Gotchas
Although Airbyte solves many data ingestion problems, it has several pitfalls. Firstly, the deployment. Defining the pipelines visually is nice and probably more pleasant than editing them from an IDE. However, the low-code character of this solution poses the question of how to deploy the ingestion jobs and integrate them to the classic CI/CD process? A potential solution could be Octavia CLI that transforms a static Airbyte configuration into templates that can be deployed to different environments.
The second question we had after the initial discovery was Big Data. As you saw in the previous section, the ingestion process works on the single node and uses CPU-powered parallelisation. Yet there seems to be no way to parallelize a single ingestion job instance on a cluster. Hence, synchronizing big datasets may be slower with Airbyte than with an Apache Spark or Apache Flink job that also provides native connectivity to many data sources.
Finally, we’ve also raised a question about daily maintenance. To understand the problem, let’s recall one of the basics of functional data engineering by Maxime Beauchemin, reproducibility and replicability. Simply speaking, a job execution should always use the same logic, even if it gets backfilled. How does it relate to Airbyte, if we consider it as a batch job of the passthrough type? Unfortunately, Airbyte doesn’t respect that logic. Even if you define your ingestion as an incremental process, you don’t have the possibility of replaying a specific execution instance. Instead, you have to relaunch the whole synchronization from scratch. It might not look like a big deal, but if your destination had lost the data from only the 3 most recent jobs, having to reingest everything won’t be optimal.
As a workaround for the previous problem you could consider changing the cursor field in the metadata table. But considering that tools like Apache Airflow provide that feature out-of-the-box, there are possibly some improvements to be made in the Airbyte process.
You’re now aware of the gotchas and hopefully, you’ll be able to make better data ingestion decisions. But if you need some help, we’re waiting for your message!