Size: a a a

2021 June 10

ИК

Иван Калининский... in Moscow Spark
Коллеги, помогайте! ))
Захотел сделать много мелких бродкастов, чтобы избавиться от SortMergeJoin. Ну, относительно мелких, мож мегабайт по двести, а может и по сотне килобайт. Всего таких бродкастов в запросе бывает около двух тысяч, в конкретном кейсе - пятьсот. Это изменения в данных, тут как повезёт.
Но не взлетело, зависает с постоянно повторяемой ошибкой: WARN TaskMemoryManager: Failed to allocate a page ({тут размер} bytes), try again
источник

ИК

Иван Калининский... in Moscow Spark
Экзекуторам давал по восемь ядер и по 12 Гб+12Overhead, на драйвере 16+12
источник

ИК

Иван Калининский... in Moscow Spark
Юзаю G1GC, с некоторыми настройками, могу скопипастить, если поможет
источник

ИК

Иван Калининский... in Moscow Spark
что еще? Код:

Join(
 Filter(partitionFilter, targetTable),
 ResolvedHint(
   Project(pkAttributes, Filter(diffDelFilter, sourceTable)),
   HintInfo(broadcast = true)),
 LeftAnti,
 Some(joinFilter)
)
источник

PK

Pavel Klemenkov in Moscow Spark
Это почему? Для scalar удфок не нужен никакой groupby. А датафрейм ты можешь внутри удфки из отдельных series собрать, если уж нужен датафрейм.
источник

ММ

Максим Мартынов... in Moscow Spark
В udf вызывается модель. Они бывают разных типов - бинарная классификация возвращает на 1 строку 1 float, мультиклассовая возвращает на 1 строку N float, по числу категорий. Соответственно, это как минимум несколько столбцов на каждую исходную строку.
источник

ММ

Максим Мартынов... in Moscow Spark
Плюс у нас используется обертка над моделью, которая вместо нескольких столбцов, для каждого класса, генерирует несколько строк с двумя столбцами - название класса и float. Так удобнее работать с изменяющимся числом классов, плюс их можно использовать для партиционирования
источник

ММ

Максим Мартынов... in Moscow Spark
В первом случае scalar еще как-то подходит, т.к. можно вызвать udf несколько раз, хоть это и очень неэффективно с точки зрения ресурсов, потому что будет запущена куча копий модели, на которых будут обрабатываться одни и те же данные. А второй случай ими вообще никак не решить
источник

PK

Pavel Klemenkov in Moscow Spark
Ну так ведь series может и сложный тип содержать, например list. А его уже можно распаковать спарком по индексам. Проблема может быть только в pyarrow, но листы он могет
источник

ММ

Максим Мартынов... in Moscow Spark
explode в плане производительности очень плох
источник

ММ

Максим Мартынов... in Moscow Spark
shuffle занимает гораздо меньше времени, чем эти манипуляции с перепаковкой данных
источник

PK

Pavel Klemenkov in Moscow Spark
Это понятно, я предлагал df.select(df.predictions[1], df.predictions[2]) и т.д. Но если число классов переменное, то не прокатит канеш
источник

ММ

Максим Мартынов... in Moscow Spark
да, переменное
источник

ММ

Максим Мартынов... in Moscow Spark
ну, раз такого способа нет, тогда ладно, будет двойной shuffle
источник
2021 June 11

МК

Михаил Королев... in Moscow Spark
(пардон, если занудство) возвращаясь к теме (сходил на кластер):
- первое приложение делает dfBig.write.format("orc").mode("append").save(TARGET_DIR)
- второе делает readCnt = sparkB.read.format("orc").load(TARGET_DIR).count()
- оба лупят это в бесконечном цикле
Проблем нет, оба чудесно работают и показывают ожидаемые результаты, что не так воспроизвел? (попробую то же с метастором пока кластер держу...)
источник

ПБ

Повелитель Бури... in Moscow Spark
1 процесс saveAsTable(t_ref)
2 процесс  читает fg_ref = spark.table(t_ref)
источник

МК

Михаил Королев... in Moscow Spark
это я щас проверю, но - как минимум - если напрямую в файлы, то работает
источник

ПБ

Повелитель Бури... in Moscow Spark
формат snappy parquet
источник

МК

Михаил Королев... in Moscow Spark
с метастором все так же, от формата - поверьте - не зависит (но попробую, ибо не сложно)
источник

A

Alex in Moscow Spark
у вас мод append, то есть докидывает файлы, но старые не удаляет
источник