Size: a a a

2021 January 21

ПФ

Паша Финкельштейн... in Moscow Spark
Это тривиальный код на спарке абсолютно
источник

ПФ

Паша Финкельштейн... in Moscow Spark
case class Movie(movieId: Long, title: String, genres: String)

case class MovieWithGenresAndYear(movieId: Long, title: String, genres: List[String], year: Integer)
case class MovieExploded(movieId: Long, title: String, genres: List[String])

case class MovieAggregate(year: Int, count: Long)

import spark.implicits._

val df = spark
       .read
       .option("header", true)
       .option("inferSchema", true)
       .option("mode", "DROPMALFORMED")
       .csv("/home/finkel/Downloads/ml-latest/movies.csv")
       .as[Movie]
       .map(it => MovieExploded(it.movieId, it.title, it.genres.split('|').map(_.trim).toList))
       .map {
           case MovieExploded(movieId, title, genres) =>
               if (!title.matches("\"?.*\\(\\d{4}\\)\\s*\"?")) MovieWithGenresAndYear(movieId, title, genres, null)
               else {
                   val lastOpen = title.lastIndexOf('(')
                   val year = title.substring(lastOpen + 1).replace(")", "").replace("\"", "").trim.toInt
                   MovieWithGenresAndYear(movieId, title.substring(0, lastOpen), genres, year)
               }
       }
       .filter(_.year != null)
       .groupByKey(_.year)
       .mapGroups((k, v) =>
           (k, v.size)
       )
       .show(300, false)
источник

R

Rustam Aikaev in Moscow Spark
Ошибку про csv не выдает, но и не читает? Может просто ресурсов нет на джобу?
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Хммм, там кажись спарк на чём-то дедлокнулся
источник

R

Rustam Aikaev in Moscow Spark
Или сервис какой нибудь упал
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Ну это локальная фигня, у меня цеппелин со спарк интерпретером
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Ща ради интереса перезапущуцеппелин целиком
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Почему думаю что это спарк ливлокнулся:
   java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00000000ea908740> (a scala.concurrent.impl.Promise$CompletionLatch)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
       at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
       at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
       at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
       at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:243)
       at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:729)
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Это вот верхушка стека одного из потоков
источник

ПФ

Паша Финкельштейн... in Moscow Spark
И даже рестарт цеппелина не помогает. Кто готов у себя запустить?
источник

DB

Dmitry Bugaychenko in Moscow Spark
Паша Финкельштейн
case class Movie(movieId: Long, title: String, genres: String)

case class MovieWithGenresAndYear(movieId: Long, title: String, genres: List[String], year: Integer)
case class MovieExploded(movieId: Long, title: String, genres: List[String])

case class MovieAggregate(year: Int, count: Long)

import spark.implicits._

val df = spark
       .read
       .option("header", true)
       .option("inferSchema", true)
       .option("mode", "DROPMALFORMED")
       .csv("/home/finkel/Downloads/ml-latest/movies.csv")
       .as[Movie]
       .map(it => MovieExploded(it.movieId, it.title, it.genres.split('|').map(_.trim).toList))
       .map {
           case MovieExploded(movieId, title, genres) =>
               if (!title.matches("\"?.*\\(\\d{4}\\)\\s*\"?")) MovieWithGenresAndYear(movieId, title, genres, null)
               else {
                   val lastOpen = title.lastIndexOf('(')
                   val year = title.substring(lastOpen + 1).replace(")", "").replace("\"", "").trim.toInt
                   MovieWithGenresAndYear(movieId, title.substring(0, lastOpen), genres, year)
               }
       }
       .filter(_.year != null)
       .groupByKey(_.year)
       .mapGroups((k, v) =>
           (k, v.size)
       )
       .show(300, false)
Может дело в том что у тебя тут регулярка на каждую строчку будет парситься а в SQL распарсится один раз и потом только применяться?
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Dmitry Bugaychenko
Может дело в том что у тебя тут регулярка на каждую строчку будет парситься а в SQL распарсится один раз и потом только применяться?
Было бы так — я бы видел что оно застревает на мапе
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Но оно тупо не читает данные (или по крайней мере не отдаёт их)
источник

DB

Dmitry Bugaychenko in Moscow Spark
Т.е. вообще не работает или просто тупит? Ну яб гипотезу все равно проверил скомпилировав Pattern заранее
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Dmitry Bugaychenko
Т.е. вообще не работает или просто тупит? Ну яб гипотезу все равно проверил скомпилировав Pattern заранее
Вообще не работает, https://t.me/moscowspark/13245
источник

ПФ

Паша Финкельштейн... in Moscow Spark
время исполнения — 9 минут, number of output rows: 0
источник

С

Сюткин in Moscow Spark
Могу глянуть через час, кинь сэмпл в личку
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Ок
источник

AS

Andrey Smirnov in Moscow Spark
выложи файлик
источник

ПФ

Паша Финкельштейн... in Moscow Spark
источник