spark 手动设置偏移量,如果数据处理完后,offset 提交失败,造成重复计算怎么办?

https://zhuanlan.zhihu.com/p/256327560一、如何保证Spark Streaming第一次启动不丢数据?kafka的参数auto.offset.reset设定为earlist,保证Spark Streaming第一次启动从kafka最早偏移量开始拉取数据。二、Spark Streaming如何保证数据“恰好一次”消费?在Spark Streaming下有三种消费模式的定义 最多一次、至少一次、恰好一次,要实现恰好一次偏移量必须手动维护。1、手动维护偏移量:需设置kafka参数enable.auto.commit改为false2、处理完业务数据后再提交offset处理完业务数据后手动提交到Kafka:官网地址
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}stream.foreachRdd后根据每个rdd先转换成HasOffsetRanges对象通过.offsetRanges方法获取到偏移量对象,再通过commitAsync方法将偏移量提交。处理完业务数据后手动提交到本地库 如MySql、HBase
   //1、查询mysql中是否有偏移量
val sqlProxy = new SqlProxy()
//存放偏移量数据
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4)
offsetMap.put(model, offset)
}
rs.close() //关闭游标
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//2、设置kafka消费数据的参数
判断本地是否有偏移量
有则根据偏移量继续消费 无则重新消费
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
//3、处理完业务逻辑后 将offset最新值保存到mysql
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
//根据每个rdd先转换成HasOffsetRanges对象通过.offsetRanges方法获取到偏移量对象
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into offset_manager (groupid,topic,partition,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})注意:处理业务数据和提交offset并非同一事物,在极端情况下如提交offset时断网断电还是会导致offset没有提交并且业务数据已处理完的情况。那么保证事物就需要将并行度调成1或者将数据collect到driver端,再进行数据业务处理和提交offset,但这样还会导致并行度变成1很可能导致处理速度跟不上,所以大数据情况下一般不考虑事物。三、updateStateByBykey算子是返回一个新的“状态”的DStream的算子,其通过历史状态值和当前批次的数据状态值的累加操作得出一个最新的结果
使用updateStateByBykey算子,必须使用Spark Streaming的checkpoint来维护历史状态数据
存在小文件且小文件个数不可控,所以在真实企业生产环境上并不会使用checkpoint操作,也不会使用基于checkpoint的算子如updateStateBykey算子解决:在进行相应操作时,可以去库中查询出历史数据,再与当前数据进行操作得出最新结果集,将结果集再刷新到本地库中。https://zhuanlan.zhihu.com/p/55093372四、Spark Streaming从kafka分区每秒拉取多少条数据?通过spark.streaming.kafka.maxRatePerPartition参数来设置Spark Streaming从kafka分区每秒拉取的条数。五、Spark Streaming背压机制能够根据当前的批处理调度延迟和处理时间来动态控制接收速率spark.streaming.backpressure.enable=truespark.streaming.kafka.maxRatePerPartition 控制背压机制的上限速率。https://www.iteblog.com/archives/2323.html六、数据倾斜问题数据倾斜为在shuffle过程中,必须将各个节点上相同的key的数据拉取到某节点的一个task来进行,此时如果某个key对应的数据量特别大的话,就会发生数据倾,某个task耗时非常大,一个stage的耗时由最慢的task决定,从而导致整个Spark Streaming任务运行非常缓慢。解决方案:两阶段聚合,先打散key聚合一次,再还原key聚合一次。对DStream 进行map操作对原始key前加上随机值,map完后进行第一次reducebykey操作,此结果为打散key后的reducebykey结果
再次进行map操作根据分隔符,去掉随机数保留原有key,map后再进行reducebykey,保证相同key的数据准确累加
   //1、对原始key前加上随机值
val dsStream = stream.filter(item => item.value().split("\t").length == 3)
.mapPartitions(partitions =>
partitions.map(item => {
val rand = new Random()
val line = item.value()
val arr = line.split("\t")
val app_id = arr(1)
(rand.nextInt(3) + "_" + app_id, 1)
}))
dsStream.print()
//2、打散key后的reducebykey结果
val randAppId = dsStream.reduceByKey(_ + _)
randAppId.print()
//3、根据分隔符,去掉随机数保留原有key
val resultStream = randAppId.map(item => {
val appid = item._1.split("_")(1)
(appid, item._2)
}).reduceByKey(_ + _)
resultStream.print()七、Spark Streaming优雅关闭提交Spark Streaming任务到yarn后,当需要停止程序时使用 yarn application -kill application_id 命令来关闭Spark Streaming ,那么操作此命令可以保证数据不丢失spark.streaming.stopGracefullOnShutdown=truespark Streaming程序在接收到kill命令时,不会立马结束程序,Spark会在JVM关闭时正常关闭Spark Streaming,而不是是立马关闭,即保证当前数据处理完后再关闭。八、Spark Streaming默认分区数Spark Streaming默认分区数与所对应kafka topic创建时的分区数一致,且在真实开发环境中Spark Streaming一般不会去使用repartition增大分区操作,因为会进行shuffle耗时。九、Spark Streaming正确使用数据库连接循环粒度 foreachRdd => foreachPartition => foreach循环粒度是分区,在每个分区下创建一个数据库连接
循环分区下的数据每条数据使用当前分区下的数据库连接
当使用完毕后归还到连接池中
resultDStream.foreachRDD(rdd => {
//1、循环粒度是分区
rdd.foreachPartition(partition => {
//2、在分区下获取jdbc连接
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partition.foreach(item => {
//3、业务处理
使用当前connection
calcPageJumpCount(sqlProxy, item, client) //计算页面跳转个数
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
//4、归还连接
sqlProxy.shutdown(client)
}
})
})十、Spark Streaming操作数据库时线程安全问题在Spark Streaming中,采用查询本地库的历史数据和当前批次数据的计算来代替需要基于hdfs的算子updatestatebykey,在查询本地库时需要进行一次预聚合操作,将相同key的数据落到一个分区,保证同一个key的数据指挥操作数据库一次,预聚合操作有reducebykey、groupbykey、groupby等算子。总结:spark streaming消费上限条数:spark.streaming.kafka.maxRatePerPartition=100
背压,动态消费数据:spark.streaming.backpressure.enabled=true
优雅关闭:spark.streaming.stopGracefullyOnShutdown=true

我要回帖

更多关于 kafka修改偏移量offset 的文章