Tutorial
11 min read

Airbyte is in the air - data ingestion with Airbyte

One of our internal initiatives are GetInData Labs, the projects where we discover and work with different data tools. In the DataOps Labs, we’ve been recently working on data ingestion and one of its popular implementations, Airbyte.

Data ingestion 101

In simple terms, data ingestion is the process of integrating the dataset from an external place into our data system. Unfortunately, it’s only a simplified definition because in the real world, the process is way more complex.

The complexity comes from the following points:

  • Ingestion type. It can be online or offline. The former leverages real-time processing to extract the data continuously. The latter works in batch and refreshes the dataset less regularly but with bigger data volume.
  • Ingestion modes. The process can load the data fully or incrementally. Put another way, it’s simplicity vs. costs because loading the whole dataset at once is easy and always possible. On the other hand, it’s expensive because each load will take more data and need more resources. The incremental load is its opposite but might not always be possible. For example, the solution won’t work very well if the dataset has in-place updates or if it deletes the data instead of using the tombstone approach.
  • Conflict resolution strategies. The ingestion process can solve the conflicts by overwriting or merging the dataset. The former strategy removes the existing data completely, while the latter inserts or updates the entries. Implementing the merging is more challenging since it requires defining a unique identification key for the rows.
  • Technical solution. You can create a custom solution based on a data processing framework, such as Apache Spark, Apache Flink, or Kafka Connect. If you don’t have the coding skills or manpower, you can also rely on the existing data ingestion solutions, such as Airbyte, Fivetran, StreamSets, or Meltano.

In our analysis we’ve decided to focus on the offline data ingestion with Airbyte. Why this one in particular? Mostly for its business model. Airbyte is an Open Source project backed by a company. It has a lot of available connectors, supports different data ingestion modes and integrates pretty well with Apache Airflow and dbt, which are 2 other tools present in the DataOps Labs project.

Airbyte - architecture

Airbyte shares with Airflow not only the prefix. As the Open Source data orchestrator, Airbyte uses multi-tier architecture composed of:

  • Data exposition layer. It’s mainly the React.js UI and a Node.js web server. They’re the entry points for manipulating the ingestion pipelines (yes, Airbyte is a low-code tool).
  • Configuration API. This component interacts with the data exposition layer and stores data sources and data sinks configurations.
  • Scheduler. It communicates with the Configuration API to get the ingestion jobs to execute.
  • Workers. The layer is based on Temporal and is responsible for running the ingestion jobs.

As a data engineer, you’ll certainly be most concerned by the execution environment. Under-the-hood, Airbyte usesTemporal platform, which is relatively new in the data landscape but has already convinced big tech companies like Datadog or Netflix. Initially Airbyte had been using an in-house scheduling framework that worked well in the initial phase of the project. However, each new connection and team member made the maintenance of that solution more complicated. Finally, the development team decided to migrate to a more reliable alternative which happened to be Temporal.

More specifically, Airbyte executes Temporal workflows for various stages of the synchronization process, such as connection verification (CheckConnectionWorkflowImpl), synchronization (SyncWorkflowImpl), or scheduling (ConnectionManagerWorkflow). If you want to code your own workflows outside Airflow, Temporal has pretty well covered the How-to guide (Java example)

Ingestion 

Airbyte represents the manipulated data by 3 different abstractions:

  • Source. It’s the origin of the data to copy. Airbyte supports various sources, including the streaming brokers such as Apache Kafka or Apache Pulsar. 
  • Destination. This is the place where Airbyte will copy the ingested data.  
  • Connection. It’s the bridge between the Source and the Destination. Put another way, the Connection defines the ingestion process. There you’ll configure the frequency of the job, its type (incremental, full), and select the items to copy (e.g. tables from a  PostgreSQL schema)

connection-page-airbyte

What happens when the scheduler discovers a new data ingestion task to execute? To understand this better, let’s analyze the logs for the PostgreSQL to JSON synchronization defined in the screenshot above:

