博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming “Could not read data from write ahead log record” 报错分析解决
阅读量:5066 次
发布时间:2019-06-12

本文共 5781 字,大约阅读时间需要 19 分钟。

# if open walorg.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment

 

SparkStreaming开启了checkpoint wal后有时会出现如上报错,但不会影响整体程序,只会丢失报错的那个job的数据。其根本原因是wal文件被删了,被sparkstreaming自己的清除机制删掉了。通常意味着一定程度流式程序上存在速率不匹配或堆积问题。

查看driver日志可发现类似如下的日志:

2017-03-23 13:55:00 INFO  [Logging.scala:58] Attempting to clear 0 old log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 1490248380000:2017-03-23 13:55:05 INFO  [Logging.scala:58] Attempting to clear 1 old log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 1490248470000: hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata/log-1490248404471-14902484644712017-03-23 13:55:05 INFO  [Logging.scala:58] Cleared log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 14902484700002017-03-23 13:55:05 ERROR [Logging.scala:74] Task 41 in stage 35.0 failed 4 times; aborting job2017-03-23 13:55:05 ERROR [Logging.scala:95] Error running job streaming job 1490248470000 ms.0org.apache.spark.SparkException: Job aborted due to stage failure: Task 41 in stage 35.0 failed 4 times, most recent failure: Lost task 41.3 in stage 35.0 (TID 4273, alps60): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedData/0/log-1490248403649-1490248463649,44333482,118014)        at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:143)

可以发现 1490248403649 的日志被删除程序删除了(cleared log older than 1490248470000),然后这个wal就报错了。

Spark官方文档没有任何关于这个的配置,因此直接看源码。(spark很多这样的坑,得看源码才知道如何hack或有些隐藏配置)。

 

1.FileBasedWriteAheadLogSegment 类中根据日志搜索发现了clean方法(后面的逻辑就是具体删除逻辑,暂不关心),核心就是如何调整这个threshTime了。

/**   * Delete the log files that are older than the threshold time.   *   * Its important to note that the threshold time is based on the time stamps used in the log   * files, which is usually based on the local system time. So if there is coordination necessary   * between the node calculating the threshTime (say, driver node), and the local system time   * (say, worker node), the caller has to take account of possible time skew.   *   * If waitForCompletion is set to true, this method will return only after old logs have been   * deleted. This should be set to true only for testing. Else the files will be deleted   * asynchronously.   */  def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {    val oldLogFiles = synchronized {      val expiredLogs = pastLogs.filter { _.endTime < threshTime }      pastLogs --= expiredLogs      expiredLogs    }    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

 

2.一步步看调用追踪出去,ReceivedBlockHandler -> ReceiverSupervisorImpl -> CleanUpOldBlocks 。这里有个和ReceiverTracker通信的rpc,因此直接搜索CleanUpOldBlocks -> ReceiverTracker -> JobGenerator 

在JobGenerator.clearCheckpointData 中有这么一段逻辑

/** Clear DStream checkpoint data for the given `time`. */  private def clearCheckpointData(time: Time) {    ssc.graph.clearCheckpointData(time)    // All the checkpoint information about which batches have been processed, etc have    // been saved to checkpoints, so its safe to delete block metadata and data WAL files    val maxRememberDuration = graph.getMaxInputStreamRememberDuration()    jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)    jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)    markBatchFullyProcessed(time)  }

发现了 ssc.graph有个 maxRememberDuration 的成员属性!这就意味着有机会通过ssc去修改它。

搜索一下代码便发现了相关方法:

jssc.remember(new Duration(2 * 3600 * 1000));

 

反思:

从之前的日志我们发现默认的清除间隔是几十秒左右,但是在代码中我们可以发现这个参数只能被设置一次(每次设置都会检查当前为null才生效,初始值为null)。所以问题来了,这几十秒在哪里设置的?代码一时没找到,于是项目直接搜索 remember,发现了在DStream里的初始化代码(其中slideDuration初始化来自InputDStream)。根据计算,我们的batchInterval为15s,其他两个没有设置,则checkpointDuration 为15s,rememberDuration为30s。

override def slideDuration: Duration = {    if (ssc == null) throw new Exception("ssc is null")    if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")    ssc.graph.batchDuration  } /**   * Initialize the DStream by setting the "zero" time, based on which   * the validity of future times is calculated. This method also recursively initializes   * its parent DStreams.   */  private[streaming] def initialize(time: Time) {    if (zeroTime != null && zeroTime != time) {      throw new SparkException("ZeroTime is already initialized to " + zeroTime        + ", cannot initialize it again to " + time)    }    zeroTime = time    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger    if (mustCheckpoint && checkpointDuration == null) {      checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt      logInfo("Checkpoint interval automatically set to " + checkpointDuration)    }    // Set the minimum value of the rememberDuration if not already set    var minRememberDuration = slideDuration    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {      // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)      minRememberDuration = checkpointDuration * 2    }    if (rememberDuration == null || rememberDuration < minRememberDuration) {      rememberDuration = minRememberDuration    }    // Initialize the dependencies    dependencies.foreach(_.initialize(zeroTime))  }

 

转载于:https://www.cnblogs.com/lhfcws/p/6605085.html

你可能感兴趣的文章
Objective - C基础: 第四天 - 10.SEL类型的基本认识
查看>>
Android TextView加上阴影效果
查看>>
《梦断代码》读书笔记(三)
查看>>
Java8 Lambda表达应用 -- 单线程游戏server+异步数据库操作
查看>>
[Unity3D]Unity3D游戏开发MatchTarget的作用攀登效果实现
查看>>
AngularJS学习篇(一)
查看>>
关于Xshell无法连接centos6.4的问题
查看>>
css3动画——基本准则
查看>>
输入月份和日期,得出是今年第几天
查看>>
pig自定义UDF
查看>>
Kubernetes 运维学习笔记
查看>>
spring security 11种过滤器介绍
查看>>
代码实现导航栏分割线
查看>>
大数据学习系列(8)-- WordCount+Block+Split+Shuffle+Map+Reduce技术详解
查看>>
【AS3代码】播放FLV视频流的三步骤!
查看>>
枚举的使用
查看>>
luogu4849 寻找宝藏 (cdq分治+dp)
查看>>
日志框架--(一)基础篇
查看>>
关于源程序到可运行程序的过程
查看>>
转载:mysql数据库密码忘记找回方法
查看>>