Size: a a a

2020 October 30

DZ

Dmitry Zuev in Moscow Spark
зачем вы его перститите?
источник

DZ

Dmitry Zuev in Moscow Spark
у вас больше одной зависимости на него?
источник

JF

Jane Frankenstein in Moscow Spark
да
источник

JF

Jane Frankenstein in Moscow Spark
нам нужно к нему разные фильтры применять
источник

JF

Jane Frankenstein in Moscow Spark
бывает по 14TB в памяти
источник

JF

Jane Frankenstein in Moscow Spark
огромные приходят, а мы из них рипортики клепаем
источник

DZ

Dmitry Zuev in Moscow Spark
initialValue вместо finalValue?
источник

JF

Jane Frankenstein in Moscow Spark
пробовала :) нашла и тоже заменила, как неблокирующую: оно всегда показывает 0
источник

JF

Jane Frankenstein in Moscow Spark
что неверно
источник

DZ

Dmitry Zuev in Moscow Spark
я б попрбовал потунить шафлинг, уменьшить память до 31гб и сделать больше экзекуторов, увеличить параллелизм
источник

JF

Jane Frankenstein in Moscow Spark
Dmitry Zuev
я б попрбовал потунить шафлинг, уменьшить память до 31гб и сделать больше экзекуторов, увеличить параллелизм
Дмитрий, спасибо
источник

JF

Jane Frankenstein in Moscow Spark
Dmitry Zuev
я б попрбовал потунить шафлинг, уменьшить память до 31гб и сделать больше экзекуторов, увеличить параллелизм
что именно подтьюнить в шафлинге? на что-то конкретное  обратить внимание?
источник

N

Nickolay in Moscow Spark
На партиционирование например
источник

N

Nickolay in Moscow Spark
Надо разобраться, что этот шафлинг вызывает. Нет ли перекоса, равномерно ли распределено. В таком ключе сначала
источник

M

Mi in Moscow Spark
Jane Frankenstein
Привет всем, пришла за советом/помощью: дано Hadoop Yarn кластер с 50 нодами r5.16xlarge (64 Gb vCPU, ESB storage 500 GB), настройки для экзекьютеров memory 55Gb, по дефолту executor.memort.overhead 10% то есть 5.5 и того 60,5 GB. Получается по 8 экзекьюетеров на одну ноду с памятью 484GB.

Падаем с OutOfMemory Java Heap  на операции шафлинга в строяке val totalSize = df.rdd.countApprox(timeout = 200L, confidence = 0.70).getFinalValue().mean, которая нужна чтобы посчитать количество партиций, на которые нужно разбить датафрейм на определенное количество рядков. Операция получается быстрее по сравнению с count. Но к нам может прийти огромные датафреймы, тогда происходит падение и потеря экзекьютеров, а значит дополнительное время к работе джобы.
Когда читаем датафрейм, то персистем его: unified.persist(StorageLevels.MEMORY_AND_DISK). Замена на DISK_ONLY и MEMORY_AND_DISK_SER приводит к замедлению джобы. Мы можем себе позволить, чтобы джоба ехала только 1 час, с такими параметрами она едет около 3 часов и убивается по таймауту. Использую: "spark.io.compression.codec", "zstd", также пробовала изменить дефолтовые на "spark.memory.fraction", "0.4" и "spark.memory.storageFraction", "0.4", чтобы уменьшить cache в памяти и спилить все на диск - на тесте все проходит, ООМ нет, на проде получилось, что отвалилась нода, потому что я забила диск на дозволенный предел + долго едет джоба - что является недопустимым в процессе. На кластере одновременно может ранится таких 4 джобы. Они равномерно +/- разделяют ресурсы кластера. Сейчас все это бежит с ошибками около +/- час.  Пока всех устраивает, но я хочу пофиксить с надеждой, что такая джоба будет ранится вместо часа хотя бы 40-45 минут.

Вопросы: 1) стоит ли максимально задать "spark.executor.memory": "58G", тогда executor.memory.overhead  будет 5,8 GB и вместе грубо будет 64 GB?
2) взять мощнее машины с большей памятью например r5.24xlarge?
3) val totalSize = df.rdd.countApprox(timeout = 200L, confidence = 0.70).getFinalValue().mean - как-то можно заменить/оптимизировать?
4) в такой ситуации что лучше использовать StorageLevels.MEMORY_AND_DISK или StorageLevels.DISK_ONLY?
5) что еще можно предпринять/попробовать в изменении настроек спарка, чтобы убрать ООМ и при этом не замедлять джобу/ы?

Спасибо заранее за помощь и предложения.
а исходные данные вы как-то контролите? может их раскладывать сразу по бакетам нужным чтобы шаффлинг меньше стал
источник

JF

Jane Frankenstein in Moscow Spark
Nickolay
Надо разобраться, что этот шафлинг вызывает. Нет ли перекоса, равномерно ли распределено. В таком ключе сначала
skew нет
источник

JF

Jane Frankenstein in Moscow Spark
Надо разобраться, что этот шафлинг вызывает. => countApprox
источник

AS

Andrey Smirnov in Moscow Spark
смотрите как у вас получается этот датафрейм, как он строится, может там что можно оптимизировать, план выполнения.
вот эти кручения ручек в параметрах надо делать только в том случае, когда вы четко понимаете зачем и как это поможет
источник

JF

Jane Frankenstein in Moscow Spark
Mi
а исходные данные вы как-то контролите? может их раскладывать сразу по бакетам нужным чтобы шаффлинг меньше стал
грузим паркет, делаем манипуляции, потом считаем кол-во rdd
источник

AS

Andrey Smirnov in Moscow Spark
вот смотрите, грузим паркет, сколько этого паркета, сколько партиций, есть ли там бакеты, можно их как-то использовать
а в делаем манипуляции вообще кладезь всего что можно сделать
источник