Last blog introduces the structure of leveldb, this blog will introduce how leveldb handle reading/writing and some related question.
Reading
Reading Procedure:
- check memtable (skiplist)
- check immutable memtable
- iterate sorted table in different level[0…] to find possible file
- using table index ([key => offset]) to find the suitable block via binary search
- binary search in
restart array
to find the last restart point with a key < target - linear search possible entries between two restarting point to check
Step 1 & 2:
// dp_impl.cc
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) {
// Done
Step 3:
// version_set.cc: Version::Get
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
// Get the list of files to search in this level
// in level 0, iterate all table index
if (level == 0) {
// Level-0 files may overlap each other. Find all files that
// overlap user_key and process them in order from newest to oldest.
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
//...
// in level >= 1
// Binary search to find earliest index whose largest key >= ikey.
// i.e. largest >= ikey > smallest
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
Step 4 & 5 & 6 in brief:
// try to find table by number using the mapping of [file_number => TableAndFile]
s = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue);
// table_cached.cc
Status TableCache::Get(...) {
Status s = FindTable(file_number, file_size, &handle);
// read from the abstraction of sorted table file
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, saver);
Step 5 & 6 in detail
// block.cc
virtual void Seek(const Slice& target) {
// Binary search in restart array to find the last restart point
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr = DecodeEntry(data_ + region_offset,
data_ + restarts_,
&shared, &non_shared, &value_length);
if (key_ptr == NULL || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}
// Linear search (within restart block) for first key >= target
SeekToRestartPoint(left);
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}
Write
Write Procedure
Adding to log file & memtable (both with sequence number) --> background compaction --> sorted table
Using queue to arrange multi-thread write request
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
Build batch before write
WriteBatch* updates = BuildBatchGroup(&last_writer);
Persist first, then add to memtable
(SkipList)
// db_impl.cc
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
Ref
Written with StackEdit.
评论
发表评论