一、引言
在 RocksDB 的核心机制中,Flush 操作扮演着至关重要的角色,它是连接内存数据结构 (Memtable) 和持久化存储 (SST 文件) 的桥梁。该机制不仅影响写入性能和内存使用效率,更直接关系到数据安全性和系统恢复速度。
本文将基于 RocksDB v8.8.1 详细介绍在未启用 atomic_flush
的情况下,深入解析 RocksDB 的 Flush 机制,包括触发条件、执行策略、相关配置,以及与之密切相关的 WAL(预写式日志)管理和恢复机制。不妨带着以下问题,来详细深入了解下具体的实现细节:
- 为什么集群滚动升级会 Flush 生成很多 SST 文件,进而触发 compaction?
- 哪些情况会触发 Memtable Flush?
- 当 Memtable Flush 时,会选中哪些 Memtable?
- 多个列族 (Column Family) 会 Flush 到同一个 SST 文件么?
- 不活跃的列族会自动 Flush 么?会有什么影响?
- Flush 完毕之后,WAL 是怎么处理的?WAL 什么时候会归档?什么时候会删除?
- RocksDB 重启的时候,怎么确定从哪个 WAL 文件的哪个位置开始读取数据,恢复 Memtable?
- 如何加速恢复,降低恢复所需要的时长?几种恢复模式在数据丢失量和恢复速度上有何异同?
- 如果因数据同步需要调大 WAL 的保留时间,会增加异常重启恢复时间么?
二、基础概念
2.1 Memtable 的生命周期
Memtable 是 RocksDB 的内存数据结构,用于存储最近写入的数据。它具有以下特点:
写入流程:当用户写入数据时,数据首先被写入预写式日志 (WAL) 用于崩溃恢复,然后被插入当前活跃的 Memtable。
状态转换:Memtable 有三种状态:
- **活跃 (Active)**:接收新的写入请求
- **不可变 (Immutable)**:不再接收新写入,等待刷新到存储
- **已刷新 (Flushed)**:数据已持久化到 SST 文件,Memtable 可以被销毁
切换机制:当活跃 Memtable 达到一定大小 (
write_buffer_size
) 后,会被标记为不可变,并创建新的 Memtable 接收后续写入。
Memtable 的实现通常基于跳表 (SkipList) 数据结构,保证了高效的随机写入和有序遍历能力。
2.2 写入缓冲区机制
RocksDB 的写入缓冲区实现了高效的内存管理策略:
- 单 CF 写入缓冲区:每个列族 (Column Family) 配置有自己的
write_buffer_size
,控制单个 Memtable 的大小。 - 全局写入缓冲区:通过
db_write_buffer_size
参数限制所有列族的 Memtable 总内存占用。 - Memtable 数量控制:
max_write_buffer_number
:每个 CF 允许的最大 Memtable 数量min_write_buffer_number_to_merge
:刷新前合并的最小 Memtable 数量
当一个 Memtable 被标记为不可变后,RocksDB 会调度后台线程执行 Flush 操作,将其数据持久化到 SST 文件中。
2.3 Flush 与 WAL 的关系
Flush 操作与 WAL(Write-Ahead Log) 密切相关:
- 数据安全保障:WAL 记录所有写操作,确保即使在内存数据 (Memtable) 丢失的情况下也能恢复数据。
- 日志回收机制:只有当 WAL 中的所有数据都已通过 Flush 持久化到 SST 文件后,该 WAL 文件才可以被归档或删除。
- WAL 文件限制:
max_total_wal_size
参数控制 WAL 文件的总大小,超过限制会触发 Flush 以减小 WAL 占用。
2.4 相关配置
RocksDB 提供了多种参数用于配置 Memtable 和 Flush 行为:
// DBOptions(数据库级别选项)
struct DBOptions {
// ... 其他选项 ...
// 总写入缓存大小。所有列族共享的写缓冲区总大小 (字节)
// 所有列族共享的写入缓存(MemTable)的总大小。当所有 MemTable 的总大小超过这个值时,RocksDB 会触发一个列族的刷新操作,通常是最大的 MemTable 所在的列族
// 控制 RocksDB 实例的整体内存使用量。更大的值可以提高写入吞吐量,但会增加内存占用
size_t db_write_buffer_size = 0;
// 最大后台刷新线程数。用于执行刷新操作的后台线程的最大数量
// 控制刷新操作的并发度。增加此值可以提高刷新吞吐量,尤其是在有多个列族的情况下,但也可能增加资源竞争
int max_background_flushes = 1;
// 是否避免不必要的阻塞 I/O。如果设置为 true,则工作线程可能会避免执行不必要的、长时间的 I/O 操作(例如直接删除过时的文件或删除 MemTable),而是安排一个后台任务来执行
// 提高延迟敏感型应用的性能,将潜在的阻塞操作卸载到后台线程
bool avoid_unnecessary_blocking_io = true;
// 是否原子刷新,如果设置为 true,RocksDB 支持原子地刷新多个列族,并将它们的结果原子地提交到 MANIFEST 文件
// 确保跨多个列族的数据一致性。如果某些列族的数据写入没有受到 WAL 保护,这个选项就很有用
bool atomic_flush = false;
// 是否手动刷新 WAL。如果设置为 true,则在每次写入后不会自动刷新 WAL(Write-Ahead Log)
// 禁用自动 WAL 刷新,需要手动调用 `SyncWAL()` 来刷新 WAL。这可以提高写入性能,但会增加数据丢失的风险
bool manual_wal_flush = false;
// 活跃的 WAL 文件总大小的最大值 (字节)。当总大小超过此值时,RocksDB 将开始刷新列族以减小活跃的 WAL 大小
// 实时控制活跃 WAL 文件的总大小,超过限制时强制刷新 Memtable 以减少 WAL 依赖
uint64_t max_total_wal_size = 0;
// 不活跃的 WAL 文件总大小。以下两个字段影响归档 WAL 删除方式,防止历史 WAL 文件占用过多磁盘空间
// 如果均为 0,则 WAL 立刻删除不会归档
// 如果 WAL_ttl_seconds 为 0,且 WAL_size_limit_MB 不为 0,则每十分钟检查一次,删除超过大小限制的 WAL,从最旧的 WAL 开始
// 如果 WAL_ttl_seconds 不为 0,且 WAL_size_limit_MB 为 0,则每 WAL_ttl_seconds / 2 检查一次,删除超过时间限制的 WAL
// 如果两者均不为 0,则每十分钟检查一次,先检查时间限制,再检查大小限制
uint64_t WAL_ttl_seconds = 0;
uint64_t WAL_size_limit_MB = 0;
// 用户定义的事件监听器列表
// 监听器可以接收刷新开始和刷新完成事件的通知,允许用户监控和响应刷新活动
std::vector<std::shared_ptr<EventListener>> listeners;
// ... 其他选项 ...
};
// ColumnFamilyOptions(列族级别选项)
struct ColumnFamilyOptions {
// ... 其他选项 ...
// 每个 MemTable 的大小 (字节)。一旦 MemTable 达到此大小,它将被标记为不可变,并触发刷新
// 控制每个列族的内存使用量和刷新频率。更大的值会降低刷新频率,但会增加内存使用量
size_t write_buffer_size = 64 * 1024 * 1024;
// 内存中 MemTable 的最大数量。在阻止写入之前,内存中要保留的最大 MemTable 数量
// 限制未刷新的 MemTable 的数量。达到此限制时,写入将被暂停,直到刷新完成
int max_write_buffer_number = 2;
// 刷新前要合并的最小 MemTable 数量。在刷新到存储之前要合并的最小 MemTable 数量
// 控制刷新期间合并到单个 SST 文件中的 MemTable 数量。更大的值可以减少 SST 文件的数量,但可能会增加刷新延迟
int min_write_buffer_number_to_merge = 1;
// 刷新时是否验证 MemTable 计数。验证 MemTable 中的条目数是否与刷新期间读取的条目数匹配
// 启用刷新期间 MemTable 计数的验证
bool flush_verify_memtable_count = false;
// 实验性 MemPurge 阈值。触发 MemPurge 的阈值
// 如果设置为 >0.0,则所有自动刷新操作将首先通过 MemPurge 过程
double experimental_mempurge_threshold = 0.0;
// ... 其他选项 ...
};
// FlushOptions(传递给Flush API调用的选项)
struct FlushOptions {
// 是否等待刷新完成。如果为 true,则刷新操作将阻塞,直到完成。如果为 false,则刷新是异步的
// 确定 `Flush()` 调用是同步还是异步
bool wait = true;
// 是否允许刷新导致写入暂停。如果为 true,即使这意味着写入将在刷新期间暂停,刷新操作也会立即进行
// 允许刷新继续进行,即使它会导致写入暂停
bool allow_write_stall = false;
};
三、Flush 触发机制
RocksDB 中的 Flush 操作由多种条件触发,可分为自动触发、手动触发和系统状态变更触发三类。
3.1 自动触发条件
3.1.1 单个 Memtable 大小达到阈值
当单个 Memtable 的大小达到 write_buffer_size
配置值时,会触发 Flush:
// memtable.cc
bool MemTable::ShouldFlushNow() {
// 省略代码...
// if user keeps adding entries that exceeds write_buffer_size, we need to
// flush earlier even though we still have much available memory left.
if (allocated_memory >
write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
return true;
}
// 省略代码...
}
当 Memtable 达到阈值后,系统会将其标记为不可变,并创建新的 Memtable 接收后续写入,同时安排后台任务执行实际的 Flush 操作。
3.1.2 总写入缓冲区大小超限
当所有 Memtable 的总大小超过 db_write_buffer_size
时,会触发 Flush 操作:
// db_impl_write.cc
Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
// 省略代码...
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty() && !cfd->imm()->IsFlushPendingOrRunning()) {
// We only consider flush on CFs with bytes in the mutable memtable,
// and no immutable memtables for which flush has yet to finish. If
// we triggered flush on CFs already trying to flush, we would risk
// creating too many immutable memtables leading to write stalls.
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
}
}
// 省略代码...
}
RocksDB 会 Flush 序号最小的 Memtable。
3.1.3 WAL 文件大小超过限制
当 WAL 文件的总大小超过 max_total_wal_size
时,RocksDB 会触发 Flush 以减小 WAL 体积:
// db_impl_write.cc
Status DBImpl::SwitchWAL(WriteContext* write_context) {
// 省略代码...
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
cfds.push_back(cfd);
}
}
MaybeFlushStatsCF(&cfds);
// 省略代码...
}
RocksDB 会 Flush 与最旧 WAL 文件关联的 Memtable 以释放 WAL 空间。
3.2 手动触发情况
3.2.1 用户显式调用 Flush API
用户可以通过调用 DB::Flush()
方法手动触发 Flush 操作:
// 手动触发Flush的示例
FlushOptions flush_options;
flush_options.wait = true; // 等待Flush完成
db->Flush(flush_options); // 触发所有列族的Flush
// 或者
db->Flush(flush_options, handles[1]); // 只Flush特定列族
手动 Flush 在需要确保数据持久化或准备备份时非常有用。
3.2.2 外部文件导入前的 Flush
当使用 IngestExternalFile()
导入外部 SST 文件时,RocksDB 需要确保 MemTable 和摄取的外部文件之间没有重叠的键范围。 刷新 MemTable 会创建一个新的 SST 文件,然后可以将其与外部文件一起原子地添加到数据库中:
// db_impl.cc
Status DBImpl::IngestExternalFile(
ColumnFamilyHandle* column_family,
const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) {
// 省略代码...
if (status.ok() && at_least_one_cf_need_flush) {
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
status = AtomicFlushMemTables(
flush_opts, FlushReason::kExternalFileIngestion,
{} /* provided_candidate_cfds */, true /* entered_write_thread */);
mutex_.Lock();
} else {
for (size_t i = 0; i != num_cfs; ++i) {
if (need_flush[i]) {
mutex_.Unlock();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
->cfd();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* entered_write_thread */);
mutex_.Lock();
if (!status.ok()) {
break;
}
}
}
}
}
// 省略代码...
}
3.2.3 手动压缩前的 Flush
确保要压缩的数据都持久化到了 SST 文件:
// db_impl_compaction_flush.cc
Status DBImpl::CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin_without_ts,
const Slice* end_without_ts) {
// 省略代码...
bool flush_needed = true;
// ...
if (s.ok() && flush_needed) {
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables(fo, FlushReason::kManualCompaction);
} else {
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction);
}
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
return s;
}
}
// 省略代码...
}
3.2.4 修剪键空间前的 Flush
当用户调用 DB::ClipColumnFamily
API ,主动触发对指定 Column Family 的数据裁剪操作。操作会将 Column Family 中指定 Key 范围之外的数据物理删除。在删除文件之前,务必确保这些文件可能引用的任何数据都已安全地持久保存在其他位置。 刷新 MemTable 可确保将任何最近的写入都写入新的 SST 文件,因此可以安全地删除旧文件而不会丢失数据。
// db_impl.cc
Status DBImpl::ClipColumnFamily(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
// 省略代码...
// Flush memtable
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
if (immutable_db_options_.atomic_flush) {
status = AtomicFlushMemTables(flush_opts, FlushReason::kDeleteFiles,
{} /* provided_candidate_cfds */,
false /* entered_write_thread */);
} else {
status = FlushMemTable(cfd, flush_opts, FlushReason::kDeleteFiles,
false /* entered_write_thread */);
}
// 省略代码...
}
3.3 系统状态变更触发
3.3.1 数据库打开时
当 avoid_flush_during_recovery
设置为 false
时,虽然 RocksDB 不执行传统的 memtable flush 操作,仍然会将 WAL 中的数据即时刷新到 SST 文件。确保了即使在大量 WAL 数据情况下,恢复过程也能保持可控的内存使用。
// db_impl_open.cc
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_wal_found,
RecoveryContext* recovery_ctx) {
// 省略代码...
// flush the final memtable (if non-empty)
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
// If flush happened in the middle of recovery (e.g. due to memtable
// being full), we flush at the end. Otherwise we'll need to record
// where we were on last flush, which make the logic complicated.
if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
if (!status.ok()) {
// Recovery failed
break;
}
flushed = true;
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
versions_->LastSequence());
}
data_seen = true;
}
// 省略代码...
}
3.3.2 数据库关闭时
当数据库正常关闭时,会执行 Flush 以确保所有内存数据持久化(除非设置了 avoid_flush_during_shutdown = true
):
// db_impl.cc
Status DBImpl::Close() {
// 省略代码...
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
s = DBImpl::FlushAllColumnFamilies(FlushOptions(), FlushReason::kShutDown);
s.PermitUncheckedError();
}
// 省略代码...
}
3.3.3 错误恢复过程中
在错误恢复过程中,可能需要 Flush 以确保数据一致性:
// error_handler.cc
Status ErrorHandler::RecoverFromBGError(bool is_manual) {
// 省略代码...
if (context.flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
s = RetryFlushesForErrorRecovery(FlushReason::kErrorRecoveryRetryFlush,
true /* wait */);
} else {
// We cannot guarantee consistency of the WAL. So force flush Memtables of
// all the column families
FlushOptions flush_opts;
// We allow flush to stall write since we are trying to resume from error.
flush_opts.allow_write_stall = true;
s = FlushAllColumnFamilies(flush_opts, context.flush_reason);
}
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"DB resume requested but failed due to Flush failure [%s]",
s.ToString().c_str());
}
// 省略代码...
}
3.3.4 创建备份/快照时
当调用 GetLiveFiles()
并指定 flush_memtable=true
时,会触发 Flush 以确保返回完整的文件列表,常用于创建备份或快照:
// db_filesnapshot.cc
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
bool flush_memtable) {
// 省略代码...
if (flush_memtable) {
Status status = FlushForGetLiveFiles();
if (!status.ok()) {
mutex_.Unlock();
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n",
status.ToString().c_str());
return status;
}
}
// 省略代码...
}
四、Flush 执行策略
除了主动刷新时选择特定的列族,以及特定列族的 Immutable Memtable 总数达到 min_write_buffer_number_to_merge
触发被动 Flush,在 Non-Atomic Flush 模式下 RocksDB 需要决定哪些 Memtable 应该被 Flush。选择策略会根据触发 Flush 的原因不同而变化。
4.1 基于 Memtable 时间的选择策略
当总 Memtable 内存占用过高时,选择策略倾向于选择创建序列号最小(即最老)的,有数据且没有正在刷盘的 Memtable。:
// db_impl_write.cc
Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
// 省略代码...
autovector<ColumnFamilyData*> cfds;
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty() && !cfd->imm()->IsFlushPendingOrRunning()) {
// We only consider flush on CFs with bytes in the mutable memtable,
// and no immutable memtables for which flush has yet to finish. If
// we triggered flush on CFs already trying to flush, we would risk
// creating too many immutable memtables leading to write stalls.
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
}
if (cfd_picked != nullptr) {
cfds.push_back(cfd_picked);
}
MaybeFlushStatsCF(&cfds);
// 省略代码...
}
4.2 基于 WAL 时间的选择策略
当 WAL 文件大小超过限制时,选择与最旧 WAL 关联的 CF 的所有 Memtable 进行 Flush:
// db_impl_write.cc
Status DBImpl::SwitchWAL(WriteContext* write_context) {
// 省略代码...
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
cfds.push_back(cfd);
}
}
MaybeFlushStatsCF(&cfds);
// 省略代码...
}
4.3 总结
因为同一 CF 内的较新 Memtable 也会被连带 Flush,两者刷新的 Memtable 的类型几乎一样,被刷新的 Memtable 一定包含当前最旧未刷新的 Memtable,但会包含较新的 Memtable。
五、WAL 恢复机制
从上面的触发可知,RocksDB 不会专门针对不活跃的列族进行自动 Flush。除了额外的内存占用(不活跃列族的数据会在 Memtable 中保留,直到触发 Flush)之外,还会导致 WAL 文件内的数据累积,影响恢复时读取的数据量和时长
5.1 WAL 恢复过程
5.1.1 恢复原理概述
RocksDB 的崩溃恢复流程:
- 读取 MANIFEST:确定数据库状态、SST 文件列表和列族信息。
- 确定恢复点:确定需要回放的 WAL 文件及起始点。
- 回放 WAL:重新执行 WAL 中记录的写操作,重建内存状态。
- 执行恢复后 Flush:可选地执行 Flush 以持久化恢复的数据。
5.1.2 确定需要读取的 WAL 起始文件
在 RecoverLogFiles 中,起始 WAL 文件的确定流程如下:
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_wal_found,
RecoveryContext* recovery_ctx) {
// 省略代码...
// 从 VersionSet 中获取需要保留的最小 WAL 编号
uint64_t min_wal_number = MinLogNumberToKeep();
if (!allow_2pc()) {
// 计算包含未刷盘数据的最小 WAL 编号
min_wal_number = std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
}
// 遍历所有WAL文件,跳过比最小保留编号还小的WAL文件
for (auto wal_number : wal_numbers) {
if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Skipping log #%" PRIu64
" since it is older than min log to keep #%" PRIu64,
wal_number, min_wal_number);
continue;
}
// 处理 WAL 文件...
}
// 省略代码...
}
// Returns the minimum log number which still has data not flushed to any SST
// file, except data from `cfd_to_skip`.
uint64_t PreComputeMinLogNumberWithUnflushedData(
const ColumnFamilyData* cfd_to_skip) const {
uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
for (auto cfd : *column_family_set_) {
if (cfd == cfd_to_skip) {
continue;
}
// It's safe to ignore dropped column families here:
// cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
min_log_num = cfd->GetLogNumber();
}
}
return min_log_num;
}
5.1.3 确定起始的 Record
对于每个需要处理的 WAL 文件,从文件头开始顺序读取所有 Record:
// 创建日志读取器,从文件开头开始读取
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, wal_number);
// 从头开始读取所有记录
std::string scratch;
Slice record;
while (reader.ReadRecord(&record, &scratch,
immutable_db_options_.wal_recovery_mode,
&record_checksum) && status.ok()) {
// 处理每条记录...
}
不是从某个特定位置开始,而是完整读取整个 WAL 文件的所有记录。
5.1.4 确定写入 CF Memtable 的 Record
该过程通过 WriteBatchInternal::InsertInto
和 MemTableInserter
类来完成:
5.1.4.1 解析 WriteBatch
// 将WAL记录解析为WriteBatch
WriteBatch batch;
status = WriteBatchInternal::SetContents(&batch, record);
// 应用批处理到memtable
status = WriteBatchInternal::InsertInto(
batch_to_use, column_family_memtables_.get(), &flush_scheduler_,
&trim_history_scheduler_, true, wal_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
5.1.4.2 按列族过滤和应用
在 MemTableInserter::SeekToColumnFamily
中进行过滤:
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
// 查找对应的列族
bool found = cf_mems_->Seek(column_family_id);
if (!found) {
if (ignore_missing_column_families_) {
*s = Status::OK();
} else {
*s = Status::InvalidArgument("Invalid column family specified in write batch");
}
return false;
}
// 检查是否需要跳过此记录(恢复模式下的关键逻辑)
if (recovering_log_number_ != 0 &&
recovering_log_number_ < cf_mems_->GetLogNumber()) {
// 如果恢复的日志编号小于列族的当前日志编号,
// 说明列族已经包含了来自此日志的更新,跳过以避免重复应用
*s = Status::OK();
return false;
}
return true;
}
5.1.4.3 写入 Memtable
通过 MemTableInserter::PutCF
、DeleteCF
等方法将数据写入对应列族的 memtable:
Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) {
// 检查列族是否存在和有效
if (!SeekToColumnFamily(column_family_id, &ret_status)) {
return ret_status;
}
// 获取目标memtable
MemTable* mem = cf_mems_->GetMemTable();
// 将数据添加到memtable
ret_status = mem->Add(sequence_, value_type, key, value, kv_prot_info,
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
return ret_status;
}
5.1.5 总结
- WAL 起始文件:基于各列族的
log_number_
和系统的min_log_number_to_keep_
确定 - 起始 Record:每个 WAL 文件都从头开始完整读取
- Record 过滤:
- 根据
WriteBatch
中的column_family_id
找到对应列族 - 检查列族的
log_number_
避免重复应用已处理的数据 - 只有通过过滤的 Record 才会被应用到对应列族的 memtable
- 根据
如果因数据同步需要调大 WAL 的保留时间,可以通过调大 WAL_ttl_seconds
或者 WAL_size_limit_MB
,并且保持 max_total_wal_size
不变实现,此时并不会影响恢复速度
5.2 不同恢复模式比较
RocksDB 提供了四种 WAL 恢复模式,在数据丢失量和恢复速度之间做出不同的权衡:
5.2.1 kTolerateCorruptedTailRecords
// 原始的LevelDB恢复模式
// 我们容忍WAL文件末尾的损坏记录
// 能够恢复大部分仍然可读的数据
WALRecoveryMode::kTolerateCorruptedTailRecords
特点:
- 数据丢失量:文件尾部损坏的记录会丢失
- 恢复速度:中等
- 适用场景:对部分数据丢失可接受,但要尽量恢复的场景
5.2.2 kAbsoluteConsistency
// 如果发现任何损坏记录,恢复会失败
// 确保数据的绝对一致性
WALRecoveryMode::kAbsoluteConsistency
特点:
- 数据丢失量:零容忍,有任何损坏就会恢复失败
- 恢复速度:较慢,需要验证所有记录
- 适用场景:金融等要求数据完全准确的场景
5.2.3 kPointInTimeRecovery
// 恢复到损坏记录之前的最后一个完整记录
// 确保数据一致性但可能丢失最近的写入
WALRecoveryMode::kPointInTimeRecovery
特点:
- 数据丢失量:损坏点之后的所有数据
- 恢复速度:较快,发现损坏立即停止
- 适用场景:需要一致性视图且接受部分数据丢失的场景
5.2.4 kSkipAnyCorruptedRecords
// 跳过所有损坏的记录但继续处理
// 可能导致数据不一致但恢复速度最快
WALRecoveryMode::kSkipAnyCorruptedRecords
特点:
- 数据丢失量:仅损坏的记录
- 恢复速度:最快,不会因损坏而停止
- 适用场景:恢复速度优先,可容忍潜在的不一致性
5.3 恢复速度优化
恢复速度与 Flush 策略紧密相关,优化方法包括:
**设置
avoid_flush_during_recovery
**:Options options; options.avoid_flush_during_recovery = true;
此选项避免在恢复期间进行额外的 Flush,减少 I/O 开销。
设置并行 WAL 恢复:
Options options; options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; options.max_background_jobs = 8; // 增加并行恢复线程
优化 WAL 文件数量:
合理设置max_total_wal_size
并经常触发 Flush,减少崩溃时需要回放的 WAL 数量。统计更新优化
跳过 DB 打开时的统计信息更新可加快启动速度:
Options options;
// 不更新用于优化压缩决策的统计信息
options.skip_stats_update_on_db_open = true;
- 文件检查优化
跳过检查 SST 文件大小可加快数据库打开:
Options options;
// 跳过在DB打开时获取和检查所有SST文件的大小
options.skip_checking_sst_file_sizes_on_db_open = true;
当使用非默认 Env 且获取文件大小开销较大时,这一优化尤为有效。
实现不活跃 CF 的定期 Flush 机制:
// 实现定时任务,定期执行Flush void PeriodicFlushTask() { FlushOptions fopts; for (auto cf_handle : inactive_cf_handles) { if (TimeExceeds(last_flush_time[cf_handle], max_idle_time)) { db->Flush(fopts, cf_handle); } } }
六、Flush 与 SST 文件
6.1 列族与 SST 文件的关系
多 CF 是否会 flush 到同一个 SST ?从代码实现上看,FlushJob 总是针对单个 CF 创建并运行的:
FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
FlushReason flush_reason, LogBuffer* log_buffer,
FSDirectory* db_directory, FSDirectory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const SeqnoToTimeMapping& seq_time_mapping,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "",
BlobFileCompletionCallback* blob_callback = nullptr);
多个 CF 不会 Flush 到同一个 SST 文件的原因包括:
- 数据隔离:每个 CF 可能有不同的压缩选项、比较器等,需要独立存储。
- 独立生命周期:每个 CF 可以独立删除或修改,分开存储便于管理。
- 性能考虑:分开存储可以并行处理不同 CF 的数据访问和压缩。
6.2 SST 文件的组织与管理
SST 文件通常以数字作为文件名,表示文件编号:
// 生成 SST 文件名的函数
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
file_name = TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
std::string MakeTableFileName(const std::string& path, uint64_t number) {
// static const std::string kRocksDbTFileExt = "sst";
return MakeFileName(path, number, kRocksDbTFileExt.c_str());
}
static std::string MakeFileName(uint64_t number, const char* suffix) {
char buf[100];
snprintf(buf, sizeof(buf), "%06llu.%s",
static_cast<unsigned long long>(number), suffix);
return buf;
}
文件编号通过 VersionSet
类中的原子计数器 next_file_number_
生成的全局递增序列,确保了所有文件的唯一性标识,并通过 MANIFEST 文件持久化,在数据库重启时能够正确恢复。文件编号在整个数据库实例中全局唯一,不同的列族共享同一个计数器
// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
七、总结
RocksDB 的 Flush 机制直接影响写入性能、内存占用和重启恢复速度。合理配置参数、关注不活跃列族和活跃的 WAL 管理,能有效提升系统整体表现和可用性。
本文作者 : cyningsun
本文地址 : https://www.cyningsun.com/05-30-2025/rocksdb-memtable-flush.html
版权声明 :本博客所有文章除特别声明外,均采用 CC BY-NC-ND 3.0 CN 许可协议。转载请注明出处!