Tutorial
8 min read

Apache Spark with Apache Iceberg - a way to boost your data pipeline performance and safety

SQL language was invented in 1970 and has powered databases for decades. It allows you not only to query the data, but also to modify it easily on the row level. Big data evolution in 2006 changed this perspective by promoting immutability as a cure for the responsiveness of analytical queries. While immutable data makes sense in many cases, there is still a need to have scalable datasets with the ability to modify the rows and run the transactions. But can we do this in a world dominated by Hadoop-based execution engines? Well, meet Apache Iceberg.

Apache Iceberg is an open table format for huge analytics datasets. Apache Iceberg can be used with commonly used big data processing engines such as Apache Spark, Trino, PrestoDB, Flink and Hive. 

Apache Iceberg is open source and its full specification is available to everyone, no surprises. 

Introduction

What can you get using Apache Iceberg and how can you benefit from this technology? Imagine a situation where the producer is in the process of saving the data and the consumer reads the data in the middle of that process. Apache Iceberg gives you serializable isolation (Atomicity), changes to the table are atomic and what's more consumers cannot see partial or uncommitted results. During the save process, data is never locked, consumers can reliably read the data without holding the lock. 

Whenever your data seems to be corrupted or missing for a new version, it can simply be rolled back to the previous version using version history. 

Apache Iceberg provides you with the possibility to write concurrently to a specific table, assuming an optimistic concurrency mechanism, which means that any writer performing a write operation assumes that there is no other writer at that moment. After the process is finished, it tries to swap the metadata files. If the swap process fails because the other writer has already saved the result, the process is then retried based on the new current table state.

Partitioning helps to reduce the amount of data loaded into memory, as opposed to the whole location of Apache Spark using a partition pruning mechanism which can only load selected partitions. Apache Iceberg compliments this behaviour by adding hidden partitioning. Iceberg then takes the column value and may optionally transform it, but still keeps track of the relationship. For example, assume that we have a table with timestamp values like presented below. 

introduction-spark-iceberg-getindata

We can create tables partitioned by date and still keep track of the relationship and run fast queries with the partition pruning mechanism.

spark.table(“table”).where(“time = ‘2018-07-18’”)

To define hidden partition write simply

CREATE TABLE table (

    id bigint,

    time timestamp

)

USING iceberg

PARTITIONED BY (days(time))

At the moment you can use functions such as:

  • year
  • month
  • day
  • hour

Apache Iceberg keeps track of partitions using metadata files, based on that partitioning can evolve during the table existance. 

Clients no longer have to be worried about schema evolution, Apache Iceberg handles that also, by adding schema evolution functionalities:

  • Add - new column to the table
  • Rename - column name can be changed during the table lifetime
  • Drop - remove existing column
  • Reorder - change position of any column
  • Update - Widen the type of the column, or complex types such as struct field, map key, map value, or list element.

Different Table specifications

At the moment Apache Iceberg supports two versions of table specification.

Version 1 of the Iceberg spec defines how to manage huge size tables with immutable formats of data like, parquet, avro or ORC.

Version 2 adds row level updates and deletes for version 1, the main difference between versions is that version 2 adds delete files to encode rows that are deleted in existing data files to reduce the amount of rewritten data.

How Apache Iceberg manage the data (Table v1)

Table is divided into two locations:

  • data
  • metadata

data:

  • 00001-99-d6620612-66f1-430c-aeee-2af099f7908c-00001.parquet
  • 00001-98-d6620612-66f1-430c-aeee-2af099f7908c-00001.parquet
  • 00001-97-d6620612-66f1-430c-aeee-2af099f7908c-00001.parquet
  • …..
  • 00001-01-d6620612-66f1-430c-aeee-2af099f7908c-00001.parquet

metadata

  • snap-836290095057887676-1-8347b010-09ee-47e3-b867-83f4d90669e2.avro
  • ….
  • snap-8851525293705610611-1-f0664dba-0c01-4f6c-8060-bb0473d66cfa.avro
  • 18c57490-40c5-4b80-a3ec-3118f1a41a6e-m0.avro
  • f0664dba-0c01-4f6c-8060-bb0473d66cfa-m0.avro
  • ​​v1.metadata.json
  • v4.metadata.json
  • version-hint.text

Data consists of files with actual data, even prior snapshots. Metadata keeps the information within snapshots and files which are related to specific snapshots (avro files).

Snap files keep information  on avro files where specific parquet files can be found. Avro files which start from some uuid hold reference to specific data files for a given snapshot. 

vnr.metadata.json files keeps information about schema, the last update time (append, overwrite), snapshot version, partitioning and a few simple statistics. Version hint is like a tip in git, it refers to the actual version.

How to use Apache Iceberg with Spark

Apache Iceberg provides two methods for spark users to interact with ACID tables, via dataframes or using an sql syntax. All iceberg capabilities are available for spark version 3.x, for version 2.4 there is only support for DataFrame overwrite and append. 

Apache Iceberg provides an easy way to extend spark with table specification, adding a jar to a spark session, adding extensions by using appropriate sql extensions and specifying the catalog place. . 

Interacting with tables is also simplified, to create a partitioned table based on the dataframe write simple SQL

CREATE TABLE prod.db.table

USING iceberg

PARTITIONED BY (months(date))

AS SELECT * FROM temporary

ORDER BY date

Explicit Order is essential due to spark limitations (spark does not allow Iceberg to request sort before performing a write operation). Months function creates a hidden partition.

The same goal can be achieved using v2 API

df.sortWithinPartitions(“date”).writeTo(“prod.db.table”).create()

In addition, Apache Iceberg gives you the opportunity to insert data into the table, merge based on predicate, insert overwrite, delete from (row level deletes requires spark extensions), update, data frame append, overwrite. 

To manage table use ALTER TABLE syntax, that brings you possibility to:

  • change column type, position (ALTER COLUMN)
  • drop column (DROP COLUMN)
  • add partition field (ADD PARTITION FIELD) 
  • drop partition field (DROP PARTITION FIELD)

Apache iceberg gives you the flexibility to load any snapshot or data at a given point in time. To check the versions history run
spark.read.table("local.db.table.history").show(10, false)

big-data-blog-spark-iceberg

To read snapshot by id run

spark.read.option("snapshot-id", 2626515310899177788L).table("local.db.table")

Or at given any given time

spark.read.option("as-of-timestamp", 1637879675001L).table("local.db.table")

Why is it worth using built-in maintenance tools? 

Apache Iceberg writes many new files on a disc.  With many versions, used disc space may increase drastically, so to avoid this, specific versions of the snapshot can be marked as expired and removed from the disc. 

In some situations, producers can fail during the data writes and data files may not be connected to the metadata files. Such files will not cause issues while loading data, but definitely take up disc space. To avoid this, simply remove orphan files using Java or Scala API. 

Without locking the table for consumers, Apache Iceberg brings the possibility of compacting files into larger files using a data compaction mechanism. 

If the data in the table has a long version history it is important to remove old metadata files, especially for streaming jobs which may produce a lot of new metadata files. 

Summary 

Apache Iceberg is a promising piece of technology, developed by the open source community. It is open-table format, it can be used on premise clusters or in the cloud. To add iceberg functionality to Apache Spark, all you need to do is provide additional packages and specify a few spark config options (spark.sql.extensions, spark.sql.catalog.spark_catalog etc.). Rollback data whenever it is necessary based on the data history, load specific versions when you need to no longer have a nightmare when the data has to be restored after a failure. 

Partitioning in Apache Iceberg is dynamic, metadata files hold that information, it can evolve during the table lifetime and if a new level of partitioning is needed, no worries. Partitioning is made simple -  hidden partitioning allows for the pruning of partitions based on column relation, not on strict values. Create date partitions but load based on date month or timestamp as this is transparent to the user.

big data
spark
data pipelines
iceberg
22 December 2021

Want more? Check our articles

real time reporting cover getindata
Tutorial

Real-Time Customer-Facing Reporting - Why Showing Users Data Sooner Rather than Later is Better

In today's fast-paced business environment, companies are increasingly turning to real-time data to gain a competitive edge. One of the examples are…

Read more
1RiTD99ILqsAaSQqY1GaLMw
Big Data Event

Five big ideas to learn at Big Data Tech Warsaw 2020

Hello again in 2020. It’s a new year and the new, 6th edition of Big Data Tech Warsaw is coming soon! Save the date: 27th of February. We have put…

Read more
0 pjPVaAnArwat2ZH8
Big Data Event

Big Data Tech Warsaw Summit 2019 summary

It’s been already more than a month after Big Data Tech Warsaw Summit 2019, but it’s spirit is still among us — that’s why we’ve decided to prolong it…

Read more
getindator create an image illustrating the concept of data ske b0d7e21f 9c85 40d2 9a52 32caba3aece3
Tutorial

Data skew in Flink SQL

Data processing in real-time has become crucial for businesses, and Apache Flink, with its powerful stream processing capabilities, is at the…

Read more
llm data enrichment bigqueryobszar roboczy 1 4
Tutorial

How to use LLMs for data enrichment in BigQuery?

Introduction In the ever-evolving world of data analytics, businesses are continuously seeking innovative methods to unlock hidden value from their…

Read more
radiodatawilla
Radio DaTa Podcast

Data Journey with Arunabh Singh (Willa) – Building robust ML & Analytics capability very early with FinTech, skills & competencies for data scientists with ML/AI predictions for the next decades.

In this episode of the RadioData Podcast, Adama Kawa talks with Arunabh Singh about Willa use cases (​ FinTech): the most important ML models…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy