LevelDB原理解析:数据的读写与合并是怎样发生的?
一、LevelDB总体架构
Memtable:Memtable可读可写,内部由SkipList实现,用于在内存中缓存写操作。
Immutable Memtable:内部同样由SkipList实现,但是只读,当Memtable的大小达到设定的阈值时,会变成 Immutable Memtable,后续由后台线程通过compaction操作将数据顺序落盘,变成sstable文件。
sstable:sstable是磁盘上的存储文件,它将key有序存放,level0层的sstable由内存中的Immutable Memtable直接持久化生成,因为没有和当前层的其他文件合并过,因此level0层的sstable里的key会发生重叠,其余层的sstable文件均由当前层和前一层的sstable文件归并而来。
Manifest:Manifest文件是sstable的索引信息,用来记录每个sstable对应的key range、文件size等信息。
Log:Log文件主要是用于机器重启而不丢失数据,当向LevelDB写入一条数据时,它首先会向Log文件顺序写入一条操作日志,然后再向内存Memtable写入数据,这样即便机器掉电,也不会出现数据丢失的情况。
Current文件:当机器重启时,LevelDB会重新生成新的Manifest文件,所以Manifest文件可能存在多个,这里会使用Current文件记录当前使用的Manifest文件。
二、写入流程
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); //调用DBImpl::Write方法 return Write(opt, &batch);}struct DBImpl::Writer { Status status; WriteBatch* batch; bool sync; bool done; port::CondVar cv; explicit Writer(port::Mutex* mu) : cv(mu) { }};
其他线程已经帮忙把Writer写入;
抢到锁并且是写队列的首节点。
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false; MutexLock l(&mutex_); writers_.push_back(&w); //任务放到queue中,如果当前不是queue的头部则等待 //当某个线程将queue中自己对应的Writer写入磁盘时,可能也会将其他线程对应的Writer写入磁盘 while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; }
Status DBImpl::MakeRoomForWrite(bool force) { //通过改变指针指向,将Memtable转换成Immutable imm_ = mem_; has_imm_.store(true, std::memory_order_release); //生成新的Memtable mem_ = new MemTable(internal_comparator_); mem_->Ref(); //触发compaction MaybeScheduleCompaction();}
//从队列中批量取任务 WriteBatch* write_batch = BuildBatchGroup(&last_writer);//将任务写入Log文件status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));//将任务写入Memtablestatus = WriteBatchInternal::InsertInto(write_batch, mem_);
//last_writer指向writers_里合并的后一个Writer //逐个遍历弹出writers_里的元素,并唤醒对应的正在等待的写线程,直到遇到last_writer while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // 唤醒队列未写入的个Writer if (!writers_.empty()) { writers_.front()->cv.Signal(); }三、读取流程
{ mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); //先查找memtable if (mem->Get(lkey, value, &s)) { //再查找immutable memtable } else if (imm != nullptr && imm->Get(lkey, value, &s)) { } else { //查找sstable s = current->Get(options, lkey, value, &stats); have_stat_update = true; } mutex_.Lock(); }
四、Compaction流程
将内存中的数据持久化到磁盘;
清理冗余数据,因为LevelDB的更新和删除操作具有延后性,两种操作实际上都会向LevelDB写入一条新记录,所以通过重新compaction整理数据,可以清理冗余数据,节省磁盘空间;
通过compaction使level 0以下的文件层中的数据保持有序,这样便可以通过二分进行数据查找,同时也可以减少待查找的文件数量,提升读效率。
1. minor compaction
将Immutable memtable落盘成SSTable文件
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) { //生成sstable编号,用于构建文件名 FileMetaData meta; meta.number = versions_->NewFileNumber(); Status s; { mutex_.Unlock(); //更新memtable中的全部数据到xxx.ldb文件 //meta记录key range, file_size等sst信息 s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); mutex_.Lock(); } int level = ; if (s.ok() && meta.file_size > ) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); if (base != nullptr) { //为新生成的sstable选择合适的level(不一定总是0) level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } //level及file meta记录到edit edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest); }}
//记录edit信息versions_->LogAndApply(&edit, &mutex_);//释放imm_空间imm_->Unref();imm_ = nullptr;has_imm_.Release_Store(nullptr);//清理无用文件DeleteObsoleteFiles();2. major compation
level 0层:sstable文件个数超过指定个数。因为level0是从Immutable直接转储而来,所以用个数限制而不是文件大小。
level i层:第i层的sstable size总大小超过(10^i) MB。level越大,说明数据越冷,读取的几率越小,因此对于level更大的层,给定的size阈值更大,从而减少comaction次数。
对于sstable文件还有seek次数限制,如果文件多次seek但是一直没有查找到数据,那么就应该被合并了,否则会浪费更多的seek。
选择合适的level及sstable文件用于合并
level 0层的score计算规则为:文件数 / 4;
level i层的计算规则为:整个level所有的file size总和/(10^i)。
void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction int best_level = -1; double best_score = -1 for (int level = ; level < config::kNumLevels - 1; level++) { double score; //对于level 0使用文件数/4计算score if (level == ) { score = v->files_[level].size() / static_cast<double>(config::kL0_CompactionTrigger); } else { //对于非0层,使用该层文件的总大小 // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level); } if (score > best_score) { best_level = level; best_score = score; } } //使用compaction_level_记录需要合并的层,使用compaction_score_记录合并分数 v->compaction_level_ = best_level; v->compaction_score_ = best_score;}
bool Version::UpdateStats(const GetStats& stats) { FileMetaData* f = stats.seek_file; if (f != nullptr) { f->allowed_seeks--; if (f->allowed_seeks <= && file_to_compact_ == nullptr) { file_to_compact_ = f; file_to_compact_level_ = stats.seek_file_level; return true; } } return false;}
根据key重叠情况扩大输入文件集合
多路合并
Iterator* VersionSet::MakeInputIterator(Compaction* c) { const int space = (c->level() == ? c->inputs_[].size() + 1 : 2); // list存储所有Iterator Iterator** list = new Iterator*[space]; int num = ; for (int which = ; which < 2; which++) { if (!c->inputs_[which].empty()) { //第0层 if (c->level() + which == ) { const std::vector<FileMetaData*>& files = c->inputs_[which]; // Iterator* Table::NewIterator for (size_t i = ; i < files.size(); i++) { list[num++] = table_cache_->NewIterator( options, files[i]->number, files[i]->file_size); } } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( // 遍历文件列表的iterator new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]), &GetFileIterator, table_cache_, options); } } }参考链接:
相关文章