深入理解 RocksDB 过期文件清理


  1. 一、什么是过期文件
  2. 二、过期文件清理的核心机制
    1. 2.1 核心数据结构
      1. JobContext
      2. DBImpl
    2. 2.2 核心函数
      1. 2.2.1 SST 文件生命周期
      2. 2.2.2 识别过期文件:FindObsoleteFiles
        1. 2.2.2.1 常规清理 vs. 全量扫描 (Full Scan)
        2. 2.2.2.2 过期清理触发事件
        3. 2.2.2.3 “周期性”全量清理
        4. 2.2.2.4 文件回收机制
      3. 2.2.3 从 VersionSet 获取过期文件:GetObsoleteFiles
        1. 2.2.3.1 pending_outputs_ 机制
      4. 2.2.4 删除过期文件:PurgeObsoleteFiles
        1. 2.2.4.1 异步删除机制
  3. 三、不同类型文件的清理策略
  4. 四、统计指标
  5. 五、总结

RocksDB 作为一个高性能的 KV 存储引擎,会产生多种类型的文件:SST 数据文件、WAL 日志文件、MANIFEST 元数据文件、LOG 运行日志等。随着数据库运行,这些文件会不断生成、更新和过期。尤其是一些极端情况下,会导致磁盘空间耗尽,数据库无法继续写入数据,引发服务中断。

不妨带着以下问题,来详细深入了解下具体的实现细节

  1. compaction 结束之后旧的 sst 文件、WAL 文件、LOG 文件是立刻删除,还是定时删除?
  2. compaction 进行到一半,进程因为发布、崩溃重启,此时 compaction 生成的还为 install 的 sst 文件会成为过期文件么?如果是,什么时间会清理?
  3. 什么时候会强制触发全量清理?
  4. 如果没有任何写入落盘,是否也会定时触发清理?

一、什么是过期文件

在 RocksDB 中,过期文件(Obsolete Files)指的是那些逻辑上已不再需要但物理上仍存在于磁盘上的文件。文件主要包括:

  1. SST 文件 (kTableFile, kBlobFile): 当 Compaction 操作将多个 SST 文件合并生成新的 SST 文件后,原来的输入 SST 文件就可能变为过期。当 Flush、Compaction 和 Ingestion 出现异常时,创建新的 SST 文件不会被正式添加到版本控制中,也会被视作过期。
  2. WAL 文件 (kWalFile): 当 WAL 文件中的所有数据变更都已成功刷入 MemTable 并最终持久化到 SST 文件后,该 WAL 文件就可能变为过期。
  3. Manifest 文件 (kDescriptorFile): 当数据库元信息更新,生成新的 Manifest 文件后,旧的 Manifest 文件就变为过期。
  4. Info LOG 文件 (kInfoLogFile): RocksDB 会保留一定数量的 Info LOG 文件,旧的日志文件会根据配置被删除。
  5. Options 文件 (kOptionsFile): 记录数据库配置的文件,RocksDB 通常会保留最新的几个版本。
  6. 临时文件 (kTempFile): 在 Flush、Compaction 或 Manifest 写入过程中产生的临时文件,操作完成后应被删除。

二、过期文件清理的核心机制

2.1 核心数据结构

JobContext

JobContext 是清理过期文件的关键结构体,存储待清理文件的信息:

struct JobContext {
  // 检查是否有过时文件需要删除,通过检查各种过时文件列表是否为空来确定。
  inline bool HaveSomethingToDelete() const {
    return !(full_scan_candidate_files.empty() && sst_delete_files.empty() &&
             blob_delete_files.empty() && log_delete_files.empty() &&
             manifest_delete_files.empty());
  }

  // 检查是否有任何资源需要清理,包括过时文件、内存表、日志写入器和快照等。
  inline bool HaveSomethingToClean() const {
    bool sv_have_sth = false;
    for (const auto& sv_ctx : superversion_contexts) {
      if (sv_ctx.HaveSomethingToDelete()) {
        sv_have_sth = true;
        break;
      }
    }
    return memtables_to_free.size() > 0 || logs_to_free.size() > 0 ||
           job_snapshot != nullptr || sv_have_sth;
  }

  // 存储全扫描过程中识别出的所有潜在可删除文件的信息
  // 当执行全目录扫描时,会将数据库目录中所有文件加入此列表,随后筛选哪些是过时的
  // **包含信息**:文件名和完整路径
  std::vector<CandidateFileInfo> full_scan_candidate_files;

  // 专门存储已确定为过时的 SST 文件信息
  // 压缩或版本控制过程中识别的不再需要的 SST 文件
  // **包含信息**:文件编号、文件大小、文件路径等元数据
  std::vector<ObsoleteFileInfo> sst_delete_files;

  // 存储已确定为过时的 Blob 文件信息
  // 当 Blob 文件中的数据被压缩或覆盖后,标记为过时
  // **包含信息**:Blob 文件编号、文件路径和其他相关元数据
  std::vector<ObsoleteBlobFileInfo> blob_delete_files;

  // 存储需要删除的预写式日志(WAL)文件编号
  // 当日志文件中的所有记录都已持久化到 SST 文件后,这些日志文件变为过时
  std::vector<uint64_t> log_delete_files;

  // 存储在清理过程中需要保留的日志文件编号
  // 这些文件虽然逻辑上已过时,但计划被重用,避免频繁创建新文件
  std::vector<uint64_t> log_recycle_files;

  // 存储需要删除的过时清单文件的路径
  // 当生成新的清单文件后,旧的清单文件成为过时文件
  std::vector<std::string> manifest_delete_files;

  // 存储不再需要的内存表指针,等待释放
  // 当内存表被刷新到磁盘后,需要释放其占用的内存
  autovector<MemTable*> memtables_to_free;

  // 存储不再需要的日志写入器指针,等待释放
  // 当对应的日志文件被关闭或不再使用时
  autovector<log::Writer*> logs_to_free;

  // 执行实际的资源清理工作,释放不再需要的内存和文件句柄,但不执行实际的文件删除操作(文件删除通常由专门的线程处理)。
  void Clean() {
    // free superversions
    for (auto& sv_context : superversion_contexts) {
      sv_context.Clean();
    }
    // free pending memtables
    for (auto m : memtables_to_free) {
      delete m;
    }
    for (auto l : logs_to_free) {
      delete l;
    }

    memtables_to_free.clear();
    logs_to_free.clear();
    job_snapshot.reset();
  }

  // 省略其他成员...
};

DBImpl

DBImpl 类中包含多个与文件清理相关的成员:

class DBImpl : public DB {
  // 省略其他成员...

