Size: a a a

2020 October 20

G

Gev in Moscow Spark
Хорошо. так а какое постоянное решение если нужна сквазная нумерация ID
источник

AS

Andrey Smirnov in Moscow Spark
Gev
Хорошо. так а какое постоянное решение если нужна сквазная нумерация ID
zipwithindex как уже подсказали, будет шафл, но всяко лучше чем window
источник

G

Gev in Moscow Spark
row_number плохо - потому что медленно и шафл. monotonically_incrace_id - вообще не подходит так как генериться будут ID ghb rf;ljv pfgecrt
источник

G

Gev in Moscow Spark
Andrey Smirnov
zipwithindex как уже подсказали, будет шафл, но всяко лучше чем window
Понял
источник

t

tenKe in Moscow Spark
Gev
Хорошо. так а какое постоянное решение если нужна сквазная нумерация ID
никак
источник

t

tenKe in Moscow Spark
не делать такого в спарке :)
источник

AS

Andrey Smirnov in Moscow Spark
tenKe
не делать такого в спарке :)
лучше быть богатым и здоровым
источник

G

Gev in Moscow Spark
Ну лучше ничего не делать. Так и не ошибиться.
источник

R

Rodion in Moscow Spark
можно попробовать хэш натравить на уникальный набор полей
источник

NG

Nikita Gunbin in Moscow Spark
Коллеги, привет!
Хотел посоветоваться на счет graceful shutdown. Есть стрим kafka->hdfs с несложной логикой внутри.
Если сделать yarn -kill, есть высокий шанс получить дубли в приемнике при рестарте. Нагуглил путь с подкладыванием файлика и остановкой стрима при его удалении. Какие есть еще варианты? Смотрел в сторону spark.streaming.stopGracefullyOnShutdown, но так понял, это не очень живая вещь. В общем, в поисках best practice =)
Spark 2.4.5
источник

t

tenKe in Moscow Spark
ты пишешь writeStream в хдфс или foreachBatch и внутри пишешь в хдфс?
источник

NG

Nikita Gunbin in Moscow Spark
tenKe
ты пишешь writeStream в хдфс или foreachBatch и внутри пишешь в хдфс?
forEachBatch
источник

t

tenKe in Moscow Spark
тогда в начале функции кидай исключение когда тебе надо остановить твой джоб и отавливай его в sq.awaitTermination
источник

NG

Nikita Gunbin in Moscow Spark
tenKe
тогда в начале функции кидай исключение когда тебе надо остановить твой джоб и отавливай его в sq.awaitTermination
Спасибо! буду пробовать
источник

AS

Andrey Smirnov in Moscow Spark
источник

AS

Andrey Smirnov in Moscow Spark
Переслано от Denis Tsvetkov
что-нибудь вроде
import sun.misc.{Signal, SignalHandler}

var isTimeToStop = false
// в самом начале регистрируем обработчик
registerSignalHandlers()

def registerSignalHandlers() = {
   val signalHandler = new SignalHandler {
     override def handle(signal: Signal): Unit = {
       log.info(s"Signal ${signal} received.")
       isTimeToStop = true
     }
   Signal.handle(new Signal("TERM"), signalHandler)
   }
ну и перед обработкой каждого из батчей проверять, не пришло ли время isTimeToStop == true ?
источник
2020 October 21

K

KrivdaTheTriewe in Moscow Spark
Andrey Smirnov
Переслано от Denis Tsvetkov
что-нибудь вроде
import sun.misc.{Signal, SignalHandler}

var isTimeToStop = false
// в самом начале регистрируем обработчик
registerSignalHandlers()

def registerSignalHandlers() = {
   val signalHandler = new SignalHandler {
     override def handle(signal: Signal): Unit = {
       log.info(s"Signal ${signal} received.")
       isTimeToStop = true
     }
   Signal.handle(new Signal("TERM"), signalHandler)
   }
ну и перед обработкой каждого из батчей проверять, не пришло ли время isTimeToStop == true ?
в ярне работать не будет
источник

K

KrivdaTheTriewe in Moscow Spark
источник

VM

Vladimir Morozov in Moscow Spark
Коллеги, а как в structured стриминге метрики принято собирать?
источник

K

KrivdaTheTriewe in Moscow Spark
статсд
источник