Size: a a a

2021 February 15

NN

No Name in Moscow Spark
tenKe
> И, следовательно, шаффл со спиллом и без отличается только тем, что первый
шафл со спиллом это когда в процессе подготвки файла  шафла все настолько большое, что не влезает в память воркера и он спиллит данные на диск. В итоге да, оверхед на I/O в диск получается многократный
Не, вроде же он и не должен целиком влезть, там некий шаффл буфер, который что-то типа 0.2 по-умолчанию от spark.shuffle.memoryFraction, если я ничего не путаю
источник

NN

No Name in Moscow Spark
tenKe
> больше грузит сеть, так?
а вот сеть по идее тут не аффектит
А, ну, если spill writer и spill reader поднимаются на тех же самых нодах (что более, чем логично), то, да, сетке пофиг должно быть.
источник

N

Nikolay in Moscow Spark
No Name
Народ, а кто в кишки лазил, вот помогите понять - происходит вот у нас шаффл. По идее, есть у нас буфер в execution memory, там происходит некий сортинг входной партиции. Далее, по достижении определенного лимита, данные спилятся на диск spill writer-ом в отдельный файл. Допустим, это произошло ещё несколько раз, после чего все прокрутилось, и файлы опять поднимаются в буфер, окончательно сортируются и мерджатся в один файл. Дальше эта шаффлд партиция отправляется на читку spill reader-у, который опять тащит блоки из нее в буфер и проводит похожую с spill writer-ом манипуляцию. Эта каша у меня в голове образовалась после прочтения нескольких разных источников и попытки немного полазить по сырцам. Помогите, пожалуйста, устранить кашу в голове:
1. Если говорить просто про шаффл - он вообще в состоянии произойти только в оперативной памяти, даже если данные входящей партиции помещаются в буфер, или же в любом случае будет промежуточные результаты  скидывать на диск?
2.  Если будет скидывать на диск все равно - то в чем вообще отличие шаффла со спиллом и без него?
3. Зачем spill reader ещё раз колбасит данные после spill wtiter-а?
4. Куда вообще сохраняет свои промежуточные результаты spill writer и spill reader (и shuffle writer ещё)? На датаноду локально, или в распределенное хранилище?
Там разные есть врайтеры. они по разному работают .  https://github.com/nkudinov/apache_spark_pptx/blob/master/Spark_sort2.pptx и https://github.com/nkudinov/apache_spark_pptx/blob/master/Spark_sort5.pptx
источник

M

Mi in Moscow Spark
No Name
А, ну, если spill writer и spill reader поднимаются на тех же самых нодах (что более, чем логично), то, да, сетке пофиг должно быть.
С 1.6 уже нет
источник

M

Mi in Moscow Spark
Там общая память на все и автоматически решает где нужно больше где меньше
источник

NN

No Name in Moscow Spark
Mi
Там общая память на все и автоматически решает где нужно больше где меньше
Да, действительно
источник

NN

No Name in Moscow Spark
Mi
Почему-то
В смысле, странно, что, имея такой механизм, как спилл, все равно возможен оом?
источник

t

tenKe in Moscow Spark
Есть видео доклада этой презы?
источник

А

Алексей in Moscow Spark
интересные презентации, поболее бы текста к ним для не совсем понимающих..
источник

А

Алексей in Moscow Spark
No Name
В смысле, странно, что, имея такой механизм, как спилл, все равно возможен оом?
реально, кто может рассказать, почему может быть OOM, если есть spill?
источник

NN

No Name in Moscow Spark
Алексей
интересные презентации, поболее бы текста к ним для не совсем понимающих..
+
источник

M

Mi in Moscow Spark
No Name
В смысле, странно, что, имея такой механизм, как спилл, все равно возможен оом?
Я ж сюда со своей проблемой писал на прошлой неделе, окно/джоин по большой грязной патриции -> репартишен -> начинает спилить на диск -> через какое-то время Ярн убивает контейнер по памяти.
--
Вполне возможно что это может быть проблема вообще ярна или какой-то не совсем идеальной конфигурации экщекуторов когда вот чуть чуть не хватает, но тем не менее
источник

NN

No Name in Moscow Spark
Mi
Я ж сюда со своей проблемой писал на прошлой неделе, окно/джоин по большой грязной патриции -> репартишен -> начинает спилить на диск -> через какое-то время Ярн убивает контейнер по памяти.
--
Вполне возможно что это может быть проблема вообще ярна или какой-то не совсем идеальной конфигурации экщекуторов когда вот чуть чуть не хватает, но тем не менее
Ну, прикол в том, что есть некий набор достаточно стандартных действий, которые обычно помогают решить такого рода проблему, как ООМ. Поэтому на глубинном уровне, мне кажется, многие не заморачиваться по этому поводу. Я вот периодически по тому или иному поводу начинаю это делать, и, спасибо @tenKe , в этот раз ещё часть алгоритмов работы спарка в голове уложились во что-то более-менее стройное. Но при этом я тоже перестал понимать, в какой же все таки момент спилл не спасает от ООМ. Гипотетически, наверное, у нас есть объем доступной памяти для коров, и если объема памяти на одну корову недостаточно для того, чтобы обработать хотя бы 1 партицию, он не может в такой ситуации спиллить и просит умирает, типа меньше минимальной единицы параллелизма низя. Но гипотеза выглядит говном, т.к. тогда вообще не оч понятно, в каком случае тогда спилл появляется, да и с чего бы ему вдруг не уметь обрабатывать партицию в несколько этапов?
источник