  // `pending_outputs_` 实现一个安全机制,确保后台任务(如压缩、刷盘)创建的文件在任务完成前不会被误删除。它解决了多个并发后台操作之间的文件安全问题
  // 当后台作业开始时,会捕获当前的文件号并添加到 `pending_outputs_` 中
  // 由于 RocksDB 的文件号是单调递增的,这意味着 `pending_outputs_` 中的任何文件号都表示"保护线" - 任何编号大于等于这个值的文件都不应被删除
  // `FindObsoleteFiles()`/`PurgeObsoleteFiles()` 在识别可删除文件时,会参考 `pending_outputs_`,确保不会删除编号大于列表中任何值的文件
  // 后台任务完成后,从 `pending_outputs_` 中删除对应的文件号,允许这些文件在不再需要时被清理
  std::list<uint64_t> pending_outputs_;

  // 踪已找到需要删除但尚未完成删除的文件批次数量
  // 当 `FindObsoleteFiles` 识别出过时文件后,在 `PurgeObsoleteFiles` 真正删除前增加此计数
  int pending_purge_obsolete_files_;

  // 存储待清理文件的详细信息(文件号、文件名、路径、类型等)
  // `FindObsoleteFiles`收集要删除的文件,`PurgeObsoleteFiles`从此映射中读取并执行删除
  std::unordered_map<uint64_t, PurgeFileInfo> purge_files_;

  // 记录已分配给特定任务上下文的文件号
  // 确保正在进行删除操作的文件不会被其他任务处理
  std::unordered_set<uint64_t> files_grabbed_for_purge_;

  // 维护当前活跃(未过时)的WAL日志文件列表
  // 跟踪哪些日志文件仍在使用,防止被错误删除
  std::deque<LogFileNumberSize> alive_log_files_;

  // 存储尚未完全同步和当前正在写入的日志文件
  // 管理活跃日志的生命周期,日志同步后可能成为过时候选
  std::deque<LogWriterNumber> logs_;

  // 存储可重用的日志文件号
  // 避免频繁创建新文件,优先重用已有文件提高效率
  std::deque<uint64_t> log_recycle_files_;

  // 存储需要在后台线程中删除的日志写入器
  // 异步清理不再需要的日志文件,减少主线程阻塞
  autovector<log::Writer*> logs_to_free_;

  // 存储等待关闭的日志写入器队列
  // 推迟日志文件的关闭操作,以优化I/O操作
  std::deque<log::Writer*> logs_to_free_queue_;

  // 记录后台任务正在使用的文件号,防止清理过程删除这些文件
  // 保护正在创建或处理中的文件不被过早删除
  std::list<uint64_t> pending_outputs_;

  // 控制是否允许删除过时文件的开关
  // 在特定操作(如备份、快照)期间临时禁用文件删除
  int disable_delete_obsolete_files_;

  // 记录上次执行完整扫描删除操作的时间戳
  // 控制删除操作的频率,避免过于频繁的磁盘扫描
  uint64_t delete_obsolete_files_last_run_;

  // 省略其他成员...
}

2.2 核心函数

2.2.1 SST 文件生命周期

RocksDB 使用引用计数机制管理 SST 文件生命周期。每个文件有一个引用计数器,当引用计数变为 0 时,文件被标记为可删除。

关键数据结构:

class Version {
  // 引用计数
  int refs_;
  
  // 层级化存储的文件
  std::vector<FileMetaData*> files_[num_levels_];
};

struct FileMetaData {
  // 文件的引用计数
  int refs;
  
  // 文件描述符
  FileDescriptor fd;
};

RocksDB 使用 VersionSet 来管理数据库在不同时间点的状态快照,每个快照称为一个 Version。每个 Version 包含一组在该时间点“存活”的 SST 文件列表。SST 文件通过引用计数(FileMetaData::refs)来跟踪其被多少个 Version 引用。

当一个 Version 不再被任何快照、迭代器或其他内部结构引用时,它的析构函数 Version::~Version 会被调用。 Version 析构函数会减少所有引用文件的计数。

当文件的引用计数降为 0,说明没有任何版本在使用该文件,此时它会被添加到 obsolete_files_ 列表中,等待后续的物理删除。

Version::~Version() {
  // 确保引用计数为0,即没有任何地方引用此版本
  assert(refs_ == 0);

  // 从版本双向链表中移除自身
  prev_->next_ = next_;
  next_->prev_ = prev_;

  // 遍历每个层级的所有文件,减少它们的引用计数
  for (int level = 0; level < storage_info_.num_levels_; level++) {
    for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
      FileMetaData* f = storage_info_.files_[level][i];
      assert(f->refs > 0);
      f->refs--;

      // 如果引用计数降为0,表示没有任何版本在使用此文件
      if (f->refs <= 0) {
        assert(cfd_ != nullptr);
        uint32_t path_id = f->fd.GetPathId();
        assert(path_id < cfd_->ioptions()->cf_paths.size());

        // 将文件添加到版本集的过期文件列表中(obsolete_files_)
        vset_->obsolete_files_.push_back(
            ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path,
                           cfd_->GetFileMetadataCacheReservationManager()));
      }
    }
  }
}

2.2.2 识别过期文件:FindObsoleteFiles

FindObsoleteFiles 负责识别哪些文件已经过期,需要在持有数据库互斥锁的情况下调用:

// 该方法用于寻找过时文件,将它们添加到 job_context 中以便后续删除
// * 将活跃的 SST 文件列表存储在 'sst_live' 和活跃的 blob 文件列表存储在 'blob_live'
// 如果执行全量扫描:
// * 将文件系统中所有文件的列表存储在 'full_scan_candidate_files'
// 否则,从 VersionSet 获取过时文件
//
// no_full_scan = true -- 第一优先级:明确禁止全量扫描
// force = true -- 第二优先级:强制全量扫描
// force = false -- 第三优先级:除非到达周期(每 mutable_db_options_.delete_obsolete_files_period_micros 一次),否则不强制全量扫描

// 函数声明:void FindObsoleteFiles(JobContext* job_context, bool force, bool no_full_scan = false);
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
                               bool no_full_scan) {
  mutex_.AssertHeld();  // 确认互斥锁已被持有

  // 如果禁用了文件删除功能,则不执行任何操作
  if (disable_delete_obsolete_files_ > 0) {
    return;
  }

  bool doing_the_full_scan = false;  // 是否执行全量扫描的标志

  // 判断是否执行全量扫描的逻辑
  if (no_full_scan) {
    doing_the_full_scan = false;  // 如果明确指定不进行全量扫描,则不执行
  } else if (force ||
             mutable_db_options_.delete_obsolete_files_period_micros == 0) {
    doing_the_full_scan = true;  // 如果强制执行或删除周期设置为0,则执行全量扫描
  } else {
    const uint64_t now_micros = immutable_db_options_.clock->NowMicros();
    // 根据上次扫描时间和配置的周期决定是否执行全量扫描
    if ((delete_obsolete_files_last_run_ +
         mutable_db_options_.delete_obsolete_files_period_micros) <
        now_micros) {
      doing_the_full_scan = true;
      delete_obsolete_files_last_run_ = now_micros;  // 更新上次扫描时间
    }
  }

  /****** SST/Blob 文件处理部分 ******/
  // 设置最小的 pending output 文件号,防止删除正在被 compaction 线程写入的文件
  // 注意:扫描期间不能释放 mutex_,否则可能出现竞态
  job_context->min_pending_output = MinObsoleteSstNumberToKeep();

  // 获取过时文件。此函数还将更新 VersionSet 的 pending 文件列表
  versions_->GetObsoleteFiles(
      &job_context->sst_delete_files, &job_context->blob_delete_files,
      &job_context->manifest_delete_files, job_context->min_pending_output);

  // 将 job_context->sst_delete_files 和 job_context->blob_delete_files 中的元素
  // 标记为"已获取用于清理",其他线程调用 FindObsoleteFiles 时将不会将这些文件
  // 添加到清理候选列表中,避免多线程重复处理
  for (const auto& sst_to_del : job_context->sst_delete_files) {
    MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
  }

  for (const auto& blob_file : job_context->blob_delete_files) {
    MarkAsGrabbedForPurge(blob_file.GetBlobFileNumber());
  }

  // 存储当前的文件编号、日志编号等信息到 job_context
  job_context->manifest_file_number = versions_->manifest_file_number();
  job_context->pending_manifest_file_number =
      versions_->pending_manifest_file_number();
  job_context->log_number = MinLogNumberToKeep();  // 获取需要保留的最小日志编号
  job_context->prev_log_number = versions_->prev_log_number();

  if (doing_the_full_scan) {
    // 如果执行全量扫描,收集所有活跃的文件
    versions_->AddLiveFiles(&job_context->sst_live, &job_context->blob_live);
    InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
                                  dbname_);
    // 收集所有数据库路径
    // 多路径数据库路径支持 “热冷数据分层存储”&“当单个存储设备容量不足时,可以将数据分散到多个设备”
    //
    // 举例:
    // L0-L1层(热数据) → 快速SSD
    // L2-L3层(温数据) → 普通SSD
    // L4-L6层(冷数据) → 大容量HDD
    std::set<std::string> paths;
    for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
         path_id++) {
      paths.insert(immutable_db_options_.db_paths[path_id].path);
    }

    // 注意:如果列族选项中没有指定 cf_paths,使用 db_paths 作为 cf_paths 设置。
    // 因此,在下面的代码中可能会有多个重复的 db_paths 文件。重复项在 PurgeObsoleteFiles 中标识唯一文件时会被删除。
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
           path_id++) {
        auto& path = cfd->ioptions()->cf_paths[path_id].path;

        if (paths.find(path) == paths.end()) {
          paths.insert(path);
        }
      }
    }

    IOOptions io_opts;
    io_opts.do_not_recurse = true;  // 不进行递归查询
    // 遍历所有路径,查找潜在的过时文件
    for (auto& path : paths) {
      // 获取目录中的所有文件列表
      // 后续处理将排除仍然活跃的文件
      std::vector<std::string> files;
      Status s = immutable_db_options_.fs->GetChildren(
          path, io_opts, &files, /*IODebugContext*=*/nullptr);
      s.PermitUncheckedError();  // TODO: 错误处理需要改进
      for (const std::string& file : files) {
        uint64_t number;
        FileType type;
        // 如果无法解析文件名,跳过
        // 如果文件已被其他压缩任务获取用于清除,或已安排清除,也跳过
        // 避免在竞态条件下重复删除相同的文件
        if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) ||
            !ShouldPurge(number)) {
          continue;
        }

        // 将文件添加到候选清除文件列表
        job_context->full_scan_candidate_files.emplace_back("/" + file, path);
      }
    }

    /****** WAL 路径单独配置时,WAL 文件处理部分 ******/
    // 添加 wal_dir 中的日志文件
    if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) {
      std::vector<std::string> log_files;
      Status s = immutable_db_options_.fs->GetChildren(
          immutable_db_options_.wal_dir, io_opts, &log_files,
          /*IODebugContext*=*/nullptr);
      s.PermitUncheckedError();  // TODO: 错误处理需要改进
      for (const std::string& log_file : log_files) {
        job_context->full_scan_candidate_files.emplace_back(
            log_file, immutable_db_options_.wal_dir);
      }
    }

    /****** LOG 文件处理部分 ******/
    // 添加 db_log_dir 中的信息日志文件
    if (!immutable_db_options_.db_log_dir.empty() &&
        immutable_db_options_.db_log_dir != dbname_) {
      std::vector<std::string> info_log_files;
      Status s = immutable_db_options_.fs->GetChildren(
          immutable_db_options_.db_log_dir, io_opts, &info_log_files,
          /*IODebugContext*=*/nullptr);
      s.PermitUncheckedError();  // TODO: 错误处理需要改进
      for (std::string& log_file : info_log_files) {
        job_context->full_scan_candidate_files.emplace_back(
            log_file, immutable_db_options_.db_log_dir);
      }
    }
  } else {
    // 如果不执行全量扫描,直接从待删除文件列表中移除在任何版本中出现的文件
    // 因为候选文件通常只占所有文件的一小部分,所以与构建所有文件的映射相比,
    // 直接检查它们是否在任何版本中出现更高效
    versions_->RemoveLiveFiles(job_context->sst_delete_files,
                               job_context->blob_delete_files);
  }

  // 在可能释放互斥锁和等待条件变量之前,增加 pending_purge_obsolete_files_
  // 这样另一个执行 `GetSortedWals` 的线程将等待直到这个线程完成执行
  // 因为另一个线程将等待 `pending_purge_obsolete_files_`
  // 如果没有需要删除的内容,必须递减 pending_purge_obsolete_files_
  ++pending_purge_obsolete_files_;

  // 设置一个延迟执行的清理操作,确保在没有需要删除的内容时减少 pending_purge_obsolete_files_
  Defer cleanup([job_context, this]() {
    assert(job_context != nullptr);
    if (!job_context->HaveSomethingToDelete()) {
      mutex_.AssertHeld();
      --pending_purge_obsolete_files_;
    }
  });

  // 当在恢复期间调用时,logs_ 为空,此时还没有任何需要跟踪的过时日志
  log_write_mutex_.Lock();

  if (alive_log_files_.empty() || logs_.empty()) {
    mutex_.AssertHeld();
    // 如果数据库是 DBImplSecondary,可能会到达这里
    log_write_mutex_.Unlock();
    return;
  }

  /****** 物理日志文件处理部分 ******/
  bool mutex_unlocked = false;
  if (!alive_log_files_.empty() && !logs_.empty()) {
    uint64_t min_log_number = job_context->log_number;
    size_t num_alive_log_files = alive_log_files_.size();
    // 查找新的过时日志文件
    while (alive_log_files_.begin()->number < min_log_number) {
      auto& earliest = *alive_log_files_.begin();
      // 如果配置了日志文件回收,且回收列表未满,则添加到回收列表
      if (immutable_db_options_.recycle_log_file_num >
          log_recycle_files_.size()) {
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
                       "adding log %" PRIu64 " to recycle list\n",
                       earliest.number);
        // 放入回收列表以便复用
        log_recycle_files_.push_back(earliest.number);
      } else {
        // 否则添加到待删除列表
        job_context->log_delete_files.push_back(earliest.number);
      }
      if (job_context->size_log_to_delete == 0) {
        job_context->prev_total_log_size = total_log_size_;
        job_context->num_alive_log_files = num_alive_log_files;
      }
      // 更新统计信息
      job_context->size_log_to_delete += earliest.size;
      total_log_size_ -= earliest.size;
      // 从活跃列表中移除
      alive_log_files_.pop_front();

      // 当前日志应该始终保持活跃状态,因为它不可能有 number < MinLogNumber()
      assert(alive_log_files_.size());
    }
    log_write_mutex_.Unlock();
    mutex_.Unlock();
    mutex_unlocked = true;
    TEST_SYNC_POINT_CALLBACK("FindObsoleteFiles::PostMutexUnlock", nullptr);
    log_write_mutex_.Lock();
    /****** 日志 Writer 对象处理部分 ******/
    while (!logs_.empty() && logs_.front().number < min_log_number) {
      auto& log = logs_.front();
      if (log.IsSyncing()) {
        // 如果日志正在同步,等待同步完成
        log_sync_cv_.Wait();
        // 等待期间 logs_ 可能已更改,继续下一轮循环
        continue;
      }
      logs_to_free_.push_back(log.ReleaseWriter());
      logs_.pop_front();
    }
    // 当前日志不可能过时
    assert(!logs_.empty());
  }

  // 清理 DB::Write() 操作
  assert(job_context->logs_to_free.empty());
  job_context->logs_to_free = logs_to_free_;

  logs_to_free_.clear();
  log_write_mutex_.Unlock();
  if (mutex_unlocked) {
    mutex_.Lock();
  }
  job_context->log_recycle_files.assign(log_recycle_files_.begin(),
                                        log_recycle_files_.end());
}
2.2.2.1 常规清理 vs. 全量扫描 (Full Scan)

