# NTHU DB Assignment 4 - **Deadline**: 2021/05/05 (Wed.) 23:59:59 - **Gitlab**: [https://shwu10.cs.nthu.edu.tw/106062137/db21-assignment-4](https://shwu10.cs.nthu.edu.tw/106062137/db21-assignment-4) ## 已刪除的synchronized - Page - void read(BlockId blk) - void write(BlockId blk) - BlockId append(String fileName) - FileMgr - void delete(String fileName) - BufferMgr - Buffer pin(BlockId blk) - Buffer pinNew(String fileName, PageFormatter fmtr) - void flushAll() - void flushAllMyBuffers() - int available() - void repin() - void unpinAll(Transaction tx) - bufferPool.notifyAll(); - unpin(Buffer buff) - bufferPool.notifyAll(); - BufferPoolMgr - void unpin(Buffer... buffs) - int available() - void flushAll() ``` synchronized Buffer pin(BlockId blk) { lock_blockMap.lock(); //System.out.printf("lock "); Buffer buff = findExistingBuffer(blk); //System.out.println("asking "+blk); //assert blk.number() != 0 : "ERROR: blk == 0"; if (buff == null) { buff = chooseUnpinnedBuffer(); if (buff == null) { //System.out.printf("unlock "); lock_blockMap.unlock(); return null; } buff.pin(); BlockId oldBlk = buff.block(); if (oldBlk != null) blockMap.remove(oldBlk); blockMap.put(blk, buff); //System.out.printf("unlock "); lock_blockMap.unlock(); buff.assignToBlock(blk); numAvailable.decrementAndGet(); } else { if (!buff.isPinned()) { numAvailable.decrementAndGet(); } buff.pin(); //System.out.printf("unlock "); lock_blockMap.unlock(); } return buff; } ``` ## RecordFile Source Code ```{java} /******************************************************************************* * Copyright 2016, 2018 vanilladb.org contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package org.vanilladb.core.storage.record; import org.vanilladb.core.server.VanillaDb; import org.vanilladb.core.sql.Constant; import org.vanilladb.core.sql.Record; import org.vanilladb.core.sql.SchemaIncompatibleException; import org.vanilladb.core.sql.Type; import org.vanilladb.core.storage.buffer.Buffer; import org.vanilladb.core.storage.file.BlockId; import org.vanilladb.core.storage.file.Page; import org.vanilladb.core.storage.metadata.TableInfo; import org.vanilladb.core.storage.tx.Transaction; /** * Manages a file of records. There are methods for iterating through the * records and accessing their contents. Note that the first block (block 0) of * a record file is reserved for file header, and the actual data block is start * from block 1. * * <p> * The {@link #insert()} method must be called before setters. * </p> * * <p> * The {@link #beforeFirst()} method must be called before {@link #next()}. * </p> */ public class RecordFile implements Record { private BlockId headerBlk; private TableInfo ti; private Transaction tx; private String fileName; private RecordPage rp; private FileHeaderPage fhp; private long currentBlkNum; private boolean doLog; private boolean isBeforeFirsted; /** * Constructs an object to manage a file of records. If the file does not * exist, it is created. This method should be called by {@link TableInfo} * only. To obtain an instance of this class, call * {@link TableInfo#open(Transaction, boolean)} instead. * * @param ti * the table metadata * @param tx * the transaction * @param doLog * true if the underlying record modification should perform * logging */ public RecordFile(TableInfo ti, Transaction tx, boolean doLog) { this.ti = ti; this.tx = tx; this.doLog = doLog; fileName = ti.fileName(); headerBlk = new BlockId(fileName, 0); } /** * Format the header of specified file. * * @param fileName * the file name * @param tx * the transaction */ public static void formatFileHeader(String fileName, Transaction tx) { tx.concurrencyMgr().modifyFile(fileName); // header should be the first block of the given file if (VanillaDb.fileMgr().size(fileName) == 0) { FileHeaderFormatter fhf = new FileHeaderFormatter(); Buffer buff = tx.bufferMgr().pinNew(fileName, fhf); tx.bufferMgr().unpin(buff); } } /** * Closes the record file. */ public void close() { if (rp != null) rp.close(); if (fhp != null) closeHeader(); } /** * Remove the record file. * TODO: handle the concurrency issues that might happen */ public void remove() { close(); VanillaDb.fileMgr().delete(fileName); } /** * Positions the current record so that a call to method next will wind up * at the first record. */ public void beforeFirst() { close(); currentBlkNum = 0; // first data block is block 1 isBeforeFirsted = true; } /** * Moves to the next record. Returns false if there is no next record. * * @return false if there is no next record. */ public boolean next() { if (!isBeforeFirsted) throw new IllegalStateException("You must call beforeFirst() before iterating table '" + ti.tableName() + "'"); if (currentBlkNum == 0 && !moveTo(1)) return false; while (true) { if (rp.next()) return true; if (!moveTo(currentBlkNum + 1)) return false; } } /** * Returns the value of the specified field in the current record. Getter * should be called after {@link #next()} or {@link #moveToRecordId(RecordId)}. * * @param fldName * the name of the field * * @return the value at that field */ public Constant getVal(String fldName) { return rp.getVal(fldName); } /** * Sets a value of the specified field in the current record. The type of * the value must be equal to that of the specified field. * * @param fldName * the name of the field * @param val * the new value for the field */ public void setVal(String fldName, Constant val) { if (tx.isReadOnly() && !isTempTable()) throw new UnsupportedOperationException(); Type fldType = ti.schema().type(fldName); Constant v = val.castTo(fldType); if (Page.size(v) > Page.maxSize(fldType)) throw new SchemaIncompatibleException(); rp.setVal(fldName, v); } /** * Deletes the current record. The client must call next() to move to the * next record. Calls to methods on a deleted record have unspecified * behavior. */ public void delete() { if (tx.isReadOnly() && !isTempTable()) throw new UnsupportedOperationException(); if (fhp == null) fhp = openHeaderForModification(); // Log that this logical operation starts RecordId deletedRid = currentRecordId(); tx.recoveryMgr().logLogicalStart(); // Delete the current record rp.delete(fhp.getLastDeletedSlot()); fhp.setLastDeletedSlot(currentRecordId()); // Log that this logical operation ends tx.recoveryMgr().logRecordFileDeletionEnd(ti.tableName(), deletedRid.block().number(), deletedRid.id()); // Close the header (release the header lock) closeHeader(); } /** * Deletes the specified record. * * @param rid * the record to be deleted */ public void delete(RecordId rid) { // Note that the delete() method will // take care the concurrency and recovery problems moveToRecordId(rid); delete(); } /** * Inserts a new, blank record somewhere in the file beginning at the * current record. If the new record does not fit into an existing block, * then a new block is appended to the file. */ public void insert() { // Block read-only transaction if (tx.isReadOnly() && !isTempTable()) throw new UnsupportedOperationException(); // Insertion may change the properties of this file, // so that we need to lock the file. if (!isTempTable()) tx.concurrencyMgr().modifyFile(fileName); // Modify the free chain which is start from a pointer in // the header of the file. if (fhp == null) fhp = openHeaderForModification(); // Log that this logical operation starts tx.recoveryMgr().logLogicalStart(); if (fhp.hasDeletedSlots()) { // Insert into a deleted slot moveToRecordId(fhp.getLastDeletedSlot()); RecordId lds = rp.insertIntoDeletedSlot(); fhp.setLastDeletedSlot(lds); } else { // Insert into a empty slot if (!fhp.hasDataRecords()) { // no record inserted before // Create the first data block appendBlock(); moveTo(1); rp.insertIntoNextEmptySlot(); } else { // Find the tail page RecordId tailSlot = fhp.getTailSolt(); moveToRecordId(tailSlot); while (!rp.insertIntoNextEmptySlot()) { if (atLastBlock()) appendBlock(); moveTo(currentBlkNum + 1); } } fhp.setTailSolt(currentRecordId()); } // Log that this logical operation ends RecordId insertedRid = currentRecordId(); tx.recoveryMgr().logRecordFileInsertionEnd(ti.tableName(), insertedRid.block().number(), insertedRid.id()); // Close the header (release the header lock) closeHeader(); } /** * Inserts a record to a specified physical address. * * @param rid * the address a record will be inserted */ public void insert(RecordId rid) { // Block read-only transaction if (tx.isReadOnly() && !isTempTable()) throw new UnsupportedOperationException(); // Insertion may change the properties of this file, // so that we need to lock the file. if (!isTempTable()) tx.concurrencyMgr().modifyFile(fileName); // Open the header if (fhp == null) fhp = openHeaderForModification(); // Log that this logical operation starts tx.recoveryMgr().logLogicalStart(); // Mark the specified slot as in used moveToRecordId(rid); if (!rp.insertIntoTheCurrentSlot()) throw new RuntimeException("the specified slot: " + rid + " is in used"); // Traverse the free chain to find the specified slot RecordId lastSlot = null; RecordId currentSlot = fhp.getLastDeletedSlot(); while (!currentSlot.equals(rid) && currentSlot.block().number() != FileHeaderPage.NO_SLOT_BLOCKID) { moveToRecordId(currentSlot); lastSlot = currentSlot; currentSlot = rp.getNextDeletedSlotId(); } // Remove the specified slot from the chain // If it is the first slot if (lastSlot == null) { moveToRecordId(currentSlot); fhp.setLastDeletedSlot(rp.getNextDeletedSlotId()); // If it is in the middle } else if (currentSlot.block().number() != FileHeaderPage.NO_SLOT_BLOCKID) { moveToRecordId(currentSlot); RecordId nextSlot = rp.getNextDeletedSlotId(); moveToRecordId(lastSlot); rp.setNextDeletedSlotId(nextSlot); } // Log that this logical operation ends tx.recoveryMgr().logRecordFileInsertionEnd(ti.tableName(), rid.block().number(), rid.id()); // Close the header (release the header lock) closeHeader(); } /** * Positions the current record as indicated by the specified record ID . * * @param rid * a record ID */ public void moveToRecordId(RecordId rid) { moveTo(rid.block().number()); rp.moveToId(rid.id()); } /** * Returns the record ID of the current record. * * @return a record ID */ public RecordId currentRecordId() { int id = rp.currentId(); return new RecordId(new BlockId(fileName, currentBlkNum), id); } /** * Returns the number of blocks in the specified file. This method first * calls corresponding concurrency manager to guarantee the isolation * property, before asking the file manager to return the file size. * * @return the number of blocks in the file */ public long fileSize() { if (!isTempTable()) tx.concurrencyMgr().readFile(fileName); return VanillaDb.fileMgr().size(fileName); } private boolean moveTo(long b) { if (rp != null) rp.close(); if (b >= fileSize()) // block b not allocated yet return false; currentBlkNum = b; BlockId blk = new BlockId(fileName, currentBlkNum); rp = new RecordPage(blk, ti, tx, doLog); return true; } private void appendBlock() { if (!isTempTable()) tx.concurrencyMgr().modifyFile(fileName); RecordFormatter fmtr = new RecordFormatter(ti); Buffer buff = tx.bufferMgr().pinNew(fileName, fmtr); tx.bufferMgr().unpin(buff); if (!isTempTable()) tx.concurrencyMgr().insertBlock(buff.block()); } private FileHeaderPage openHeaderForModification() { // acquires exclusive access to the header if (!isTempTable()) tx.concurrencyMgr().lockRecordFileHeader(headerBlk); return new FileHeaderPage(fileName, tx); } private void closeHeader() { // Release the lock of the header if (fhp != null) { tx.concurrencyMgr().releaseRecordFileHeader(headerBlk); fhp = null; } } private boolean isTempTable() { return fileName.startsWith("_temp"); } private boolean atLastBlock() { return currentBlkNum == fileSize() - 1; } } ``` ## RecordPage Source Code ```{java} /******************************************************************************* * Copyright 2016, 2018 vanilladb.org contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package org.vanilladb.core.storage.record; import static org.vanilladb.core.sql.Type.BIGINT; import static org.vanilladb.core.sql.Type.INTEGER; import java.util.HashMap; import java.util.Map; import org.vanilladb.core.sql.BigIntConstant; import org.vanilladb.core.sql.Constant; import org.vanilladb.core.sql.IntegerConstant; import org.vanilladb.core.sql.Record; import org.vanilladb.core.sql.Schema; import org.vanilladb.core.sql.Type; import org.vanilladb.core.storage.buffer.Buffer; import org.vanilladb.core.storage.file.BlockId; import org.vanilladb.core.storage.file.Page; import org.vanilladb.core.storage.log.LogSeqNum; import org.vanilladb.core.storage.metadata.TableInfo; import org.vanilladb.core.storage.tx.Transaction; /** * Manages the placement and access of records in a block. */ public class RecordPage implements Record { public static final int EMPTY = 0, INUSE = 1; public static final int MIN_REC_SIZE = Page.maxSize(INTEGER) + Page.maxSize(BIGINT); public static final int FLAG_SIZE = Page.maxSize(INTEGER); public static final int MIN_SLOT_SIZE = FLAG_SIZE + MIN_REC_SIZE; // Optimization: Materialize the constant value of flag private static final IntegerConstant INUSE_CONST = new IntegerConstant( INUSE), EMPTY_CONST = new IntegerConstant(EMPTY); private Transaction tx; private BlockId blk; private TableInfo ti; private boolean doLog; private Buffer currentBuff; private int slotSize; private int currentSlot = -1; private Map<String, Integer> myOffsetMap; // Optimization: Materialize the offset map. // /** // * Returns the offset of a specified field within a record. // * // * @param sch // * the table's schema // * @param fldName // * the name of the field // * // * @return the offset of that field within a record // */ // public static int offset(Schema sch, String fldName) { // int pos = 0; // for (String fldname : sch.fields()) { // if (fldName.equals(fldname)) // break; // pos += Page.maxSize(sch.type(fldname)); // } // return pos; // } /** * Returns the map of field name to offset of a specified schema. * * @param sch * the table's schema * * @return the offset map */ public static Map<String, Integer> offsetMap(Schema sch) { int pos = 0; Map<String, Integer> offsetMap = new HashMap<String, Integer>(); for (String fldname : sch.fields()) { offsetMap.put(fldname, pos); pos += Page.maxSize(sch.type(fldname)); } return offsetMap; } /** * Returns the number of bytes required to store a record with the specified * schema in disk. * * @param sch * the table's schema * @return the size of a record, in bytes */ public static int recordSize(Schema sch) { int pos = 0; for (String fldname : sch.fields()) pos += Page.maxSize(sch.type(fldname)); return pos < MIN_REC_SIZE ? MIN_REC_SIZE : pos; } /** * Returns the number of bytes required to store a record slot with the * specified schema in disk. * * @param sch * the table's schema * @return the size of a record slot, in bytes */ public static int slotSize(Schema sch) { return recordSize(sch) + Page.maxSize(INTEGER); } /** * Creates the record manager for the specified block. The current record is * set to be prior to the first one. * * @param blk * a block ID * @param ti * the table's metadata * @param tx * the transaction * @param doLog * will it log the modification */ public RecordPage(BlockId blk, TableInfo ti, Transaction tx, boolean doLog) { this.blk = blk; this.tx = tx; this.ti = ti; this.doLog = doLog; currentBuff = tx.bufferMgr().pin(blk); // Optimization: Reduce the cost of prepare the schema information Schema sch = ti.schema(); int pos = 0; myOffsetMap = new HashMap<String, Integer>(); for (String fldname : sch.fields()) { myOffsetMap.put(fldname, pos); pos += Page.maxSize(sch.type(fldname)); } pos = pos < MIN_REC_SIZE ? MIN_REC_SIZE : pos; slotSize = pos + FLAG_SIZE; } /** * Closes the manager, by unpinning the block. */ public void close() { if (blk != null) { tx.bufferMgr().unpin(currentBuff); blk = null; currentBuff = null; } } /** * Moves to the next record in the block. * * @return false if there is no next record. */ public boolean next() { return searchFor(INUSE); } /** * Returns the value stored in the specified field of this record. * * @param fldName * the name of the field. * * @return the constant stored in that field */ public Constant getVal(String fldName) { int position = fieldPos(fldName); return getVal(position, ti.schema().type(fldName)); } /** * Stores a value at the specified field of this record. * * @param fldName * the name of the field * @param val * the constant value stored in that field */ public void setVal(String fldName, Constant val) { int position = fieldPos(fldName); setVal(position, val); } /** * Deletes the current record. Deletion is performed by marking the record * as "deleted" and setting the content as a pointer points to next deleted * slot. * * @param nextDeletedSlot * the record is of next deleted slot * */ public void delete(RecordId nextDeletedSlot) { Constant flag = EMPTY_CONST; setVal(currentPos(), flag); setNextDeletedSlotId(nextDeletedSlot); } /** * Marks the current slot as in-used. * * @return true, if it succeed. If the slot has been occupied, return false. */ public boolean insertIntoTheCurrentSlot() { if (!getVal(currentPos(), INTEGER).equals(EMPTY_CONST)) return false; setVal(currentPos(), INUSE_CONST); return true; } /** * Inserts a new, blank record somewhere in the page. Return false if there * were no available slots. * * @return false if the insertion was not possible */ public boolean insertIntoNextEmptySlot() { boolean found = searchFor(EMPTY); if (found) { Constant flag = INUSE_CONST; setVal(currentPos(), flag); } return found; } /** * Inserts a new, blank record into this deleted slot and return the record * id of the next one. * * @return the record id of the next deleted slot */ public RecordId insertIntoDeletedSlot() { RecordId nds = getNextDeletedSlotId(); // Important: Erase the free chain information. // If we didn't do this, it would crash when // a tx try to set a VARCHAR at this position // since the getVal would get negative size. setNextDeletedSlotId(new RecordId(new BlockId("", 0), 0)); Constant flag = INUSE_CONST; setVal(currentPos(), flag); return nds; } /** * Sets the current record to be the record having the specified ID. * * @param id * the ID of the record within the page. */ public void moveToId(int id) { currentSlot = id; } /** * Returns the ID of the current record. * * @return the ID of the current record */ public int currentId() { return currentSlot; } /** * Returns the BlockId of the current record. * * @return the BlockId of the current record */ public BlockId currentBlk() { return blk; } /** * Print all Slot IN_USE or EMPTY, for debugging */ public void runAllSlot() { moveToId(0); System.out.println("== runAllSlot start at " + currentSlot + " =="); while (isValidSlot()) { if (currentSlot % 10 == 0) System.out.print(currentSlot + ": "); int flag = (Integer) getVal(currentPos(), INTEGER).asJavaVal(); System.out.print(flag + " "); if ((currentSlot + 1) % 10 == 0) System.out.println(); currentSlot++; } System.out.println("== runAllSlot end at " + currentSlot + " =="); } public RecordId getNextDeletedSlotId() { int position = currentPos() + FLAG_SIZE; long blkNum = (Long) getVal(position, BIGINT).asJavaVal(); int id = (Integer) getVal(position + Page.maxSize(BIGINT), INTEGER) .asJavaVal(); return new RecordId(new BlockId(blk.fileName(), blkNum), id); } public void setNextDeletedSlotId(RecordId rid) { Constant val = new BigIntConstant(rid.block().number()); int position = currentPos() + FLAG_SIZE; setVal(position, val); val = new IntegerConstant(rid.id()); position += Page.maxSize(BIGINT); setVal(position, val); } private int currentPos() { return currentSlot * slotSize; } private int fieldPos(String fldName) { int offset = FLAG_SIZE + myOffsetMap.get(fldName); return currentPos() + offset; } private boolean isValidSlot() { return currentPos() + slotSize <= Buffer.BUFFER_SIZE; } private boolean searchFor(int flag) { currentSlot++; while (isValidSlot()) { if ((Integer) getVal(currentPos(), INTEGER).asJavaVal() == flag) { return true; } currentSlot++; } return false; } private Constant getVal(int offset, Type type) { if (!isTempTable()) tx.concurrencyMgr().readRecord(new RecordId(blk, currentSlot)); return currentBuff.getVal(offset, type); } private void setVal(int offset, Constant val) { if (tx.isReadOnly() && !isTempTable()) throw new UnsupportedOperationException(); if (!isTempTable()) tx.concurrencyMgr().modifyRecord(new RecordId(blk, currentSlot)); LogSeqNum lsn = doLog ? tx.recoveryMgr().logSetVal(currentBuff, offset, val) : null; currentBuff.setVal(offset, val, tx.getTransactionNumber(), lsn); } private boolean isTempTable() { return blk.fileName().startsWith("_temp"); } } ``` ## Native API ![](https://i.imgur.com/IdT99Ey.png) ## Trace Path - src/main/java/org/vanilladb/core/query/planner/BasicUpdatePlanner.java ## 分析 - TableScan 會 open 一個 RecordFile - RecordFile 代表一個 實體File (Table) 的抽象 (Manages the placement and access of records in a block.) - Block 抽象為一個 RecordPage (Creates the record manager for the specified block) - Block 代表 "實體File" 裡面的一個區塊 - RecordPage 會在 Block 上移動 - BufferMgr 向 BufferPool 請求要 pin 某個 block 到 buffer - Buffer 會把 pin 在他上面的 Block 從 實體File 讀進來 - assignToBlock (Reads the contents of the specified block into the buffer's page.) - Page (Buffer.Page): The contents of a disk block in memory. - IoBuffer contents(Page.contents) = IoAllocator.newIoBuffer(BLOCK_SIZE); - FileMgr: The database system stores its data as files * within a specified directory. The file manager provides methods for reading * the contents of a file block to a Java byte buffer, writing the contents of a * byte buffer to a file block - BLOCK_SIZE: 4K (每次 fileChanel 操作的範圍) - Record 代表 Table 中的一個 - 每個 JDBC Connection 會 Bind 一個 Transaction (Management) (per Client Connection) - conn.getTransaction: transaction associated with this connection - Transaction: Provides transaction management for clients - BufferMgr (per Transaction) # unpins all pages pinned by the current tx - Transaction Commit/Rollback: unpinAll Buffer - RecoverManager (guarantee: A, D) - onTxCommit: # flushs dirty pages and then `commit` log - bufferMgr.flushAll(txNum) - new CommitRecord.writeToLog - LogMgr.flush() - ConcurrencyManager (guarantee: C, I) - onTxCommit: # release all locks - lockTable.releaseAll(txNum, false) - FileHeaderPage: Creates the header manager for a specified file. ## Question ``` 1. read committed vs repeatable read vs serializable ``` ```Java addLifecycleListener(txMgr); /* * A recover manager must be added before a concurrency manager. For * example, if the transaction need to roll back, it must hold all locks * until the recovery procedure complete. */ addLifecycleListener(recoveryMgr); addLifecycleListener(concurMgr); addLifecycleListener(bufferMgr); ``` # 改好的FIleMgr.java ```{java} /******************************************************************************* * Copyright 2016, 2018 vanilladb.org contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package org.vanilladb.core.storage.file; import static org.vanilladb.core.storage.file.Page.BLOCK_SIZE; import static org.vanilladb.core.storage.log.LogMgr.DEFAULT_LOG_FILE; import java.io.File; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; import org.vanilladb.core.server.VanillaDb; import org.vanilladb.core.storage.file.io.IoAllocator; import org.vanilladb.core.storage.file.io.IoBuffer; import org.vanilladb.core.storage.file.io.IoChannel; import org.vanilladb.core.util.CoreProperties; /** * The VanillaDb file manager. The database system stores its data as files * within a specified directory. The file manager provides methods for reading * the contents of a file block to a Java byte buffer, writing the contents of a * byte buffer to a file block, and appending the contents of a byte buffer to * the end of a file. These methods are called exclusively by the class * {@link org.vanilladb.core.storage.file.Page Page}, and are thus * package-private. The class also contains two public methods: Method * {@link #isNew() isNew} is called during system initialization by * {@link VanillaDb#init}. Method {@link #size(String) size} is called by the * log manager and transaction manager to determine the end of the file. */ public class FileMgr { private static Logger logger = Logger.getLogger(FileMgr.class.getName()); public static final String DB_FILES_DIR, LOG_FILES_DIR; public static final String TMP_FILE_NAME_PREFIX = "_temp"; private File dbDirectory, logDirectory; private boolean isNew; private Map<String, IoChannel> openFiles = new ConcurrentHashMap<String, IoChannel>(); private static Map<String, ReentrantReadWriteLock> concurrentMap = new ConcurrentHashMap<String, ReentrantReadWriteLock>(); static { String dbDir = CoreProperties.getLoader().getPropertyAsString(FileMgr.class.getName() + ".DB_FILES_DIR", System.getProperty("user.home")); String logDir = CoreProperties.getLoader().getPropertyAsString(FileMgr.class.getName() + ".LOG_FILES_DIR", dbDir); String defaultDir = System.getProperty("user.home"); // Check if these two directories exist if (!new File(dbDir).exists()) { if (logger.isLoggable(Level.WARNING)) logger.warning("the database directory '" + dbDir + "' doesn't exist, use the default directory: " + defaultDir); dbDir = defaultDir; } if (!new File(logDir).exists()) { if (logger.isLoggable(Level.WARNING)) logger.warning("the log files directory '" + logDir + "' doesn't exist, use the same directory as database files: " + dbDir); logDir = dbDir; } DB_FILES_DIR = dbDir; LOG_FILES_DIR = logDir; } /** * Creates a file manager for the specified database. The database will be * stored in a folder of that name in the user's home directory. If the * folder does not exist, then a folder containing an empty database is * created automatically. Files for all temporary tables (i.e. tables * beginning with "_temp") will be deleted during initializing. * * @param dbName * the name of the directory that holds the database */ public FileMgr(String dbName) { dbDirectory = new File(DB_FILES_DIR, dbName); // log files can be stored in a different directory logDirectory = new File(LOG_FILES_DIR, dbName); isNew = !dbDirectory.exists(); // check the existence of log folder if (!isNew && !logDirectory.exists()) throw new RuntimeException("log file for the existed " + dbName + " is missing"); // create the directory if the database is new if (isNew && (!dbDirectory.mkdir())) throw new RuntimeException("cannot create " + dbName); // remove any leftover temporary tables for (String filename : dbDirectory.list()) if (filename.startsWith(TMP_FILE_NAME_PREFIX)) new File(dbDirectory, filename).delete(); if (logger.isLoggable(Level.INFO)) logger.info("block size " + Page.BLOCK_SIZE); } /** * Reads the contents of a disk block into a byte buffer. * * @param blk * a block ID * @param buffer * the byte buffer */ void read(BlockId blk, IoBuffer buffer) { try { IoChannel fileChannel = getFileChannel(blk.fileName()); // clear the buffer buffer.clear(); // read a block from file concurrentMap.get(blk.fileName()).readLock().lock(); fileChannel.read(buffer, blk.number() * BLOCK_SIZE); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("cannot read block " + blk); }finally{ // if(concurrentMap.get(blk.fileName()).readLock().tryLock()){ concurrentMap.get(blk.fileName()).readLock().unlock(); // } } } /** * Writes the contents of a byte buffer into a disk block. * * @param blk * a block ID * @param buffer * the byte buffer */ void write(BlockId blk, IoBuffer buffer) { try { IoChannel fileChannel = getFileChannel(blk.fileName()); // rewind the buffer buffer.rewind(); // write the block to the file concurrentMap.get(blk.fileName()).writeLock().lock(); fileChannel.write(buffer, blk.number() * BLOCK_SIZE); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("cannot write block" + blk); }finally{ // if(concurrentMap.get(blk.fileName()).writeLock().tryLock()){ concurrentMap.get(blk.fileName()).writeLock().unlock(); // } } } /** * Appends the contents of a byte buffer to the end of the specified file. * * @param fileName * the name of the file * @param buffer * the byte buffer * @return a block ID refers to the newly-created block. */ BlockId append(String fileName, IoBuffer buffer) { try { IoChannel fileChannel = getFileChannel(fileName); // Rewind the buffer for writing buffer.rewind(); // Append the block to the file concurrentMap.get(fileName).writeLock().lock(); long newSize = fileChannel.append(buffer); concurrentMap.get(fileName).writeLock().unlock(); // Return the new block id return new BlockId(fileName, newSize / BLOCK_SIZE - 1); } catch (IOException e) { e.printStackTrace(); return null; } } /** * Returns the number of blocks in the specified file. * * @param fileName * the name of the file * * @return the number of blocks in the file */ public long size(String fileName) { try { IoChannel fileChannel = getFileChannel(fileName); concurrentMap.get(fileName).readLock().lock(); return fileChannel.size() / BLOCK_SIZE; } catch (IOException e) { throw new RuntimeException("cannot access " + fileName); }finally{ // if(concurrentMap.get(fileName).readLock().tryLock()){ concurrentMap.get(fileName).readLock().unlock(); // } } } /** * Returns a boolean indicating whether the file manager had to create a new * database directory. * * @return true if the database is new */ public boolean isNew() { return isNew; } /** * Returns the file channel for the specified filename. The file channel is * stored in a map keyed on the filename. If the file is not open, then it * is opened and the file channel is added to the map. * * @param fileName * the specified filename * * @return the file channel associated with the open file. * @throws IOException */ private IoChannel getFileChannel(String fileName) throws IOException { IoChannel fileChannel = openFiles.get(fileName); if (fileChannel == null) { concurrentMap.putIfAbsent(fileName, new ReentrantReadWriteLock()); concurrentMap.get(fileName).writeLock().lock(); File dbFile = fileName.equals(DEFAULT_LOG_FILE) ? new File(logDirectory, fileName) : new File(dbDirectory, fileName); fileChannel = IoAllocator.newIoChannel(dbFile); openFiles.put(fileName, fileChannel); concurrentMap.get(fileName).writeLock().unlock(); } return fileChannel; } /** * Delete the specified file. * * @param fileName * the name of the target file */ public synchronized void delete(String fileName) { try { // Close file, if it was opened IoChannel fileChannel = openFiles.remove(fileName); if (fileChannel != null) fileChannel.close(); // Delete the file boolean hasDeleted = new File(dbDirectory, fileName).delete(); if (!hasDeleted && logger.isLoggable(Level.WARNING)) logger.warning("cannot delete file: " + fileName); } catch (IOException e) { if (logger.isLoggable(Level.WARNING)) logger.warning("there is something wrong when deleting " + fileName); e.printStackTrace(); } } } ```