# Modified Wisckey - Version Table
[BACK TO HOME](https://hackmd.io/JvhNsCKaT3uIUe0e74urrw?both)
:::warning
* 原定以adjecency list 實作

預定更改為hashmap:

:::
## 基礎架構:LSM tree/Version Table/Value Log

LSM不需要再另外存有關value log的資訊透過Version Table與Value Log溝通,新舊value都會在value log裡,方便刪除要刪掉key對應到的所有value以達到Security Deletion目標
```cpp=
struct Version_Table_node
{
uint64_t version;
string value_address;
size_t size;
Version_Table_node = NULL;
};
struct Version_Table_key
{
Slice key;
string hotness;
Version_Table_node info = NULL;
}
//Wisckey/include/leveldb
```
## 資料寫入[流程已更新]
**wisckey_set -> leveldb_set -> DBImpl::Put -> DB::Put -> DBImpl::Write**
### wisckey_set

### leveldb_set
Stringstream紀錄offse和size,加上&&符號方便切割,這些資訊寫到leveldb_set function
string s 包含了offset和size等value Log位置資訊代替value傳到leveldb_set function
```cpp=
leveldb_vset(DB *db, string key, string value)
{
size_t idx = 0;
VTable_mutex.lock();
Version_Table_node new_info;
new_info.version = 1; //not so sure
new_info.value_address = value;
bool key_exist_in_table = false;
for(idx =0;idx<Version_Table.size();idx++)
{
if(Version_Table[idx].key == key) //duplicated key
{
key_exist_in_table = true;
break;
}
}
//idx is the index of existing key or the last index(not found)
if(!key_exist_in_table) //here comes new key
{
table_size ++;
idx = table_size; //extend version table
Version_Table_key new_entry;
new_entry.key = key;
new_entry.hotness = (MBF.checkHot(key))?'1':'0';
new_entry.info = new_info;
Version_Table.pushback(new_entry);
}
else //key already exist in Version Table
{
//Traverse
Version_Table_node tmp = Version_Table[idx].info;
while(tmp->next != NULL) tmp = tmp->next; //move to the node with newest value info
tmp->next = new_info;
}
VTable_mutex.unlock();
Writebatch wb;
Status s;
WriteOptions wopt;
size_t *table_address = Version_Table[idx]; //pointer to array
s = db->Put(wopt, key, table_address);
assert(s.ok());
}
```
key struct 組成的vector作為Version table
每個key帶有linked list 結構存value資訊(version_table_node)
先把value資訊紀錄的一個新的node裡,接著用traverse檢查key是否存在
* 如果是新key 在global variable的 table size+1 然後key vector pushback新的key
* 否則idx會是key所在位置,將新的資訊串到此key再table上對應的linked-list最後面
db put function傳入的是(key, pointer to table address)
### DBImpl::Put
```cpp=
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
```
### DB::Put
```cpp=
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
/*In levelDB, getfunction is disable.
So, I use this function to update head and tail.
*/
if(key.data()==headkey){
head = std::atoi(value.data());
}else if(key.data()==tailkey){
tail = std::atoi(value.data());
}
batch.Put(key, value);
return Write(opt, &batch);
}
```
寫入的key value首先被封裝到WriteBatch 按照一定格式記錄了:sequence, count, 操作類型(Put or Delete),key/value的長度及key/value本身
```cpp=
struct DBImpl::Writer {
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
```
DBImpl::Writer封裝了 mutex cond 等同步相關
### DBImpl::Write
```cpp=
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
```
資料被寫入到writers_直到其他thread已经幫忙完成了Writer w的寫入且搶到mutexlock並且位於writers_最前端
status = WriteBatchInternal::InsertInto(updates, mem_)
寫入memtable
## hashMap版本
### Map<key, vector<value_info>>

```cpp=
typedef struct info
{
string hotness;
unsigned long version;
string value_address;
size_t size;
};
static map<string, vector<info>> Version_Table;
```
#### 資料寫入
lab2_common.h
leveldb_vset
```cpp=
static void leveldb_vset (DB db, string &key, string &value)
{
VTable_mutex. lock();
info new_info;
new_info.hotness (MBF.checkHot(key))?'1':'0';
new_info.version = 1;
new_info.value_address = value; //vlog addr
size_t idx=0; //Idx is to record version table address
vector<info> tmp;
if(Version Table.find(key)== Version Table.end())
{
// new key
tmp.push_back(new_info);
}
else
{
//key already exist in Version Table
tmp = Version Table [key];
new_info.version = tmp.size()+1;
idx = tmp.size(); //vector index
tmp.push_back(new_info);
}
Version Table [key] = tmp;
VTable_mutex.unlock();
WriteBatch wb;
Status s;
WriteOptions wopt;
//Pointer to Array
info *table_address = &Version_Table[key][idx];
s = db->Put(wopt,key,table_address);
assert(s.ok());
}
```
#### 資料讀取
```cpp=
static bool wisckey_vget (WK * wk, string &key, string &value, string &accvalue)
{
std::string tmpkey = key;
//compaction
do_logfileGC (wk, accvalue, key, value);
key = tmpkey;
cout << "\n\t\tGet Function\n\n";
cout << "Key Received: " << key << endl;
//string offsetinfo;
//bool found = leveldb_get(wk->leveldb, key, offsetinfo);
info *tableaddr_info; //table address
bool found
leveldb_vget (wk->leveldb, key, tableaddr_info);
if (found) {
cout << "offset and Length: " <<*(tableaddr_info).value_address <<offset value size<<endl; //call by address
}
else {
cout << "Record: Not Found" << endl;
return false;
}
//拆解offset資訊
std::string value_offset;
std::string value_length;
//std::string s= offsetinfo;
std::strings = *(tableaddr_info).value_address;
std::string delimiter = "&&";
size_t pos = 0;
std::string token;
while ((pos= s.find(delimiter)) != std::string::npos) {
token=s.substr(0, pos);
value_offset= token;
s.erase(0, pos + delimiter.length());
}
value_length = s;
```