# Modified Wisckey - Version Table [BACK TO HOME](https://hackmd.io/JvhNsCKaT3uIUe0e74urrw?both) :::warning * 原定以adjecency list 實作 ![](https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcRvWNIR5QwKWLj8tLjQANT5HADWulzdYj3vLA&usqp=CAU) 預定更改為hashmap: ![](https://upload.cc/i1/2022/09/16/DEoCWj.png) ::: ## 基礎架構:LSM tree/Version Table/Value Log ![](https://upload.cc/i1/2022/09/16/w4V1qF.png) LSM不需要再另外存有關value log的資訊透過Version Table與Value Log溝通,新舊value都會在value log裡,方便刪除要刪掉key對應到的所有value以達到Security Deletion目標 ```cpp= struct Version_Table_node { uint64_t version; string value_address; size_t size; Version_Table_node = NULL; }; struct Version_Table_key { Slice key; string hotness; Version_Table_node info = NULL; } //Wisckey/include/leveldb ``` ## 資料寫入[流程已更新] **wisckey_set -> leveldb_set -> DBImpl::Put -> DB::Put -> DBImpl::Write** ### wisckey_set ![](https://upload.cc/i1/2022/09/16/8flgPt.png) ### leveldb_set Stringstream紀錄offse和size,加上&&符號方便切割,這些資訊寫到leveldb_set function string s 包含了offset和size等value Log位置資訊代替value傳到leveldb_set function ```cpp= leveldb_vset(DB *db, string key, string value) { size_t idx = 0; VTable_mutex.lock(); Version_Table_node new_info; new_info.version = 1; //not so sure new_info.value_address = value; bool key_exist_in_table = false; for(idx =0;idx<Version_Table.size();idx++) { if(Version_Table[idx].key == key) //duplicated key { key_exist_in_table = true; break; } } //idx is the index of existing key or the last index(not found) if(!key_exist_in_table) //here comes new key { table_size ++; idx = table_size; //extend version table Version_Table_key new_entry; new_entry.key = key; new_entry.hotness = (MBF.checkHot(key))?'1':'0'; new_entry.info = new_info; Version_Table.pushback(new_entry); } else //key already exist in Version Table { //Traverse Version_Table_node tmp = Version_Table[idx].info; while(tmp->next != NULL) tmp = tmp->next; //move to the node with newest value info tmp->next = new_info; } VTable_mutex.unlock(); Writebatch wb; Status s; WriteOptions wopt; size_t *table_address = Version_Table[idx]; //pointer to array s = db->Put(wopt, key, table_address); assert(s.ok()); } ``` key struct 組成的vector作為Version table 每個key帶有linked list 結構存value資訊(version_table_node) 先把value資訊紀錄的一個新的node裡,接著用traverse檢查key是否存在 * 如果是新key 在global variable的 table size+1 然後key vector pushback新的key * 否則idx會是key所在位置,將新的資訊串到此key再table上對應的linked-list最後面 db put function傳入的是(key, pointer to table address) ### DBImpl::Put ```cpp= Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } ``` ### DB::Put ```cpp= Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; /*In levelDB, getfunction is disable. So, I use this function to update head and tail. */ if(key.data()==headkey){ head = std::atoi(value.data()); }else if(key.data()==tailkey){ tail = std::atoi(value.data()); } batch.Put(key, value); return Write(opt, &batch); } ``` 寫入的key value首先被封裝到WriteBatch 按照一定格式記錄了:sequence, count, 操作類型(Put or Delete),key/value的長度及key/value本身 ```cpp= struct DBImpl::Writer { Status status; WriteBatch* batch; bool sync; bool done; port::CondVar cv; explicit Writer(port::Mutex* mu) : cv(mu) { } }; ``` DBImpl::Writer封裝了 mutex cond 等同步相關 ### DBImpl::Write ```cpp= Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { Writer w(&mutex_); w.batch = my_batch; w.sync = options.sync; w.done = false; MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; } // May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == NULL); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != NULL) { // NULL batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (updates == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); } while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // Notify new head of write queue if (!writers_.empty()) { writers_.front()->cv.Signal(); } return status; } ``` 資料被寫入到writers_直到其他thread已经幫忙完成了Writer w的寫入且搶到mutexlock並且位於writers_最前端 status = WriteBatchInternal::InsertInto(updates, mem_) 寫入memtable ## hashMap版本 ### Map<key, vector<value_info>> ![](https://upload.cc/i1/2022/10/21/uHINUa.png) ```cpp= typedef struct info { string hotness; unsigned long version; string value_address; size_t size; }; static map<string, vector<info>> Version_Table; ``` #### 資料寫入 lab2_common.h leveldb_vset ```cpp= static void leveldb_vset (DB db, string &key, string &value) { VTable_mutex. lock(); info new_info; new_info.hotness (MBF.checkHot(key))?'1':'0'; new_info.version = 1; new_info.value_address = value; //vlog addr size_t idx=0; //Idx is to record version table address vector<info> tmp; if(Version Table.find(key)== Version Table.end()) { // new key tmp.push_back(new_info); } else { //key already exist in Version Table tmp = Version Table [key]; new_info.version = tmp.size()+1; idx = tmp.size(); //vector index tmp.push_back(new_info); } Version Table [key] = tmp; VTable_mutex.unlock(); WriteBatch wb; Status s; WriteOptions wopt; //Pointer to Array info *table_address = &Version_Table[key][idx]; s = db->Put(wopt,key,table_address); assert(s.ok()); } ``` #### 資料讀取 ```cpp= static bool wisckey_vget (WK * wk, string &key, string &value, string &accvalue) { std::string tmpkey = key; //compaction do_logfileGC (wk, accvalue, key, value); key = tmpkey; cout << "\n\t\tGet Function\n\n"; cout << "Key Received: " << key << endl; //string offsetinfo; //bool found = leveldb_get(wk->leveldb, key, offsetinfo); info *tableaddr_info; //table address bool found leveldb_vget (wk->leveldb, key, tableaddr_info); if (found) { cout << "offset and Length: " <<*(tableaddr_info).value_address <<offset value size<<endl; //call by address } else { cout << "Record: Not Found" << endl; return false; } //拆解offset資訊 std::string value_offset; std::string value_length; //std::string s= offsetinfo; std::strings = *(tableaddr_info).value_address; std::string delimiter = "&&"; size_t pos = 0; std::string token; while ((pos= s.find(delimiter)) != std::string::npos) { token=s.substr(0, pos); value_offset= token; s.erase(0, pos + delimiter.length()); } value_length = s; ```