Tutorial
8 min read

Apache Spark with Apache Iceberg - a way to boost your data pipeline performance and safety

SQL language was invented in 1970 and has powered databases for decades. It allows you not only to query the data, but also to modify it easily on the row level. Big data evolution in 2006 changed this perspective by promoting immutability as a cure for the responsiveness of analytical queries. While immutable data makes sense in many cases, there is still a need to have scalable datasets with the ability to modify the rows and run the transactions. But can we do this in a world dominated by Hadoop-based execution engines? Well, meet Apache Iceberg.

Apache Iceberg is an open table format for huge analytics datasets. Apache Iceberg can be used with commonly used big data processing engines such as Apache Spark, Trino, PrestoDB, Flink and Hive. 

Apache Iceberg is open source and its full specification is available to everyone, no surprises. 

Introduction

What can you get using Apache Iceberg and how can you benefit from this technology? Imagine a situation where the producer is in the process of saving the data and the consumer reads the data in the middle of that process. Apache Iceberg gives you serializable isolation (Atomicity), changes to the table are atomic and what's more consumers cannot see partial or uncommitted results. During the save process, data is never locked, consumers can reliably read the data without holding the lock. 

Whenever your data seems to be corrupted or missing for a new version, it can simply be rolled back to the previous version using version history. 

Apache Iceberg provides you with the possibility to write concurrently to a specific table, assuming an optimistic concurrency mechanism, which means that any writer performing a write operation assumes that there is no other writer at that moment. After the process is finished, it tries to swap the metadata files. If the swap process fails because the other writer has already saved the result, the process is then retried based on the new current table state.

Partitioning helps to reduce the amount of data loaded into memory, as opposed to the whole location of Apache Spark using a partition pruning mechanism which can only load selected partitions. Apache Iceberg compliments this behaviour by adding hidden partitioning. Iceberg then takes the column value and may optionally transform it, but still keeps track of the relationship. For example, assume that we have a table with timestamp values like presented below. 

introduction-spark-iceberg-getindata

We can create tables partitioned by date and still keep track of the relationship and run fast queries with the partition pruning mechanism.

spark.table(“table”).where(“time = ‘2018-07-18’”)

To define hidden partition write simply

CREATE TABLE table (

    id bigint,

    time timestamp

)

USING iceberg

PARTITIONED BY (days(time))

At the moment you can use functions such as:

  • year
  • month
  • day
  • hour

Apache Iceberg keeps track of partitions using metadata files, based on that partitioning can evolve during the table existance. 

Clients no longer have to be worried about schema evolution, Apache Iceberg handles that also, by adding schema evolution functionalities:

  • Add - new column to the table
  • Rename - column name can be changed during the table lifetime
  • Drop - remove existing column
  • Reorder - change position of any column
  • Update - Widen the type of the column, or complex types such as struct field, map key, map value, or list element.

Different Table specifications

At the moment Apache Iceberg supports two versions of table specification.

Version 1 of the Iceberg spec defines how to manage huge size tables with immutable formats of data like, parquet, avro or ORC.

Version 2 adds row level updates and deletes for version 1, the main difference between versions is that version 2 adds delete files to encode rows that are deleted in existing data files to reduce the amount of rewritten data.

How Apache Iceberg manage the data (Table v1)

Table is divided into two locations:

  • data
  • metadata

data:

  • 00001-99-d6620612-66f1-430c-aeee-2af099f7908c-00001.parquet
  • 00001-98-d6620612-66f1-430c-aeee-2af099f7908c-00001.parquet
  • 00001-97-d6620612-66f1-430c-aeee-2af099f7908c-00001.parquet
  • …..
  • 00001-01-d6620612-66f1-430c-aeee-2af099f7908c-00001.parquet

metadata

  • snap-836290095057887676-1-8347b010-09ee-47e3-b867-83f4d90669e2.avro
  • ….
  • snap-8851525293705610611-1-f0664dba-0c01-4f6c-8060-bb0473d66cfa.avro
  • 18c57490-40c5-4b80-a3ec-3118f1a41a6e-m0.avro
  • f0664dba-0c01-4f6c-8060-bb0473d66cfa-m0.avro
  • ​​v1.metadata.json
  • v4.metadata.json
  • version-hint.text

Data consists of files with actual data, even prior snapshots. Metadata keeps the information within snapshots and files which are related to specific snapshots (avro files).

Snap files keep information  on avro files where specific parquet files can be found. Avro files which start from some uuid hold reference to specific data files for a given snapshot. 

vnr.metadata.json files keeps information about schema, the last update time (append, overwrite), snapshot version, partitioning and a few simple statistics. Version hint is like a tip in git, it refers to the actual version.

How to use Apache Iceberg with Spark

Apache Iceberg provides two methods for spark users to interact with ACID tables, via dataframes or using an sql syntax. All iceberg capabilities are available for spark version 3.x, for version 2.4 there is only support for DataFrame overwrite and append. 

