DZ
мне нужно реализовать следующий флоу:
1) получить сообщения из Kafka топика `raw
2) распарсить их в case class
3) отправить ошибки парсинга в Kafka топик
error
4) Сделать reduceByKey для всего что спарсилось
5) Отправить результаты reduce в кафку в токик
data
6) commit offset for Kafka topic raw
GOTO (1)
Модель данных (Scala):
sealed trait Message
case class ParsedMessage(...) extends Message
case class ParseError(raw: String, exception: Throwable) extends Message
Я попытался решить проблему двумя способами:
1)Через Spark RDD - не работает потому что после reduceByKey невозможно получить Kafka offset, rd
d.asInstanceOf[HasOffsetRanges].offsetRanges п
адает, потому что после первого же RDD.map{...} и
з типа RDD т
еряется связь с кафкой2)Я попытался прикрутить Structured streaming, в доках обещают что офсеты будут магически комитится самостоятельно, код на Scala:
.f
latMap {Но он падает или на компиляции или на запске (зависит от того указывать тип переменной или нет)
case raw: RawMessage =>
implicit val mapperInfo: ProviderKey = ProviderKey(RawMessageParser.name, RawMessageParser.version, 0)
Try(parse[List[ParsedMessage]](Left(raw.message))) match {
case Success(msg) =>
msg
case Failure(ex) =>
Seq(ParseError(raw.nxMessage, ex))
}
}
foun
d : org.apache.spark.sql.Dataset[Product with Serializable with Message]ли вопрос не сюда, то подскажите чат где могут теоретически помочь с данным вопросом
required: org.apache.spark.sql.Dataset[Message]
Note: Product with Serializable with Message <: Message, but class Dataset is invariant in type T.
Ес