SS
Size: a a a
SS
M
А
D
VK
M
N
M
a
PK
GP
a
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
a
PK
GP
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
GP
PF
GP
Н
window_spec = Window.partitionBy("domain").orderBy("to")Вроде lag не самая сложная операция
df = df.withColumnRenamed("@timestamp", "to")
df = df.withColumn("from", F.lag(df["to"], 1).over(window_spec))
df = df.withColumn("minutesdiff", (df["to"].cast("long") - df["from"].cast("long")) / 60)
PK