ПФ
Size: a a a
ПФ
ПФ
case class Movie(movieId: Long, title: String, genres: String)
case class MovieWithGenresAndYear(movieId: Long, title: String, genres: List[String], year: Integer)
case class MovieExploded(movieId: Long, title: String, genres: List[String])
case class MovieAggregate(year: Int, count: Long)
import spark.implicits._
val df = spark
.read
.option("header", true)
.option("inferSchema", true)
.option("mode", "DROPMALFORMED")
.csv("/home/finkel/Downloads/ml-latest/movies.csv")
.as[Movie]
.map(it => MovieExploded(it.movieId, it.title, it.genres.split('|').map(_.trim).toList))
.map {
case MovieExploded(movieId, title, genres) =>
if (!title.matches("\"?.*\\(\\d{4}\\)\\s*\"?")) MovieWithGenresAndYear(movieId, title, genres, null)
else {
val lastOpen = title.lastIndexOf('(')
val year = title.substring(lastOpen + 1).replace(")", "").replace("\"", "").trim.toInt
MovieWithGenresAndYear(movieId, title.substring(0, lastOpen), genres, year)
}
}
.filter(_.year != null)
.groupByKey(_.year)
.mapGroups((k, v) =>
(k, v.size)
)
.show(300, false)
R
ПФ
R
ПФ
ПФ
ПФ
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ea908740> (a scala.concurrent.impl.Promise$CompletionLatch)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:243)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:729)
ПФ
ПФ
DB
case class Movie(movieId: Long, title: String, genres: String)
case class MovieWithGenresAndYear(movieId: Long, title: String, genres: List[String], year: Integer)
case class MovieExploded(movieId: Long, title: String, genres: List[String])
case class MovieAggregate(year: Int, count: Long)
import spark.implicits._
val df = spark
.read
.option("header", true)
.option("inferSchema", true)
.option("mode", "DROPMALFORMED")
.csv("/home/finkel/Downloads/ml-latest/movies.csv")
.as[Movie]
.map(it => MovieExploded(it.movieId, it.title, it.genres.split('|').map(_.trim).toList))
.map {
case MovieExploded(movieId, title, genres) =>
if (!title.matches("\"?.*\\(\\d{4}\\)\\s*\"?")) MovieWithGenresAndYear(movieId, title, genres, null)
else {
val lastOpen = title.lastIndexOf('(')
val year = title.substring(lastOpen + 1).replace(")", "").replace("\"", "").trim.toInt
MovieWithGenresAndYear(movieId, title.substring(0, lastOpen), genres, year)
}
}
.filter(_.year != null)
.groupByKey(_.year)
.mapGroups((k, v) =>
(k, v.size)
)
.show(300, false)
ПФ
ПФ
DB
ПФ
ПФ
С
ПФ
AS