Apache Iceberg provides an easy way to extend spark with table specification, adding a jar to a spark session, adding extensions by using appropriate sql extensions and specifying the catalog place. . 

Interacting with tables is also simplified, to create a partitioned table based on the dataframe write simple SQL

CREATE TABLE prod.db.table

USING iceberg

PARTITIONED BY (months(date))

AS SELECT * FROM temporary

ORDER BY date

Explicit Order is essential due to spark limitations (spark does not allow Iceberg to request sort before performing a write operation). Months function creates a hidden partition.

The same goal can be achieved using v2 API

df.sortWithinPartitions(“date”).writeTo(“prod.db.table”).create()

In addition, Apache Iceberg gives you the opportunity to insert data into the table, merge based on predicate, insert overwrite, delete from (row level deletes requires spark extensions), update, data frame append, overwrite. 

To manage table use ALTER TABLE syntax, that brings you possibility to:

  • change column type, position (ALTER COLUMN)
  • drop column (DROP COLUMN)
  • add partition field (ADD PARTITION FIELD) 
  • drop partition field (DROP PARTITION FIELD)

Apache iceberg gives you the flexibility to load any snapshot or data at a given point in time. To check the versions history run
spark.read.table("local.db.table.history").show(10, false)

big-data-blog-spark-iceberg

To read snapshot by id run

spark.read.option("snapshot-id", 2626515310899177788L).table("local.db.table")

Or at given any given time

spark.read.option("as-of-timestamp", 1637879675001L).table("local.db.table")

Why is it worth using built-in maintenance tools? 

Apache Iceberg writes many new files on a disc.  With many versions, used disc space may increase drastically, so to avoid this, specific versions of the snapshot can be marked as expired and removed from the disc. 

In some situations, producers can fail during the data writes and data files may not be connected to the metadata files. Such files will not cause issues while loading data, but definitely take up disc space. To avoid this, simply remove orphan files using Java or Scala API. 

Without locking the table for consumers, Apache Iceberg brings the possibility of compacting files into larger files using a data compaction mechanism. 

If the data in the table has a long version history it is important to remove old metadata files, especially for streaming jobs which may produce a lot of new metadata files. 

Summary 

Apache Iceberg is a promising piece of technology, developed by the open source community. It is open-table format, it can be used on premise clusters or in the cloud. To add iceberg functionality to Apache Spark, all you need to do is provide additional packages and specify a few spark config options (spark.sql.extensions, spark.sql.catalog.spark_catalog etc.). Rollback data whenever it is necessary based on the data history, load specific versions when you need to no longer have a nightmare when the data has to be restored after a failure. 

Partitioning in Apache Iceberg is dynamic, metadata files hold that information, it can evolve during the table lifetime and if a new level of partitioning is needed, no worries. Partitioning is made simple -  hidden partitioning allows for the pruning of partitions based on column relation, not on strict values. Create date partitions but load based on date month or timestamp as this is transparent to the user.

big data
spark
data pipelines
iceberg
22 December 2021

Want more? Check our articles

bqmlobszar roboczy 1 4
Tutorial

A Step-by-Step Guide to Training a Machine Learning Model using BigQuery ML (BQML)

What is BigQuery ML? BQML empowers data analysts to create and execute ML models through existing SQL tools & skills. Thanks to that, data analysts…

Read more
1 RsDrT5xOpdAcpehomqlOPg
Big Data Event

2³ Reasons To Speak at Big Data Tech Warsaw 2020 (February 27th, 2020)

Big Data Technology Warsaw Summit 2020 is fast approaching. This will be 6th edition of the conference that is jointly organised by Evention and…

Read more
dynamicsqlprocessingwithapacheflinkobszar roboczy 1 4
Tutorial

Dynamic SQL processing with Apache Flink

In this blog post, I would like to cover the hidden possibilities of dynamic SQL processing using the current Flink implementation. I will showcase a…

Read more
1 gh9BkF JQSj9vlgSi0I48A
Tech News

Everything you would like to know about Kubernetes

Source: GetInData, Google. Kubernetes. What is it? Undoubtedly one of the hottest topics in Big Data world over the last months and a subject of…

Read more
datagenerationobszar roboczy 1 4
Tutorial

Data online generation for event stream processing

In a lot of business cases that we solve at Getindata when working with our clients, we need to analyze sessions: a series of related events of actors…

Read more
transfer legacy pipeline modern using gitlab cicd
Tutorial

How we helped our client to transfer legacy pipeline to modern one using GitLab's CI/CD - Part 3

Please dive in the third part of a blog series based on a project delivered for one of our clients. Please click part I, part II to read the…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.

The administrator of your personal data is GetInData Sp. z o.o. Sp.k with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the  Terms & Conditions. For more information on personal data processing and your rights please see  Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy