Wednesday, October 6, 2021

TLDR intro to Delta Lake

What is delta lake?

Delta Lake is a project originated by the creators of Apache Spark, specifically designed to provide a thin layer on top of existing data lakes. The data is saved as snappy-compressed parquet files. The main goal of Delta is to bring ACID guarantees on data lakes so as to combine the best of horizontally scalable OLAP analytical workloads with the transactional reliability of OLTP ones. Ultimately, the goal of delta is to unify streaming and batch data processing. This is achieved mainly by adding a transaction log, recording every change made to the data. This allows rolling back to previous versions in what is called time traveling, as well as provides an audit log of data modifications and enables support for deletes, updates and merge operations.

The protocol has the following properties [1]:

  • serializable writes - multiple concurrent writes to a delta table are serializable; 
  • snapshot isolation on reads - readers can read a consistent table snapshot even in presence of multiple writers;
  • scalability to billions of partition files
  • self-describing - metadata are stored alongside data to eliminate the need for a separate metastore and simplify data management, as well as provide back compatibility on existing infrastructure
  • incremental processing - tailing the transaction log is enough to determine data added in a certain time period, in order to efficiently convert the table/file to a stream;

Transaction log

When writing a Spark DataFrame (i.e., df.write.format("delta").save(path) ) to Delta or when creating a Delta-formatted Hive table, a _delta_log folder is added along with the data folder.

The transaction log indicates which files are to be included in the currently active version. Without this, when multiple processes write to the same folder, in case of error there is no mechanism to clean it up from those files that were already written. So without a transaction log and with direct read access to the data files all of them are retrieved.

For each transaction there exists a CRC file along with a JSON file. The CRC file contains information related to the number of files and size that helps Spark optimize its queries.

Upon modifications (insert, delete, update or merge) the transaction log is modified by atomic operations named commits. Each commit is written as a JSON file, starting with 000000.json and subsequent commits generate additional JSON files named with an ascending ID. The transaction log contains an add.path column with the list of files being added by the commit and a remove.path column with all those being removed; 

An OPTIMIZE operation is available to compact the files in the current state. This has the effect of adding a new transaction with data residing in a lower number of partitions. The effect of optimize is also visible by looking at the metrics numFilesAdded and numFilesRemoved.

Upon data removal, the operation is recorded in the transaction log. In this case, a new commit is added with all data entries but those deleted. In spite of the fact that data is no longer present in the latest version of the table, data are in fact retained and it is possible to rollback to a previous state. Since those files are not automatically removed from disk, a VACUUM operation exists. 

Since a streaming producer would be writing many transactions, the small-file problem would quickly arise by solely relying on commit files. The solution is to use periodic checkpointing, specifically meant to save the entire table snapshot after the nth commit occurred. This also avoids readers to have to rerun the transaction log (i.e., many tiny inefficient JSON files) to reproduce the table state. In practice, this is done incrementally, so every commit is being added by Spark and the resulting table is cached, so that it can be directly used by depending Spark operations. Upon reaching the 10th commit, a checkpoint file is produced and saved in Parquet format.

When looking at the parquet checkpoint file, you will notice that all previous n transactions are contained, along with a stats_parsed column containing similar information to that of the individual CRC files.

Optimistic concurrency control

Transactions resulting by concurrent writers are ensured to complete without conflict. This is implicitly achieved when working on different parts of the tables (different partitions). However, the optimistic protocol works when producers write to the same parts of the table simultaneously. Serializability is achieved by implementing a policy of mutual exclusion:

if there is a conflict (another writer is attempting a commit), check whether what was read has changed, if so read the latest version and attempt to commit with a newer id, otherwise, just commit the version and go on;

For instance, suppose:

  • user 1 reads 000000.json, user 2 reads 000001.json
  • user 1 attempts to commit 000001.json, user 2 attempts to commit 000002.json at the same time
  • only one commit can be 000001.json
  • user 1 sees there is a newer commit than 000000.json named 000001.json, while user 2 sees it has already the newest version and can commit 000002.json
Therefore, the solution lets one of the user succeed and the other one retry with another commit.
When both users attempt deleting the same data twice, the only solution is instead to let one succeed and the other fail and throw an error, since delete is not idempotent and upon reading the newer version the user can not apply any longer the delete operation.

This mechanism is implemented using multiversion concurrency control (MVCC), which provides transactional guarantees (i.e. serializability and snapshot isolation) without needing to physically lock the resource, consequently allowing for higher performance.

Table Utils

Table History

Table history, i.e. the list of operations along with user timestamp and other metadata can be retrieved with a DESCRIBE HISTORY <table> or with DeltaTable.forPath(spark, "mypath").history(). Table history is retained by default for 30 days and can be configured using the config spark.databricks.delta.logRetentionDuration. Upon new commits, the transaction log is cleaned up for those commits older than the set retention period. 

Vacuum

While the transaction log is automatically cleaned up at every new commit (if older than the set retention) the data file must be deleted explicitly. Dangling files, i.e. files no longer referenced as they were overwritten or deleted, can be removed by running the VACUUM operation. VACUUM is never called automatically and when used the default threshold used for the files to be removed is 7 days, i.e. files older than 7 days will be removed. Generally, this interval should be longer than the longest-running transaction or the longest period that any input source can lag behind the most recent update to the table. The config spark.databricks.delta.deletedFileRetentionDuration controls the threshold between the time files were marked for deletion and the moment they can be actually deleted by VACUUM.
The config spark.databricks.delta.vacuum.parallelDelete.enabled can be set to true to vacuum delete the files in parallel (as based on the number of shuffle partitions).

Miscellaneous

Convert Parquet table to Delta table

Converting Parquet tables to Delta can be easily done with:
  • CONVERT TO DELTA parquet.`<path-to-table>`
  • DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")
Conversion back to Parquet is easily done by running VACUUM with a retention of 0 hours to delete all dangling data files and then deleting the _delta_log directory.

Integration with non-Spark systems

To allow non-Spark systems, such as Presto, to integrate with Delta lake without accessing the transaction log, it is possible to generate a manifest file with:
  • GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>` 
  • DeltaTable.forPath("path-to-delta-table").generate("symlink_format_manifest").
A file named _symlink_format_manifest is created at the delta table path.

Selecting specific table version

Versions of Delta tables can be accessed by timestamp or a version number. These can be listed with:
  • DESCRIBE HISTORY <table-name>

A specific version can be queried, such as:
  • SELECT * FROM <table> VERSION AS OF <version>
  • SELECT * FROM <table> VERSION AS OF <datetime>

Delta as a Stream

A Delta table can be used as a source in a stream processing pipeline, such as Spark Streaming. For instance:
  • spark.readStream.format("delta").load("delta-table-path")
  • spark.readStream.format("delta").table("delta-table-name")
Similarly, delta can be used as a sink:
  • stream.writeStream.format("delta").outputMode("append").option("checkpointLocation", <path>).start("delta-table-path")
  • stream.writeStream.format("delta").outputMode("append").option("checkpointLocation", <path>).table("delta-table-name")

No comments:

Post a Comment