Size: a a a

2020 February 14

SS

Semyon Sinchenko in Moscow Spark
Mikhail
Всем привет! Возник вопрос по оптимизации. Как можно повлиять на план запроса, который строит Spark, а именно указать правильный порядок джойна? Знаю, что есть хинты для broadcast join, но у меня ситуация другая. Джойнятся две большие таблицы факта (условно F1 и F2) + каждая из них джойнится на календарь для фильтрации (D). Spark строит план ((F1 broadcast join D) sortmerge join F2) broadcast join D). Вместо того, что вторую таблицу фактов так же предварительно фильтрануть по календарю. Сталкивался кто с таким?
Вроде можно руками все разнести на разные джойны в нужном порядке и чекпойнтов понаставить.
источник

M

Mikhail in Moscow Spark
Теоретически да, но тут запрос пушится в виде реального sql текста и что-то с ним сделать нельзя. Я думал в сторону каких-нибудь магических настроек оптимизатора или сбора статистики, но пока ничего не помогло.
источник

А

Алексей in Moscow Spark
разбить на 2 запроса
источник

D

Dima in Moscow Spark
Select * from
 (select * from F1 join D on ...)v1
   Join
 (select * from F2 join D on ...)v2
   On v1... = v2...
Попробуй так, глянь план
источник
2020 February 15

VK

Vasily Kolpakov in Moscow Spark
Mikhail
Всем привет! Возник вопрос по оптимизации. Как можно повлиять на план запроса, который строит Spark, а именно указать правильный порядок джойна? Знаю, что есть хинты для broadcast join, но у меня ситуация другая. Джойнятся две большие таблицы факта (условно F1 и F2) + каждая из них джойнится на календарь для фильтрации (D). Spark строит план ((F1 broadcast join D) sortmerge join F2) broadcast join D). Вместо того, что вторую таблицу фактов так же предварительно фильтрануть по календарю. Сталкивался кто с таким?
Можно попробовать посмотреть на кастомные оптимизации https://www.waitingforcode.com/apache-spark-sql/introduction-custom-optimization-apache-spark-sql/read
источник

M

Mikhail in Moscow Spark
Спасибо, посмотрю! В целом если вручную нужным образом переписать запрос или написать последовательность соединений через API, выключив опцию joinReorder, то план строится в лоб и этим можно управлять. Но у меня потребность была в автоматической оптимизации SQL-запроса со стороны Spark на основе собранной статистики (вкл. joinReorder). И этого добиться пока не получается, хотя фича такая есть - spark.sql.cbo.enabled.
источник

N

Nikolay in Moscow Spark
Cbo не для всех случаев работает.
источник
2020 February 17

M

Mikhail in Moscow Spark
Nikolay
Cbo не для всех случаев работает.
Why not?
источник

a

agathis in Moscow Spark
Господа, всем привет. Мне тут достался легаси spark-streaming 1.6 джоб, в нем несколько оригинальных решений.
Оригинальное решение 1: джоб процессит довольно длинный список топиков кафки, на каждый топик эксплицитно создается  new Thread на драйвере (в который передается ssc).
Оригинальное решение номер 2: там написана хитрая обвязка, которая хранит оффсеты в Hbase! Целевая кафка была 0.9
В таком вообще был какой-то смысл даже во времена 1.6?
источник

PK

Pavel Klemenkov in Moscow Spark
agathis
Господа, всем привет. Мне тут достался легаси spark-streaming 1.6 джоб, в нем несколько оригинальных решений.
Оригинальное решение 1: джоб процессит довольно длинный список топиков кафки, на каждый топик эксплицитно создается  new Thread на драйвере (в который передается ssc).
Оригинальное решение номер 2: там написана хитрая обвязка, которая хранит оффсеты в Hbase! Целевая кафка была 0.9
В таком вообще был какой-то смысл даже во времена 1.6?
А джоба в режиме fair scheduler запускается? Иначе от тред пула, как бэ нет толку.
источник

GP

Grigory Pomadchin in Moscow Spark
agathis
Господа, всем привет. Мне тут достался легаси spark-streaming 1.6 джоб, в нем несколько оригинальных решений.
Оригинальное решение 1: джоб процессит довольно длинный список топиков кафки, на каждый топик эксплицитно создается  new Thread на драйвере (в который передается ssc).
Оригинальное решение номер 2: там написана хитрая обвязка, которая хранит оффсеты в Hbase! Целевая кафка была 0.9
В таком вообще был какой-то смысл даже во времена 1.6?
хранить оффсеты имело смысл да и имеет; когда руками коммит хочется сделать и рекавери сделать с руками записаных оффсетов
источник

a

agathis in Moscow Spark
Grigory Pomadchin
хранить оффсеты имело смысл да и имеет; когда руками коммит хочется сделать и рекавери сделать с руками записаных оффсетов
а вот это разве не коммит "руками"?
(пример из спарковой доки)
stream.foreachRDD { rdd =>
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 // some time later, after outputs have completed
 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

в чем вообще смысл хранить оффсет "снаружи" (при том, что хранится не история, а один последний оффсет)
источник

a

agathis in Moscow Spark
Pavel Klemenkov
А джоба в режиме fair scheduler запускается? Иначе от тред пула, как бэ нет толку.
угу, в fair
источник

PK

Pavel Klemenkov in Moscow Spark
agathis
угу, в fair
Тогда все ровно
источник

GP

Grigory Pomadchin in Moscow Spark
agathis
а вот это разве не коммит "руками"?
(пример из спарковой доки)
stream.foreachRDD { rdd =>
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 // some time later, after outputs have completed
 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

в чем вообще смысл хранить оффсет "снаружи" (при том, что хранится не история, а один последний оффсет)
Рекавери делать с него; обычно хранят офсет последний или в зк отдельно или в сторонней базе
источник

GP

Grigory Pomadchin in Moscow Spark
Чекпойнты никто не делает для стримов
источник

PF

Peter Fedosov in Moscow Spark
Чекпоинты по умолчанию нужны для stateful streaming, просто проблемка там в том, что при изменении кодовой базы они невалидны становятся
источник

GP

Grigory Pomadchin in Moscow Spark
Peter Fedosov
Чекпоинты по умолчанию нужны для stateful streaming, просто проблемка там в том, что при изменении кодовой базы они невалидны становятся
именно
источник
2020 February 18

Н

Никита in Moscow Spark
Никто не знает почему pyspark --master local так медленно работает на df.limit(100).
window_spec = Window.partitionBy("domain").orderBy("to")
df = df.withColumnRenamed("@timestamp", "to")
df = df.withColumn("from", F.lag(df["to"], 1).over(window_spec))
df = df.withColumn("minutesdiff", (df["to"].cast("long") - df["from"].cast("long")) / 60)
Вроде lag не самая сложная операция

Попробовал, ничего не изменилось
spark.conf.set("spark.sql.shuffle.partitions", 10)
источник

PK

Pavel Klemenkov in Moscow Spark
Надо в ui смотреть, так непонятно
источник