Real-time ingestion to Iceberg with Kafka Connect - Apache Iceberg Sink
What is Apache Iceberg? Apache Iceberg is an open table format for huge analytics datasets which can be used with commonly-used big data processing…
Read moreSQL language was invented in 1970 and has powered databases for decades. It allows you not only to query the data, but also to modify it easily on the row level. Big data evolution in 2006 changed this perspective by promoting immutability as a cure for the responsiveness of analytical queries. While immutable data makes sense in many cases, there is still a need to have scalable datasets with the ability to modify the rows and run the transactions. But can we do this in a world dominated by Hadoop-based execution engines? Well, meet Apache Iceberg.
Apache Iceberg is an open table format for huge analytics datasets. Apache Iceberg can be used with commonly used big data processing engines such as Apache Spark, Trino, PrestoDB, Flink and Hive.
Apache Iceberg is open source and its full specification is available to everyone, no surprises.
What can you get using Apache Iceberg and how can you benefit from this technology? Imagine a situation where the producer is in the process of saving the data and the consumer reads the data in the middle of that process. Apache Iceberg gives you serializable isolation (Atomicity), changes to the table are atomic and what's more consumers cannot see partial or uncommitted results. During the save process, data is never locked, consumers can reliably read the data without holding the lock.
Whenever your data seems to be corrupted or missing for a new version, it can simply be rolled back to the previous version using version history.
Apache Iceberg provides you with the possibility to write concurrently to a specific table, assuming an optimistic concurrency mechanism, which means that any writer performing a write operation assumes that there is no other writer at that moment. After the process is finished, it tries to swap the metadata files. If the swap process fails because the other writer has already saved the result, the process is then retried based on the new current table state.
Partitioning helps to reduce the amount of data loaded into memory, as opposed to the whole location of Apache Spark using a partition pruning mechanism which can only load selected partitions. Apache Iceberg compliments this behaviour by adding hidden partitioning. Iceberg then takes the column value and may optionally transform it, but still keeps track of the relationship. For example, assume that we have a table with timestamp values like presented below.
We can create tables partitioned by date and still keep track of the relationship and run fast queries with the partition pruning mechanism.
spark.table(“table”).where(“time = ‘2018-07-18’”)
To define hidden partition write simply
CREATE TABLE table (
id bigint,
time timestamp
)
USING iceberg
PARTITIONED BY (days(time))
At the moment you can use functions such as:
Apache Iceberg keeps track of partitions using metadata files, based on that partitioning can evolve during the table existance.
Clients no longer have to be worried about schema evolution, Apache Iceberg handles that also, by adding schema evolution functionalities:
At the moment Apache Iceberg supports two versions of table specification.
Version 1 of the Iceberg spec defines how to manage huge size tables with immutable formats of data like, parquet, avro or ORC.
Version 2 adds row level updates and deletes for version 1, the main difference between versions is that version 2 adds delete files to encode rows that are deleted in existing data files to reduce the amount of rewritten data.
Table is divided into two locations:
data:
metadata
Data consists of files with actual data, even prior snapshots. Metadata keeps the information within snapshots and files which are related to specific snapshots (avro files).
Snap files keep information on avro files where specific parquet files can be found. Avro files which start from some uuid hold reference to specific data files for a given snapshot.
vnr.metadata.json files keeps information about schema, the last update time (append, overwrite), snapshot version, partitioning and a few simple statistics. Version hint is like a tip in git, it refers to the actual version.
Apache Iceberg provides two methods for spark users to interact with ACID tables, via dataframes or using an sql syntax. All iceberg capabilities are available for spark version 3.x, for version 2.4 there is only support for DataFrame overwrite and append.
Apache Iceberg provides an easy way to extend spark with table specification, adding a jar to a spark session, adding extensions by using appropriate sql extensions and specifying the catalog place. .
Interacting with tables is also simplified, to create a partitioned table based on the dataframe write simple SQL
CREATE TABLE prod.db.table
USING iceberg
PARTITIONED BY (months(date))
AS SELECT * FROM temporary
ORDER BY date
Explicit Order is essential due to spark limitations (spark does not allow Iceberg to request sort before performing a write operation). Months function creates a hidden partition.
The same goal can be achieved using v2 API
df.sortWithinPartitions(“date”).writeTo(“prod.db.table”).create()
In addition, Apache Iceberg gives you the opportunity to insert data into the table, merge based on predicate, insert overwrite, delete from (row level deletes requires spark extensions), update, data frame append, overwrite.
To manage table use ALTER TABLE syntax, that brings you possibility to:
Apache iceberg gives you the flexibility to load any snapshot or data at a given point in time. To check the versions history runspark.read.table("local.db.table.history").show(10, false)
To read snapshot by id run
spark.read.option("snapshot-id", 2626515310899177788L).table("local.db.table")
Or at given any given time
spark.read.option("as-of-timestamp", 1637879675001L).table("local.db.table")
Apache Iceberg writes many new files on a disc. With many versions, used disc space may increase drastically, so to avoid this, specific versions of the snapshot can be marked as expired and removed from the disc.
In some situations, producers can fail during the data writes and data files may not be connected to the metadata files. Such files will not cause issues while loading data, but definitely take up disc space. To avoid this, simply remove orphan files using Java or Scala API.
Without locking the table for consumers, Apache Iceberg brings the possibility of compacting files into larger files using a data compaction mechanism.
If the data in the table has a long version history it is important to remove old metadata files, especially for streaming jobs which may produce a lot of new metadata files.
Apache Iceberg is a promising piece of technology, developed by the open source community. It is open-table format, it can be used on premise clusters or in the cloud. To add iceberg functionality to Apache Spark, all you need to do is provide additional packages and specify a few spark config options (spark.sql.extensions, spark.sql.catalog.spark_catalog etc
.). Rollback data whenever it is necessary based on the data history, load specific versions when you need to no longer have a nightmare when the data has to be restored after a failure.
Partitioning in Apache Iceberg is dynamic, metadata files hold that information, it can evolve during the table lifetime and if a new level of partitioning is needed, no worries. Partitioning is made simple - hidden partitioning allows for the pruning of partitions based on column relation, not on strict values. Create date partitions but load based on date month or timestamp as this is transparent to the user.
What is Apache Iceberg? Apache Iceberg is an open table format for huge analytics datasets which can be used with commonly-used big data processing…
Read moreBuilding a modern analytics environment is a strategic, long-term, iterative process of continuous improvement rather than a one-off project. The…
Read moreOne of the main challenges of today's Machine Learning initiatives is the need for a centralized store of high-quality data that can be reused by Data…
Read moreAs the effort to productionize ML workflows is growing, feature stores are also growing in importance. Their job is to provide standardized and up-to…
Read moreDuring my 6-year Hadoop adventure, I had an opportunity to work with Big Data technologies at several companies ranging from fast-growing startups (e…
Read moreThe year 2023 has definitely been dominated by LLM’s (Large Language Models) and generative models. Whether you are a researcher, data scientist, or…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?