i.a.w.DefaultReplicationWorker(run):103 - start sync worker. job id: 8 attempt id: 0
i.a.w.DefaultReplicationWorker(run):115 - configured sync modes: {airbyte_db_schema.users=incremental - append}

After some preliminary health checks, it’s the first synchronization important log. Airbyte starts the sync worker and retrieves the connection configuration from the metadata store. In Airbyte’s nomenclature, these parameters are part of the Airbyte Catalog structure. 

In the next step, Airbyte starts the source and destination Docker containers inside the sync process:

i.a.w.p.a.DefaultAirbyteDestination(start):69 - Running destination...
i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/destination-local-json:0.2.9 exists...
i.a.c.i.LineGobbler(voidCall):82 - airbyte/destination-local-json:0.2.9 was found locally.
i.a.w.p.DockerProcessFactory(create):157 - Preparing command: docker run --rm --init -i -w /data/10/0 --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local airbyte/destination-local-json:0.2.9 write --config destination_config.json --catalog destination_catalog.json
i.a.c.i.LineGobbler(voidCall):82 - Checking if airbyte/source-postgres:0.4.3 exists...
i.a.c.i.LineGobbler(voidCall):82 - airbyte/source-postgres:0.4.3 was found locally.
i.a.w.p.DockerProcessFactory(create):157 - Preparing command: docker run --rm --init -i -w /data/10/0 --log-driver none --network host -v airbyte_workspace:/data -v /tmp/airbyte_local:/local airbyte/source-postgres:0.4.3 read --config source_config.json --catalog source_catalog.json --state input_state.json
i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):307 - Destination output thread started.
i.a.w.DefaultReplicationWorker(lambda$getReplicationRunnable$5):268 - Replication thread started.
i.a.w.DefaultReplicationWorker(run):147 - Waiting for source and destination threads to complete.

Now, you should get a better idea on how the data gets exchanged. No, there is no network exchange or a message queue buffering. The communication passes through OS pipes. The source opens a connection to the STDOUT on the started container, whereas the destination to the STDIN on its own container. Additionally, the destination also waits for the data to synchronize by instantiating a reader to listen to the STDOUT.

But let’s move on to see the next interesting part, the cursor. To handle incremental loads you need to define a column that Airbyte will use to discover the data that appeared since the last synchronization. The code below shows an incremental and append-only synchronization using the register_time column.

source > i.a.i.s.j.AbstractJdbcSource(lambda$getCheckOperations$1):87 - Attempting to get metadata from the database to see if we can connect.
source > i.a.i.s.r.CdcStateManager(<init>):26 - Initialized CDC state with: null
source > i.a.i.s.r.StateManager(createCursorInfoForStream):108 - Found matching cursor in state. Stream: AirbyteStreamNameNamespacePair{name='users', namespace='airbyte_db_schema'}. Cursor Field: register_time Value: 2021-04-30T19:10:25Z

After discovering the data, Airbyte starts the synchronization. This step of the operation extracts the new rows for the append mode, or all rows for the full mode, and writes them to the destination. Additionally, it updates the state information in the metadata storage: 

source > i.a.i.s.j.AbstractJdbcSource(queryTableIncremental):255 - Queueing query for table: users
source > i.a.i.s.j.AbstractJdbcSource(lambda$queryTableIncremental$16):260 - Preparing query for table: users
source > i.a.i.s.j.AbstractJdbcSource(lambda$queryTableIncremental$16):269 - Executing query for table: users
source > i.a.i.s.r.StateDecoratingIterator(computeNext):60 - State Report: stream name: AirbyteStreamNameNamespacePair{name='users', namespace='airbyte_db_schema'}, original cursor field: register_time, original cursor 2021-04-30T19:10:25Z, cursor field: register_time, new cursor: 2022-04-30T19:10:25Z
 
