Привет всем, пришла за советом/помощью: дано Hadoop Yarn кластер с 50 нодами r5.16xlarge (64 Gb vCPU, ESB storage 500 GB), настройки для экзекьютеров memory 55Gb, по дефолту executor.memort.overhead 10% то есть 5.5 и того 60,5 GB. Получается по 8 экзекьюетеров на одну ноду с памятью 484GB.
Падаем с OutOfMemory Java Heap на операции шафлинга в строяке val totalSize = df.rdd.countApprox(timeout = 200L, confidence = 0.70).getFinalValue().mean, которая нужна чтобы посчитать количество партиций, на которые нужно разбить датафрейм на определенное количество рядков. Операция получается быстрее по сравнению с count. Но к нам может прийти огромные датафреймы, тогда происходит падение и потеря экзекьютеров, а значит дополнительное время к работе джобы.
Когда читаем датафрейм, то персистем его: unified.persist(StorageLevels.MEMORY_AND_DISK). Замена на DISK_ONLY и MEMORY_AND_DISK_SER приводит к замедлению джобы. Мы можем себе позволить, чтобы джоба ехала только 1 час, с такими параметрами она едет около 3 часов и убивается по таймауту. Использую: "spark.io.compression.codec", "zstd", также пробовала изменить дефолтовые на "spark.memory.fraction", "0.4" и "spark.memory.storageFraction", "0.4", чтобы уменьшить cache в памяти и спилить все на диск - на тесте все проходит, ООМ нет, на проде получилось, что отвалилась нода, потому что я забила диск на дозволенный предел + долго едет джоба - что является недопустимым в процессе. На кластере одновременно может ранится таких 4 джобы. Они равномерно +/- разделяют ресурсы кластера. Сейчас все это бежит с ошибками около +/- час. Пока всех устраивает, но я хочу пофиксить с надеждой, что такая джоба будет ранится вместо часа хотя бы 40-45 минут.
Вопросы: 1) стоит ли максимально задать "spark.executor.memory": "58G", тогда executor.memory.overhead будет 5,8 GB и вместе грубо будет 64 GB?
2) взять мощнее машины с большей памятью например r5.24xlarge?
3) val totalSize = df.rdd.countApprox(timeout = 200L, confidence = 0.70).getFinalValue().mean - как-то можно заменить/оптимизировать?
4) в такой ситуации что лучше использовать StorageLevels.MEMORY_AND_DISK или StorageLevels.DISK_ONLY?
5) что еще можно предпринять/попробовать в изменении настроек спарка, чтобы убрать ООМ и при этом не замедлять джобу/ы?
Спасибо заранее за помощь и предложения.
а исходные данные вы как-то контролите? может их раскладывать сразу по бакетам нужным чтобы шаффлинг меньше стал