問題描述
火花非確定性和重新計算安全 (Spark nondeterminism and recomputation safety)
有人聲稱,由於重新計算和容錯,Spark RDD 必須是其輸入的確定性函數,但也有認可的非確定性 RDD,例如在 SparkSQL 或 SparkML。是否有關於如何安全使用非確定性的正式指南?
考慮一下這個帶有菱形 DAG 的 Spark 作業。
val bar = (rdd map f) zip (rdd map g)
bar.saveAsTextFile("outfile")
如果 rdd
是非確定性的(例如,隨機或時間戳),將 outfile
包含一致的數據?是否有可能重新計算 zip 的一個組件而另一個組件不會?如果我們檢查點或持久化 rdd
,是否能保證安全?本地檢查點 就足夠了嗎?
參考解法
方法 1:
General
Here are some of my takes and experience at a practical level:
If you read from tables / files in Hive, then Spark will make a list of all files used and what node provessed part of that list, so a re‑computation will be consistent if it goes all the way back to the start, i.e. read from HDFS / Hive for that subset of data.
If you use random functions, then I .cache or .persist to avoid re‑computation with different path logic. Of course, combined with the aformentioned, you would get different results if random function after reading and having to get data from source. See below.
Reading from a JDBC source there would be no guarantee on consistency / deterministic result if updating of that JDBC source is allowed at the same time of processing and the DAG recomputes from them.
Effect of checkpointing
In case of failure for whatever reason, computation all the way back to source from DAG, is expensive. A checkpoint taken at a given Stage stores the data to disk ‑ local or HDFS, and if there is a subsequent failure, then re‑computation starts from this point onwards, thus saving time. DAG Lineage is broken.
Final notes
What if the re‑computation starts from a JDBC source or random functions used that when processed in a Stage could affect subsequently already processed partitions? I cannot prove it easily, but those results that do not fit in the "current node" re‑processing, are I think discarded. It would not be practical otherwise is my take.
Relating to author's own answer, What is the difference between spark checkpoint and persist to a disk, the following should be noted: "... There are few important difference but the fundamental one is what happens with lineage. Persist / cache keeps lineage intact while checkpoint breaks lineage. ...". The statement in other answer is not correct.
方法 2:
Regarding the use of cache
, persist
and checkpoint
on rdd
, according to this post persist(StorageLevel.DISK_ONLY)
will effectively break the lineage within the current job, while checkpoint
breaks the lineage across jobs (but doesn't clean up the files). I tentatively conclude that task retries after the persist
or checkpoint
will not break data consistency. The cache
operation does not guarantee consistency.
Can there be problems before the persist
? If rdd
partitions contain independent random data, there is no problem with task retries on separate partitions. If rdd
contains timestamps, then rdd
should consist of a single partition.
I tentatively conclude that a safe strategy for computing with a nondeterministic RDD is to build it from "independent" partitions, which are safe to recompute separately, and to immediately persist to disk or checkpoint the RDD. Checkpoint
is required if the RDD is re‑used across jobs.
Some transformations introduce nondeterminism in the ordering of RDDs. If you need order consistency between re‑used copies of an RDD (e.g., due to zipWithIndex
), the lineage, back to the most recent persist
or checkpoint
, should not contain any order‑modifying transformations.
(by Steve Mitchell、thebluephantom、Steve Mitchell)