FindObsoleteFiles 方法区分常规清理全量扫描,在持有锁的情况下,是基于性能和可靠性之间的权衡设计。常规清理模式主要依赖 RocksDB 的内存状态来识别过时文件,该模式只检查那些已经在版本控制系统(VersionSet)中被标记为过时的文件。全量扫描模式会扫描数据库目录中的所有物理文件,需要遍历文件系统中的所有文件,这是一个 I/O 密集型操作。

在以下情况下,文件可能不会被正确标记为过时,但仍然需要清理:

  • 进程在 compaction/flush/ingestion 中途崩溃,留下未完成的临时文件
  • 软件版本升级或 bug 修复,导致旧版本产生的一些文件未被跟踪

全量扫描可以识别这些 “ 孤儿 “ 文件,防止长期的空间泄漏。在 DBImpl::Open 方法中,我们可以看到 RocksDB 会主动触发清理:

Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
                    const std::vector<ColumnFamilyDescriptor>& column_families,
                    std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
                    const bool seq_per_batch, const bool batch_per_txn) {
  // 省略其他代码...
  
  if (s.ok()) {
    // Persist RocksDB Options before scheduling the compaction.
    // The WriteOptionsFile() will release and lock the mutex internally.
    persist_options_status =
        impl->WriteOptionsFile(true /*db_mutex_already_held*/);
    *dbptr = impl;
    impl->opened_successfully_ = true;
    impl->DeleteObsoleteFiles();
  }
  
  // 省略其他代码...
}

确保即使是上次异常退出留下的未完成 compaction 文件也能被及时清理。

2.2.2.2 过期清理触发事件

RocksDB 在多个关键点触发过期文件清理,保证及时释放磁盘空间:

迭代器销毁时:ForwardIterator::SVCleanup

void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
                                bool background_purge_on_iterator_cleanup) {
  // 省略其他代码...
  db->FindObsoleteFiles(&job_context, false, true);
  if (job_context.HaveSomethingToDelete()) {
    db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
  }
  // 省略其他代码...
}

当迭代器被销毁时,它可能持有对某些版本的引用。一旦释放这些引用,可能导致某些 SST 文件变为过期状态。迭代器销毁时会检查并清理这些文件。如果设置了 background_purge_on_iterator_cleanup=true,清理操作会在后台执行,避免阻塞用户线程。

范围删除后:DBImpl::DeleteFilesInRanges

Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
                                   const RangePtr* ranges, size_t n,
                                   bool include_end) {
  // 省略其他代码...
  FindObsoleteFiles(&job_context, false);
  if (job_context.HaveSomethingToDelete()) {
    // Call PurgeObsoleteFiles() without holding mutex.
    PurgeObsoleteFiles(job_context);
  }
  // 省略其他代码...
}

执行范围删除(DeleteFilesInRanges)后,会有大量 SST 文件变为过期,此时会触发文件清理。

启用文件删除功能时:DBImpl::EnableFileDeletions

Status DBImpl::EnableFileDeletions(bool force) {
  // 省略其他代码...
  FindObsoleteFiles(&job_context, true);
  if (saved_counter == 0) {
    ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
    if (job_context.HaveSomethingToDelete()) {
      PurgeObsoleteFiles(job_context);
    }
  }
  // 省略其他代码...
}

RocksDB 允许暂时禁用文件删除(例如在备份或创建快照时)。当文件删除功能被重新启用时,会清理之前累积的过期文件。

不同线程同时需要暂停文件删除时,每个线程都会调用 DisableFileDeletions(),导致计数器累加。RocksDB 内部不同组件可能各自调用 DisableFileDeletions(),例如:

  • 快照创建过程
  • 备份操作进行时
  • 某些迭代器依赖特定文件时

计数器设计是有意为之,确保只有当所有禁用请求都被释放后(计数器回到 0),才会重新启用文件删除。

刷盘操作后:DBImpl::BackgroundCallFlush