А

Алексей in Moscow Spark
Есть ещё вариант оом по высокому % gc от общего времени
источник

А

Алексей in Moscow Spark
No Name
Ну, прикол в том, что есть некий набор достаточно стандартных действий, которые обычно помогают решить такого рода проблему, как ООМ. Поэтому на глубинном уровне, мне кажется, многие не заморачиваться по этому поводу. Я вот периодически по тому или иному поводу начинаю это делать, и, спасибо @tenKe , в этот раз ещё часть алгоритмов работы спарка в голове уложились во что-то более-менее стройное. Но при этом я тоже перестал понимать, в какой же все таки момент спилл не спасает от ООМ. Гипотетически, наверное, у нас есть объем доступной памяти для коров, и если объема памяти на одну корову недостаточно для того, чтобы обработать хотя бы 1 партицию, он не может в такой ситуации спиллить и просит умирает, типа меньше минимальной единицы параллелизма низя. Но гипотеза выглядит говном, т.к. тогда вообще не оч понятно, в каком случае тогда спилл появляется, да и с чего бы ему вдруг не уметь обрабатывать партицию в несколько этапов?
нагуглилось такое, но не уверен на 100%, что все так: https://www.programmersought.com/article/1697871819/
> when calculating the specific usage of memory in the heap, considering the performance and other reasons, Spark currently uses the sampling statistics method to calculate the memory used by the MemoryConsumer, so that the actual usage of the memory in the heap is not particularly accurate. As a result, OOM may be caused by the inability to spill in time
источник

А

Алексей in Moscow Spark
Алексей
нагуглилось такое, но не уверен на 100%, что все так: https://www.programmersought.com/article/1697871819/
> when calculating the specific usage of memory in the heap, considering the performance and other reasons, Spark currently uses the sampling statistics method to calculate the memory used by the MemoryConsumer, so that the actual usage of the memory in the heap is not particularly accurate. As a result, OOM may be caused by the inability to spill in time
возможно, это действительно так, особенно если есть перекосы. Оценка размера данных сэмплами может дать погрешность
При сортировке проверяется необходимость спила: https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala#L214
Размер данных оценивается с помощью меньшего от 2 сэмпловых замеров: https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala#L77-L101
На больших массивах оценивается размер от 200 случайных элементов https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala#L263
источник
2021 February 16

NN

No Name in Moscow Spark
Алексей
возможно, это действительно так, особенно если есть перекосы. Оценка размера данных сэмплами может дать погрешность
При сортировке проверяется необходимость спила: https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala#L214
Размер данных оценивается с помощью меньшего от 2 сэмпловых замеров: https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala#L77-L101
На больших массивах оценивается размер от 200 случайных элементов https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala#L263
Воу. Вот это ты раскопал. Завтра поизучаю)
источник

M

Mi in Moscow Spark
Алексей
нагуглилось такое, но не уверен на 100%, что все так: https://www.programmersought.com/article/1697871819/
> when calculating the specific usage of memory in the heap, considering the performance and other reasons, Spark currently uses the sampling statistics method to calculate the memory used by the MemoryConsumer, so that the actual usage of the memory in the heap is not particularly accurate. As a result, OOM may be caused by the inability to spill in time
Это кстати действительно может объяснить почему контейнеры убиваются ярном, а не падают сами
источник

AS

Andrey Smirnov in Moscow Spark
Алексей
возможно, это действительно так, особенно если есть перекосы. Оценка размера данных сэмплами может дать погрешность
При сортировке проверяется необходимость спила: https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala#L214
Размер данных оценивается с помощью меньшего от 2 сэмпловых замеров: https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala#L77-L101
На больших массивах оценивается размер от 200 случайных элементов https://github.com/apache/Spark/blob/master/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala#L263
я не понял как это работает, зачем проход два раза
// To exclude the shared objects that the array elements may link, sample twice
// and use the min one to calculate array size.
val s1 = sampleArray(array, state, rand, drawn, length)
val s2 = sampleArray(array, state, rand, drawn, length)
источник

А

Алексей in Moscow Spark
Andrey Smirnov
я не понял как это работает, зачем проход два раза
// To exclude the shared objects that the array elements may link, sample twice
// and use the min one to calculate array size.
val s1 = sampleArray(array, state, rand, drawn, length)
val s2 = sampleArray(array, state, rand, drawn, length)
тоже не до конца понимаю
источник