From 9ee8378d393778ac67314be7ea8d5bcbaeee9ee0 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Sun, 7 Dec 2014 10:08:07 +0100 Subject: try out unqlite --- common/unqlite/lhash_kv.c | 3082 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 3082 insertions(+) create mode 100644 common/unqlite/lhash_kv.c (limited to 'common/unqlite/lhash_kv.c') diff --git a/common/unqlite/lhash_kv.c b/common/unqlite/lhash_kv.c new file mode 100644 index 0000000..4af5b3e --- /dev/null +++ b/common/unqlite/lhash_kv.c @@ -0,0 +1,3082 @@ +/* + * Symisc unQLite: An Embeddable NoSQL (Post Modern) Database Engine. + * Copyright (C) 2012-2013, Symisc Systems http://unqlite.org/ + * Copyright (C) 2014, Yuras Shumovich + * Version 1.1.6 + * For information on licensing, redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES + * please contact Symisc Systems via: + * legal@symisc.net + * licensing@symisc.net + * contact@symisc.net + * or visit: + * http://unqlite.org/licensing.html + */ + /* $SymiscID: lhash_kv.c v1.7 Solaris 2013-01-14 12:56 stable $ */ +#ifndef UNQLITE_AMALGAMATION +#include "unqliteInt.h" +#endif +/* + * This file implements disk based hashtable using the linear hashing algorithm. + * This implementation is the one decribed in the paper: + * LINEAR HASHING : A NEW TOOL FOR FILE AND TABLE ADDRESSING. Witold Litwin. I. N. Ft. I. A.. 78 150 Le Chesnay, France. + * Plus a smart extension called Virtual Bucket Table. (contact devel@symisc.net for additional information). + */ +/* Magic number identifying a valid storage image */ +#define L_HASH_MAGIC 0xFA782DCB +/* + * Magic word to hash to identify a valid hash function. + */ +#define L_HASH_WORD "chm@symisc" +/* + * Cell size on disk. + */ +#define L_HASH_CELL_SZ (4/*Hash*/+4/*Key*/+8/*Data*/+2/* Offset of the next cell */+8/*Overflow*/) +/* + * Primary page (not overflow pages) header size on disk. + */ +#define L_HASH_PAGE_HDR_SZ (2/* Cell offset*/+2/* Free block offset*/+8/*Slave page number*/) +/* + * The maximum amount of payload (in bytes) that can be stored locally for + * a database entry. If the entry contains more data than this, the + * extra goes onto overflow pages. +*/ +#define L_HASH_MX_PAYLOAD(PageSize) (PageSize-(L_HASH_PAGE_HDR_SZ+L_HASH_CELL_SZ)) +/* + * Maxium free space on a single page. + */ +#define L_HASH_MX_FREE_SPACE(PageSize) (PageSize - (L_HASH_PAGE_HDR_SZ)) +/* +** The maximum number of bytes of payload allowed on a single overflow page. +*/ +#define L_HASH_OVERFLOW_SIZE(PageSize) (PageSize-8) +/* Forward declaration */ +typedef struct lhash_kv_engine lhash_kv_engine; +typedef struct lhpage lhpage; +/* + * Each record in the database is identified either in-memory or in + * disk by an instance of the following structure. + */ +typedef struct lhcell lhcell; +struct lhcell +{ + /* Disk-data (Big-Endian) */ + sxu32 nHash; /* Hash of the key: 4 bytes */ + sxu32 nKey; /* Key length: 4 bytes */ + sxu64 nData; /* Data length: 8 bytes */ + sxu16 iNext; /* Offset of the next cell: 2 bytes */ + pgno iOvfl; /* Overflow page number if any: 8 bytes */ + /* In-memory data only */ + lhpage *pPage; /* Page this cell belongs */ + sxu16 iStart; /* Offset of this cell */ + pgno iDataPage; /* Data page number when overflow */ + sxu16 iDataOfft; /* Offset of the data in iDataPage */ + SyBlob sKey; /* Record key for fast lookup (Kept in-memory if < 256KB ) */ + lhcell *pNext,*pPrev; /* Linked list of the loaded memory cells */ + lhcell *pNextCol,*pPrevCol; /* Collison chain */ +}; +/* +** Each database page has a header that is an instance of this +** structure. +*/ +typedef struct lhphdr lhphdr; +struct lhphdr +{ + sxu16 iOfft; /* Offset of the first cell */ + sxu16 iFree; /* Offset of the first free block*/ + pgno iSlave; /* Slave page number */ +}; +/* + * Each loaded primary disk page is represented in-memory using + * an instance of the following structure. + */ +struct lhpage +{ + lhash_kv_engine *pHash; /* KV Storage engine that own this page */ + unqlite_page *pRaw; /* Raw page contents */ + lhphdr sHdr; /* Processed page header */ + lhcell **apCell; /* Cell buckets */ + lhcell *pList,*pFirst; /* Linked list of cells */ + sxu32 nCell; /* Total number of cells */ + sxu32 nCellSize; /* apCell[] size */ + lhpage *pMaster; /* Master page in case we are dealing with a slave page */ + lhpage *pSlave; /* List of slave pages */ + lhpage *pNextSlave; /* Next slave page on the list */ + sxi32 iSlave; /* Total number of slave pages */ + sxu16 nFree; /* Amount of free space available in the page */ +}; +/* + * A Bucket map record which is used to map logical bucket number to real + * bucket number is represented by an instance of the following structure. + */ +typedef struct lhash_bmap_rec lhash_bmap_rec; +struct lhash_bmap_rec +{ + pgno iLogic; /* Logical bucket number */ + pgno iReal; /* Real bucket number */ + lhash_bmap_rec *pNext,*pPrev; /* Link to other bucket map */ + lhash_bmap_rec *pNextCol,*pPrevCol; /* Collision links */ +}; +typedef struct lhash_bmap_page lhash_bmap_page; +struct lhash_bmap_page +{ + pgno iNum; /* Page number where this entry is stored */ + sxu16 iPtr; /* Offset to start reading/writing from */ + sxu32 nRec; /* Total number of records in this page */ + pgno iNext; /* Next map page */ +}; +/* + * An in memory linear hash implemenation is represented by in an isntance + * of the following structure. + */ +struct lhash_kv_engine +{ + const unqlite_kv_io *pIo; /* IO methods: Must be first */ + /* Private fields */ + SyMemBackend sAllocator; /* Private memory backend */ + ProcHash xHash; /* Default hash function */ + ProcCmp xCmp; /* Default comparison function */ + unqlite_page *pHeader; /* Page one to identify a valid implementation */ + lhash_bmap_rec **apMap; /* Buckets map records */ + sxu32 nBuckRec; /* Total number of bucket map records */ + sxu32 nBuckSize; /* apMap[] size */ + lhash_bmap_rec *pList; /* List of bucket map records */ + lhash_bmap_rec *pFirst; /* First record*/ + lhash_bmap_page sPageMap; /* Primary bucket map */ + int iPageSize; /* Page size */ + pgno nFreeList; /* List of free pages */ + pgno split_bucket; /* Current split bucket: MUST BE A POWER OF TWO */ + pgno max_split_bucket; /* Maximum split bucket: MUST BE A POWER OF TWO */ + pgno nmax_split_nucket; /* Next maximum split bucket (1 << nMsb): In-memory only */ + sxu32 nMagic; /* Magic number to identify a valid linear hash disk database */ +}; +/* + * Given a logical bucket number, return the record associated with it. + */ +static lhash_bmap_rec * lhMapFindBucket(lhash_kv_engine *pEngine,pgno iLogic) +{ + lhash_bmap_rec *pRec; + if( pEngine->nBuckRec < 1 ){ + /* Don't bother */ + return 0; + } + pRec = pEngine->apMap[iLogic & (pEngine->nBuckSize - 1)]; + for(;;){ + if( pRec == 0 ){ + break; + } + if( pRec->iLogic == iLogic ){ + return pRec; + } + /* Point to the next entry */ + pRec = pRec->pNextCol; + } + /* No such record */ + return 0; +} +/* + * Install a new bucket map record. + */ +static int lhMapInstallBucket(lhash_kv_engine *pEngine,pgno iLogic,pgno iReal) +{ + lhash_bmap_rec *pRec; + sxu32 iBucket; + /* Allocate a new instance */ + pRec = (lhash_bmap_rec *)SyMemBackendPoolAlloc(&pEngine->sAllocator,sizeof(lhash_bmap_rec)); + if( pRec == 0 ){ + return UNQLITE_NOMEM; + } + /* Zero the structure */ + SyZero(pRec,sizeof(lhash_bmap_rec)); + /* Fill in the structure */ + pRec->iLogic = iLogic; + pRec->iReal = iReal; + iBucket = iLogic & (pEngine->nBuckSize - 1); + pRec->pNextCol = pEngine->apMap[iBucket]; + if( pEngine->apMap[iBucket] ){ + pEngine->apMap[iBucket]->pPrevCol = pRec; + } + pEngine->apMap[iBucket] = pRec; + /* Link */ + if( pEngine->pFirst == 0 ){ + pEngine->pFirst = pEngine->pList = pRec; + }else{ + MACRO_LD_PUSH(pEngine->pList,pRec); + } + pEngine->nBuckRec++; + if( (pEngine->nBuckRec >= pEngine->nBuckSize * 3) && pEngine->nBuckRec < 100000 ){ + /* Allocate a new larger table */ + sxu32 nNewSize = pEngine->nBuckSize << 1; + lhash_bmap_rec *pEntry; + lhash_bmap_rec **apNew; + sxu32 n; + + apNew = (lhash_bmap_rec **)SyMemBackendAlloc(&pEngine->sAllocator, nNewSize * sizeof(lhash_bmap_rec *)); + if( apNew ){ + /* Zero the new table */ + SyZero((void *)apNew, nNewSize * sizeof(lhash_bmap_rec *)); + /* Rehash all entries */ + n = 0; + pEntry = pEngine->pList; + for(;;){ + /* Loop one */ + if( n >= pEngine->nBuckRec ){ + break; + } + pEntry->pNextCol = pEntry->pPrevCol = 0; + /* Install in the new bucket */ + iBucket = pEntry->iLogic & (nNewSize - 1); + pEntry->pNextCol = apNew[iBucket]; + if( apNew[iBucket] ){ + apNew[iBucket]->pPrevCol = pEntry; + } + apNew[iBucket] = pEntry; + /* Point to the next entry */ + pEntry = pEntry->pNext; + n++; + } + /* Release the old table and reflect the change */ + SyMemBackendFree(&pEngine->sAllocator,(void *)pEngine->apMap); + pEngine->apMap = apNew; + pEngine->nBuckSize = nNewSize; + } + } + return UNQLITE_OK; +} +/* + * Process a raw bucket map record. + */ +static int lhMapLoadPage(lhash_kv_engine *pEngine,lhash_bmap_page *pMap,const unsigned char *zRaw) +{ + const unsigned char *zEnd = &zRaw[pEngine->iPageSize]; + const unsigned char *zPtr = zRaw; + pgno iLogic,iReal; + sxu32 n; + int rc; + if( pMap->iPtr == 0 ){ + /* Read the map header */ + SyBigEndianUnpack64(zRaw,&pMap->iNext); + zRaw += 8; + SyBigEndianUnpack32(zRaw,&pMap->nRec); + zRaw += 4; + }else{ + /* Mostly page one of the database */ + zRaw += pMap->iPtr; + } + /* Start processing */ + for( n = 0; n < pMap->nRec ; ++n ){ + if( zRaw >= zEnd ){ + break; + } + /* Extract the logical and real bucket number */ + SyBigEndianUnpack64(zRaw,&iLogic); + zRaw += 8; + SyBigEndianUnpack64(zRaw,&iReal); + zRaw += 8; + /* Install the record in the map */ + rc = lhMapInstallBucket(pEngine,iLogic,iReal); + if( rc != UNQLITE_OK ){ + return rc; + } + } + pMap->iPtr = (sxu16)(zRaw-zPtr); + /* All done */ + return UNQLITE_OK; +} +/* + * Allocate a new cell instance. + */ +static lhcell * lhNewCell(lhash_kv_engine *pEngine,lhpage *pPage) +{ + lhcell *pCell; + pCell = (lhcell *)SyMemBackendPoolAlloc(&pEngine->sAllocator,sizeof(lhcell)); + if( pCell == 0 ){ + return 0; + } + /* Zero the structure */ + SyZero(pCell,sizeof(lhcell)); + /* Fill in the structure */ + SyBlobInit(&pCell->sKey,&pEngine->sAllocator); + pCell->pPage = pPage; + return pCell; +} +/* + * Discard a cell from the page table. + */ +static void lhCellDiscard(lhcell *pCell) +{ + lhpage *pPage = pCell->pPage->pMaster; + + if( pCell->pPrevCol ){ + pCell->pPrevCol->pNextCol = pCell->pNextCol; + }else{ + pPage->apCell[pCell->nHash & (pPage->nCellSize - 1)] = pCell->pNextCol; + } + if( pCell->pNextCol ){ + pCell->pNextCol->pPrevCol = pCell->pPrevCol; + } + MACRO_LD_REMOVE(pPage->pList,pCell); + if( pCell == pPage->pFirst ){ + pPage->pFirst = pCell->pPrev; + } + pPage->nCell--; + /* Release the cell */ + SyBlobRelease(&pCell->sKey); + SyMemBackendPoolFree(&pPage->pHash->sAllocator,pCell); +} +/* + * Install a cell in the page table. + */ +static int lhInstallCell(lhcell *pCell) +{ + lhpage *pPage = pCell->pPage->pMaster; + sxu32 iBucket; + if( pPage->nCell < 1 ){ + sxu32 nTableSize = 32; /* Must be a power of two */ + lhcell **apTable; + /* Allocate a new cell table */ + apTable = (lhcell **)SyMemBackendAlloc(&pPage->pHash->sAllocator, nTableSize * sizeof(lhcell *)); + if( apTable == 0 ){ + return UNQLITE_NOMEM; + } + /* Zero the new table */ + SyZero((void *)apTable, nTableSize * sizeof(lhcell *)); + /* Install it */ + pPage->apCell = apTable; + pPage->nCellSize = nTableSize; + } + iBucket = pCell->nHash & (pPage->nCellSize - 1); + pCell->pNextCol = pPage->apCell[iBucket]; + if( pPage->apCell[iBucket] ){ + pPage->apCell[iBucket]->pPrevCol = pCell; + } + pPage->apCell[iBucket] = pCell; + if( pPage->pFirst == 0 ){ + pPage->pFirst = pPage->pList = pCell; + }else{ + MACRO_LD_PUSH(pPage->pList,pCell); + } + pPage->nCell++; + if( (pPage->nCell >= pPage->nCellSize * 3) && pPage->nCell < 100000 ){ + /* Allocate a new larger table */ + sxu32 nNewSize = pPage->nCellSize << 1; + lhcell *pEntry; + lhcell **apNew; + sxu32 n; + + apNew = (lhcell **)SyMemBackendAlloc(&pPage->pHash->sAllocator, nNewSize * sizeof(lhcell *)); + if( apNew ){ + /* Zero the new table */ + SyZero((void *)apNew, nNewSize * sizeof(lhcell *)); + /* Rehash all entries */ + n = 0; + pEntry = pPage->pList; + for(;;){ + /* Loop one */ + if( n >= pPage->nCell ){ + break; + } + pEntry->pNextCol = pEntry->pPrevCol = 0; + /* Install in the new bucket */ + iBucket = pEntry->nHash & (nNewSize - 1); + pEntry->pNextCol = apNew[iBucket]; + if( apNew[iBucket] ){ + apNew[iBucket]->pPrevCol = pEntry; + } + apNew[iBucket] = pEntry; + /* Point to the next entry */ + pEntry = pEntry->pNext; + n++; + } + /* Release the old table and reflect the change */ + SyMemBackendFree(&pPage->pHash->sAllocator,(void *)pPage->apCell); + pPage->apCell = apNew; + pPage->nCellSize = nNewSize; + } + } + return UNQLITE_OK; +} +/* + * Private data of lhKeyCmp(). + */ +struct lhash_key_cmp +{ + const char *zIn; /* Start of the stream */ + const char *zEnd; /* End of the stream */ + ProcCmp xCmp; /* Comparison function */ +}; +/* + * Comparsion callback for large key > 256 KB + */ +static int lhKeyCmp(const void *pData,sxu32 nLen,void *pUserData) +{ + struct lhash_key_cmp *pCmp = (struct lhash_key_cmp *)pUserData; + int rc; + if( pCmp->zIn >= pCmp->zEnd ){ + if( nLen > 0 ){ + return UNQLITE_ABORT; + } + return UNQLITE_OK; + } + /* Perform the comparison */ + rc = pCmp->xCmp((const void *)pCmp->zIn,pData,nLen); + if( rc != 0 ){ + /* Abort comparison */ + return UNQLITE_ABORT; + } + /* Advance the cursor */ + pCmp->zIn += nLen; + return UNQLITE_OK; +} +/* Forward declaration */ +static int lhConsumeCellkey(lhcell *pCell,int (*xConsumer)(const void *,unsigned int,void *),void *pUserData,int offt_only); +/* + * given a key, return the cell associated with it on success. NULL otherwise. + */ +static lhcell * lhFindCell( + lhpage *pPage, /* Target page */ + const void *pKey, /* Lookup key */ + sxu32 nByte, /* Key length */ + sxu32 nHash /* Hash of the key */ + ) +{ + lhcell *pEntry; + if( pPage->nCell < 1 ){ + /* Don't bother hashing */ + return 0; + } + /* Point to the corresponding bucket */ + pEntry = pPage->apCell[nHash & (pPage->nCellSize - 1)]; + for(;;){ + if( pEntry == 0 ){ + break; + } + if( pEntry->nHash == nHash && pEntry->nKey == nByte ){ + if( SyBlobLength(&pEntry->sKey) < 1 ){ + /* Large key (> 256 KB) are not kept in-memory */ + struct lhash_key_cmp sCmp; + int rc; + /* Fill-in the structure */ + sCmp.zIn = (const char *)pKey; + sCmp.zEnd = &sCmp.zIn[nByte]; + sCmp.xCmp = pPage->pHash->xCmp; + /* Fetch the key from disk and perform the comparison */ + rc = lhConsumeCellkey(pEntry,lhKeyCmp,&sCmp,0); + if( rc == UNQLITE_OK ){ + /* Cell found */ + return pEntry; + } + }else if ( pPage->pHash->xCmp(pKey,SyBlobData(&pEntry->sKey),nByte) == 0 ){ + /* Cell found */ + return pEntry; + } + } + /* Point to the next entry */ + pEntry = pEntry->pNextCol; + } + /* No such entry */ + return 0; +} +/* + * Parse a raw cell fetched from disk. + */ +static int lhParseOneCell(lhpage *pPage,const unsigned char *zRaw,const unsigned char *zEnd,lhcell **ppOut) +{ + sxu16 iNext,iOfft; + sxu32 iHash,nKey; + lhcell *pCell; + sxu64 nData; + int rc; + /* Offset this cell is stored */ + iOfft = (sxu16)(zRaw - (const unsigned char *)pPage->pRaw->zData); + /* 4 byte hash number */ + SyBigEndianUnpack32(zRaw,&iHash); + zRaw += 4; + /* 4 byte key length */ + SyBigEndianUnpack32(zRaw,&nKey); + zRaw += 4; + /* 8 byte data length */ + SyBigEndianUnpack64(zRaw,&nData); + zRaw += 8; + /* 2 byte offset of the next cell */ + SyBigEndianUnpack16(zRaw,&iNext); + /* Perform a sanity check */ + if( iNext > 0 && &pPage->pRaw->zData[iNext] >= zEnd ){ + return UNQLITE_CORRUPT; + } + zRaw += 2; + pCell = lhNewCell(pPage->pHash,pPage); + if( pCell == 0 ){ + return UNQLITE_NOMEM; + } + /* Fill in the structure */ + pCell->iNext = iNext; + pCell->nKey = nKey; + pCell->nData = nData; + pCell->nHash = iHash; + /* Overflow page if any */ + SyBigEndianUnpack64(zRaw,&pCell->iOvfl); + zRaw += 8; + /* Cell offset */ + pCell->iStart = iOfft; + /* Consume the key */ + rc = lhConsumeCellkey(pCell,unqliteDataConsumer,&pCell->sKey,pCell->nKey > 262144 /* 256 KB */? 1 : 0); + if( rc != UNQLITE_OK ){ + /* TICKET: 14-32-chm@symisc.net: Key too large for memory */ + SyBlobRelease(&pCell->sKey); + } + /* Finally install the cell */ + rc = lhInstallCell(pCell); + if( rc != UNQLITE_OK ){ + return rc; + } + if( ppOut ){ + *ppOut = pCell; + } + return UNQLITE_OK; +} +/* + * Compute the total number of free space on a given page. + */ +static int lhPageFreeSpace(lhpage *pPage) +{ + const unsigned char *zEnd,*zRaw = pPage->pRaw->zData; + lhphdr *pHdr = &pPage->sHdr; + sxu16 iNext,iAmount; + sxu16 nFree = 0; + if( pHdr->iFree < 1 ){ + /* Don't bother processing, the page is full */ + pPage->nFree = 0; + return UNQLITE_OK; + } + /* Point to first free block */ + zEnd = &zRaw[pPage->pHash->iPageSize]; + zRaw += pHdr->iFree; + for(;;){ + /* Offset of the next free block */ + SyBigEndianUnpack16(zRaw,&iNext); + zRaw += 2; + /* Available space on this block */ + SyBigEndianUnpack16(zRaw,&iAmount); + nFree += iAmount; + if( iNext < 1 ){ + /* No more free blocks */ + break; + } + /* Point to the next free block*/ + zRaw = &pPage->pRaw->zData[iNext]; + if( zRaw >= zEnd ){ + /* Corrupt page */ + return UNQLITE_CORRUPT; + } + } + /* Save the amount of free space */ + pPage->nFree = nFree; + return UNQLITE_OK; +} +/* + * Given a primary page, load all its cell. + */ +static int lhLoadCells(lhpage *pPage) +{ + const unsigned char *zEnd,*zRaw = pPage->pRaw->zData; + lhphdr *pHdr = &pPage->sHdr; + lhcell *pCell = 0; /* cc warning */ + int rc; + /* Calculate the amount of free space available first */ + rc = lhPageFreeSpace(pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pHdr->iOfft < 1 ){ + /* Don't bother processing, the page is empty */ + return UNQLITE_OK; + } + /* Point to first cell */ + zRaw += pHdr->iOfft; + zEnd = &zRaw[pPage->pHash->iPageSize]; + for(;;){ + /* Parse a single cell */ + rc = lhParseOneCell(pPage,zRaw,zEnd,&pCell); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pCell->iNext < 1 ){ + /* No more cells */ + break; + } + /* Point to the next cell */ + zRaw = &pPage->pRaw->zData[pCell->iNext]; + if( zRaw >= zEnd ){ + /* Corrupt page */ + return UNQLITE_CORRUPT; + } + } + /* All done */ + return UNQLITE_OK; +} +/* + * Given a page, parse its raw headers. + */ +static int lhParsePageHeader(lhpage *pPage) +{ + const unsigned char *zRaw = pPage->pRaw->zData; + lhphdr *pHdr = &pPage->sHdr; + /* Offset of the first cell */ + SyBigEndianUnpack16(zRaw,&pHdr->iOfft); + zRaw += 2; + /* Offset of the first free block */ + SyBigEndianUnpack16(zRaw,&pHdr->iFree); + zRaw += 2; + /* Slave page number */ + SyBigEndianUnpack64(zRaw,&pHdr->iSlave); + /* All done */ + return UNQLITE_OK; +} +/* + * Allocate a new page instance. + */ +static lhpage * lhNewPage( + lhash_kv_engine *pEngine, /* KV store which own this instance */ + unqlite_page *pRaw, /* Raw page contents */ + lhpage *pMaster /* Master page in case we are dealing with a slave page */ + ) +{ + lhpage *pPage; + /* Allocate a new instance */ + pPage = (lhpage *)SyMemBackendPoolAlloc(&pEngine->sAllocator,sizeof(lhpage)); + if( pPage == 0 ){ + return 0; + } + /* Zero the structure */ + SyZero(pPage,sizeof(lhpage)); + /* Fill-in the structure */ + pPage->pHash = pEngine; + pPage->pRaw = pRaw; + pPage->pMaster = pMaster ? pMaster /* Slave page */ : pPage /* Master page */ ; + if( pPage->pMaster != pPage ){ + /* Slave page, attach it to its master */ + pPage->pNextSlave = pMaster->pSlave; + pMaster->pSlave = pPage; + pMaster->iSlave++; + } + /* Save this instance for future fast lookup */ + pRaw->pUserData = pPage; + /* All done */ + return pPage; +} +/* + * Load a primary and its associated slave pages from disk. + */ +static int lhLoadPage(lhash_kv_engine *pEngine,pgno pnum,lhpage *pMaster,lhpage **ppOut,int iNest) +{ + unqlite_page *pRaw; + lhpage *pPage = 0; /* cc warning */ + int rc; + /* Aquire the page from the pager first */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pnum,&pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pRaw->pUserData ){ + /* The page is already parsed and loaded in memory. Point to it */ + pPage = (lhpage *)pRaw->pUserData; + }else{ + /* Allocate a new page */ + pPage = lhNewPage(pEngine,pRaw,pMaster); + if( pPage == 0 ){ + return UNQLITE_NOMEM; + } + /* Process the page */ + rc = lhParsePageHeader(pPage); + if( rc == UNQLITE_OK ){ + /* Load cells */ + rc = lhLoadCells(pPage); + } + if( rc != UNQLITE_OK ){ + pEngine->pIo->xPageUnref(pPage->pRaw); /* pPage will be released inside this call */ + return rc; + } + if( pPage->sHdr.iSlave > 0 && iNest < 128 ){ + if( pMaster == 0 ){ + pMaster = pPage; + } + /* Slave page. Not a fatal error if something goes wrong here */ + lhLoadPage(pEngine,pPage->sHdr.iSlave,pMaster,0,iNest++); + } + } + if( ppOut ){ + *ppOut = pPage; + } + return UNQLITE_OK; +} +/* + * Given a cell, Consume its key by invoking the given callback for each extracted chunk. + */ +static int lhConsumeCellkey( + lhcell *pCell, /* Target cell */ + int (*xConsumer)(const void *,unsigned int,void *), /* Consumer callback */ + void *pUserData, /* Last argument to xConsumer() */ + int offt_only + ) +{ + lhpage *pPage = pCell->pPage; + const unsigned char *zRaw = pPage->pRaw->zData; + const unsigned char *zPayload; + int rc; + /* Point to the payload area */ + zPayload = &zRaw[pCell->iStart]; + if( pCell->iOvfl == 0 ){ + /* Best scenario, consume the key directly without any overflow page */ + zPayload += L_HASH_CELL_SZ; + rc = xConsumer((const void *)zPayload,pCell->nKey,pUserData); + if( rc != UNQLITE_OK ){ + rc = UNQLITE_ABORT; + } + }else{ + lhash_kv_engine *pEngine = pPage->pHash; + sxu32 nByte,nData = pCell->nKey; + unqlite_page *pOvfl; + int data_offset = 0; + pgno iOvfl; + /* Overflow page */ + iOvfl = pCell->iOvfl; + /* Total usable bytes in an overflow page */ + nByte = L_HASH_OVERFLOW_SIZE(pEngine->iPageSize); + for(;;){ + if( iOvfl == 0 || nData < 1 ){ + /* no more overflow page */ + break; + } + /* Point to the overflow page */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iOvfl,&pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + zPayload = &pOvfl->zData[8]; + /* Point to the raw content */ + if( !data_offset ){ + /* Get the data page and offset */ + SyBigEndianUnpack64(zPayload,&pCell->iDataPage); + zPayload += 8; + SyBigEndianUnpack16(zPayload,&pCell->iDataOfft); + zPayload += 2; + if( offt_only ){ + /* Key too large, grab the data offset and return */ + pEngine->pIo->xPageUnref(pOvfl); + return UNQLITE_OK; + } + data_offset = 1; + } + /* Consume the key */ + if( nData <= nByte ){ + rc = xConsumer((const void *)zPayload,nData,pUserData); + if( rc != UNQLITE_OK ){ + pEngine->pIo->xPageUnref(pOvfl); + return UNQLITE_ABORT; + } + nData = 0; + }else{ + rc = xConsumer((const void *)zPayload,nByte,pUserData); + if( rc != UNQLITE_OK ){ + pEngine->pIo->xPageUnref(pOvfl); + return UNQLITE_ABORT; + } + nData -= nByte; + } + /* Next overflow page in the chain */ + SyBigEndianUnpack64(pOvfl->zData,&iOvfl); + /* Unref the page */ + pEngine->pIo->xPageUnref(pOvfl); + } + rc = UNQLITE_OK; + } + return rc; +} +/* + * Given a cell, Consume its data by invoking the given callback for each extracted chunk. + */ +static int lhConsumeCellData( + lhcell *pCell, /* Target cell */ + int (*xConsumer)(const void *,unsigned int,void *), /* Data consumer callback */ + void *pUserData /* Last argument to xConsumer() */ + ) +{ + lhpage *pPage = pCell->pPage; + const unsigned char *zRaw = pPage->pRaw->zData; + const unsigned char *zPayload; + int rc; + /* Point to the payload area */ + zPayload = &zRaw[pCell->iStart]; + if( pCell->iOvfl == 0 ){ + /* Best scenario, consume the data directly without any overflow page */ + zPayload += L_HASH_CELL_SZ + pCell->nKey; + rc = xConsumer((const void *)zPayload,(sxu32)pCell->nData,pUserData); + if( rc != UNQLITE_OK ){ + rc = UNQLITE_ABORT; + } + }else{ + lhash_kv_engine *pEngine = pPage->pHash; + sxu64 nData = pCell->nData; + unqlite_page *pOvfl; + int fix_offset = 0; + sxu32 nByte; + pgno iOvfl; + /* Overflow page where data is stored */ + iOvfl = pCell->iDataPage; + for(;;){ + if( iOvfl == 0 || nData < 1 ){ + /* no more overflow page */ + break; + } + /* Point to the overflow page */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iOvfl,&pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Point to the raw content */ + zPayload = pOvfl->zData; + if( !fix_offset ){ + /* Point to the data */ + zPayload += pCell->iDataOfft; + nByte = pEngine->iPageSize - pCell->iDataOfft; + fix_offset = 1; + }else{ + zPayload += 8; + /* Total usable bytes in an overflow page */ + nByte = L_HASH_OVERFLOW_SIZE(pEngine->iPageSize); + } + /* Consume the data */ + if( nData <= (sxu64)nByte ){ + rc = xConsumer((const void *)zPayload,(unsigned int)nData,pUserData); + if( rc != UNQLITE_OK ){ + pEngine->pIo->xPageUnref(pOvfl); + return UNQLITE_ABORT; + } + nData = 0; + }else{ + if( nByte > 0 ){ + rc = xConsumer((const void *)zPayload,nByte,pUserData); + if( rc != UNQLITE_OK ){ + pEngine->pIo->xPageUnref(pOvfl); + return UNQLITE_ABORT; + } + nData -= nByte; + } + } + /* Next overflow page in the chain */ + SyBigEndianUnpack64(pOvfl->zData,&iOvfl); + /* Unref the page */ + pEngine->pIo->xPageUnref(pOvfl); + } + rc = UNQLITE_OK; + } + return rc; +} +/* + * Read the linear hash header (Page one of the database). + */ +static int lhash_read_header(lhash_kv_engine *pEngine,unqlite_page *pHeader) +{ + const unsigned char *zRaw = pHeader->zData; + lhash_bmap_page *pMap; + sxu32 nHash; + int rc; + pEngine->pHeader = pHeader; + /* 4 byte magic number */ + SyBigEndianUnpack32(zRaw,&pEngine->nMagic); + zRaw += 4; + if( pEngine->nMagic != L_HASH_MAGIC ){ + /* Corrupt implementation */ + return UNQLITE_CORRUPT; + } + /* 4 byte hash value to identify a valid hash function */ + SyBigEndianUnpack32(zRaw,&nHash); + zRaw += 4; + /* Sanity check */ + if( pEngine->xHash(L_HASH_WORD,sizeof(L_HASH_WORD)-1) != nHash ){ + /* Different hash function */ + pEngine->pIo->xErr(pEngine->pIo->pHandle,"Invalid hash function"); + return UNQLITE_INVALID; + } + /* List of free pages */ + SyBigEndianUnpack64(zRaw,&pEngine->nFreeList); + zRaw += 8; + /* Current split bucket */ + SyBigEndianUnpack64(zRaw,&pEngine->split_bucket); + zRaw += 8; + /* Maximum split bucket */ + SyBigEndianUnpack64(zRaw,&pEngine->max_split_bucket); + zRaw += 8; + /* Next generation */ + pEngine->nmax_split_nucket = pEngine->max_split_bucket << 1; + /* Initialiaze the bucket map */ + pMap = &pEngine->sPageMap; + /* Fill in the structure */ + pMap->iNum = pHeader->pgno; + /* Next page in the bucket map */ + SyBigEndianUnpack64(zRaw,&pMap->iNext); + zRaw += 8; + /* Total number of records in the bucket map (This page only) */ + SyBigEndianUnpack32(zRaw,&pMap->nRec); + zRaw += 4; + pMap->iPtr = (sxu16)(zRaw - pHeader->zData); + /* Load the map in memory */ + rc = lhMapLoadPage(pEngine,pMap,pHeader->zData); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Load the bucket map chain if any */ + for(;;){ + pgno iNext = pMap->iNext; + unqlite_page *pPage; + if( iNext == 0 ){ + /* No more map pages */ + break; + } + /* Point to the target page */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iNext,&pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Fill in the structure */ + pMap->iNum = iNext; + pMap->iPtr = 0; + /* Load the map in memory */ + rc = lhMapLoadPage(pEngine,pMap,pPage->zData); + if( rc != UNQLITE_OK ){ + return rc; + } + } + /* All done */ + return UNQLITE_OK; +} +/* + * Perform a record lookup. + */ +static int lhRecordLookup( + lhash_kv_engine *pEngine, /* KV storage engine */ + const void *pKey, /* Lookup key */ + sxu32 nByte, /* Key length */ + lhcell **ppCell /* OUT: Target cell on success */ + ) +{ + lhash_bmap_rec *pRec; + lhpage *pPage; + lhcell *pCell; + pgno iBucket; + sxu32 nHash; + int rc; + /* Acquire the first page (hash Header) so that everything gets loaded autmatically */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Compute the hash of the key first */ + nHash = pEngine->xHash(pKey,nByte); + /* Extract the logical (i.e. not real) page number */ + iBucket = nHash & (pEngine->nmax_split_nucket - 1); + if( iBucket >= (pEngine->split_bucket + pEngine->max_split_bucket) ){ + /* Low mask */ + iBucket = nHash & (pEngine->max_split_bucket - 1); + } + /* Map the logical bucket number to real page number */ + pRec = lhMapFindBucket(pEngine,iBucket); + if( pRec == 0 ){ + /* No such entry */ + return UNQLITE_NOTFOUND; + } + /* Load the master page and it's slave page in-memory */ + rc = lhLoadPage(pEngine,pRec->iReal,0,&pPage,0); + if( rc != UNQLITE_OK ){ + /* IO error, unlikely scenario */ + return rc; + } + /* Lookup for the cell */ + pCell = lhFindCell(pPage,pKey,nByte,nHash); + if( pCell == 0 ){ + /* No such entry */ + return UNQLITE_NOTFOUND; + } + if( ppCell ){ + *ppCell = pCell; + } + return UNQLITE_OK; +} +/* + * Acquire a new page either from the free list or ask the pager + * for a new one. + */ +static int lhAcquirePage(lhash_kv_engine *pEngine,unqlite_page **ppOut) +{ + unqlite_page *pPage; + int rc; + if( pEngine->nFreeList != 0 ){ + /* Acquire one from the free list */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pEngine->nFreeList,&pPage); + if( rc == UNQLITE_OK ){ + /* Point to the next free page */ + SyBigEndianUnpack64(pPage->zData,&pEngine->nFreeList); + /* Update the database header */ + rc = pEngine->pIo->xWrite(pEngine->pHeader); + if( rc != UNQLITE_OK ){ + return rc; + } + SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/],pEngine->nFreeList); + /* Tell the pager do not journal this page */ + pEngine->pIo->xDontJournal(pPage); + /* Return to the caller */ + *ppOut = pPage; + /* All done */ + return UNQLITE_OK; + } + } + /* Acquire a new page */ + rc = pEngine->pIo->xNew(pEngine->pIo->pHandle,&pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Point to the target page */ + *ppOut = pPage; + return UNQLITE_OK; +} +/* + * Write a bucket map record to disk. + */ +static int lhMapWriteRecord(lhash_kv_engine *pEngine,pgno iLogic,pgno iReal) +{ + lhash_bmap_page *pMap = &pEngine->sPageMap; + unqlite_page *pPage = 0; + int rc; + if( pMap->iPtr > (pEngine->iPageSize - 16) /* 8 byte logical bucket number + 8 byte real bucket number */ ){ + unqlite_page *pOld; + /* Point to the old page */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pMap->iNum,&pOld); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Acquire a new page */ + rc = lhAcquirePage(pEngine,&pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Reflect the change */ + pMap->iNext = 0; + pMap->iNum = pPage->pgno; + pMap->nRec = 0; + pMap->iPtr = 8/* Next page number */+4/* Total records in the map*/; + /* Link this page */ + rc = pEngine->pIo->xWrite(pOld); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pOld->pgno == pEngine->pHeader->pgno ){ + /* First page (Hash header) */ + SyBigEndianPack64(&pOld->zData[4/*magic*/+4/*hash*/+8/* Free page */+8/*current split bucket*/+8/*Maximum split bucket*/],pPage->pgno); + }else{ + /* Link the new page */ + SyBigEndianPack64(pOld->zData,pPage->pgno); + /* Unref */ + pEngine->pIo->xPageUnref(pOld); + } + /* Assume the last bucket map page */ + rc = pEngine->pIo->xWrite(pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + SyBigEndianPack64(pPage->zData,0); /* Next bucket map page on the list */ + } + if( pPage == 0){ + /* Point to the current map page */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pMap->iNum,&pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + } + /* Make page writable */ + rc = pEngine->pIo->xWrite(pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Write the data */ + SyBigEndianPack64(&pPage->zData[pMap->iPtr],iLogic); + pMap->iPtr += 8; + SyBigEndianPack64(&pPage->zData[pMap->iPtr],iReal); + pMap->iPtr += 8; + /* Install the bucket map */ + rc = lhMapInstallBucket(pEngine,iLogic,iReal); + if( rc == UNQLITE_OK ){ + /* Total number of records */ + pMap->nRec++; + if( pPage->pgno == pEngine->pHeader->pgno ){ + /* Page one: Always writable */ + SyBigEndianPack32( + &pPage->zData[4/*magic*/+4/*hash*/+8/* Free page */+8/*current split bucket*/+8/*Maximum split bucket*/+8/*Next map page*/], + pMap->nRec); + }else{ + /* Make page writable */ + rc = pEngine->pIo->xWrite(pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + SyBigEndianPack32(&pPage->zData[8],pMap->nRec); + } + } + return rc; +} +/* + * Defragment a page. + */ +static int lhPageDefragment(lhpage *pPage) +{ + lhash_kv_engine *pEngine = pPage->pHash; + unsigned char *zTmp,*zPtr,*zEnd,*zPayload; + lhcell *pCell; + /* Get a temporary page from the pager. This opertaion never fail */ + zTmp = pEngine->pIo->xTmpPage(pEngine->pIo->pHandle); + /* Move the target cells to the begining */ + pCell = pPage->pList; + /* Write the slave page number */ + SyBigEndianPack64(&zTmp[2/*Offset of the first cell */+2/*Offset of the first free block */],pPage->sHdr.iSlave); + zPtr = &zTmp[L_HASH_PAGE_HDR_SZ]; /* Offset to start writing from */ + zEnd = &zTmp[pEngine->iPageSize]; + pPage->sHdr.iOfft = 0; /* Offset of the first cell */ + for(;;){ + if( pCell == 0 ){ + /* No more cells */ + break; + } + if( pCell->pPage->pRaw->pgno == pPage->pRaw->pgno ){ + /* Cell payload if locally stored */ + zPayload = 0; + if( pCell->iOvfl == 0 ){ + zPayload = &pCell->pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ]; + } + /* Move the cell */ + pCell->iNext = pPage->sHdr.iOfft; + pCell->iStart = (sxu16)(zPtr - zTmp); /* Offset where this cell start */ + pPage->sHdr.iOfft = pCell->iStart; + /* Write the cell header */ + /* 4 byte hash number */ + SyBigEndianPack32(zPtr,pCell->nHash); + zPtr += 4; + /* 4 byte ley length */ + SyBigEndianPack32(zPtr,pCell->nKey); + zPtr += 4; + /* 8 byte data length */ + SyBigEndianPack64(zPtr,pCell->nData); + zPtr += 8; + /* 2 byte offset of the next cell */ + SyBigEndianPack16(zPtr,pCell->iNext); + zPtr += 2; + /* 8 byte overflow page number */ + SyBigEndianPack64(zPtr,pCell->iOvfl); + zPtr += 8; + if( zPayload ){ + /* Local payload */ + SyMemcpy((const void *)zPayload,zPtr,(sxu32)(pCell->nKey + pCell->nData)); + zPtr += pCell->nKey + pCell->nData; + } + if( zPtr >= zEnd ){ + /* Can't happen */ + break; + } + } + /* Point to the next page */ + pCell = pCell->pNext; + } + /* Mark the free block */ + pPage->nFree = (sxu16)(zEnd - zPtr); /* Block length */ + if( pPage->nFree > 3 ){ + pPage->sHdr.iFree = (sxu16)(zPtr - zTmp); /* Offset of the free block */ + /* Mark the block */ + SyBigEndianPack16(zPtr,0); /* Offset of the next free block */ + SyBigEndianPack16(&zPtr[2],pPage->nFree); /* Block length */ + }else{ + /* Block of length less than 4 bytes are simply discarded */ + pPage->nFree = 0; + pPage->sHdr.iFree = 0; + } + /* Reflect the change */ + SyBigEndianPack16(zTmp,pPage->sHdr.iOfft); /* Offset of the first cell */ + SyBigEndianPack16(&zTmp[2],pPage->sHdr.iFree); /* Offset of the first free block */ + SyMemcpy((const void *)zTmp,pPage->pRaw->zData,pEngine->iPageSize); + /* All done */ + return UNQLITE_OK; +} +/* +** Allocate nByte bytes of space on a page. +** +** Return the index into pPage->pRaw->zData[] of the first byte of +** the new allocation. Or return 0 if there is not enough free +** space on the page to satisfy the allocation request. +** +** If the page contains nBytes of free space but does not contain +** nBytes of contiguous free space, then this routine automatically +** calls defragementPage() to consolidate all free space before +** allocating the new chunk. +*/ +static int lhAllocateSpace(lhpage *pPage,sxu64 nAmount,sxu16 *pOfft) +{ + const unsigned char *zEnd,*zPtr; + sxu16 iNext,iBlksz,nByte; + unsigned char *zPrev; + int rc; + if( (sxu64)pPage->nFree < nAmount ){ + /* Don't bother looking for a free chunk */ + return UNQLITE_FULL; + } + if( pPage->nCell < 10 && ((int)nAmount >= (pPage->pHash->iPageSize / 2)) ){ + /* Big chunk need an overflow page for its data */ + return UNQLITE_FULL; + } + zPtr = &pPage->pRaw->zData[pPage->sHdr.iFree]; + zEnd = &pPage->pRaw->zData[pPage->pHash->iPageSize]; + nByte = (sxu16)nAmount; + zPrev = 0; + iBlksz = 0; /* cc warning */ + /* Perform the lookup */ + for(;;){ + if( zPtr >= zEnd ){ + return UNQLITE_FULL; + } + /* Offset of the next free block */ + SyBigEndianUnpack16(zPtr,&iNext); + /* Block size */ + SyBigEndianUnpack16(&zPtr[2],&iBlksz); + if( iBlksz >= nByte ){ + /* Got one */ + break; + } + zPrev = (unsigned char *)zPtr; + if( iNext == 0 ){ + /* No more free blocks, defragment the page */ + rc = lhPageDefragment(pPage); + if( rc == UNQLITE_OK && pPage->nFree >= nByte) { + /* Free blocks are merged together */ + iNext = 0; + zPtr = &pPage->pRaw->zData[pPage->sHdr.iFree]; + iBlksz = pPage->nFree; + zPrev = 0; + break; + }else{ + return UNQLITE_FULL; + } + } + /* Point to the next free block */ + zPtr = &pPage->pRaw->zData[iNext]; + } + /* Acquire writer lock on this page */ + rc = pPage->pHash->pIo->xWrite(pPage->pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Save block offset */ + *pOfft = (sxu16)(zPtr - pPage->pRaw->zData); + /* Fix pointers */ + if( iBlksz >= nByte && (iBlksz - nByte) > 3 ){ + unsigned char *zBlock = &pPage->pRaw->zData[(*pOfft) + nByte]; + /* Create a new block */ + zPtr = zBlock; + SyBigEndianPack16(zBlock,iNext); /* Offset of the next block */ + SyBigEndianPack16(&zBlock[2],iBlksz-nByte); /* Block size*/ + /* Offset of the new block */ + iNext = (sxu16)(zPtr - pPage->pRaw->zData); + iBlksz = nByte; + } + /* Fix offsets */ + if( zPrev ){ + SyBigEndianPack16(zPrev,iNext); + }else{ + /* First block */ + pPage->sHdr.iFree = iNext; + /* Reflect on the page header */ + SyBigEndianPack16(&pPage->pRaw->zData[2/* Offset of the first cell1*/],iNext); + } + /* All done */ + pPage->nFree -= iBlksz; + return UNQLITE_OK; +} +/* + * Write the cell header into the corresponding offset. + */ +static int lhCellWriteHeader(lhcell *pCell) +{ + lhpage *pPage = pCell->pPage; + unsigned char *zRaw = pPage->pRaw->zData; + /* Seek to the desired location */ + zRaw += pCell->iStart; + /* 4 byte hash number */ + SyBigEndianPack32(zRaw,pCell->nHash); + zRaw += 4; + /* 4 byte key length */ + SyBigEndianPack32(zRaw,pCell->nKey); + zRaw += 4; + /* 8 byte data length */ + SyBigEndianPack64(zRaw,pCell->nData); + zRaw += 8; + /* 2 byte offset of the next cell */ + pCell->iNext = pPage->sHdr.iOfft; + SyBigEndianPack16(zRaw,pCell->iNext); + zRaw += 2; + /* 8 byte overflow page number */ + SyBigEndianPack64(zRaw,pCell->iOvfl); + /* Update the page header */ + pPage->sHdr.iOfft = pCell->iStart; + /* pEngine->pIo->xWrite() has been successfully called on this page */ + SyBigEndianPack16(pPage->pRaw->zData,pCell->iStart); + /* All done */ + return UNQLITE_OK; +} +/* + * Write local payload. + */ +static int lhCellWriteLocalPayload(lhcell *pCell, + const void *pKey,sxu32 nKeylen, + const void *pData,unqlite_int64 nDatalen + ) +{ + /* A writer lock have been acquired on this page */ + lhpage *pPage = pCell->pPage; + unsigned char *zRaw = pPage->pRaw->zData; + /* Seek to the desired location */ + zRaw += pCell->iStart + L_HASH_CELL_SZ; + /* Write the key */ + SyMemcpy(pKey,(void *)zRaw,nKeylen); + zRaw += nKeylen; + if( nDatalen > 0 ){ + /* Write the Data */ + SyMemcpy(pData,(void *)zRaw,(sxu32)nDatalen); + } + return UNQLITE_OK; +} +/* + * Allocate as much overflow page we need to store the cell payload. + */ +static int lhCellWriteOvflPayload(lhcell *pCell,const void *pKey,sxu32 nKeylen,...) +{ + lhpage *pPage = pCell->pPage; + lhash_kv_engine *pEngine = pPage->pHash; + unqlite_page *pOvfl,*pFirst,*pNew; + const unsigned char *zPtr,*zEnd; + unsigned char *zRaw,*zRawEnd; + sxu32 nAvail; + va_list ap; + int rc; + /* Acquire a new overflow page */ + rc = lhAcquirePage(pEngine,&pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Acquire a writer lock */ + rc = pEngine->pIo->xWrite(pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + pFirst = pOvfl; + /* Link */ + pCell->iOvfl = pOvfl->pgno; + /* Update the cell header */ + SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4/*Hash*/ + 4/*Key*/ + 8/*Data*/ + 2 /*Next cell*/],pCell->iOvfl); + /* Start the write process */ + zPtr = (const unsigned char *)pKey; + zEnd = &zPtr[nKeylen]; + SyBigEndianPack64(pOvfl->zData,0); /* Next overflow page on the chain */ + zRaw = &pOvfl->zData[8/* Next ovfl page*/ + 8 /* Data page */ + 2 /* Data offset*/]; + zRawEnd = &pOvfl->zData[pEngine->iPageSize]; + pNew = pOvfl; + /* Write the key */ + for(;;){ + if( zPtr >= zEnd ){ + break; + } + if( zRaw >= zRawEnd ){ + /* Acquire a new page */ + rc = lhAcquirePage(pEngine,&pNew); + if( rc != UNQLITE_OK ){ + return rc; + } + rc = pEngine->pIo->xWrite(pNew); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Link */ + SyBigEndianPack64(pOvfl->zData,pNew->pgno); + pEngine->pIo->xPageUnref(pOvfl); + SyBigEndianPack64(pNew->zData,0); /* Next overflow page on the chain */ + pOvfl = pNew; + zRaw = &pNew->zData[8]; + zRawEnd = &pNew->zData[pEngine->iPageSize]; + } + nAvail = (sxu32)(zRawEnd-zRaw); + nKeylen = (sxu32)(zEnd-zPtr); + if( nKeylen > nAvail ){ + nKeylen = nAvail; + } + SyMemcpy((const void *)zPtr,(void *)zRaw,nKeylen); + /* Synchronize pointers */ + zPtr += nKeylen; + zRaw += nKeylen; + } + rc = UNQLITE_OK; + va_start(ap,nKeylen); + pCell->iDataPage = pNew->pgno; + pCell->iDataOfft = (sxu16)(zRaw-pNew->zData); + /* Write the data page and its offset */ + SyBigEndianPack64(&pFirst->zData[8/*Next ovfl*/],pCell->iDataPage); + SyBigEndianPack16(&pFirst->zData[8/*Next ovfl*/+8/*Data page*/],pCell->iDataOfft); + /* Write data */ + for(;;){ + const void *pData; + sxu32 nDatalen; + sxu64 nData; + pData = va_arg(ap,const void *); + nData = va_arg(ap,sxu64); + if( pData == 0 ){ + /* No more chunks */ + break; + } + /* Write this chunk */ + zPtr = (const unsigned char *)pData; + zEnd = &zPtr[nData]; + for(;;){ + if( zPtr >= zEnd ){ + break; + } + if( zRaw >= zRawEnd ){ + /* Acquire a new page */ + rc = lhAcquirePage(pEngine,&pNew); + if( rc != UNQLITE_OK ){ + va_end(ap); + return rc; + } + rc = pEngine->pIo->xWrite(pNew); + if( rc != UNQLITE_OK ){ + va_end(ap); + return rc; + } + /* Link */ + SyBigEndianPack64(pOvfl->zData,pNew->pgno); + pEngine->pIo->xPageUnref(pOvfl); + SyBigEndianPack64(pNew->zData,0); /* Next overflow page on the chain */ + pOvfl = pNew; + zRaw = &pNew->zData[8]; + zRawEnd = &pNew->zData[pEngine->iPageSize]; + } + nAvail = (sxu32)(zRawEnd-zRaw); + nDatalen = (sxu32)(zEnd-zPtr); + if( nDatalen > nAvail ){ + nDatalen = nAvail; + } + SyMemcpy((const void *)zPtr,(void *)zRaw,nDatalen); + /* Synchronize pointers */ + zPtr += nDatalen; + zRaw += nDatalen; + } + } + /* Unref the overflow page */ + pEngine->pIo->xPageUnref(pOvfl); + va_end(ap); + return UNQLITE_OK; +} +/* + * Restore a page to the free list. + */ +static int lhRestorePage(lhash_kv_engine *pEngine,unqlite_page *pPage) +{ + int rc; + rc = pEngine->pIo->xWrite(pEngine->pHeader); + if( rc != UNQLITE_OK ){ + return rc; + } + rc = pEngine->pIo->xWrite(pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Link to the list of free page */ + SyBigEndianPack64(pPage->zData,pEngine->nFreeList); + pEngine->nFreeList = pPage->pgno; + SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/],pEngine->nFreeList); + /* All done */ + return UNQLITE_OK; +} +/* + * Restore cell space and mark it as a free block. + */ +static int lhRestoreSpace(lhpage *pPage,sxu16 iOfft,sxu16 nByte) +{ + unsigned char *zRaw; + if( nByte < 4 ){ + /* At least 4 bytes of freespace must form a valid block */ + return UNQLITE_OK; + } + /* pEngine->pIo->xWrite() has been successfully called on this page */ + zRaw = &pPage->pRaw->zData[iOfft]; + /* Mark as a free block */ + SyBigEndianPack16(zRaw,pPage->sHdr.iFree); /* Offset of the next free block */ + zRaw += 2; + SyBigEndianPack16(zRaw,nByte); + /* Link */ + SyBigEndianPack16(&pPage->pRaw->zData[2/* offset of the first cell */],iOfft); + pPage->sHdr.iFree = iOfft; + pPage->nFree += nByte; + return UNQLITE_OK; +} +/* Forward declaration */ +static lhcell * lhFindSibeling(lhcell *pCell); +/* + * Unlink a cell. + */ +static int lhUnlinkCell(lhcell *pCell) +{ + lhash_kv_engine *pEngine = pCell->pPage->pHash; + lhpage *pPage = pCell->pPage; + sxu16 nByte = L_HASH_CELL_SZ; + lhcell *pPrev; + int rc; + rc = pEngine->pIo->xWrite(pPage->pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Bring the link */ + pPrev = lhFindSibeling(pCell); + if( pPrev ){ + pPrev->iNext = pCell->iNext; + /* Fix offsets in the page header */ + SyBigEndianPack16(&pPage->pRaw->zData[pPrev->iStart + 4/*Hash*/+4/*Key*/+8/*Data*/],pCell->iNext); + }else{ + /* First entry on this page (either master or slave) */ + pPage->sHdr.iOfft = pCell->iNext; + /* Update the page header */ + SyBigEndianPack16(pPage->pRaw->zData,pCell->iNext); + } + /* Restore cell space */ + if( pCell->iOvfl == 0 ){ + nByte += (sxu16)(pCell->nData + pCell->nKey); + } + lhRestoreSpace(pPage,pCell->iStart,nByte); + /* Discard the cell from the in-memory hashtable */ + lhCellDiscard(pCell); + return UNQLITE_OK; +} +/* + * Remove a cell and its paylod (key + data). + */ +static int lhRecordRemove(lhcell *pCell) +{ + lhash_kv_engine *pEngine = pCell->pPage->pHash; + int rc; + if( pCell->iOvfl > 0){ + /* Discard overflow pages */ + unqlite_page *pOvfl; + pgno iNext = pCell->iOvfl; + for(;;){ + /* Point to the overflow page */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iNext,&pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Next page on the chain */ + SyBigEndianUnpack64(pOvfl->zData,&iNext); + /* Restore the page to the free list */ + rc = lhRestorePage(pEngine,pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Unref */ + pEngine->pIo->xPageUnref(pOvfl); + if( iNext == 0 ){ + break; + } + } + } + /* Unlink the cell */ + rc = lhUnlinkCell(pCell); + return rc; +} +/* + * Find cell sibeling. + */ +static lhcell * lhFindSibeling(lhcell *pCell) +{ + lhpage *pPage = pCell->pPage->pMaster; + lhcell *pEntry; + pEntry = pPage->pFirst; + while( pEntry ){ + if( pEntry->pPage == pCell->pPage && pEntry->iNext == pCell->iStart ){ + /* Sibeling found */ + return pEntry; + } + /* Point to the previous entry */ + pEntry = pEntry->pPrev; + } + /* Last inserted cell */ + return 0; +} +/* + * Move a cell to a new location with its new data. + */ +static int lhMoveLocalCell( + lhcell *pCell, + sxu16 iOfft, + const void *pData, + unqlite_int64 nData + ) +{ + sxu16 iKeyOfft = pCell->iStart + L_HASH_CELL_SZ; + lhpage *pPage = pCell->pPage; + lhcell *pSibeling; + pSibeling = lhFindSibeling(pCell); + if( pSibeling ){ + /* Fix link */ + SyBigEndianPack16(&pPage->pRaw->zData[pSibeling->iStart + 4/*Hash*/+4/*Key*/+8/*Data*/],pCell->iNext); + pSibeling->iNext = pCell->iNext; + }else{ + /* First cell, update page header only */ + SyBigEndianPack16(pPage->pRaw->zData,pCell->iNext); + pPage->sHdr.iOfft = pCell->iNext; + } + /* Set the new offset */ + pCell->iStart = iOfft; + pCell->nData = (sxu64)nData; + /* Write the cell payload */ + lhCellWriteLocalPayload(pCell,(const void *)&pPage->pRaw->zData[iKeyOfft],pCell->nKey,pData,nData); + /* Finally write the cell header */ + lhCellWriteHeader(pCell); + /* All done */ + return UNQLITE_OK; +} +/* + * Overwrite an existing record. + */ +static int lhRecordOverwrite( + lhcell *pCell, + const void *pData,unqlite_int64 nByte + ) +{ + lhash_kv_engine *pEngine = pCell->pPage->pHash; + unsigned char *zRaw,*zRawEnd,*zPayload; + const unsigned char *zPtr,*zEnd; + unqlite_page *pOvfl,*pOld,*pNew; + lhpage *pPage = pCell->pPage; + sxu32 nAvail; + pgno iOvfl; + int rc; + /* Acquire a writer lock on this page */ + rc = pEngine->pIo->xWrite(pPage->pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pCell->iOvfl == 0 ){ + /* Local payload, try to deal with the free space issues */ + zPayload = &pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ + pCell->nKey]; + if( pCell->nData == (sxu64)nByte ){ + /* Best scenario, simply a memcpy operation */ + SyMemcpy(pData,(void *)zPayload,(sxu32)nByte); + }else if( (sxu64)nByte < pCell->nData ){ + /* Shorter data, not so ugly */ + SyMemcpy(pData,(void *)zPayload,(sxu32)nByte); + /* Update the cell header */ + SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],nByte); + /* Restore freespace */ + lhRestoreSpace(pPage,(sxu16)(pCell->iStart + L_HASH_CELL_SZ + pCell->nKey + nByte),(sxu16)(pCell->nData - nByte)); + /* New data size */ + pCell->nData = (sxu64)nByte; + }else{ + sxu16 iOfft = 0; /* cc warning */ + /* Check if another chunk is available for this cell */ + rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ + pCell->nKey + nByte,&iOfft); + if( rc != UNQLITE_OK ){ + /* Transfer the payload to an overflow page */ + rc = lhCellWriteOvflPayload(pCell,&pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ],pCell->nKey,pData,nByte,(const void *)0); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Update the cell header */ + SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],(sxu64)nByte); + /* Restore freespace */ + lhRestoreSpace(pPage,(sxu16)(pCell->iStart + L_HASH_CELL_SZ),(sxu16)(pCell->nKey + pCell->nData)); + /* New data size */ + pCell->nData = (sxu64)nByte; + }else{ + sxu16 iOldOfft = pCell->iStart; + sxu32 iOld = (sxu32)pCell->nData; + /* Space is available, transfer the cell */ + lhMoveLocalCell(pCell,iOfft,pData,nByte); + /* Restore cell space */ + lhRestoreSpace(pPage,iOldOfft,(sxu16)(L_HASH_CELL_SZ + pCell->nKey + iOld)); + } + } + return UNQLITE_OK; + } + /* Point to the overflow page */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pCell->iDataPage,&pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Relase all old overflow pages first */ + SyBigEndianUnpack64(pOvfl->zData,&iOvfl); + pOld = pOvfl; + for(;;){ + if( iOvfl == 0 ){ + /* No more overflow pages on the chain */ + break; + } + /* Point to the target page */ + if( UNQLITE_OK != pEngine->pIo->xGet(pEngine->pIo->pHandle,iOvfl,&pOld) ){ + /* Not so fatal if something goes wrong here */ + break; + } + /* Next overflow page to be released */ + SyBigEndianUnpack64(pOld->zData,&iOvfl); + if( pOld != pOvfl ){ /* xx: chm is maniac */ + /* Restore the page to the free list */ + lhRestorePage(pEngine,pOld); + /* Unref */ + pEngine->pIo->xPageUnref(pOld); + } + } + /* Point to the data offset */ + zRaw = &pOvfl->zData[pCell->iDataOfft]; + zRawEnd = &pOvfl->zData[pEngine->iPageSize]; + /* The data to be stored */ + zPtr = (const unsigned char *)pData; + zEnd = &zPtr[nByte]; + /* Start the overwrite process */ + /* Acquire a writer lock */ + rc = pEngine->pIo->xWrite(pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + SyBigEndianPack64(pOvfl->zData,0); + for(;;){ + sxu32 nLen; + if( zPtr >= zEnd ){ + break; + } + if( zRaw >= zRawEnd ){ + /* Acquire a new page */ + rc = lhAcquirePage(pEngine,&pNew); + if( rc != UNQLITE_OK ){ + return rc; + } + rc = pEngine->pIo->xWrite(pNew); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Link */ + SyBigEndianPack64(pOvfl->zData,pNew->pgno); + pEngine->pIo->xPageUnref(pOvfl); + SyBigEndianPack64(pNew->zData,0); /* Next overflow page on the chain */ + pOvfl = pNew; + zRaw = &pNew->zData[8]; + zRawEnd = &pNew->zData[pEngine->iPageSize]; + } + nAvail = (sxu32)(zRawEnd-zRaw); + nLen = (sxu32)(zEnd-zPtr); + if( nLen > nAvail ){ + nLen = nAvail; + } + SyMemcpy((const void *)zPtr,(void *)zRaw,nLen); + /* Synchronize pointers */ + zPtr += nLen; + zRaw += nLen; + } + /* Unref the last overflow page */ + pEngine->pIo->xPageUnref(pOvfl); + /* Finally, update the cell header */ + pCell->nData = (sxu64)nByte; + SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],pCell->nData); + /* All done */ + return UNQLITE_OK; +} +/* + * Append data to an existing record. + */ +static int lhRecordAppend( + lhcell *pCell, + const void *pData,unqlite_int64 nByte + ) +{ + lhash_kv_engine *pEngine = pCell->pPage->pHash; + const unsigned char *zPtr,*zEnd; + lhpage *pPage = pCell->pPage; + unsigned char *zRaw,*zRawEnd; + unqlite_page *pOvfl,*pNew; + sxu64 nDatalen; + sxu32 nAvail; + pgno iOvfl; + int rc; + if( pCell->nData + nByte < pCell->nData ){ + /* Overflow */ + pEngine->pIo->xErr(pEngine->pIo->pHandle,"Append operation will cause data overflow"); + return UNQLITE_LIMIT; + } + /* Acquire a writer lock on this page */ + rc = pEngine->pIo->xWrite(pPage->pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pCell->iOvfl == 0 ){ + sxu16 iOfft = 0; /* cc warning */ + /* Local payload, check for a bigger place */ + rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ + pCell->nKey + pCell->nData + nByte,&iOfft); + if( rc != UNQLITE_OK ){ + /* Transfer the payload to an overflow page */ + rc = lhCellWriteOvflPayload(pCell, + &pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ],pCell->nKey, + (const void *)&pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ + pCell->nKey],pCell->nData, + pData,nByte, + (const void *)0); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Update the cell header */ + SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],pCell->nData + nByte); + /* Restore freespace */ + lhRestoreSpace(pPage,(sxu16)(pCell->iStart + L_HASH_CELL_SZ),(sxu16)(pCell->nKey + pCell->nData)); + /* New data size */ + pCell->nData += nByte; + }else{ + sxu16 iOldOfft = pCell->iStart; + sxu32 iOld = (sxu32)pCell->nData; + SyBlob sWorker; + SyBlobInit(&sWorker,&pEngine->sAllocator); + /* Copy the old data */ + rc = SyBlobAppend(&sWorker,(const void *)&pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ + pCell->nKey],(sxu32)pCell->nData); + if( rc == SXRET_OK ){ + /* Append the new data */ + rc = SyBlobAppend(&sWorker,pData,(sxu32)nByte); + } + if( rc != UNQLITE_OK ){ + SyBlobRelease(&sWorker); + return rc; + } + /* Space is available, transfer the cell */ + lhMoveLocalCell(pCell,iOfft,SyBlobData(&sWorker),(unqlite_int64)SyBlobLength(&sWorker)); + /* Restore cell space */ + lhRestoreSpace(pPage,iOldOfft,(sxu16)(L_HASH_CELL_SZ + pCell->nKey + iOld)); + /* All done */ + SyBlobRelease(&sWorker); + } + return UNQLITE_OK; + } + /* Point to the overflow page which hold the data */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pCell->iDataPage,&pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Next overflow page in the chain */ + SyBigEndianUnpack64(pOvfl->zData,&iOvfl); + /* Point to the end of the chunk */ + zRaw = &pOvfl->zData[pCell->iDataOfft]; + zRawEnd = &pOvfl->zData[pEngine->iPageSize]; + nDatalen = pCell->nData; + nAvail = (sxu32)(zRawEnd - zRaw); + for(;;){ + if( zRaw >= zRawEnd ){ + if( iOvfl == 0 ){ + /* Cant happen */ + pEngine->pIo->xErr(pEngine->pIo->pHandle,"Corrupt overflow page"); + return UNQLITE_CORRUPT; + } + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iOvfl,&pNew); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Next overflow page on the chain */ + SyBigEndianUnpack64(pNew->zData,&iOvfl); + /* Unref the previous overflow page */ + pEngine->pIo->xPageUnref(pOvfl); + /* Point to the new chunk */ + zRaw = &pNew->zData[8]; + zRawEnd = &pNew->zData[pCell->pPage->pHash->iPageSize]; + nAvail = L_HASH_OVERFLOW_SIZE(pCell->pPage->pHash->iPageSize); + pOvfl = pNew; + } + if( (sxu64)nAvail > nDatalen ){ + zRaw += nDatalen; + break; + }else{ + nDatalen -= nAvail; + } + zRaw += nAvail; + } + /* Start the append process */ + zPtr = (const unsigned char *)pData; + zEnd = &zPtr[nByte]; + /* Acquire a writer lock */ + rc = pEngine->pIo->xWrite(pOvfl); + if( rc != UNQLITE_OK ){ + return rc; + } + for(;;){ + sxu32 nLen; + if( zPtr >= zEnd ){ + break; + } + if( zRaw >= zRawEnd ){ + /* Acquire a new page */ + rc = lhAcquirePage(pEngine,&pNew); + if( rc != UNQLITE_OK ){ + return rc; + } + rc = pEngine->pIo->xWrite(pNew); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Link */ + SyBigEndianPack64(pOvfl->zData,pNew->pgno); + pEngine->pIo->xPageUnref(pOvfl); + SyBigEndianPack64(pNew->zData,0); /* Next overflow page on the chain */ + pOvfl = pNew; + zRaw = &pNew->zData[8]; + zRawEnd = &pNew->zData[pEngine->iPageSize]; + } + nAvail = (sxu32)(zRawEnd-zRaw); + nLen = (sxu32)(zEnd-zPtr); + if( nLen > nAvail ){ + nLen = nAvail; + } + SyMemcpy((const void *)zPtr,(void *)zRaw,nLen); + /* Synchronize pointers */ + zPtr += nLen; + zRaw += nLen; + } + /* Unref the last overflow page */ + pEngine->pIo->xPageUnref(pOvfl); + /* Finally, update the cell header */ + pCell->nData += nByte; + SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],pCell->nData); + /* All done */ + return UNQLITE_OK; +} +/* + * A write privilege have been acquired on this page. + * Mark it as an empty page (No cells). + */ +static int lhSetEmptyPage(lhpage *pPage) +{ + unsigned char *zRaw = pPage->pRaw->zData; + lhphdr *pHeader = &pPage->sHdr; + sxu16 nByte; + int rc; + /* Acquire a writer lock */ + rc = pPage->pHash->pIo->xWrite(pPage->pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Offset of the first cell */ + SyBigEndianPack16(zRaw,0); + zRaw += 2; + /* Offset of the first free block */ + pHeader->iFree = L_HASH_PAGE_HDR_SZ; + SyBigEndianPack16(zRaw,L_HASH_PAGE_HDR_SZ); + zRaw += 2; + /* Slave page number */ + SyBigEndianPack64(zRaw,0); + zRaw += 8; + /* Fill the free block */ + SyBigEndianPack16(zRaw,0); /* Offset of the next free block */ + zRaw += 2; + nByte = (sxu16)L_HASH_MX_FREE_SPACE(pPage->pHash->iPageSize); + SyBigEndianPack16(zRaw,nByte); + pPage->nFree = nByte; + /* Do not add this page to the hot dirty list */ + pPage->pHash->pIo->xDontMkHot(pPage->pRaw); + return UNQLITE_OK; +} +/* Forward declaration */ +static int lhSlaveStore( + lhpage *pPage, + const void *pKey,sxu32 nKeyLen, + const void *pData,unqlite_int64 nDataLen, + sxu32 nHash + ); +/* + * Store a cell and its payload in a given page. + */ +static int lhStoreCell( + lhpage *pPage, /* Target page */ + const void *pKey,sxu32 nKeyLen, /* Payload: Key */ + const void *pData,unqlite_int64 nDataLen, /* Payload: Data */ + sxu32 nHash, /* Hash of the key */ + int auto_append /* Auto append a slave page if full */ + ) +{ + lhash_kv_engine *pEngine = pPage->pHash; + int iNeedOvfl = 0; /* Need overflow page for this cell and its payload*/ + lhcell *pCell; + sxu16 nOfft; + int rc; + /* Acquire a writer lock on this page first */ + rc = pEngine->pIo->xWrite(pPage->pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Check for a free block */ + rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ+nKeyLen+nDataLen,&nOfft); + if( rc != UNQLITE_OK ){ + /* Check for a free block to hold a single cell only (without payload) */ + rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ,&nOfft); + if( rc != UNQLITE_OK ){ + if( !auto_append ){ + /* A split must be done */ + return UNQLITE_FULL; + }else{ + /* Store this record in a slave page */ + rc = lhSlaveStore(pPage,pKey,nKeyLen,pData,nDataLen,nHash); + return rc; + } + } + iNeedOvfl = 1; + } + /* Allocate a new cell instance */ + pCell = lhNewCell(pEngine,pPage); + if( pCell == 0 ){ + pEngine->pIo->xErr(pEngine->pIo->pHandle,"KV store is running out of memory"); + return UNQLITE_NOMEM; + } + /* Fill-in the structure */ + pCell->iStart = nOfft; + pCell->nKey = nKeyLen; + pCell->nData = (sxu64)nDataLen; + pCell->nHash = nHash; + if( nKeyLen < 262144 /* 256 KB */ ){ + /* Keep the key in-memory for fast lookup */ + SyBlobAppend(&pCell->sKey,pKey,nKeyLen); + } + /* Link the cell */ + rc = lhInstallCell(pCell); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Write the payload */ + if( iNeedOvfl ){ + rc = lhCellWriteOvflPayload(pCell,pKey,nKeyLen,pData,nDataLen,(const void *)0); + if( rc != UNQLITE_OK ){ + lhCellDiscard(pCell); + return rc; + } + }else{ + lhCellWriteLocalPayload(pCell,pKey,nKeyLen,pData,nDataLen); + } + /* Finally, Write the cell header */ + lhCellWriteHeader(pCell); + /* All done */ + return UNQLITE_OK; +} +/* + * Find a slave page capable of hosting the given amount. + */ +static int lhFindSlavePage(lhpage *pPage,sxu64 nAmount,sxu16 *pOfft,lhpage **ppSlave) +{ + lhash_kv_engine *pEngine = pPage->pHash; + lhpage *pMaster = pPage->pMaster; + lhpage *pSlave = pMaster->pSlave; + unqlite_page *pRaw; + lhpage *pNew; + sxu16 iOfft; + sxi32 i; + int rc; + /* Look for an already attached slave page */ + for( i = 0 ; i < pMaster->iSlave ; ++i ){ + /* Find a free chunk big enough */ + sxu16 size = L_HASH_CELL_SZ + nAmount; + rc = lhAllocateSpace(pSlave,size,&iOfft); + if( rc != UNQLITE_OK ){ + /* A space for cell header only */ + size = L_HASH_CELL_SZ; + rc = lhAllocateSpace(pSlave,size,&iOfft); + } + if( rc == UNQLITE_OK ){ + /* All done */ + if( pOfft ){ + *pOfft = iOfft; + }else{ + rc = lhRestoreSpace(pSlave, iOfft, size); + } + *ppSlave = pSlave; + return rc; + } + /* Point to the next slave page */ + pSlave = pSlave->pNextSlave; + } + /* Acquire a new slave page */ + rc = lhAcquirePage(pEngine,&pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Last slave page */ + pSlave = pMaster->pSlave; + if( pSlave == 0 ){ + /* First slave page */ + pSlave = pMaster; + } + /* Initialize the page */ + pNew = lhNewPage(pEngine,pRaw,pMaster); + if( pNew == 0 ){ + return UNQLITE_NOMEM; + } + /* Mark as an empty page */ + rc = lhSetEmptyPage(pNew); + if( rc != UNQLITE_OK ){ + goto fail; + } + if( pOfft ){ + /* Look for a free block */ + if( UNQLITE_OK != lhAllocateSpace(pNew,L_HASH_CELL_SZ+nAmount,&iOfft) ){ + /* Cell header only */ + lhAllocateSpace(pNew,L_HASH_CELL_SZ,&iOfft); /* Never fail */ + } + *pOfft = iOfft; + } + /* Link this page to the previous slave page */ + rc = pEngine->pIo->xWrite(pSlave->pRaw); + if( rc != UNQLITE_OK ){ + goto fail; + } + /* Reflect in the page header */ + SyBigEndianPack64(&pSlave->pRaw->zData[2/*Cell offset*/+2/*Free block offset*/],pRaw->pgno); + pSlave->sHdr.iSlave = pRaw->pgno; + /* All done */ + *ppSlave = pNew; + return UNQLITE_OK; +fail: + pEngine->pIo->xPageUnref(pNew->pRaw); /* pNew will be released in this call */ + return rc; + +} +/* + * Perform a store operation in a slave page. + */ +static int lhSlaveStore( + lhpage *pPage, /* Master page */ + const void *pKey,sxu32 nKeyLen, /* Payload: key */ + const void *pData,unqlite_int64 nDataLen, /* Payload: data */ + sxu32 nHash /* Hash of the key */ + ) +{ + lhpage *pSlave; + int rc; + /* Find a slave page */ + rc = lhFindSlavePage(pPage,nKeyLen + nDataLen,0,&pSlave); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Perform the insertion in the slave page */ + rc = lhStoreCell(pSlave,pKey,nKeyLen,pData,nDataLen,nHash,1); + return rc; +} +/* + * Transfer a cell to a new page (either a master or slave). + */ +static int lhTransferCell(lhcell *pTarget,lhpage *pPage) +{ + lhcell *pCell; + sxu16 nOfft; + int rc; + /* Check for a free block to hold a single cell only */ + rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ,&nOfft); + if( rc != UNQLITE_OK ){ + /* Store in a slave page */ + rc = lhFindSlavePage(pPage,L_HASH_CELL_SZ,&nOfft,&pPage); + if( rc != UNQLITE_OK ){ + return rc; + } + } + /* Allocate a new cell instance */ + pCell = lhNewCell(pPage->pHash,pPage); + if( pCell == 0 ){ + return UNQLITE_NOMEM; + } + /* Fill-in the structure */ + pCell->iStart = nOfft; + pCell->nData = pTarget->nData; + pCell->nKey = pTarget->nKey; + pCell->iOvfl = pTarget->iOvfl; + pCell->iDataOfft = pTarget->iDataOfft; + pCell->iDataPage = pTarget->iDataPage; + pCell->nHash = pTarget->nHash; + SyBlobDup(&pTarget->sKey,&pCell->sKey); + /* Link the cell */ + rc = lhInstallCell(pCell); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Finally, Write the cell header */ + lhCellWriteHeader(pCell); + /* All done */ + return UNQLITE_OK; +} +/* + * Perform a page split. + */ +static int lhPageSplit( + lhpage *pOld, /* Page to be split */ + lhpage *pNew, /* New page */ + pgno split_bucket, /* Current split bucket */ + pgno high_mask /* High mask (Max split bucket - 1) */ + ) +{ + lhcell *pCell,*pNext; + SyBlob sWorker; + pgno iBucket; + int rc; + SyBlobInit(&sWorker,&pOld->pHash->sAllocator); + /* Perform the split */ + pCell = pOld->pList; + for( ;; ){ + if( pCell == 0 ){ + /* No more cells */ + break; + } + /* Obtain the new logical bucket */ + iBucket = pCell->nHash & high_mask; + pNext = pCell->pNext; + if( iBucket != split_bucket){ + rc = UNQLITE_OK; + if( pCell->iOvfl ){ + /* Transfer the cell only */ + rc = lhTransferCell(pCell,pNew); + }else{ + /* Transfer the cell and its payload */ + SyBlobReset(&sWorker); + if( SyBlobLength(&pCell->sKey) < 1 ){ + /* Consume the key */ + rc = lhConsumeCellkey(pCell,unqliteDataConsumer,&pCell->sKey,0); + if( rc != UNQLITE_OK ){ + goto fail; + } + } + /* Consume the data (Very small data < 65k) */ + rc = lhConsumeCellData(pCell,unqliteDataConsumer,&sWorker); + if( rc != UNQLITE_OK ){ + goto fail; + } + /* Perform the transfer */ + rc = lhStoreCell( + pNew, + SyBlobData(&pCell->sKey),(int)SyBlobLength(&pCell->sKey), + SyBlobData(&sWorker),SyBlobLength(&sWorker), + pCell->nHash, + 1 + ); + } + if( rc != UNQLITE_OK ){ + goto fail; + } + /* Discard the cell from the old page */ + lhUnlinkCell(pCell); + } + /* Point to the next cell */ + pCell = pNext; + } + /* All done */ + rc = UNQLITE_OK; +fail: + SyBlobRelease(&sWorker); + return rc; +} +/* + * Perform the infamous linear hash split operation. + */ +static int lhSplit(lhpage *pTarget,int *pRetry) +{ + lhash_kv_engine *pEngine = pTarget->pHash; + lhash_bmap_rec *pRec; + lhpage *pOld,*pNew; + unqlite_page *pRaw; + int rc; + /* Get the real page number of the bucket to split */ + pRec = lhMapFindBucket(pEngine,pEngine->split_bucket); + if( pRec == 0 ){ + /* Can't happen */ + return UNQLITE_CORRUPT; + } + /* Load the page to be split */ + rc = lhLoadPage(pEngine,pRec->iReal,0,&pOld,0); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Request a new page */ + rc = lhAcquirePage(pEngine,&pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Initialize the page */ + pNew = lhNewPage(pEngine,pRaw,0); + if( pNew == 0 ){ + return UNQLITE_NOMEM; + } + /* Mark as an empty page */ + rc = lhSetEmptyPage(pNew); + if( rc != UNQLITE_OK ){ + goto fail; + } + /* Install and write the logical map record */ + rc = lhMapWriteRecord(pEngine, + pEngine->split_bucket + pEngine->max_split_bucket, + pRaw->pgno + ); + if( rc != UNQLITE_OK ){ + goto fail; + } + if( pTarget->pRaw->pgno == pOld->pRaw->pgno ){ + *pRetry = 1; + } + /* Perform the split */ + rc = lhPageSplit(pOld,pNew,pEngine->split_bucket,pEngine->nmax_split_nucket - 1); + if( rc != UNQLITE_OK ){ + goto fail; + } + /* Update the database header */ + pEngine->split_bucket++; + /* Acquire a writer lock on the first page */ + rc = pEngine->pIo->xWrite(pEngine->pHeader); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pEngine->split_bucket >= pEngine->max_split_bucket ){ + /* Increment the generation number */ + pEngine->split_bucket = 0; + pEngine->max_split_bucket = pEngine->nmax_split_nucket; + pEngine->nmax_split_nucket <<= 1; + if( !pEngine->nmax_split_nucket ){ + /* If this happen to your installation, please tell us */ + pEngine->pIo->xErr(pEngine->pIo->pHandle,"Database page (64-bit integer) limit reached"); + return UNQLITE_LIMIT; + } + /* Reflect in the page header */ + SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/+8/*Free list*/],pEngine->split_bucket); + SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/+8/*Free list*/+8/*Split bucket*/],pEngine->max_split_bucket); + }else{ + /* Modify only the split bucket */ + SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/+8/*Free list*/],pEngine->split_bucket); + } + /* All done */ + return UNQLITE_OK; +fail: + pEngine->pIo->xPageUnref(pNew->pRaw); + return rc; +} +/* + * Store a record in the target page. + */ +static int lhRecordInstall( + lhpage *pPage, /* Target page */ + sxu32 nHash, /* Hash of the key */ + const void *pKey,sxu32 nKeyLen, /* Payload: Key */ + const void *pData,unqlite_int64 nDataLen /* Payload: Data */ + ) +{ + int rc; + rc = lhStoreCell(pPage,pKey,nKeyLen,pData,nDataLen,nHash,0); + if( rc == UNQLITE_FULL ){ + int do_retry = 0; + /* Split */ + rc = lhSplit(pPage,&do_retry); + if( rc == UNQLITE_OK ){ + if( do_retry ){ + /* Re-calculate logical bucket number */ + return SXERR_RETRY; + } + /* Perform the store */ + rc = lhStoreCell(pPage,pKey,nKeyLen,pData,nDataLen,nHash,1); + } + } + return rc; +} +/* + * Insert a record (Either overwrite or append operation) in our database. + */ +static int lh_record_insert( + unqlite_kv_engine *pKv, /* KV store */ + const void *pKey,sxu32 nKeyLen, /* Payload: Key */ + const void *pData,unqlite_int64 nDataLen, /* Payload: data */ + int is_append /* True for an append operation */ + ) +{ + lhash_kv_engine *pEngine = (lhash_kv_engine *)pKv; + lhash_bmap_rec *pRec; + unqlite_page *pRaw; + lhpage *pPage; + lhcell *pCell; + pgno iBucket; + sxu32 nHash; + int iCnt; + int rc; + + /* Acquire the first page (DB hash Header) so that everything gets loaded autmatically */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0); + if( rc != UNQLITE_OK ){ + return rc; + } + iCnt = 0; + /* Compute the hash of the key first */ + nHash = pEngine->xHash(pKey,(sxu32)nKeyLen); +retry: + /* Extract the logical bucket number */ + iBucket = nHash & (pEngine->nmax_split_nucket - 1); + if( iBucket >= pEngine->split_bucket + pEngine->max_split_bucket ){ + /* Low mask */ + iBucket = nHash & (pEngine->max_split_bucket - 1); + } + /* Map the logical bucket number to real page number */ + pRec = lhMapFindBucket(pEngine,iBucket); + if( pRec == 0 ){ + /* Request a new page */ + rc = lhAcquirePage(pEngine,&pRaw); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Initialize the page */ + pPage = lhNewPage(pEngine,pRaw,0); + if( pPage == 0 ){ + return UNQLITE_NOMEM; + } + /* Mark as an empty page */ + rc = lhSetEmptyPage(pPage); + if( rc != UNQLITE_OK ){ + pEngine->pIo->xPageUnref(pRaw); /* pPage will be released during this call */ + return rc; + } + /* Store the cell */ + rc = lhStoreCell(pPage,pKey,nKeyLen,pData,nDataLen,nHash,1); + if( rc == UNQLITE_OK ){ + /* Install and write the logical map record */ + rc = lhMapWriteRecord(pEngine,iBucket,pRaw->pgno); + } + pEngine->pIo->xPageUnref(pRaw); + return rc; + }else{ + /* Load the page */ + rc = lhLoadPage(pEngine,pRec->iReal,0,&pPage,0); + if( rc != UNQLITE_OK ){ + /* IO error, unlikely scenario */ + return rc; + } + /* Do not add this page to the hot dirty list */ + pEngine->pIo->xDontMkHot(pPage->pRaw); + /* Lookup for the cell */ + pCell = lhFindCell(pPage,pKey,(sxu32)nKeyLen,nHash); + if( pCell == 0 ){ + /* Create the record */ + rc = lhRecordInstall(pPage,nHash,pKey,nKeyLen,pData,nDataLen); + if( rc == SXERR_RETRY && iCnt++ < 2 ){ + rc = UNQLITE_OK; + goto retry; + } + }else{ + if( is_append ){ + /* Append operation */ + rc = lhRecordAppend(pCell,pData,nDataLen); + }else{ + /* Overwrite old value */ + rc = lhRecordOverwrite(pCell,pData,nDataLen); + } + } + pEngine->pIo->xPageUnref(pPage->pRaw); + } + return rc; +} +/* + * Replace method. + */ +static int lhash_kv_replace( + unqlite_kv_engine *pKv, + const void *pKey,int nKeyLen, + const void *pData,unqlite_int64 nDataLen + ) +{ + int rc; + rc = lh_record_insert(pKv,pKey,(sxu32)nKeyLen,pData,nDataLen,0); + return rc; +} +/* + * Append method. + */ +static int lhash_kv_append( + unqlite_kv_engine *pKv, + const void *pKey,int nKeyLen, + const void *pData,unqlite_int64 nDataLen + ) +{ + int rc; + rc = lh_record_insert(pKv,pKey,(sxu32)nKeyLen,pData,nDataLen,1); + return rc; +} +/* + * Write the hash header (Page one). + */ +static int lhash_write_header(lhash_kv_engine *pEngine,unqlite_page *pHeader) +{ + unsigned char *zRaw = pHeader->zData; + lhash_bmap_page *pMap; + + pEngine->pHeader = pHeader; + /* 4 byte magic number */ + SyBigEndianPack32(zRaw,pEngine->nMagic); + zRaw += 4; + /* 4 byte hash value to identify a valid hash function */ + SyBigEndianPack32(zRaw,pEngine->xHash(L_HASH_WORD,sizeof(L_HASH_WORD)-1)); + zRaw += 4; + /* List of free pages: Empty */ + SyBigEndianPack64(zRaw,0); + zRaw += 8; + /* Current split bucket */ + SyBigEndianPack64(zRaw,pEngine->split_bucket); + zRaw += 8; + /* Maximum split bucket */ + SyBigEndianPack64(zRaw,pEngine->max_split_bucket); + zRaw += 8; + /* Initialiaze the bucket map */ + pMap = &pEngine->sPageMap; + /* Fill in the structure */ + pMap->iNum = pHeader->pgno; + /* Next page in the bucket map */ + SyBigEndianPack64(zRaw,0); + zRaw += 8; + /* Total number of records in the bucket map */ + SyBigEndianPack32(zRaw,0); + zRaw += 4; + pMap->iPtr = (sxu16)(zRaw - pHeader->zData); + /* All done */ + return UNQLITE_OK; + } +/* + * Exported: xOpen() method. + */ +static int lhash_kv_open(unqlite_kv_engine *pEngine,pgno dbSize) +{ + lhash_kv_engine *pHash = (lhash_kv_engine *)pEngine; + unqlite_page *pHeader; + int rc; + if( dbSize < 1 ){ + /* A new database, create the header */ + rc = pEngine->pIo->xNew(pEngine->pIo->pHandle,&pHeader); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Acquire a writer lock */ + rc = pEngine->pIo->xWrite(pHeader); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Write the hash header */ + rc = lhash_write_header(pHash,pHeader); + if( rc != UNQLITE_OK ){ + return rc; + } + }else{ + /* Acquire the page one of the database */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,&pHeader); + if( rc != UNQLITE_OK ){ + return rc; + } + /* Read the database header */ + rc = lhash_read_header(pHash,pHeader); + if( rc != UNQLITE_OK ){ + return rc; + } + } + return UNQLITE_OK; +} +/* + * Release a master or slave page. (xUnpin callback). + */ +static void lhash_page_release(void *pUserData) +{ + lhpage *pPage = (lhpage *)pUserData; + lhash_kv_engine *pEngine = pPage->pHash; + lhcell *pNext,*pCell = pPage->pList; + unqlite_page *pRaw = pPage->pRaw; + sxu32 n; + /* Drop in-memory cells */ + for( n = 0 ; n < pPage->nCell ; ++n ){ + pNext = pCell->pNext; + SyBlobRelease(&pCell->sKey); + /* Release the cell instance */ + SyMemBackendPoolFree(&pEngine->sAllocator,(void *)pCell); + /* Point to the next entry */ + pCell = pNext; + } + if( pPage->apCell ){ + /* Release the cell table */ + SyMemBackendFree(&pEngine->sAllocator,(void *)pPage->apCell); + } + /* Finally, release the whole page */ + SyMemBackendPoolFree(&pEngine->sAllocator,pPage); + pRaw->pUserData = 0; +} +/* + * Default hash function (DJB). + */ +static sxu32 lhash_bin_hash(const void *pSrc,sxu32 nLen) +{ + register unsigned char *zIn = (unsigned char *)pSrc; + unsigned char *zEnd; + sxu32 nH = 5381; + if( nLen > 2048 /* 2K */ ){ + nLen = 2048; + } + zEnd = &zIn[nLen]; + for(;;){ + if( zIn >= zEnd ){ break; } nH = nH * 33 + zIn[0] ; zIn++; + if( zIn >= zEnd ){ break; } nH = nH * 33 + zIn[0] ; zIn++; + if( zIn >= zEnd ){ break; } nH = nH * 33 + zIn[0] ; zIn++; + if( zIn >= zEnd ){ break; } nH = nH * 33 + zIn[0] ; zIn++; + } + return nH; +} +/* + * Exported: xInit() method. + * Initialize the Key value storage engine. + */ +static int lhash_kv_init(unqlite_kv_engine *pEngine,int iPageSize) +{ + lhash_kv_engine *pHash = (lhash_kv_engine *)pEngine; + int rc; + + /* This structure is always zeroed, go to the initialization directly */ + SyMemBackendInitFromParent(&pHash->sAllocator,unqliteExportMemBackend()); +#if defined(UNQLITE_ENABLE_THREADS) + /* Already protected by the upper layers */ + SyMemBackendDisbaleMutexing(&pHash->sAllocator); +#endif + pHash->iPageSize = iPageSize; + /* Default hash function */ + pHash->xHash = lhash_bin_hash; + /* Default comparison function */ + pHash->xCmp = SyMemcmp; + /* Allocate a new record map */ + pHash->nBuckSize = 32; + pHash->apMap = (lhash_bmap_rec **)SyMemBackendAlloc(&pHash->sAllocator,pHash->nBuckSize *sizeof(lhash_bmap_rec *)); + if( pHash->apMap == 0 ){ + rc = UNQLITE_NOMEM; + goto err; + } + /* Zero the table */ + SyZero(pHash->apMap,pHash->nBuckSize * sizeof(lhash_bmap_rec *)); + /* Linear hashing components */ + pHash->split_bucket = 0; /* Logical not real bucket number */ + pHash->max_split_bucket = 1; + pHash->nmax_split_nucket = 2; + pHash->nMagic = L_HASH_MAGIC; + /* Install the cache unpin and reload callbacks */ + pHash->pIo->xSetUnpin(pHash->pIo->pHandle,lhash_page_release); + pHash->pIo->xSetReload(pHash->pIo->pHandle,lhash_page_release); + return UNQLITE_OK; +err: + SyMemBackendRelease(&pHash->sAllocator); + return rc; +} +/* + * Exported: xRelease() method. + * Release the Key value storage engine. + */ +static void lhash_kv_release(unqlite_kv_engine *pEngine) +{ + lhash_kv_engine *pHash = (lhash_kv_engine *)pEngine; + /* Release the private memory backend */ + SyMemBackendRelease(&pHash->sAllocator); +} +/* + * Exported: xConfig() method. + * Configure the linear hash KV store. + */ +static int lhash_kv_config(unqlite_kv_engine *pEngine,int op,va_list ap) +{ + lhash_kv_engine *pHash = (lhash_kv_engine *)pEngine; + int rc = UNQLITE_OK; + switch(op){ + case UNQLITE_KV_CONFIG_HASH_FUNC: { + /* Default hash function */ + if( pHash->nBuckRec > 0 ){ + /* Locked operation */ + rc = UNQLITE_LOCKED; + }else{ + ProcHash xHash = va_arg(ap,ProcHash); + if( xHash ){ + pHash->xHash = xHash; + } + } + break; + } + case UNQLITE_KV_CONFIG_CMP_FUNC: { + /* Default comparison function */ + ProcCmp xCmp = va_arg(ap,ProcCmp); + if( xCmp ){ + pHash->xCmp = xCmp; + } + break; + } + default: + /* Unknown OP */ + rc = UNQLITE_UNKNOWN; + break; + } + return rc; +} +/* + * Each public cursor is identified by an instance of this structure. + */ +typedef struct lhash_kv_cursor lhash_kv_cursor; +struct lhash_kv_cursor +{ + unqlite_kv_engine *pStore; /* Must be first */ + /* Private fields */ + int iState; /* Current state of the cursor */ + int is_first; /* True to read the database header */ + lhcell *pCell; /* Current cell we are processing */ + unqlite_page *pRaw; /* Raw disk page */ + lhash_bmap_rec *pRec; /* Logical to real bucket map */ +}; +/* + * Possible state of the cursor + */ +#define L_HASH_CURSOR_STATE_NEXT_PAGE 1 /* Next page in the list */ +#define L_HASH_CURSOR_STATE_CELL 2 /* Processing Cell */ +#define L_HASH_CURSOR_STATE_DONE 3 /* Cursor does not point to anything */ +/* + * Initialize the cursor. + */ +static void lhInitCursor(unqlite_kv_cursor *pPtr) +{ + lhash_kv_engine *pEngine = (lhash_kv_engine *)pPtr->pStore; + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pPtr; + /* Init */ + pCur->iState = L_HASH_CURSOR_STATE_NEXT_PAGE; + pCur->pCell = 0; + pCur->pRec = pEngine->pFirst; + pCur->pRaw = 0; + pCur->is_first = 1; +} +/* + * Point to the next page on the database. + */ +static int lhCursorNextPage(lhash_kv_cursor *pPtr) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pPtr; + lhash_bmap_rec *pRec; + lhpage *pPage; + int rc; + for(;;){ + pRec = pCur->pRec; + if( pRec == 0 ){ + pCur->iState = L_HASH_CURSOR_STATE_DONE; + return UNQLITE_DONE; + } + if( pPtr->iState == L_HASH_CURSOR_STATE_CELL && pPtr->pRaw ){ + /* Unref this page */ + pCur->pStore->pIo->xPageUnref(pPtr->pRaw); + pPtr->pRaw = 0; + } + /* Advance the map cursor */ + pCur->pRec = pRec->pPrev; /* Not a bug, reverse link */ + /* Load the next page on the list */ + rc = lhLoadPage((lhash_kv_engine *)pCur->pStore,pRec->iReal,0,&pPage,0); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pPage->pList ){ + /* Reflect the change */ + pCur->pCell = pPage->pList; + pCur->iState = L_HASH_CURSOR_STATE_CELL; + pCur->pRaw = pPage->pRaw; + break; + } + /* Empty page, discard this page and continue */ + pPage->pHash->pIo->xPageUnref(pPage->pRaw); + } + return UNQLITE_OK; +} +/* + * Point to the previous page on the database. + */ +static int lhCursorPrevPage(lhash_kv_cursor *pPtr) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pPtr; + lhash_bmap_rec *pRec; + lhpage *pPage; + int rc; + for(;;){ + pRec = pCur->pRec; + if( pRec == 0 ){ + pCur->iState = L_HASH_CURSOR_STATE_DONE; + return UNQLITE_DONE; + } + if( pPtr->iState == L_HASH_CURSOR_STATE_CELL && pPtr->pRaw ){ + /* Unref this page */ + pCur->pStore->pIo->xPageUnref(pPtr->pRaw); + pPtr->pRaw = 0; + } + /* Advance the map cursor */ + pCur->pRec = pRec->pNext; /* Not a bug, reverse link */ + /* Load the previous page on the list */ + rc = lhLoadPage((lhash_kv_engine *)pCur->pStore,pRec->iReal,0,&pPage,0); + if( rc != UNQLITE_OK ){ + return rc; + } + if( pPage->pFirst ){ + /* Reflect the change */ + pCur->pCell = pPage->pFirst; + pCur->iState = L_HASH_CURSOR_STATE_CELL; + pCur->pRaw = pPage->pRaw; + break; + } + /* Discard this page and continue */ + pPage->pHash->pIo->xPageUnref(pPage->pRaw); + } + return UNQLITE_OK; +} +/* + * Is a valid cursor. + */ +static int lhCursorValid(unqlite_kv_cursor *pPtr) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pPtr; + return (pCur->iState == L_HASH_CURSOR_STATE_CELL) && pCur->pCell; +} +/* + * Point to the first record. + */ +static int lhCursorFirst(unqlite_kv_cursor *pCursor) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhash_kv_engine *pEngine = (lhash_kv_engine *)pCursor->pStore; + int rc; + if( pCur->is_first ){ + /* Read the database header first */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0); + if( rc != UNQLITE_OK ){ + return rc; + } + pCur->is_first = 0; + } + /* Point to the first map record */ + pCur->pRec = pEngine->pFirst; + /* Load the cells */ + rc = lhCursorNextPage(pCur); + return rc; +} +/* + * Point to the last record. + */ +static int lhCursorLast(unqlite_kv_cursor *pCursor) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhash_kv_engine *pEngine = (lhash_kv_engine *)pCursor->pStore; + int rc; + if( pCur->is_first ){ + /* Read the database header first */ + rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0); + if( rc != UNQLITE_OK ){ + return rc; + } + pCur->is_first = 0; + } + /* Point to the last map record */ + pCur->pRec = pEngine->pList; + /* Load the cells */ + rc = lhCursorPrevPage(pCur); + return rc; +} +/* + * Reset the cursor. + */ +static void lhCursorReset(unqlite_kv_cursor *pCursor) +{ + lhCursorFirst(pCursor); +} +/* + * Point to the next record. + */ +static int lhCursorNext(unqlite_kv_cursor *pCursor) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhcell *pCell; + int rc; + if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){ + /* Load the cells of the next page */ + rc = lhCursorNextPage(pCur); + return rc; + } + pCell = pCur->pCell; + pCur->pCell = pCell->pNext; + if( pCur->pCell == 0 ){ + /* Load the cells of the next page */ + rc = lhCursorNextPage(pCur); + return rc; + } + return UNQLITE_OK; +} +/* + * Point to the previous record. + */ +static int lhCursorPrev(unqlite_kv_cursor *pCursor) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhcell *pCell; + int rc; + if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){ + /* Load the cells of the previous page */ + rc = lhCursorPrevPage(pCur); + return rc; + } + pCell = pCur->pCell; + pCur->pCell = pCell->pPrev; + if( pCur->pCell == 0 ){ + /* Load the cells of the previous page */ + rc = lhCursorPrevPage(pCur); + return rc; + } + return UNQLITE_OK; +} +/* + * Return key length. + */ +static int lhCursorKeyLength(unqlite_kv_cursor *pCursor,int *pLen) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhcell *pCell; + + if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){ + /* Invalid state */ + return UNQLITE_INVALID; + } + /* Point to the target cell */ + pCell = pCur->pCell; + /* Return key length */ + *pLen = (int)pCell->nKey; + return UNQLITE_OK; +} +/* + * Return data length. + */ +static int lhCursorDataLength(unqlite_kv_cursor *pCursor,unqlite_int64 *pLen) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhcell *pCell; + + if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){ + /* Invalid state */ + return UNQLITE_INVALID; + } + /* Point to the target cell */ + pCell = pCur->pCell; + /* Return data length */ + *pLen = (unqlite_int64)pCell->nData; + return UNQLITE_OK; +} +/* + * Consume the key. + */ +static int lhCursorKey(unqlite_kv_cursor *pCursor,int (*xConsumer)(const void *,unsigned int,void *),void *pUserData) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhcell *pCell; + int rc; + if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){ + /* Invalid state */ + return UNQLITE_INVALID; + } + /* Point to the target cell */ + pCell = pCur->pCell; + if( SyBlobLength(&pCell->sKey) > 0 ){ + /* Consume the key directly */ + rc = xConsumer(SyBlobData(&pCell->sKey),SyBlobLength(&pCell->sKey),pUserData); + }else{ + /* Very large key */ + rc = lhConsumeCellkey(pCell,xConsumer,pUserData,0); + } + return rc; +} +/* + * Consume the data. + */ +static int lhCursorData(unqlite_kv_cursor *pCursor,int (*xConsumer)(const void *,unsigned int,void *),void *pUserData) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhcell *pCell; + int rc; + if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){ + /* Invalid state */ + return UNQLITE_INVALID; + } + /* Point to the target cell */ + pCell = pCur->pCell; + /* Consume the data */ + rc = lhConsumeCellData(pCell,xConsumer,pUserData); + return rc; +} +/* + * Find a partiuclar record. + */ +static int lhCursorSeek(unqlite_kv_cursor *pCursor,const void *pKey,int nByte,int iPos) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + int rc; + /* Perform a lookup */ + rc = lhRecordLookup((lhash_kv_engine *)pCur->pStore,pKey,nByte,&pCur->pCell); + if( rc != UNQLITE_OK ){ + SXUNUSED(iPos); + pCur->pCell = 0; + pCur->iState = L_HASH_CURSOR_STATE_DONE; + return rc; + } + pCur->iState = L_HASH_CURSOR_STATE_CELL; + return UNQLITE_OK; +} +/* + * Remove a particular record. + */ +static int lhCursorDelete(unqlite_kv_cursor *pCursor) +{ + lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor; + lhcell *pCell; + int rc; + if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){ + /* Invalid state */ + return UNQLITE_INVALID; + } + /* Point to the target cell */ + pCell = pCur->pCell; + /* Point to the next entry */ + pCur->pCell = pCell->pNext; + /* Perform the deletion */ + rc = lhRecordRemove(pCell); + return rc; +} +/* + * Export the linear-hash storage engine. + */ +UNQLITE_PRIVATE const unqlite_kv_methods * unqliteExportDiskKvStorage(void) +{ + static const unqlite_kv_methods sDiskStore = { + "hash", /* zName */ + sizeof(lhash_kv_engine), /* szKv */ + sizeof(lhash_kv_cursor), /* szCursor */ + 1, /* iVersion */ + lhash_kv_init, /* xInit */ + lhash_kv_release, /* xRelease */ + lhash_kv_config, /* xConfig */ + lhash_kv_open, /* xOpen */ + lhash_kv_replace, /* xReplace */ + lhash_kv_append, /* xAppend */ + lhInitCursor, /* xCursorInit */ + lhCursorSeek, /* xSeek */ + lhCursorFirst, /* xFirst */ + lhCursorLast, /* xLast */ + lhCursorValid, /* xValid */ + lhCursorNext, /* xNext */ + lhCursorPrev, /* xPrev */ + lhCursorDelete, /* xDelete */ + lhCursorKeyLength, /* xKeyLength */ + lhCursorKey, /* xKey */ + lhCursorDataLength, /* xDataLength */ + lhCursorData, /* xData */ + lhCursorReset, /* xReset */ + 0 /* xRelease */ + }; + return &sDiskStore; +} -- cgit v1.2.3