void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
  // 省略其他代码...
  // If flush failed, we want to delete all temporary files that we might
  // have created. Thus, we force full scan in FindObsoleteFiles()
  FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
                            !s.IsColumnFamilyDropped());
  if (job_context.HaveSomethingToClean() || job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
    mutex_.Unlock();
    if (job_context.HaveSomethingToDelete()) {
      PurgeObsoleteFiles(job_context);
    }
  }
  // 省略其他代码...
}

当内存表刷新到磁盘形成 SST 文件后,对应的 WAL 文件可能变为过期,此时会触发清理。

Compact 操作后:DBImpl::CompactFiles

Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
                            ColumnFamilyHandle* column_family,
                            const std::vector<std::string>& input_file_names,
                            const int output_level, const int output_path_id,
                            std::vector<std::string>* const output_file_names,
                            CompactionJobInfo* compaction_job_info) {
    // If !s.ok(), this means that Compaction failed. In that case, we want
    // to delete all obsolete files we might have created and we force
    // FindObsoleteFiles(). This is because job_context does not
    // catch all created files if compaction failed.
    FindObsoleteFiles(&job_context, !s.ok());
  // delete unnecessary files if any, this is done outside the mutex
  if (job_context.HaveSomethingToClean() ||
      job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
    if (job_context.HaveSomethingToDelete()) {
      // no mutex is locked here.  No need to Unlock() and Lock() here.
      PurgeObsoleteFiles(job_context);
    }
    job_context.Clean();
  }
}

压缩是 RocksDB 中导致文件过期的主要操作。压缩会将多个小文件合并为较少的大文件,原来的小文件变为过期文件需要清理。

主动清理:DBImpl::DeleteObsoleteFiles

void DBImpl::DeleteObsoleteFiles() {
  // 省略其他代码...
  JobContext job_context(next_job_id_.fetch_add(1));
  FindObsoleteFiles(&job_context, true);
  if (job_context.HaveSomethingToDelete()) {
    bool defer_purge = immutable_db_options_.avoid_unnecessary_blocking_io;
    PurgeObsoleteFiles(job_context, defer_purge);
  }
  // 省略其他代码...
}
2.2.2.3 “周期性”全量清理

DBImpl::FindObsoleteFiles 中,有一段“周期性”执行全量扫描的代码:

if ((delete_obsolete_files_last_run_ +
     mutable_db_options_.delete_obsolete_files_period_micros) <
    now_micros) {
  doing_the_full_scan = true;
  delete_obsolete_files_last_run_ = now_micros;
}

这段代码容易产生误解,看起来像是定时任务的实现。但实际上,该参数 delete_obsolete_files_period_micros(默认 6 小时)只是用来判断已触发的 FindObsoleteFiles 调用是否应执行全量扫描。它不会自动创建调用 FindObsoleteFiles 的计时器或后台任务。如果数据库长时间没有任何过期清理触发事件,那么即使超过 delete_obsolete_files_period_micros 设置的时间(默认 6 小时),也不会自动触发全量扫描来清理过期文件。积累的过期文件会直至下一次触发事件发生,才会实际删除。

2.2.2.4 文件回收机制

RocksDB 支持 WAL 文件回收,避免频繁创建新文件:

// 如果配置了日志文件回收,且回收列表未满
if (immutable_db_options_.recycle_log_file_num > log_recycle_files_.size()) {
  log_recycle_files_.push_back(earliest.number);
} else {
  // 否则添加到待删除列表
  job_context->log_delete_files.push_back(earliest.number);
}

2.2.3 从 VersionSet 获取过期文件:GetObsoleteFiles

VersionSet::GetObsoleteFiles 负责从 VersionSet 获取过时文件列表,并根据 min_pending_output 进行过滤:

void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
                                  std::vector<ObsoleteBlobFileInfo>* blob_files,
                                  std::vector<std::string>* manifest_filenames,
                                  uint64_t min_pending_output) {
  // 确保传入的参数指针非空
  assert(files);
  assert(blob_files);
  assert(manifest_filenames);
  // 确保传入的容器为空
  assert(files->empty());
  assert(blob_files->empty());
  assert(manifest_filenames->empty());

  // 用于临时保存不能立即删除的过时SST文件
  std::vector<ObsoleteFileInfo> pending_files;
  for (auto& f : obsolete_files_) {
    // 如果文件号小于min_pending_output,表示该文件可以安全删除
    // 因为不会有正在进行的写操作使用这个文件号
    if (f.metadata->fd.GetNumber() < min_pending_output) {
      // 添加到可删除文件列表中
      files->emplace_back(std::move(f));
    } else {
      // 文件号大于或等于min_pending_output,表示可能有待处理的操作
      // 将其保留在待处理队列中
      pending_files.emplace_back(std::move(f));
    }
  }
  // 更新obsolete_files_,只保留那些暂时不能删除的文件
  obsolete_files_.swap(pending_files);

  // 处理过时的Blob文件,逻辑与处理SST文件类似
  std::vector<ObsoleteBlobFileInfo> pending_blob_files;
  for (auto& blob_file : obsolete_blob_files_) {
    // 同样判断文件号是否小于min_pending_output
    if (blob_file.GetBlobFileNumber() < min_pending_output) {
      // 可以安全删除的Blob文件
      blob_files->emplace_back(std::move(blob_file));
    } else {
      // 暂时不能删除的Blob文件
      pending_blob_files.emplace_back(std::move(blob_file));
    }
  }
  // 更新obsolete_blob_files_,只保留那些暂时不能删除的Blob文件
  obsolete_blob_files_.swap(pending_blob_files);

  // 处理过时的MANIFEST文件
  // 所有过时的MANIFEST文件都可以直接删除
  // 将obsolete_manifests_中的内容移到manifest_filenames中,并清空obsolete_manifests_
  obsolete_manifests_.swap(*manifest_filenames);
}
2.2.3.1 pending_outputs_ 机制

compaction 在开始时,会将“下一个将要分配的 file_number” 记录到 pending_outputs_ 中。

std::list<uint64_t>::iterator
DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
  // 需要记住插入的迭代器,因为在后台作业完成后,需要从 pending_output 中删除该元素。
  pending_outputs_.push_back(versions_->current_next_file_number());
  auto pending_outputs_inserted_elem = pending_outputs_.end();
  --pending_outputs_inserted_elem;
  return pending_outputs_inserted_elem;
}

机制的目的是保护所有大于等于这个 file number 的文件在该任务执行期间不会被误删。它与文件在 VersionSet 中的逻辑引用无关。即使 compaction 还在运行,并且它的 pending_outputs_ file number 的文件被物理删除,但这并不意味着 VersionSet 中的任何 Version 仍然在逻辑上引用 file number 的文件。

