火花非確定性和重新計算安全 (Spark nondeterminism and recomputation safety)


問題描述

火花非確定性和重新計算安全 (Spark nondeterminism and recomputation safety)

有人聲稱,由於重新計算和容錯,Spark RDD 必須是其輸入的確定性函數,但也有認可的非確定性 RDD,例如在 SparkSQLSparkML。是否有關於如何安全使用非確定性的正式指南?

考慮一下這個帶有菱形 DAG 的 Spark 作業。

val bar = (rdd map f) zip (rdd map g)
bar.saveAsTextFile("outfile")

如果 rdd 是非確定性的(例如,隨機或時間戳),將 outfile 包含一致的數據?是否有可能重新計算 zip 的一個組件而另一個組件不會?如果我們檢查點或持久化 rdd,是否能保證安全?本地檢查點 就足夠了嗎?


參考解法

方法 1:

General

Here are some of my takes and experience at a practical level:

  • If you read from tables / files in Hive, then Spark will make a list of all files used and what node provessed part of that list, so a re‑computation will be consistent if it goes all the way back to the start, i.e. read from HDFS / Hive for that subset of data.

  • If you use random functions, then I .cache or .persist to avoid re‑computation with different path logic. Of course, combined with the aformentioned, you would get different results if random function after reading and having to get data from source. See below.

  • Reading from a JDBC source there would be no guarantee on consistency / deterministic result if updating of that JDBC source is allowed at the same time of processing and the DAG recomputes from them.

Effect of checkpointing

In case of failure for whatever reason, computation all the way back to source from DAG, is expensive. A checkpoint taken at a given Stage stores the data to disk ‑ local or HDFS, and if there is a subsequent failure, then re‑computation starts from this point onwards, thus saving time. DAG Lineage is broken.

Final notes

What if the re‑computation starts from a JDBC source or random functions used that when processed in a Stage could affect subsequently already processed partitions? I cannot prove it easily, but those results that do not fit in the "current node" re‑processing, are I think discarded. It would not be practical otherwise is my take.

Relating to author's own answer, What is the difference between spark checkpoint and persist to a disk, the following should be noted: "... There are few important difference but the fundamental one is what happens with lineage. Persist / cache keeps lineage intact while checkpoint breaks lineage. ...". The statement in other answer is not correct.

方法 2:

Regarding the use of cache, persist and checkpoint on rdd, according to this post persist(StorageLevel.DISK_ONLY) will effectively break the lineage within the current job, while checkpoint breaks the lineage across jobs (but doesn't clean up the files). I tentatively conclude that task retries after the persist or checkpoint will not break data consistency. The cache operation does not guarantee consistency.

Can there be problems before the persist? If rdd partitions contain independent random data, there is no problem with task retries on separate partitions. If rdd contains timestamps, then rdd should consist of a single partition.

I tentatively conclude that a safe strategy for computing with a nondeterministic RDD is to build it from "independent" partitions, which are safe to recompute separately, and to immediately persist to disk or checkpoint the RDD. Checkpoint is required if the RDD is re‑used across jobs.

Some transformations introduce nondeterminism in the ordering of RDDs. If you need order consistency between re‑used copies of an RDD (e.g., due to zipWithIndex), the lineage, back to the most recent persist or checkpoint, should not contain any order‑modifying transformations.

(by Steve MitchellthebluephantomSteve Mitchell)

參考文件

  1. Spark nondeterminism and recomputation safety (CC BY‑SA 2.5/3.0/4.0)

#apache-spark






相關問題

為什麼我在 rdd 中的 println 會打印元素字符串? (Why does my println in rdd prints the string of elements?)

如何在 PySpark 中有效地按值排序? (How to sort by value efficiently in PySpark?)

Apache Spark 導致 Tomcat 正常關閉 (Apache Spark cause Tomcat to graceful shutdown)

查看 Spark 中派生的機器學習模型 (view machine learning model derived in Spark)

在 Spark 與 Redshift 上執行查詢 (Execute query on Spark vs Redshift)

Apache Spark:指向父 RDD 的引用指針 (Apache Spark: Reference pointer to the parent RDD)

防止 Spark Shell 中結構化流的進度輸出 (Prevent progress output from Structured Streaming in Spark Shell)

火花非確定性和重新計算安全 (Spark nondeterminism and recomputation safety)

使用 spark-submit 為 Spark Job 設置 HBase 屬性 (set HBase properties for Spark Job using spark-submit)

ST_WITHIN 使用 Spark/Java (ST_WITHIN using Spark / Java)

spark中的jdbc更新語句 (Jdbc update statement in spark)

使用 when() 進行條件聚合 (Conditional aggregation using when())







留言討論