Size: a a a

2020 August 03

N

Nikolay in Moscow Spark
Andrey Smirnov
на этом экзекетуре и пересчитываем нужную часть
Можно наверное и так. Но вот мне кажется, что это будет все ломать концепции. Смотрим на таску, а она не читает шафл данные , а их пересчитывает, если данные недоступны. И другие таски из stage1 после этого должны будут идти в совершенно другие места для чтения.
источник

AS

Andrey Smirnov in Moscow Spark
это же ситуация аналогичная что в момент работы job, отвалился какой-то экзекутор, это же не приводит к полному перезапуску
источник

N

Nikolay in Moscow Spark
Andrey Smirnov
это же ситуация аналогичная что в момент работы job, отвалился какой-то экзекутор, это же не приводит к полному перезапуску
Но сама нода цела. Данные не теряются. Они лежат в файликах. Их можно читать
источник

DM

Dmitry Mittov in Moscow Spark
Зафейлились часть тасок - нужно пересчитать только часть партиций и пересчитываться будет не полный stage, а только его часть (если граф вычислений такой хороший, у меня был такой видимо, потому что такой эффект наблюдался). Проверить можно на практике. Запускаем длительный процесс, потом вручную через Stop Requests в EC2 киляем одну машину.
источник

AS

Andrey Smirnov in Moscow Spark
да спарк переживает и выход ноды во время работы
источник

R

Renarde in Moscow Spark
вкратце это работает следующим образом.

допустим у нас запущен кластер на 4 воркера. Предположим что в джобе имеется только narrow transformation (.withColumn или .cast), и данные читаются а затем пишутся в S3 через DBFS обертку.
если умрет один из воркеров по любой из причин, мы просто запрашиваем новую виртуалку у AWS и перевыполняем на нем нехватающие партиции. Промежуточные данные на диске данного воркера нам не нужны, поскольку их можно перечитать из S3 по плану запроса на драйвере.  

теперь другой кейс. допустим у нас идет цепочка read-groupBy-join-groupBy-write. В таком случае у нас появляются wide transformation (те самые шаффлы). Если один из воркеров (или экзекьюторов) отвалился, мы запрашиваем новую виртуалку у AWS и перевычисляем с того места, где есть последние шаффл данные (допустим на соседнем экзекьюторе, если нет - то пересчитываем от источника).

Это функциональность самого спарка, по сути аналогичная resource preemption в YARN, но с учетом реалий AWS.
источник

DM

Dmitry Mittov in Moscow Spark
Все так, добавлю, что по факту не обязательно ждать новой виртуалки - таски будут распределены на имеющиеся, а новая как поднимется - может что-то и ей перепадет, если останется
источник

R

Renarde in Moscow Spark
Dmitry Mittov
Все так, добавлю, что по факту не обязательно ждать новой виртуалки - таски будут распределены на имеющиеся, а новая как поднимется - может что-то и ей перепадет, если останется
да, тут еще можно поиграться с ультра-дешевыми спотами (типа 5% от on-demand) и spark.speculation, но это для особых эстетов 🙂
источник

DM

Dmitry Mittov in Moscow Spark
Andrey Smirnov
да спарк переживает и выход ноды во время работы
вопрос в том как он этот выход переживает, если драйвер остался на рабочей машине, то относительно “дешево”, если умер драйвер, то потери времени больше
источник

R

Renarde in Moscow Spark
Dmitry Mittov
вопрос в том как он этот выход переживает, если драйвер остался на рабочей машине, то относительно “дешево”, если умер драйвер, то потери времени больше
поэтому мы рекомендуем делать driver on_demand, благо обычно это не очень большая машина.
источник

R

Renarde in Moscow Spark
В практике мы делали в Zalando так - когда наступала Black Friday мы передеплоили джобы на 100% on-demand, потому что спотов не сыскать в этот период времени, потом обратно откатывались на SPOT_WITH_FALLBACK, получалось довольно стабильно.
источник

DM

Dmitry Mittov in Moscow Spark
Автоматически? Чей-то тул или самописный мониторинг + boto3?
источник

R

Renarde in Moscow Spark
Dmitry Mittov
Автоматически? Чей-то тул или самописный мониторинг + boto3?
Databricks API + заскедуленный деплой.
источник

AS

Andrey Smirnov in Moscow Spark
Dmitry Mittov
вопрос в том как он этот выход переживает, если драйвер остался на рабочей машине, то относительно “дешево”, если умер драйвер, то потери времени больше
Мы сейчас про воркеров говорим, драйвер отдельной машиной при заказе emr кластера, и насколько я помню можно и без тегов это делать.
источник

DM

Dmitry Mittov in Moscow Spark
Andrey Smirnov
Мы сейчас про воркеров говорим, драйвер отдельной машиной при заказе emr кластера, и насколько я помню можно и без тегов это делать.
теги внутри используются, по ним после spark-submit драйвер обязательно попадет на CORE машину, а не на обычный Worker. Но можно и таски вешать на CORE машины: spark.yarn.executor.nodeLabelExpression=CORE
Для streaming процесса полезно использовать
источник

N

Nikolay in Moscow Spark
Renarde
вкратце это работает следующим образом.

допустим у нас запущен кластер на 4 воркера. Предположим что в джобе имеется только narrow transformation (.withColumn или .cast), и данные читаются а затем пишутся в S3 через DBFS обертку.
если умрет один из воркеров по любой из причин, мы просто запрашиваем новую виртуалку у AWS и перевыполняем на нем нехватающие партиции. Промежуточные данные на диске данного воркера нам не нужны, поскольку их можно перечитать из S3 по плану запроса на драйвере.  

теперь другой кейс. допустим у нас идет цепочка read-groupBy-join-groupBy-write. В таком случае у нас появляются wide transformation (те самые шаффлы). Если один из воркеров (или экзекьюторов) отвалился, мы запрашиваем новую виртуалку у AWS и перевычисляем с того места, где есть последние шаффл данные (допустим на соседнем экзекьюторе, если нет - то пересчитываем от источника).

Это функциональность самого спарка, по сути аналогичная resource preemption в YARN, но с учетом реалий AWS.
Если у нас 4 воркера и 4 инстанса aws и скажем было 5 стрейджев,а потом один инстанс упал, то в общем случае нам придется пересчитать 1/4 всех тасков для всех стэйджев, которые уже завершились т.к на инстансе, который пропал были файлы. Код, который это делает есть в 3й версии спарка на гитхабе?
источник

М

Мохаммад Реза... in Moscow Spark
Переслано от Мохаммад Реза...
How can I Unwrap the Key of this Dataset to 3 strings along other columns, So I can cast it to Dataset[MySchema] ?
I groupedByKey these three columns along with a agg() and schema lost during these operation.
источник

SM

Sergey M in Moscow Spark
Hello, everybody
Не подскажете что надо импортировать чтоб из scala-spark подсоединится к hbase
И как из scala-spark создать таблицу в hbase?
То что я пробовал заканчивается либо - нет такого в maven, либо проблемы с jackson
scala 2.11.12
spark 2.4.4
hbase 1.4.8
источник

GT

Gennady Timofeev in Moscow Spark
Java api есть у хбейса, можете его попробовать
источник

SM

Sergey M in Moscow Spark
Gennady Timofeev
Java api есть у хбейса, можете его попробовать
Интересный ответ
Ладно теперь у меня другая проблема
теперь выдаёт
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/SparkContext


idea ничего не показывает
compile отлично отрабатывает
источник