вкратце это работает следующим образом.
допустим у нас запущен кластер на 4 воркера. Предположим что в джобе имеется только narrow transformation (.withColumn или .cast), и данные читаются а затем пишутся в S3 через DBFS обертку.
если умрет один из воркеров по любой из причин, мы просто запрашиваем новую виртуалку у AWS и перевыполняем на нем нехватающие партиции. Промежуточные данные на диске данного воркера нам не нужны, поскольку их можно перечитать из S3 по плану запроса на драйвере.
теперь другой кейс. допустим у нас идет цепочка read-groupBy-join-groupBy-write. В таком случае у нас появляются wide transformation (те самые шаффлы). Если один из воркеров (или экзекьюторов) отвалился, мы запрашиваем новую виртуалку у AWS и перевычисляем с того места, где есть последние шаффл данные (допустим на соседнем экзекьюторе, если нет - то пересчитываем от источника).
Это функциональность самого спарка, по сути аналогичная resource preemption в YARN, но с учетом реалий AWS.
Если у нас 4 воркера и 4 инстанса aws и скажем было 5 стрейджев,а потом один инстанс упал, то в общем случае нам придется пересчитать 1/4 всех тасков для всех стэйджев, которые уже завершились т.к на инстансе, который пропал были файлы. Код, который это делает есть в 3й версии спарка на гитхабе?