source > i.a.i.s.p.PostgresSource(main):358 - completed source: class io.airbyte.integrations.source.postgres.PostgresSource
i.a.w.DefaultReplicationWorker(run):152 - One of source or destination thread complete. Waiting on the other.
i.a.w.DefaultReplicationWorker(lambda$getDestinationOutputRunnable$6):312 - state in DefaultReplicationWorker from Destination: io.airbyte.protocol.models.AirbyteMessage@4ec7b834[type=STATE,log=<null>,spec=<null>,connectionStatus=<null>,catalog=<null>,record=<null>,state=io.airbyte.protocol.models.AirbyteStateMessage@40005f9c[data={"cdc":false,"streams":[{"stream_name":"users","stream_namespace":"airbyte_db_schema","cursor_field":["register_time"],"cursor":"2022-04-30T19:10:25Z"}]},additionalProperties={}],additionalProperties={}]

destination > i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):60 - Airbyte message consumer: succeeded.
destination > i.a.i.d.l.LocalJsonDestination$JsonConsumer(close):174 - finalizing consumer.
destination > i.a.i.d.l.LocalJsonDestination$JsonConsumer(close):190 - File output: /local/json_data/_airbyte_raw_users.jsonl
destination > i.a.i.b.IntegrationRunner(run):133 - Completed integration: io.airbyte.integrations.destination.local_json.LocalJsonDestination
 
i.a.w.DefaultReplicationWorker(run):154 - Source and destination threads complete.
i.a.w.DefaultReplicationWorker(run):217 - sync summary: io.airbyte.config.ReplicationAttemptSummary@2205c605[status=completed,recordsSynced=2,bytesSynced=150,startTime=1651827559146,endTime=1651827562241,totalStats=io.airbyte.config.SyncStats@7a037704[recordsEmitted=2,bytesEmitted=150,stateMessagesEmitted=1,recordsCommitted=2],streamStats=[io.airbyte.config.StreamSyncStats@6561d7c7[streamName=users,stats=io.airbyte.config.SyncStats@612d4506[recordsEmitted=2,bytesEmitted=150,stateMessagesEmitted=<null>,recordsCommitted=2]]]]
i.a.w.DefaultReplicationWorker(run):237 - Source output at least one state message
i.a.w.DefaultReplicationWorker(run):243 - State capture: Updated state to: Optional[io.airbyte.config.State@6d1492c8[state={"cdc":false,"streams":[{"stream_name":"users","stream_namespace":"airbyte_db_schema","cursor_field":["register_time"],"cursor":"2022-04-30T19:10:25Z"}]}]]

The synchronization ends here. The output will be slightly different if you define a DBT transformation for the loaded data. But no worries, you won’t get lost! 

One point before finishing, though. The output generated by Airbyte won’t always reflect the input. Look at the files generated for CSV destination:

"_airbyte_ab_id",_airbyte_emitted_at,_airbyte_data
21407e41-b9e4-4a87-b039-c2eedbe9c6c7,1651237763008,"{""login"":""user1"",""city"":""Warsaw"",""register_time"":""2020-03-02T19:10:25Z""}"
eb850b6b-573f-442e-87fc-a2a92a482233,1651237763008,"{""login"":""user2"",""city"":""Poznan"",""register_time"":""2020-03-30T19:10:25Z""}"
b2d5fbc6-0375-44f3-8a86-7cf385088928,1651237763008,"{""login"":""user3"",""city"":""Paris"",""register_time"":""2020-04-12T19:10:25Z""}"
ea7c3be8-b304-41be-a598-a035c7a9a509,1651237763008,"{""login"":""user4"",""city"":""Marseille"",""register_time"":""2020-05-21T19:10:25Z""}"

As you can see, the data is a stringified JSON and alongside the copied records, you will find the metadata generated by Airbyte, such as the record UUID assigned by the tool and the extraction time. For certain data stores, you can set up a Basic Normalization that will convert the flattened results to the columns.

Airbyte - Data stack integration

Airbyte has its own built-in scheduler for tasks. However, this doesn’t mean that you have to use it - especially if you have all the jobs orchestrated from a data orchestrator like Apache Airflow. The good news is that you can still rely on Airbyte for the data ingestion but control its execution from the orchestration layer. The solution not only helps better integrate the data ingestion tasks with the existing pipelines, but also extends the limited triggering capabilities of Airbyte. As of today, the tool supports only static triggering schedules going from manual to every 24 hours. Apache Airflow offers much more flexibility for that part and fortunately, you can trigger the task from an AirbyteTriggerSyncOperator and listen for its outcome from an AirbyteJobSensor.