compaction 任务结束后会从 pending_outputs_ 释放对应的 file_number,允许 file_number 及之后的文件被清理

void DBImpl::ReleaseFileNumberFromPendingOutputs(
    std::unique_ptr<std::list<uint64_t>::iterator>& v) {
  if (v.get() != nullptr) {
    pending_outputs_.erase(*v.get());
    v.reset();
  }
}

举例场景如下:

  1. 当前的 file number 是 13(versions_->current_next_file_number())。
  2. compaction (1) 启动,此时会把 13 的 file number(准确说是“下一个将要分配的 file number”,即 versions_->current_next_file_number())加入 pending_outputs_
  3. compaction (2) 创建了 file 13。
  4. compaction (3) 消耗了 file 13,并生成了 file 15。此时 file 13 已经没有引用,被加入 VersionSet::obsolete_files_,表示它可以被删除。
  5. FindObsoleteFiles() 检查到 file 13 在 obsolete_files_ 集合中,于是将其移出 obsolete_files_,准备删除。
  6. PurgeObsoleteFiles() 尝试删除 file 13,但由于 compaction (1) 还在运行,pending_outputs_ 仍然阻挡着 file 13 的删除。此时 file 13 已经不在 obsolete_files_ 集合中,但也没有被真正删除,导致它永远不会被清理。

file 13 虽然是 compaction (2) 生成的,但它的 file number 可能早于 compaction (2) 启动时记录到 pending_outputs_ 的 file number,因为 file number 的分配是全局递增的,多个 compaction/flush 任务并发时,file number 分配顺序和任务实际完成顺序可能不同。

pending_outputs_obsolete_files_ 之间存在协作关系:一旦从 obsolete_files_ 移除(FindObsoleteFiles() 逻辑)后,但文件因为 pending_outputs_ 的存在被阻挡删除(PurgeObsoleteFiles() 逻辑),如果没有额外机制跟踪,就会出现“文件既不在 obsolete_files_,也没被删除”的死角,造成空间泄漏。 Fix deleting obsolete files# 863009b

2.2.4 删除过期文件:PurgeObsoleteFiles

PurgeObsoleteFiles 执行实际的文件删除操作,不需要持有数据库互斥锁,降低了锁持有的时间:

// 删除不属于活跃文件列表的文件,同时删除在sst_delete_files和log_delete_files中标记的文件。
// 调用此方法时不需要持有互斥锁。
void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
  // 同步点,用于测试
  TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");

  // 断言确保我们有东西要删除
  assert(state.HaveSomethingToDelete());

  // FindObsoleteFiles()应该已经填充了manifest_file_number,确保它不为0
  assert(state.manifest_file_number != 0);

  // 将活跃文件列表转换为无序集合,不需要持有互斥锁;set操作较慢
  // 这些集合用于快速查找文件是否是活跃的
  std::unordered_set<uint64_t> sst_live_set(state.sst_live.begin(),
                                          state.sst_live.end());
  std::unordered_set<uint64_t> blob_live_set(state.blob_live.begin(),
                                           state.blob_live.end());
  std::unordered_set<uint64_t> log_recycle_files_set(
      state.log_recycle_files.begin(), state.log_recycle_files.end());

  // 准备候选文件列表,这包括全扫描找到的候选文件
  auto candidate_files = state.full_scan_candidate_files;
  // 预先分配足够空间以避免频繁的内存重分配
  candidate_files.reserve(
      candidate_files.size() + state.sst_delete_files.size() +
      state.blob_delete_files.size() + state.log_delete_files.size() +
      state.manifest_delete_files.size());

  // 将要删除的SST文件添加到候选列表
  // 我们可能在生成文件名时忽略dbname
  for (auto& file : state.sst_delete_files) {
    // 如果不只是删除元数据,将文件添加到候选列表
    if (!file.only_delete_metadata) {
      candidate_files.emplace_back(
          MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
    }
    // 如果文件有table_reader_handle,释放它
    if (file.metadata->table_reader_handle) {
      table_cache_->Release(file.metadata->table_reader_handle);
    }
    // 删除文件元数据
    file.DeleteMetadata();
  }

  // 将要删除的BLOB文件添加到候选列表
  for (const auto& blob_file : state.blob_delete_files) {
    candidate_files.emplace_back(BlobFileName(blob_file.GetBlobFileNumber()),
                               blob_file.GetPath());
  }

  // 获取WAL目录
  auto wal_dir = immutable_db_options_.GetWalDir();
  // 将要删除的WAL文件添加到候选列表
  for (auto file_num : state.log_delete_files) {
    if (file_num > 0) {
      candidate_files.emplace_back(LogFileName(file_num), wal_dir);
    }
  }

  // 将要删除的manifest文件添加到候选列表
  for (const auto& filename : state.manifest_delete_files) {
    candidate_files.emplace_back(filename, dbname_);
  }

  // 对候选文件列表进行排序和去重,避免尝试删除同一个文件两次
  std::sort(candidate_files.begin(), candidate_files.end(),
            [](const JobContext::CandidateFileInfo& lhs,
               const JobContext::CandidateFileInfo& rhs) {
              // 先按文件名排序
              if (lhs.file_name < rhs.file_name) {
                return true;
              } else if (lhs.file_name > rhs.file_name) {
                return false;
              } else {
                // 如果文件名相同,按文件路径排序
                return (lhs.file_path < rhs.file_path);
              }
            });

  // 去除重复的文件条目
  candidate_files.erase(
      std::unique(candidate_files.begin(), candidate_files.end()),
      candidate_files.end());

  // 如果之前有WAL文件,记录删除日志信息
  if (state.prev_total_log_size > 0) {
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                 "[JOB %d] Try to delete WAL files size %" PRIu64
                 ", prev total WAL file size %" PRIu64
                 ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
                 state.job_id, state.size_log_to_delete,
                 state.prev_total_log_size, state.num_alive_log_files);
  }

  // 用于保存旧的info log文件
  std::vector<std::string> old_info_log_files;
  // 创建信息日志前缀
  InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
                              dbname_);

  // candidate_files中最近两个OPTIONS文件的文件编号
  // 此时,candidate_files中不能有重复的文件编号
  uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
  uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();

  // 遍历候选文件找出最近的两个OPTIONS文件
  for (const auto& candidate_file : candidate_files) {
    const std::string& fname = candidate_file.file_name;
    uint64_t number;
    FileType type;
    // 解析文件名,如果不是OPTIONS文件则跳过
    if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
        type != kOptionsFile) {
      continue;
    }
    // 更新最近的两个OPTIONS文件编号
    if (number > optsfile_num1) {
      optsfile_num2 = optsfile_num1;
      optsfile_num1 = number;
    } else if (number > optsfile_num2) {
      optsfile_num2 = number;
    }
  }

  // 在尝试删除WAL文件前先关闭它们
  for (const auto w : state.logs_to_free) {
    // TODO: 可能需要检查Close()的返回值
    auto s = w->Close();
    s.PermitUncheckedError();  // 允许未检查的错误
  }

  // 检查是否拥有表和日志文件
  bool own_files = OwnTablesAndLogs();
  // 记录要删除的文件编号
  std::unordered_set<uint64_t> files_to_del;

  // 遍历所有候选文件,决定哪些需要删除
  for (const auto& candidate_file : candidate_files) {
    const std::string& to_delete = candidate_file.file_name;
    uint64_t number;
    FileType type;
    // 如果无法识别文件,则跳过
    if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
      continue;
    }

    // 默认保留文件
    bool keep = true;
    // 根据文件类型决定是否要保留
    switch (type) {
      case kWalFile:  // WAL文件
        // 保留条件:文件编号>=log_number 或 文件编号==prev_log_number 或 在回收文件集中
        keep = ((number >= state.log_number) ||
                (number == state.prev_log_number) ||
                (log_recycle_files_set.find(number) !=
                 log_recycle_files_set.end()));
        break;
      case kDescriptorFile:  // 描述符文件(manifest)
        // 保留我的manifest文件和任何更新的版本(可能在manifest滚动期间发生)
        keep = (number >= state.manifest_file_number);
        break;
      case kTableFile:  // SST表文件
        // 如果第二个条件不存在,会导致DontDeletePendingOutputs失败
        // 保留条件:在活跃SST集中 或 文件编号>=min_pending_output
        keep = (sst_live_set.find(number) != sst_live_set.end()) ||
               number >= state.min_pending_output;
        if (!keep) {
          files_to_del.insert(number);  // 记录要删除的文件编号
        }
        break;
      case kBlobFile:  // Blob文件
        // 保留条件:文件编号>=min_pending_output 或 在活跃的blob文件集中
        keep = number >= state.min_pending_output ||
               (blob_live_set.find(number) != blob_live_set.end());
        if (!keep) {
          files_to_del.insert(number);  // 记录要删除的文件编号
        }
        break;
      case kTempFile:  // 临时文件
        // 当前正在写入的任何临时文件必须记录在pending_outputs_中,
        // 它被插入到"live"集合中。
        // 此外,SetCurrentFile在写出新的manifest时会创建一个临时文件,
        // 等于state.pending_manifest_file_number,我们不应该删除那个文件
        //
        // TODO(yhchiang): 仔细修改第三个条件以安全地移除临时options文件
        keep = (sst_live_set.find(number) != sst_live_set.end()) ||
               (blob_live_set.find(number) != blob_live_set.end()) ||
               (number == state.pending_manifest_file_number) ||
               (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
        break;
      case kInfoLogFile:  // 信息日志文件
        keep = true;  // 总是保留
        if (number != 0) {
          old_info_log_files.push_back(to_delete);  // 收集旧的日志文件
        }
        break;
      case kOptionsFile:  // 选项文件
        // 保留最近的两个OPTIONS文件
        keep = (number >= optsfile_num2);
        break;
      case kCurrentFile:   // CURRENT文件
      case kDBLockFile:    // 数据库锁文件
      case kIdentityFile:  // 身份文件
      case kMetaDatabase:  // 元数据库
        keep = true;  // 这些特殊文件总是保留
        break;
    }

    // 如果需要保留,跳到下一个文件
    if (keep) {
      continue;
    }

    // 确定要删除的文件名和要同步的目录
    std::string fname;
    std::string dir_to_sync;
    if (type == kTableFile) {  // SST文件
      // 从缓存中移除
      TableCache::Evict(table_cache_.get(), number);
      fname = MakeTableFileName(candidate_file.file_path, number);
      dir_to_sync = candidate_file.file_path;
    } else if (type == kBlobFile) {  // Blob文件
      fname = BlobFileName(candidate_file.file_path, number);
      dir_to_sync = candidate_file.file_path;
    } else {  // 其他类型文件
      // 确定同步目录
      dir_to_sync = (type == kWalFile) ? wal_dir : dbname_;
      // 构建完整文件路径,处理路径分隔符
      fname = dir_to_sync +
              ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
                       (!to_delete.empty() && to_delete.front() == '/')
                   ? ""  // 如果目录以/结尾或文件名以/开头,不添加额外的/
                   : "/") +  // 否则添加/
              to_delete;
    }

    // 对于WAL文件,如果配置了TTL或大小限制,尝试归档而不是删除
    if (type == kWalFile && (immutable_db_options_.WAL_ttl_seconds > 0 ||
                           immutable_db_options_.WAL_size_limit_MB > 0)) {
      wal_manager_.ArchiveWALFile(fname, number);
      continue;  // 已归档,不需要进一步处理
    }

    // 如果我不拥有这些文件,例如,secondary实例使用max_open_files = -1,
    // 则无需删除或安排删除这些文件,因为它们将由其所有者删除,例如primary实例
    if (!own_files) {
      continue;
    }

    // 根据schedule_only决定是安排删除还是立即删除
    if (schedule_only) {
      // 如果是安排删除,需要获取互斥锁
      InstrumentedMutexLock guard_lock(&mutex_);
      // 将文件安排到待删除队列
      SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
    } else {
      // 立即删除文件
      DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
    }
  }

  // 删除完过期文件后,从files_grabbed_for_purge_中移除它们
  {
    InstrumentedMutexLock guard_lock(&mutex_);
    autovector<uint64_t> to_be_removed;
    // 查找已删除文件的编号
    for (auto fn : files_grabbed_for_purge_) {
      if (files_to_del.count(fn) != 0) {
        to_be_removed.emplace_back(fn);
      }
    }
    // 从files_grabbed_for_purge_中移除已删除的文件
    for (auto fn : to_be_removed) {
      files_grabbed_for_purge_.erase(fn);
    }
  }

  // 删除旧的info log文件
  size_t old_info_log_file_count = old_info_log_files.size();
  if (old_info_log_file_count != 0 &&
      old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
    // 只保留配置的数量的日志文件,删除多余的
    std::sort(old_info_log_files.begin(), old_info_log_files.end());
    // 计算需要删除的文件数量
    size_t end =
        old_info_log_file_count - immutable_db_options_.keep_log_file_num;
    // 删除多余的日志文件
    for (unsigned int i = 0; i <= end; i++) {
      std::string& to_delete = old_info_log_files.at(i);
      std::string full_path_to_delete =
          (immutable_db_options_.db_log_dir.empty()
               ? dbname_  // 如果没有专门的日志目录,使用DB目录
               : immutable_db_options_.db_log_dir) +
          "/" + to_delete;
      // 记录删除信息
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "[JOB %d] Delete info log file %s\n", state.job_id,
                   full_path_to_delete.c_str());
      // 执行删除
      Status s = env_->DeleteFile(full_path_to_delete);
      if (!s.ok()) {
        // 处理删除失败的情况
        if (env_->FileExists(full_path_to_delete).IsNotFound()) {
          // 文件不存在的情况
          ROCKS_LOG_INFO(
              immutable_db_options_.info_log,
              "[JOB %d] Tried to delete non-existing info log file %s FAILED "
              "-- %s\n",
              state.job_id, to_delete.c_str(), s.ToString().c_str());
        } else {
          // 其他删除失败情况
          ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                        "[JOB %d] Delete info log file %s FAILED -- %s\n",
                        state.job_id, to_delete.c_str(),
                        s.ToString().c_str());
        }
      }
    }
  }

  // 让WAL管理器清理过期的WAL文件
  wal_manager_.PurgeObsoleteWALFiles();
  // 刷新日志
  LogFlush(immutable_db_options_.info_log);

  // 获取互斥锁并减少pending_purge_obsolete_files_计数
  InstrumentedMutexLock l(&mutex_);
  --pending_purge_obsolete_files_;
  assert(pending_purge_obsolete_files_ >= 0);

  // 如果只是安排删除,调用SchedulePurge()
  if (schedule_only) {
    // 必须在持有互斥锁的情况下从pending_purge_obsolete_files_变为bg_purge_scheduled_
    // (用于GetSortedWalFiles()等)
    SchedulePurge();
  }

  // 如果没有更多待清理的文件,通知所有等待的线程
  if (pending_purge_obsolete_files_ == 0) {
    bg_cv_.SignalAll();
  }

  // 同步点,用于测试
  TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
}
2.2.4.1 异步删除机制

为避免阻塞主线程,RocksDB 支持将文件删除操作安排到后台线程:

if (schedule_only) {
  // 安排后台删除
  InstrumentedMutexLock guard_lock(&mutex_);
  SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
} else {
  // 立即删除
  DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
}

三、不同类型文件的清理策略

RocksDB 对不同文件类型有不同的删除策略。

1 WAL (Write-Ahead Log) 文件 (kWalFile)

会被删除的 WAL 文件:

  • 文件号小于当前的 log_number
  • 不是前一个日志文件(prev_log_number
  • 不在日志回收列表中(log_recycle_files_set
keep = ((number >= state.log_number) ||
        (number == state.prev_log_number) ||
        (log_recycle_files_set.find(number) != log_recycle_files_set.end()));

特殊处理:

  • 如果设置了 WAL TTL 或大小限制,过期的 WAL 文件不会被直接删除,而是被移动到归档目录:
if (type == kWalFile && (immutable_db_options_.WAL_ttl_seconds > 0 ||
                         immutable_db_options_.WAL_size_limit_MB > 0)) {
  wal_manager_.ArchiveWALFile(fname, number);
  continue;
}

2 SST (Static Sorted Table) 文件 (kTableFile)

会被删除的 SST 文件:

  • 不在活跃文件集合中(sst_live_set
  • 文件号小于最小待处理输出号(min_pending_output
keep = (sst_live_set.find(number) != sst_live_set.end()) ||
       number >= state.min_pending_output;

删除前处理:

  • 从 TableCache 中驱逐该文件:TableCache::Evict(table_cache_.get(), number);

3 Blob 文件 (kBlobFile)

会被删除的 Blob 文件:

  • 不在活跃 Blob 文件集合中(blob_live_set
  • 文件号小于最小待处理输出号(min_pending_output
keep = number >= state.min_pending_output ||
       (blob_live_set.find(number) != blob_live_set.end());

4 清单文件 (kDescriptorFile)

会被删除的清单文件:

  • 文件号小于当前清单文件号(manifest_file_number
keep = (number >= state.manifest_file_number);

5 临时文件 (kTempFile)

会被删除的临时文件:

  • 不在活跃 SST 文件集合中
  • 不在活跃 Blob 文件集合中
  • 不是待处理的清单文件
  • 不是选项文件(不包含 kOptionsFileNamePrefix
keep = (sst_live_set.find(number) != sst_live_set.end()) ||
       (blob_live_set.find(number) != blob_live_set.end()) ||
       (number == state.pending_manifest_file_number) ||
       (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);

6 信息日志文件 (kInfoLogFile)

处理逻辑:

  • 默认所有日志文件都标记为保留(keep = true
  • 但如果日志文件总数超过配置的保留数量(keep_log_file_num),则删除最旧的日志文件
if (old_info_log_file_count != 0 &&
    old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
  std::sort(old_info_log_files.begin(), old_info_log_files.end());
  size_t end = old_info_log_file_count - immutable_db_options_.keep_log_file_num;
  // 删除最旧的文件,直到文件数等于keep_log_file_num
}

7 选项文件 (kOptionsFile)

会被删除的选项文件:

  • 不是最新的两个选项文件(optsfile_num1optsfile_num2
keep = (number >= optsfile_num2);

8 其他常驻文件类型

永不删除的文件类型:

  • 当前文件 (kCurrentFile)
  • 数据库锁文件 (kDBLockFile)
  • 标识文件 (kIdentityFile)
  • 元数据库 (kMetaDatabase)
keep = true;

四、统计指标

rocksdb.min-obsolete-sst-number-to-keep

  • 定义位置:DB::Properties::kMinObsoleteSstNumberToKeep
  • 含义:表示数据库需要保留的最小过时 SST 文件编号。低于此编号的过时 SST 文件可以安全地删除。

rocksdb.obsolete-sst-files-size

  • 定义位置:DB::Properties::kObsoleteSstFilesSize
  • 含义:表示数据库中所有过时 SST 文件的总大小(以字节为单位)。那些逻辑上不再需要的文件,但由于后台任务仍在使用,暂时还不能物理删除的文件。如果有未成功计入版本的文件(由于崩溃、异常),它们不会计入该属性中,直到系统将它们识别为 “ 垃圾文件 “ 并添加到 obsolete_files_ 列表。

五、总结

RocksDB 的过期文件清理机制依赖于正确的状态转换,当磁盘利用率因为 compaction 增长到接近 100%,系统处于资源耗尽状态时,依赖被动触发的机制可能无法正常工作。此时就需要提供主动触发清理的调用,或者重启数据库以触发正常的清理、恢复流程。

本文作者 : cyningsun
本文地址https://www.cyningsun.com/05-05-2025/rocksdb-obsolete-files.html
版权声明 :本博客所有文章除特别声明外,均采用 CC BY-NC-ND 3.0 CN 许可协议。转载请注明出处!

# 数据库