PK
Examples:
If I try to read in the line item table from TPCH (scale factor 100) in the original CSV format, after I have flushed the page cache I see a single spark thread reading the data at 20 MB/sec (local mode).
import org.apache.spark.sql.types._
val lineitemSchema = StructType(Array(
StructField("l_orderkey", LongType),
StructField("l_partkey", LongType),
StructField("l_suppkey", LongType),
StructField("l_linenumber", LongType),
StructField("l_quantity", DoubleType),
StructField("l_extendedprice", DoubleType),
StructField("l_discount", DoubleType),
StructField("l_tax", DoubleType),
StructField("l_returnflag", StringType),
StructField("l_linestatus", StringType),
StructField("l_shipdate", DateType),
StructField("l_commitdate", DateType),
StructField("l_receiptdate", DateType),
StructField("l_shipinstruct", StringType),
StructField("l_shipmode", StringType),
StructField("l_comment", StringType)
))
val df = spark.read.option("delimiter", "|").schema(lineitemSchema).csv(path_to_data)
df.orderBy("l_orderkey").show()
This means that a 500 MB/sec SSD could keep up with 25 threads reading the data. If your page cache is not completely worthless you can probably do even better. Also if you compress the CSV you now have effectively traded more compute for additional disk bandwidth + storage.
Reading parquet data is typically much faster, as it is designed for speed, but it is also compressed and if I read the same data stored in parquet+snappy I see it reading from the disk at 60 MB/sec. That would mean one SSD should be able to keep up with 8 threads before it starts to saturate.
This is just reading. It does not include much if any useful computation. The more projects, filters, joins, sorts, aggregates, etc that you do will increase the compute needed compared to the I/O being performed. The one case that this does not happen is when you have to spill if there is not enough memory to hold the working data set.
Writing parquet also adds to the compute ratio. I can generate 200,000,000 rows in about 10 seconds.
spark.time(spark.range(1, 200000000).selectExpr("id", "rand() as a", "rand() as b").orderBy("a").show())
...
Time taken: 9851 ms
But when I write the data out to parquet It now becomes 45 seconds,
spark.time(spark.range(1, 200000000).selectExpr("id", "rand() as a", "rand() as b").write.mode("overwrite").parquet("./target/tmp/"))
Time taken: 45016 ms