Size: a a a

2020 October 02

PK

Pavel Klemenkov in Moscow Spark
Denis Gabaydulin
Я вчера задавал вопрос. А может кто-то ответить. У меня ощущение, что я пропустил что-то важное. Кто-нибудь может пояснить контекст?
In practice we have seen that Spark tends to be CPU/compute bound instead of I/O bound most of the time.  When I run TPCH, TPC-DS or other similar complex queries on the CPU they are compute bound the vast majority of the time on a setup that does not use spinning disks.  They become I/O bound only on a few very large shuffles.  If that weren't the case we wouldn't see any real speed improvements by moving computation to the GPU. This fits with what we saw at my previous employer, Yahoo!, and there we were typically running spinning disks, but we also adjusted the page cache to not spill to disk unless memory was exhausted, which helped mitigate the load shuffle puts on disks. Of course this is totally dependent on the query you are running and the setup of your cluster (bandwidth + IOPs vs compute).

Examples:
If I try to read in the line item table from TPCH (scale factor 100) in the original CSV format, after I have flushed the page cache I see a single spark thread reading the data at 20 MB/sec (local mode).
import org.apache.spark.sql.types._
  val lineitemSchema = StructType(Array(
    StructField("l_orderkey", LongType),
    StructField("l_partkey", LongType),
    StructField("l_suppkey", LongType),
    StructField("l_linenumber", LongType),
    StructField("l_quantity", DoubleType),
    StructField("l_extendedprice", DoubleType),
    StructField("l_discount", DoubleType),
    StructField("l_tax", DoubleType),
    StructField("l_returnflag", StringType),
    StructField("l_linestatus", StringType),
    StructField("l_shipdate", DateType),
    StructField("l_commitdate", DateType),
    StructField("l_receiptdate", DateType),
    StructField("l_shipinstruct", StringType),
    StructField("l_shipmode", StringType),
    StructField("l_comment", StringType)
  ))
val df = spark.read.option("delimiter", "|").schema(lineitemSchema).csv(path_to_data)
df.orderBy("l_orderkey").show()
This means that a 500 MB/sec SSD could keep up with 25 threads reading the data. If your page cache is not completely worthless you can probably do even better. Also if you compress the CSV you now have effectively traded more compute for additional disk bandwidth + storage.
Reading parquet data is typically much faster, as it is designed for speed, but it is also compressed and if I read the same data stored in parquet+snappy I see it reading from the disk at 60 MB/sec. That would mean one SSD should be able to keep up with 8 threads before it starts to saturate.
This is just reading.  It does not include much if any useful computation.  The more projects, filters, joins, sorts, aggregates, etc that you do will increase the compute needed compared to the I/O being performed. The one case that this does not happen is when you have to spill if there is not enough memory to hold the working data set.
Writing parquet also adds to the compute ratio. I can generate 200,000,000 rows in about 10 seconds.
spark.time(spark.range(1, 200000000).selectExpr("id", "rand() as a", "rand() as b").orderBy("a").show())
...
Time taken: 9851 ms
But when I write the data out to parquet It now becomes 45 seconds,
spark.time(spark.range(1, 200000000).selectExpr("id", "rand() as a", "rand() as b").write.mode("overwrite").parquet("./target/tmp/"))
Time taken: 45016 ms
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Pavel Klemenkov
In practice we have seen that Spark tends to be CPU/compute bound instead of I/O bound most of the time.  When I run TPCH, TPC-DS or other similar complex queries on the CPU they are compute bound the vast majority of the time on a setup that does not use spinning disks.  They become I/O bound only on a few very large shuffles.  If that weren't the case we wouldn't see any real speed improvements by moving computation to the GPU. This fits with what we saw at my previous employer, Yahoo!, and there we were typically running spinning disks, but we also adjusted the page cache to not spill to disk unless memory was exhausted, which helped mitigate the load shuffle puts on disks. Of course this is totally dependent on the query you are running and the setup of your cluster (bandwidth + IOPs vs compute).

Examples:
If I try to read in the line item table from TPCH (scale factor 100) in the original CSV format, after I have flushed the page cache I see a single spark thread reading the data at 20 MB/sec (local mode).
import org.apache.spark.sql.types._
  val lineitemSchema = StructType(Array(
    StructField("l_orderkey", LongType),
    StructField("l_partkey", LongType),
    StructField("l_suppkey", LongType),
    StructField("l_linenumber", LongType),
    StructField("l_quantity", DoubleType),
    StructField("l_extendedprice", DoubleType),
    StructField("l_discount", DoubleType),
    StructField("l_tax", DoubleType),
    StructField("l_returnflag", StringType),
    StructField("l_linestatus", StringType),
    StructField("l_shipdate", DateType),
    StructField("l_commitdate", DateType),
    StructField("l_receiptdate", DateType),
    StructField("l_shipinstruct", StringType),
    StructField("l_shipmode", StringType),
    StructField("l_comment", StringType)
  ))
val df = spark.read.option("delimiter", "|").schema(lineitemSchema).csv(path_to_data)
df.orderBy("l_orderkey").show()
This means that a 500 MB/sec SSD could keep up with 25 threads reading the data. If your page cache is not completely worthless you can probably do even better. Also if you compress the CSV you now have effectively traded more compute for additional disk bandwidth + storage.
Reading parquet data is typically much faster, as it is designed for speed, but it is also compressed and if I read the same data stored in parquet+snappy I see it reading from the disk at 60 MB/sec. That would mean one SSD should be able to keep up with 8 threads before it starts to saturate.
This is just reading.  It does not include much if any useful computation.  The more projects, filters, joins, sorts, aggregates, etc that you do will increase the compute needed compared to the I/O being performed. The one case that this does not happen is when you have to spill if there is not enough memory to hold the working data set.
Writing parquet also adds to the compute ratio. I can generate 200,000,000 rows in about 10 seconds.
spark.time(spark.range(1, 200000000).selectExpr("id", "rand() as a", "rand() as b").orderBy("a").show())
...
Time taken: 9851 ms
But when I write the data out to parquet It now becomes 45 seconds,
spark.time(spark.range(1, 200000000).selectExpr("id", "rand() as a", "rand() as b").write.mode("overwrite").parquet("./target/tmp/"))
Time taken: 45016 ms
А тут CSV :) CSV больно читать )
источник

DG

Denis Gabaydulin in Moscow Spark
Спасибо Паш.  Значит я все таки логику правильно поймал:
If that weren't the case we wouldn't see any real speed improvements by moving computation to the GPU
источник

АЖ

Андрей Жуков... in Moscow Spark
Аргументы про индейские истории тоже хороши!
источник

E

Eugene in Moscow Spark
Андрей Жуков
Аргументы про индейские истории тоже хороши!
Таков путь!)
источник

АЖ

Андрей Жуков... in Moscow Spark
Eugene
Таков путь!)
источник

DG

Denis Gabaydulin in Moscow Spark
Если кстати parquet+snappy заменить на parquet+gzip там все еще "хуже" )
источник

АЖ

Андрей Жуков... in Moscow Spark
Denis Gabaydulin
Если кстати parquet+snappy заменить на parquet+gzip там все еще "хуже" )
Зато жмется хорошо
источник

DG

Denis Gabaydulin in Moscow Spark
Теперь можно не хранить многое, считать каждый раз )
источник

АЖ

Андрей Жуков... in Moscow Spark
Denis Gabaydulin
Теперь можно не хранить многое, считать каждый раз )
ну тогда будет csv + gz, тоже больнааа
источник

DG

Denis Gabaydulin in Moscow Spark
Андрей Жуков
ну тогда будет csv + gz, тоже больнааа
csv вообще не надо )
источник

АЖ

Андрей Жуков... in Moscow Spark
Denis Gabaydulin
csv вообще не надо )
Ну тут у меня с фтп другого не выдают :)
источник

Sa

Salam andra in Moscow Spark
источник

ПФ

Паша Финкельштейн... in Moscow Spark
XML почему нет? И ещё можно хранить в сети!
источник

Sa

Salam andra in Moscow Spark
Паша Финкельштейн
XML почему нет? И ещё можно хранить в сети!
Xml - это гениально, не додумался) В сети, это как?
источник

AS

Alexander Shorin in Moscow Spark
Паша Финкельштейн
XML почему нет? И ещё можно хранить в сети!
pingfs? да, это красивая идея.
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Alexander Shorin
pingfs? да, это красивая идея.
ну да, и любые другие подобные стораджи
источник

АЖ

Андрей Жуков... in Moscow Spark
Ну в чем пришло,  в том и хранишь в лейке. Сырцы,  вот это вот все
источник

E

Eugene in Moscow Spark
Андрей Жуков
Ну в чем пришло,  в том и хранишь в лейке. Сырцы,  вот это вот все
В xls.
источник

АЖ

Андрей Жуков... in Moscow Spark
Eugene
В xls.
Ну и такое бывает :(
источник