Size: a a a

2020 October 30

JF

Jane Frankenstein in Moscow Spark
я убирада ООМ когда изменяла сеттинги фракшина и фракшин мемори
источник

ИГ

Игорь Гомановский... in Moscow Spark
На количество Stage в Job
источник

JF

Jane Frankenstein in Moscow Spark
у меня не было ООМ, о которой пишу
источник

JF

Jane Frankenstein in Moscow Spark
у каждой джобы по 2 стейджа
источник

JF

Jane Frankenstein in Moscow Spark
union есть
источник

AS

Andrey Smirnov in Moscow Spark
Jane Frankenstein
просто не хочу выкладывать в общий доступ
так замажьте пути к файлам
источник

JF

Jane Frankenstein in Moscow Spark
источник

JF

Jane Frankenstein in Moscow Spark
вот такой даг
источник

GT

Gennady Timofeev in Moscow Spark
Jane Frankenstein
Привет всем, пришла за советом/помощью: дано Hadoop Yarn кластер с 50 нодами r5.16xlarge (64 Gb vCPU, ESB storage 500 GB), настройки для экзекьютеров memory 55Gb, по дефолту executor.memort.overhead 10% то есть 5.5 и того 60,5 GB. Получается по 8 экзекьюетеров на одну ноду с памятью 484GB.

Падаем с OutOfMemory Java Heap  на операции шафлинга в строяке val totalSize = df.rdd.countApprox(timeout = 200L, confidence = 0.70).getFinalValue().mean, которая нужна чтобы посчитать количество партиций, на которые нужно разбить датафрейм на определенное количество рядков. Операция получается быстрее по сравнению с count. Но к нам может прийти огромные датафреймы, тогда происходит падение и потеря экзекьютеров, а значит дополнительное время к работе джобы.
Когда читаем датафрейм, то персистем его: unified.persist(StorageLevels.MEMORY_AND_DISK). Замена на DISK_ONLY и MEMORY_AND_DISK_SER приводит к замедлению джобы. Мы можем себе позволить, чтобы джоба ехала только 1 час, с такими параметрами она едет около 3 часов и убивается по таймауту. Использую: "spark.io.compression.codec", "zstd", также пробовала изменить дефолтовые на "spark.memory.fraction", "0.4" и "spark.memory.storageFraction", "0.4", чтобы уменьшить cache в памяти и спилить все на диск - на тесте все проходит, ООМ нет, на проде получилось, что отвалилась нода, потому что я забила диск на дозволенный предел + долго едет джоба - что является недопустимым в процессе. На кластере одновременно может ранится таких 4 джобы. Они равномерно +/- разделяют ресурсы кластера. Сейчас все это бежит с ошибками около +/- час.  Пока всех устраивает, но я хочу пофиксить с надеждой, что такая джоба будет ранится вместо часа хотя бы 40-45 минут.

Вопросы: 1) стоит ли максимально задать "spark.executor.memory": "58G", тогда executor.memory.overhead  будет 5,8 GB и вместе грубо будет 64 GB?
2) взять мощнее машины с большей памятью например r5.24xlarge?
3) val totalSize = df.rdd.countApprox(timeout = 200L, confidence = 0.70).getFinalValue().mean - как-то можно заменить/оптимизировать?
4) в такой ситуации что лучше использовать StorageLevels.MEMORY_AND_DISK или StorageLevels.DISK_ONLY?
5) что еще можно предпринять/попробовать в изменении настроек спарка, чтобы убрать ООМ и при этом не замедлять джобу/ы?

Спасибо заранее за помощь и предложения.
А чего вы добиваетесь с вычислением количества партиций? Для чего разбиваете именно на это количество
источник

JF

Jane Frankenstein in Moscow Spark
Gennady Timofeev
А чего вы добиваетесь с вычислением количества партиций? Для чего разбиваете именно на это количество
чтобы писать csv-и по 10000
источник

JF

Jane Frankenstein in Moscow Spark
некоторые клиенты так просят
источник

JF

Jane Frankenstein in Moscow Spark
я хотела убрать эту операцию, но увы нет
источник

JF

Jane Frankenstein in Moscow Spark
+ равномерно делим
источник

А

Алексей in Moscow Spark
можно поставить настройку maxRecordsPerFile, чтобы писать нужное число строк в файл
источник

GT

Gennady Timofeev in Moscow Spark
Jane Frankenstein
чтобы писать csv-и по 10000
Может пронумеровать зипвивиндекс (допустим, поле index) и разделить на партиции по полю part = (floor (index/10000))🤔
источник

JF

Jane Frankenstein in Moscow Spark
неуверенна, что это будет быстрее
источник

GT

Gennady Timofeev in Moscow Spark
Быстрее, может быть
источник

GT

Gennady Timofeev in Moscow Spark
Jane Frankenstein
неуверенна, что это будет быстрее
Я думаю, избавиться от одного действия и возращения данных на драйвер может помочь, но да, как обычно, зависит от местных реалий данных
источник

JF

Jane Frankenstein in Moscow Spark
спасибо всем :) буду пробовать
источник

AS

Andrey Smirnov in Moscow Spark
тут посередине вызов кеширования?
источник