Additionally, Airbyte also integrates with other popular data tools, namely dbt. If necessary , you can add a post-ingestion step and transform the native Extract Load job into an Extract Load Transform one. How? By adding a transformation step to the connection that invokes dbt commands, such as seed or run, on top of the ingested data.

Would you like to know more about dbt? Check out our blog post Up & Running: data pipeline with BigQuery and dbt.

Gotchas

Although Airbyte solves many data ingestion problems, it has several pitfalls. Firstly, the deployment. Defining the pipelines visually is nice and probably more pleasant than editing them from an IDE. However, the low-code character of this solution poses the question of how to deploy the ingestion jobs and integrate them to the classic CI/CD process? A potential solution could be Octavia CLI that transforms a static Airbyte configuration into templates that can be deployed to different environments.

The second question we had after the initial discovery was Big Data. As you saw in the previous section, the ingestion process works on the single node and uses CPU-powered parallelisation. Yet  there seems to be no way to parallelize a single ingestion job instance on a cluster. Hence, synchronizing big datasets may be slower with Airbyte than with an Apache Spark or Apache Flink job that also provides native connectivity to many data sources. 

Finally, we’ve also raised a question about daily maintenance. To understand the problem, let’s recall one of the basics of functional data engineering by Maxime Beauchemin, reproducibility and replicability. Simply speaking, a job execution should always use the same logic, even if it gets backfilled. How does it relate to Airbyte, if we consider it as a batch job of the passthrough type? Unfortunately, Airbyte doesn’t respect that logic. Even if you define your ingestion as an incremental process, you don’t have the possibility of replaying a specific execution instance. Instead, you have to relaunch the whole synchronization from scratch. It might not look like a big deal, but if your destination had lost the data from only the 3 most recent jobs, having to reingest everything won’t be optimal.

reset-your-data

As a workaround for the previous problem you could consider changing the cursor field in the metadata table. But considering that tools like Apache Airflow provide that feature out-of-the-box, there are  possibly  some improvements to be made in the Airbyte process. 

You’re now aware of the gotchas and hopefully, you’ll be able to make better data ingestion decisions. But if you need some help, we’re waiting for your message!

Airbyte
Data Ingestion
Dataset
14 June 2022

Want more? Check our articles

blog7

5 main data-related trends to be covered at Big Data Tech Warsaw 2021 Part II

Trend 4. Larger clouds over the Big Data landscape  A decade ago,  only a few companies ran their Big Data infrastructure and pipelines in the public…

Read more
mariusz blogobszar roboczy 1 4x 100
Tutorial

OAuth2-based authentication on Istio-powered Kubernetes clusters

You have just installed your first Kubernetes cluster and installed Istio to get the full advantage of Service Mesh. Thanks to really awesome…

Read more
getindata grafana loki monitoring
Use-cases/Project

Why are log analytics so important in a monitoring system?

A monitoring system is a necessary component of any data platform. We can find a lot of different services that use different approaches to the same…

Read more
getindata monitoring alert data streaming platfrorm
Use-cases/Project

How to build continuous processing for real-time data streaming platform?

Real-time data streaming platforms are tough to create and to maintain. This difficulty is caused by a huge amount of data that we have to process as…

Read more
lean big data 1
Tutorial

Lean Big Data - How to avoid wasting money with Big Data technologies and get some ROI

During my 6-year Hadoop adventure, I had an opportunity to work with Big Data technologies at several companies ranging from fast-growing startups (e…

Read more
getindata blog nifi tomasz nazarewicz
Tutorial

NiFi Scripted Components - the missing link between scripts and fully custom stuff

Custom components As we probably know, the biggest strength of Apache Nifi is the large amount of ready-to-use components. There are, of course…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy