Size: a a a

2021 June 10

АЖ

Андрей Жуков... in Moscow Spark
Проще уж стримом читать тогда
источник

МК

Михаил Королев... in Moscow Spark
дико извиняюсь - сегодня не смогу поразбираться (до кластера не достучусь)... но разберусь (или как минимум  - постараюсь), ибо интересно/непонятно: идея метастора в том, чтобы от этого избавить (стандартная рекомендация "делайте через метастор"). Если он этого не делает, то... смысл его в чем?
источник

АЖ

Андрей Жуков... in Moscow Spark
Метастор у спарка не перечитывается непрерыано
источник

A

Alex in Moscow Spark
так метастор тут не поможет, так как он не предполагает что в одну таблицу и пишут и читают одновременно
источник

A

Alex in Moscow Spark
он же только трекает где что расположено
источник

ИК

Иван Калининский... in Moscow Spark
Метастор в спарке становится Catalog’ом, и в целом, также нуждается в явном запросе на обновление, он не реактивный
источник

A

Alex in Moscow Spark
насколько помню в метасторе даже список файлов не хранится
только фолдер

после того как забрали мету спарк отдельно ходит джобой и вытягивает список файлов с которым будет работать
источник

МК

Михаил Королев... in Moscow Spark
возможно, в этом и надо разобраться...
источник

ИК

Иван Калининский... in Moscow Spark
вроде в Managed c какой-то версии хранится, но я не уверен. В любом случае, для файлового стораджа будет скан hdfs, без него не добыть FileStatus’ы, а без них не прочитать файлы
источник

A

Alex in Moscow Spark
так если со спарка работают то это вроде как не совсем managed таблицы =)
почти гарантированно будет external table
источник

ИК

Иван Калининский... in Moscow Spark
У нас, например, только External позволяются по архитектуре, так что я про управляемые таблицы и не вспоминаю))
источник

A

Alex in Moscow Spark
тоже самое ….
источник

D

Dmitry in Moscow Spark
имхо 2 варианта  
1) писать/читать опенсорсный delta формат от датабрикс, он разрулит транзакции
2) писать/читать спарком через jdbc (в hive), тогда это забота хайфа будет разруливать параллельные процессы
источник

A

Alex in Moscow Spark
можно 1 расширить до
3) писать/читать в hudi/iceberg форматах, они тоже разрулят транзакции
источник

ММ

Максим Мартынов... in Moscow Spark
Добрый день. Столкнулся с такой проблемой, никак не могу придумать решение.

Spark 2.3.0, нужно прогнать pandas_udf на dataframe. В этой версии еще нет метода df.applyInPandas(func), есть только df.groupBy().apply(func), соответственно приходится делать группировку по какому-то столбцу/столбцам.

Проблема в том, что dataframe передается извне уже партиционированнный (например, df.repartition(400, 'user_id'), и нам неизвестно, по каким колонкам. А groupBy не выполняет лишнего shuffle только в том случае, если условие группировки совпадает с условием для партиционирования. В противном случае он повторно выполнит репартиционирование по новому набору колонок. Причем не на то же самое количество партиций, что и в исходном dataframe, а на то, что указано в параметрах Spark сессии (то ли spark.sql.shuffle.partitions, то ли spark.default.parallelism). И изменить это значение на какое-нибудь df.rdd.getNumPartitions() тоже не получится, т.к. groupBy не принимает на вход число партиций.

Способов вытащить из dataframe колонки, по которым были партиционированы данные, я не нашел ни в Python API для dataframe, ни даже если дергать напрямую методы в Java dataframe.
Я пробовал делать groupBy по spark_partition_id, но Spark все равно выполняет повторное репартиционирование. Причем т.к. значение столбца зависит от текущего партиционирования, с этим способом вообще выходит какая-то дичь, executor вылетает с нехваткой памяти, хотя размер партиций такой, что памяти там двукратный запас.
groupBy без параметров вообще создает SinglePartition, что совершенно не подходит для задачи.

Собственно, я в тупике. Можете что-нибудь посоветовать, или способ обречен на провал?
источник

AS

Andrey Smirnov in Moscow Spark
а зачем именно pandas_udf, может простого udf хватит?
источник

PK

Pavel Klemenkov in Moscow Spark
Код на пандасе написан небось
источник

PK

Pavel Klemenkov in Moscow Spark
У меня опять вопрос про использование этого костыля с groupby-apply, потому что в удфке хочется пандас датафрейм получить. Почему не написать scalar pandas_udf и не передавать туда все столбцы?
источник

ММ

Максим Мартынов... in Moscow Spark
Внутри udf прогоняется модель, ожидающая на вход pandas dataframe
источник

ММ

Максим Мартынов... in Moscow Spark
Тут что scalar, что grouped_map, в любом случае вызвать pandas_udf можно исключительно через groupBy
источник