What is delta lake?
Delta Lake is a project originated by the creators of Apache Spark, specifically designed to provide a thin layer on top of existing data lakes. The data is saved as snappy-compressed parquet files. The main goal of Delta is to bring ACID guarantees on data lakes so as to combine the best of horizontally scalable OLAP analytical workloads with the transactional reliability of OLTP ones. Ultimately, the goal of delta is to unify streaming and batch data processing. This is achieved mainly by adding a transaction log, recording every change made to the data. This allows rolling back to previous versions in what is called time traveling, as well as provides an audit log of data modifications and enables support for deletes, updates and merge operations.
The protocol has the following properties [1]:
- serializable writes - multiple concurrent writes to a delta table are serializable;
- snapshot isolation on reads - readers can read a consistent table snapshot even in presence of multiple writers;
- scalability to billions of partition files
- self-describing - metadata are stored alongside data to eliminate the need for a separate metastore and simplify data management, as well as provide back compatibility on existing infrastructure
- incremental processing - tailing the transaction log is enough to determine data added in a certain time period, in order to efficiently convert the table/file to a stream;
Transaction log
When writing a Spark DataFrame (i.e., df.write.format("delta").save(path) ) to Delta or when creating a Delta-formatted Hive table, a _delta_log folder is added along with the data folder.
The transaction log indicates which files are to be included in the currently active version. Without this, when multiple processes write to the same folder, in case of error there is no mechanism to clean it up from those files that were already written. So without a transaction log and with direct read access to the data files all of them are retrieved.
For each transaction there exists a CRC file along with a JSON file. The CRC file contains information related to the number of files and size that helps Spark optimize its queries.
Upon modifications (insert, delete, update or merge) the transaction log is modified by atomic operations named commits. Each commit is written as a JSON file, starting with 000000.json and subsequent commits generate additional JSON files named with an ascending ID. The transaction log contains an add.path column with the list of files being added by the commit and a remove.path column with all those being removed;
An OPTIMIZE operation is available to compact the files in the current state. This has the effect of adding a new transaction with data residing in a lower number of partitions. The effect of optimize is also visible by looking at the metrics numFilesAdded and numFilesRemoved.
Upon data removal, the operation is recorded in the transaction log. In this case, a new commit is added with all data entries but those deleted. In spite of the fact that data is no longer present in the latest version of the table, data are in fact retained and it is possible to rollback to a previous state. Since those files are not automatically removed from disk, a VACUUM operation exists.
Since a streaming producer would be writing many transactions, the small-file problem would quickly arise by solely relying on commit files. The solution is to use periodic checkpointing, specifically meant to save the entire table snapshot after the nth commit occurred. This also avoids readers to have to rerun the transaction log (i.e., many tiny inefficient JSON files) to reproduce the table state. In practice, this is done incrementally, so every commit is being added by Spark and the resulting table is cached, so that it can be directly used by depending Spark operations. Upon reaching the 10th commit, a checkpoint file is produced and saved in Parquet format.
When looking at the parquet checkpoint file, you will notice that all previous n transactions are contained, along with a stats_parsed column containing similar information to that of the individual CRC files.
Optimistic concurrency control
Transactions resulting by concurrent writers are ensured to complete without conflict. This is implicitly achieved when working on different parts of the tables (different partitions). However, the optimistic protocol works when producers write to the same parts of the table simultaneously. Serializability is achieved by implementing a policy of mutual exclusion:
if there is a conflict (another writer is attempting a commit), check whether what was read has changed, if so read the latest version and attempt to commit with a newer id, otherwise, just commit the version and go on;
For instance, suppose:
- user 1 reads 000000.json, user 2 reads 000001.json
- user 1 attempts to commit 000001.json, user 2 attempts to commit 000002.json at the same time
- only one commit can be 000001.json
- user 1 sees there is a newer commit than 000000.json named 000001.json, while user 2 sees it has already the newest version and can commit 000002.json
Table Utils
Table History
Vacuum
Miscellaneous
Convert Parquet table to Delta table
- CONVERT TO DELTA parquet.`<path-to-table>`
- DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")
Integration with non-Spark systems
- GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
- DeltaTable.forPath("path-to-delta-table").generate("symlink_format_manifest").
Selecting specific table version
- DESCRIBE HISTORY <table-name>
- SELECT * FROM <table> VERSION AS OF <version>
- SELECT * FROM <table> VERSION AS OF <datetime>
Delta as a Stream
- spark.readStream.format("delta").load("delta-table-path")
- spark.readStream.format("delta").table("delta-table-name")
- stream.writeStream.format("delta").outputMode("append").option("checkpointLocation", <path>).start("delta-table-path")
- stream.writeStream.format("delta").outputMode("append").option("checkpointLocation", <path>).table("delta-table-name")
Bibliography
- D. Lee, T. Das, V. Jaiswal. Delta Lake The Definitive Guide: Modern Data Lakehouse Architectures with Delta Lake. O'Reilly 2022.
- J. Dittrich. Multi-Version Concurrency Control (MVCC). Youtube.
No comments:
Post a Comment