Size: a a a

2018 December 14

PK

Pavel Klemenkov in Moscow Spark
источник

t

tenKe in Moscow Spark
https://github.com/tenkeiu8/spark-streaming-jdbc < пример кастомного сурса в продолжение митапа
источник

AS

Andrey Sutugin in Moscow Spark
👍
источник

GP

Grigory Pomadchin in Moscow Spark
Делай Ридмиху и мы зафорсим
источник
2018 December 15

Ds

Dmitry says in Moscow Spark
Ребят, спасибо за митап! Все было на высоте.
источник

PK

Pavel Klemenkov in Moscow Spark
Да, всем спасибо, был приятно удивлён количеству народа перед НГ. Презентации и видео выложу в пнд-вт
источник

AD

Alex D in Moscow Spark
Спасибо, а может кто еще кинуть ссылочку на PR Structured streaming listener, который упоминался в вопросах в обсуждении данного кастомного сурса?
источник

AS

Andrey Sutugin in Moscow Spark
Alex D
Спасибо, а может кто еще кинуть ссылочку на PR Structured streaming listener, который упоминался в вопросах в обсуждении данного кастомного сурса?
pr не к листенеру, листенер обычный(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis), но если делать кастомный sink, как онисено например здесь(https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-sql-streaming-demo-custom-sink-webui.adoc), то метрики работать не будут.
Вот пример, того что надо сделать в кстомном sink, что бы все заработало
https://github.com/sutugin/shc/blob/master/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala#L29
источник

t

tenKe in Moscow Spark
что примечательно, df.rdd также делает queryExecution.toRdd
источник

t

tenKe in Moscow Spark
lazy val rdd: RDD[T] = {
   val objectType = exprEnc.deserializer.dataType
   rddQueryExecution.toRdd.mapPartitions { rows =>
     rows.map(_.get(0, objectType).asInstanceOf[T])
   }
 }
источник

t

tenKe in Moscow Spark
и на нем  вотермарки не работают
источник

t

tenKe in Moscow Spark
а вот если в синке юзать data.queryExecution.toRdd, то все ок
источник

t

tenKe in Moscow Spark
источник

t

tenKe in Moscow Spark
а, ну походу из-за этого:
@transient private lazy val rddQueryExecution: QueryExecution = {
   val deserialized = CatalystSerde.deserialize[T](planWithBarrier)
   sparkSession.sessionState.executePlan(deserialized)
 }
источник

AD

Alex D in Moscow Spark
спасибо за подробный ответ!
источник
2018 December 16

AD

Alex D in Moscow Spark
@tenKe @sutuginandrey
В Spark 2.4.0 проблемму кастомных синков для сохранения где либо решили штатно: https://stackoverflow.com/a/51319121
источник

AS

Andrey Sutugin in Moscow Spark
не очень понял как это связано с кастомными синками... там вроде обсуждается проблема записи во множество синков...
источник

t

tenKe in Moscow Spark
тут не совсем вопрос кастмомных синков решается, а скорее как на наделать кучу выходных стримов
источник

AD

Alex D in Moscow Spark
Andrey Sutugin
не очень понял как это связано с кастомными синками... там вроде обсуждается проблема записи во множество синков...
Имел ввиду, что теперь это можно использовать вместо
addBatch https://github.com/sutugin/shc/blob/master/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala#L29
Написав:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
 batchDF.write.format("org.apache.spark.sql.execution.datasources.hbase").save(...)
}
источник

PK

Pavel Klemenkov in Moscow Spark
Alex D
Имел ввиду, что теперь это можно использовать вместо
addBatch https://github.com/sutugin/shc/blob/master/core/src/main/scala/org/apache/spark/sql/execution/streaming/HBaseStreamSinkProvider.scala#L29
Написав:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
 batchDF.write.format("org.apache.spark.sql.execution.datasources.hbase").save(...)
}
Да, я как раз этот пример приводил в хайлайтах 2.4.0
источник