summaryrefslogtreecommitdiffstats
path: root/common/unqlite/lhash_kv.c
diff options
context:
space:
mode:
Diffstat (limited to 'common/unqlite/lhash_kv.c')
-rw-r--r--common/unqlite/lhash_kv.c3082
1 files changed, 3082 insertions, 0 deletions
diff --git a/common/unqlite/lhash_kv.c b/common/unqlite/lhash_kv.c
new file mode 100644
index 0000000..4af5b3e
--- /dev/null
+++ b/common/unqlite/lhash_kv.c
@@ -0,0 +1,3082 @@
1/*
2 * Symisc unQLite: An Embeddable NoSQL (Post Modern) Database Engine.
3 * Copyright (C) 2012-2013, Symisc Systems http://unqlite.org/
4 * Copyright (C) 2014, Yuras Shumovich <shumovichy@gmail.com>
5 * Version 1.1.6
6 * For information on licensing, redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES
7 * please contact Symisc Systems via:
8 * legal@symisc.net
9 * licensing@symisc.net
10 * contact@symisc.net
11 * or visit:
12 * http://unqlite.org/licensing.html
13 */
14 /* $SymiscID: lhash_kv.c v1.7 Solaris 2013-01-14 12:56 stable <chm@symisc.net> $ */
15#ifndef UNQLITE_AMALGAMATION
16#include "unqliteInt.h"
17#endif
18/*
19 * This file implements disk based hashtable using the linear hashing algorithm.
20 * This implementation is the one decribed in the paper:
21 * LINEAR HASHING : A NEW TOOL FOR FILE AND TABLE ADDRESSING. Witold Litwin. I. N. Ft. I. A.. 78 150 Le Chesnay, France.
22 * Plus a smart extension called Virtual Bucket Table. (contact devel@symisc.net for additional information).
23 */
24/* Magic number identifying a valid storage image */
25#define L_HASH_MAGIC 0xFA782DCB
26/*
27 * Magic word to hash to identify a valid hash function.
28 */
29#define L_HASH_WORD "chm@symisc"
30/*
31 * Cell size on disk.
32 */
33#define L_HASH_CELL_SZ (4/*Hash*/+4/*Key*/+8/*Data*/+2/* Offset of the next cell */+8/*Overflow*/)
34/*
35 * Primary page (not overflow pages) header size on disk.
36 */
37#define L_HASH_PAGE_HDR_SZ (2/* Cell offset*/+2/* Free block offset*/+8/*Slave page number*/)
38/*
39 * The maximum amount of payload (in bytes) that can be stored locally for
40 * a database entry. If the entry contains more data than this, the
41 * extra goes onto overflow pages.
42*/
43#define L_HASH_MX_PAYLOAD(PageSize) (PageSize-(L_HASH_PAGE_HDR_SZ+L_HASH_CELL_SZ))
44/*
45 * Maxium free space on a single page.
46 */
47#define L_HASH_MX_FREE_SPACE(PageSize) (PageSize - (L_HASH_PAGE_HDR_SZ))
48/*
49** The maximum number of bytes of payload allowed on a single overflow page.
50*/
51#define L_HASH_OVERFLOW_SIZE(PageSize) (PageSize-8)
52/* Forward declaration */
53typedef struct lhash_kv_engine lhash_kv_engine;
54typedef struct lhpage lhpage;
55/*
56 * Each record in the database is identified either in-memory or in
57 * disk by an instance of the following structure.
58 */
59typedef struct lhcell lhcell;
60struct lhcell
61{
62 /* Disk-data (Big-Endian) */
63 sxu32 nHash; /* Hash of the key: 4 bytes */
64 sxu32 nKey; /* Key length: 4 bytes */
65 sxu64 nData; /* Data length: 8 bytes */
66 sxu16 iNext; /* Offset of the next cell: 2 bytes */
67 pgno iOvfl; /* Overflow page number if any: 8 bytes */
68 /* In-memory data only */
69 lhpage *pPage; /* Page this cell belongs */
70 sxu16 iStart; /* Offset of this cell */
71 pgno iDataPage; /* Data page number when overflow */
72 sxu16 iDataOfft; /* Offset of the data in iDataPage */
73 SyBlob sKey; /* Record key for fast lookup (Kept in-memory if < 256KB ) */
74 lhcell *pNext,*pPrev; /* Linked list of the loaded memory cells */
75 lhcell *pNextCol,*pPrevCol; /* Collison chain */
76};
77/*
78** Each database page has a header that is an instance of this
79** structure.
80*/
81typedef struct lhphdr lhphdr;
82struct lhphdr
83{
84 sxu16 iOfft; /* Offset of the first cell */
85 sxu16 iFree; /* Offset of the first free block*/
86 pgno iSlave; /* Slave page number */
87};
88/*
89 * Each loaded primary disk page is represented in-memory using
90 * an instance of the following structure.
91 */
92struct lhpage
93{
94 lhash_kv_engine *pHash; /* KV Storage engine that own this page */
95 unqlite_page *pRaw; /* Raw page contents */
96 lhphdr sHdr; /* Processed page header */
97 lhcell **apCell; /* Cell buckets */
98 lhcell *pList,*pFirst; /* Linked list of cells */
99 sxu32 nCell; /* Total number of cells */
100 sxu32 nCellSize; /* apCell[] size */
101 lhpage *pMaster; /* Master page in case we are dealing with a slave page */
102 lhpage *pSlave; /* List of slave pages */
103 lhpage *pNextSlave; /* Next slave page on the list */
104 sxi32 iSlave; /* Total number of slave pages */
105 sxu16 nFree; /* Amount of free space available in the page */
106};
107/*
108 * A Bucket map record which is used to map logical bucket number to real
109 * bucket number is represented by an instance of the following structure.
110 */
111typedef struct lhash_bmap_rec lhash_bmap_rec;
112struct lhash_bmap_rec
113{
114 pgno iLogic; /* Logical bucket number */
115 pgno iReal; /* Real bucket number */
116 lhash_bmap_rec *pNext,*pPrev; /* Link to other bucket map */
117 lhash_bmap_rec *pNextCol,*pPrevCol; /* Collision links */
118};
119typedef struct lhash_bmap_page lhash_bmap_page;
120struct lhash_bmap_page
121{
122 pgno iNum; /* Page number where this entry is stored */
123 sxu16 iPtr; /* Offset to start reading/writing from */
124 sxu32 nRec; /* Total number of records in this page */
125 pgno iNext; /* Next map page */
126};
127/*
128 * An in memory linear hash implemenation is represented by in an isntance
129 * of the following structure.
130 */
131struct lhash_kv_engine
132{
133 const unqlite_kv_io *pIo; /* IO methods: Must be first */
134 /* Private fields */
135 SyMemBackend sAllocator; /* Private memory backend */
136 ProcHash xHash; /* Default hash function */
137 ProcCmp xCmp; /* Default comparison function */
138 unqlite_page *pHeader; /* Page one to identify a valid implementation */
139 lhash_bmap_rec **apMap; /* Buckets map records */
140 sxu32 nBuckRec; /* Total number of bucket map records */
141 sxu32 nBuckSize; /* apMap[] size */
142 lhash_bmap_rec *pList; /* List of bucket map records */
143 lhash_bmap_rec *pFirst; /* First record*/
144 lhash_bmap_page sPageMap; /* Primary bucket map */
145 int iPageSize; /* Page size */
146 pgno nFreeList; /* List of free pages */
147 pgno split_bucket; /* Current split bucket: MUST BE A POWER OF TWO */
148 pgno max_split_bucket; /* Maximum split bucket: MUST BE A POWER OF TWO */
149 pgno nmax_split_nucket; /* Next maximum split bucket (1 << nMsb): In-memory only */
150 sxu32 nMagic; /* Magic number to identify a valid linear hash disk database */
151};
152/*
153 * Given a logical bucket number, return the record associated with it.
154 */
155static lhash_bmap_rec * lhMapFindBucket(lhash_kv_engine *pEngine,pgno iLogic)
156{
157 lhash_bmap_rec *pRec;
158 if( pEngine->nBuckRec < 1 ){
159 /* Don't bother */
160 return 0;
161 }
162 pRec = pEngine->apMap[iLogic & (pEngine->nBuckSize - 1)];
163 for(;;){
164 if( pRec == 0 ){
165 break;
166 }
167 if( pRec->iLogic == iLogic ){
168 return pRec;
169 }
170 /* Point to the next entry */
171 pRec = pRec->pNextCol;
172 }
173 /* No such record */
174 return 0;
175}
176/*
177 * Install a new bucket map record.
178 */
179static int lhMapInstallBucket(lhash_kv_engine *pEngine,pgno iLogic,pgno iReal)
180{
181 lhash_bmap_rec *pRec;
182 sxu32 iBucket;
183 /* Allocate a new instance */
184 pRec = (lhash_bmap_rec *)SyMemBackendPoolAlloc(&pEngine->sAllocator,sizeof(lhash_bmap_rec));
185 if( pRec == 0 ){
186 return UNQLITE_NOMEM;
187 }
188 /* Zero the structure */
189 SyZero(pRec,sizeof(lhash_bmap_rec));
190 /* Fill in the structure */
191 pRec->iLogic = iLogic;
192 pRec->iReal = iReal;
193 iBucket = iLogic & (pEngine->nBuckSize - 1);
194 pRec->pNextCol = pEngine->apMap[iBucket];
195 if( pEngine->apMap[iBucket] ){
196 pEngine->apMap[iBucket]->pPrevCol = pRec;
197 }
198 pEngine->apMap[iBucket] = pRec;
199 /* Link */
200 if( pEngine->pFirst == 0 ){
201 pEngine->pFirst = pEngine->pList = pRec;
202 }else{
203 MACRO_LD_PUSH(pEngine->pList,pRec);
204 }
205 pEngine->nBuckRec++;
206 if( (pEngine->nBuckRec >= pEngine->nBuckSize * 3) && pEngine->nBuckRec < 100000 ){
207 /* Allocate a new larger table */
208 sxu32 nNewSize = pEngine->nBuckSize << 1;
209 lhash_bmap_rec *pEntry;
210 lhash_bmap_rec **apNew;
211 sxu32 n;
212
213 apNew = (lhash_bmap_rec **)SyMemBackendAlloc(&pEngine->sAllocator, nNewSize * sizeof(lhash_bmap_rec *));
214 if( apNew ){
215 /* Zero the new table */
216 SyZero((void *)apNew, nNewSize * sizeof(lhash_bmap_rec *));
217 /* Rehash all entries */
218 n = 0;
219 pEntry = pEngine->pList;
220 for(;;){
221 /* Loop one */
222 if( n >= pEngine->nBuckRec ){
223 break;
224 }
225 pEntry->pNextCol = pEntry->pPrevCol = 0;
226 /* Install in the new bucket */
227 iBucket = pEntry->iLogic & (nNewSize - 1);
228 pEntry->pNextCol = apNew[iBucket];
229 if( apNew[iBucket] ){
230 apNew[iBucket]->pPrevCol = pEntry;
231 }
232 apNew[iBucket] = pEntry;
233 /* Point to the next entry */
234 pEntry = pEntry->pNext;
235 n++;
236 }
237 /* Release the old table and reflect the change */
238 SyMemBackendFree(&pEngine->sAllocator,(void *)pEngine->apMap);
239 pEngine->apMap = apNew;
240 pEngine->nBuckSize = nNewSize;
241 }
242 }
243 return UNQLITE_OK;
244}
245/*
246 * Process a raw bucket map record.
247 */
248static int lhMapLoadPage(lhash_kv_engine *pEngine,lhash_bmap_page *pMap,const unsigned char *zRaw)
249{
250 const unsigned char *zEnd = &zRaw[pEngine->iPageSize];
251 const unsigned char *zPtr = zRaw;
252 pgno iLogic,iReal;
253 sxu32 n;
254 int rc;
255 if( pMap->iPtr == 0 ){
256 /* Read the map header */
257 SyBigEndianUnpack64(zRaw,&pMap->iNext);
258 zRaw += 8;
259 SyBigEndianUnpack32(zRaw,&pMap->nRec);
260 zRaw += 4;
261 }else{
262 /* Mostly page one of the database */
263 zRaw += pMap->iPtr;
264 }
265 /* Start processing */
266 for( n = 0; n < pMap->nRec ; ++n ){
267 if( zRaw >= zEnd ){
268 break;
269 }
270 /* Extract the logical and real bucket number */
271 SyBigEndianUnpack64(zRaw,&iLogic);
272 zRaw += 8;
273 SyBigEndianUnpack64(zRaw,&iReal);
274 zRaw += 8;
275 /* Install the record in the map */
276 rc = lhMapInstallBucket(pEngine,iLogic,iReal);
277 if( rc != UNQLITE_OK ){
278 return rc;
279 }
280 }
281 pMap->iPtr = (sxu16)(zRaw-zPtr);
282 /* All done */
283 return UNQLITE_OK;
284}
285/*
286 * Allocate a new cell instance.
287 */
288static lhcell * lhNewCell(lhash_kv_engine *pEngine,lhpage *pPage)
289{
290 lhcell *pCell;
291 pCell = (lhcell *)SyMemBackendPoolAlloc(&pEngine->sAllocator,sizeof(lhcell));
292 if( pCell == 0 ){
293 return 0;
294 }
295 /* Zero the structure */
296 SyZero(pCell,sizeof(lhcell));
297 /* Fill in the structure */
298 SyBlobInit(&pCell->sKey,&pEngine->sAllocator);
299 pCell->pPage = pPage;
300 return pCell;
301}
302/*
303 * Discard a cell from the page table.
304 */
305static void lhCellDiscard(lhcell *pCell)
306{
307 lhpage *pPage = pCell->pPage->pMaster;
308
309 if( pCell->pPrevCol ){
310 pCell->pPrevCol->pNextCol = pCell->pNextCol;
311 }else{
312 pPage->apCell[pCell->nHash & (pPage->nCellSize - 1)] = pCell->pNextCol;
313 }
314 if( pCell->pNextCol ){
315 pCell->pNextCol->pPrevCol = pCell->pPrevCol;
316 }
317 MACRO_LD_REMOVE(pPage->pList,pCell);
318 if( pCell == pPage->pFirst ){
319 pPage->pFirst = pCell->pPrev;
320 }
321 pPage->nCell--;
322 /* Release the cell */
323 SyBlobRelease(&pCell->sKey);
324 SyMemBackendPoolFree(&pPage->pHash->sAllocator,pCell);
325}
326/*
327 * Install a cell in the page table.
328 */
329static int lhInstallCell(lhcell *pCell)
330{
331 lhpage *pPage = pCell->pPage->pMaster;
332 sxu32 iBucket;
333 if( pPage->nCell < 1 ){
334 sxu32 nTableSize = 32; /* Must be a power of two */
335 lhcell **apTable;
336 /* Allocate a new cell table */
337 apTable = (lhcell **)SyMemBackendAlloc(&pPage->pHash->sAllocator, nTableSize * sizeof(lhcell *));
338 if( apTable == 0 ){
339 return UNQLITE_NOMEM;
340 }
341 /* Zero the new table */
342 SyZero((void *)apTable, nTableSize * sizeof(lhcell *));
343 /* Install it */
344 pPage->apCell = apTable;
345 pPage->nCellSize = nTableSize;
346 }
347 iBucket = pCell->nHash & (pPage->nCellSize - 1);
348 pCell->pNextCol = pPage->apCell[iBucket];
349 if( pPage->apCell[iBucket] ){
350 pPage->apCell[iBucket]->pPrevCol = pCell;
351 }
352 pPage->apCell[iBucket] = pCell;
353 if( pPage->pFirst == 0 ){
354 pPage->pFirst = pPage->pList = pCell;
355 }else{
356 MACRO_LD_PUSH(pPage->pList,pCell);
357 }
358 pPage->nCell++;
359 if( (pPage->nCell >= pPage->nCellSize * 3) && pPage->nCell < 100000 ){
360 /* Allocate a new larger table */
361 sxu32 nNewSize = pPage->nCellSize << 1;
362 lhcell *pEntry;
363 lhcell **apNew;
364 sxu32 n;
365
366 apNew = (lhcell **)SyMemBackendAlloc(&pPage->pHash->sAllocator, nNewSize * sizeof(lhcell *));
367 if( apNew ){
368 /* Zero the new table */
369 SyZero((void *)apNew, nNewSize * sizeof(lhcell *));
370 /* Rehash all entries */
371 n = 0;
372 pEntry = pPage->pList;
373 for(;;){
374 /* Loop one */
375 if( n >= pPage->nCell ){
376 break;
377 }
378 pEntry->pNextCol = pEntry->pPrevCol = 0;
379 /* Install in the new bucket */
380 iBucket = pEntry->nHash & (nNewSize - 1);
381 pEntry->pNextCol = apNew[iBucket];
382 if( apNew[iBucket] ){
383 apNew[iBucket]->pPrevCol = pEntry;
384 }
385 apNew[iBucket] = pEntry;
386 /* Point to the next entry */
387 pEntry = pEntry->pNext;
388 n++;
389 }
390 /* Release the old table and reflect the change */
391 SyMemBackendFree(&pPage->pHash->sAllocator,(void *)pPage->apCell);
392 pPage->apCell = apNew;
393 pPage->nCellSize = nNewSize;
394 }
395 }
396 return UNQLITE_OK;
397}
398/*
399 * Private data of lhKeyCmp().
400 */
401struct lhash_key_cmp
402{
403 const char *zIn; /* Start of the stream */
404 const char *zEnd; /* End of the stream */
405 ProcCmp xCmp; /* Comparison function */
406};
407/*
408 * Comparsion callback for large key > 256 KB
409 */
410static int lhKeyCmp(const void *pData,sxu32 nLen,void *pUserData)
411{
412 struct lhash_key_cmp *pCmp = (struct lhash_key_cmp *)pUserData;
413 int rc;
414 if( pCmp->zIn >= pCmp->zEnd ){
415 if( nLen > 0 ){
416 return UNQLITE_ABORT;
417 }
418 return UNQLITE_OK;
419 }
420 /* Perform the comparison */
421 rc = pCmp->xCmp((const void *)pCmp->zIn,pData,nLen);
422 if( rc != 0 ){
423 /* Abort comparison */
424 return UNQLITE_ABORT;
425 }
426 /* Advance the cursor */
427 pCmp->zIn += nLen;
428 return UNQLITE_OK;
429}
430/* Forward declaration */
431static int lhConsumeCellkey(lhcell *pCell,int (*xConsumer)(const void *,unsigned int,void *),void *pUserData,int offt_only);
432/*
433 * given a key, return the cell associated with it on success. NULL otherwise.
434 */
435static lhcell * lhFindCell(
436 lhpage *pPage, /* Target page */
437 const void *pKey, /* Lookup key */
438 sxu32 nByte, /* Key length */
439 sxu32 nHash /* Hash of the key */
440 )
441{
442 lhcell *pEntry;
443 if( pPage->nCell < 1 ){
444 /* Don't bother hashing */
445 return 0;
446 }
447 /* Point to the corresponding bucket */
448 pEntry = pPage->apCell[nHash & (pPage->nCellSize - 1)];
449 for(;;){
450 if( pEntry == 0 ){
451 break;
452 }
453 if( pEntry->nHash == nHash && pEntry->nKey == nByte ){
454 if( SyBlobLength(&pEntry->sKey) < 1 ){
455 /* Large key (> 256 KB) are not kept in-memory */
456 struct lhash_key_cmp sCmp;
457 int rc;
458 /* Fill-in the structure */
459 sCmp.zIn = (const char *)pKey;
460 sCmp.zEnd = &sCmp.zIn[nByte];
461 sCmp.xCmp = pPage->pHash->xCmp;
462 /* Fetch the key from disk and perform the comparison */
463 rc = lhConsumeCellkey(pEntry,lhKeyCmp,&sCmp,0);
464 if( rc == UNQLITE_OK ){
465 /* Cell found */
466 return pEntry;
467 }
468 }else if ( pPage->pHash->xCmp(pKey,SyBlobData(&pEntry->sKey),nByte) == 0 ){
469 /* Cell found */
470 return pEntry;
471 }
472 }
473 /* Point to the next entry */
474 pEntry = pEntry->pNextCol;
475 }
476 /* No such entry */
477 return 0;
478}
479/*
480 * Parse a raw cell fetched from disk.
481 */
482static int lhParseOneCell(lhpage *pPage,const unsigned char *zRaw,const unsigned char *zEnd,lhcell **ppOut)
483{
484 sxu16 iNext,iOfft;
485 sxu32 iHash,nKey;
486 lhcell *pCell;
487 sxu64 nData;
488 int rc;
489 /* Offset this cell is stored */
490 iOfft = (sxu16)(zRaw - (const unsigned char *)pPage->pRaw->zData);
491 /* 4 byte hash number */
492 SyBigEndianUnpack32(zRaw,&iHash);
493 zRaw += 4;
494 /* 4 byte key length */
495 SyBigEndianUnpack32(zRaw,&nKey);
496 zRaw += 4;
497 /* 8 byte data length */
498 SyBigEndianUnpack64(zRaw,&nData);
499 zRaw += 8;
500 /* 2 byte offset of the next cell */
501 SyBigEndianUnpack16(zRaw,&iNext);
502 /* Perform a sanity check */
503 if( iNext > 0 && &pPage->pRaw->zData[iNext] >= zEnd ){
504 return UNQLITE_CORRUPT;
505 }
506 zRaw += 2;
507 pCell = lhNewCell(pPage->pHash,pPage);
508 if( pCell == 0 ){
509 return UNQLITE_NOMEM;
510 }
511 /* Fill in the structure */
512 pCell->iNext = iNext;
513 pCell->nKey = nKey;
514 pCell->nData = nData;
515 pCell->nHash = iHash;
516 /* Overflow page if any */
517 SyBigEndianUnpack64(zRaw,&pCell->iOvfl);
518 zRaw += 8;
519 /* Cell offset */
520 pCell->iStart = iOfft;
521 /* Consume the key */
522 rc = lhConsumeCellkey(pCell,unqliteDataConsumer,&pCell->sKey,pCell->nKey > 262144 /* 256 KB */? 1 : 0);
523 if( rc != UNQLITE_OK ){
524 /* TICKET: 14-32-chm@symisc.net: Key too large for memory */
525 SyBlobRelease(&pCell->sKey);
526 }
527 /* Finally install the cell */
528 rc = lhInstallCell(pCell);
529 if( rc != UNQLITE_OK ){
530 return rc;
531 }
532 if( ppOut ){
533 *ppOut = pCell;
534 }
535 return UNQLITE_OK;
536}
537/*
538 * Compute the total number of free space on a given page.
539 */
540static int lhPageFreeSpace(lhpage *pPage)
541{
542 const unsigned char *zEnd,*zRaw = pPage->pRaw->zData;
543 lhphdr *pHdr = &pPage->sHdr;
544 sxu16 iNext,iAmount;
545 sxu16 nFree = 0;
546 if( pHdr->iFree < 1 ){
547 /* Don't bother processing, the page is full */
548 pPage->nFree = 0;
549 return UNQLITE_OK;
550 }
551 /* Point to first free block */
552 zEnd = &zRaw[pPage->pHash->iPageSize];
553 zRaw += pHdr->iFree;
554 for(;;){
555 /* Offset of the next free block */
556 SyBigEndianUnpack16(zRaw,&iNext);
557 zRaw += 2;
558 /* Available space on this block */
559 SyBigEndianUnpack16(zRaw,&iAmount);
560 nFree += iAmount;
561 if( iNext < 1 ){
562 /* No more free blocks */
563 break;
564 }
565 /* Point to the next free block*/
566 zRaw = &pPage->pRaw->zData[iNext];
567 if( zRaw >= zEnd ){
568 /* Corrupt page */
569 return UNQLITE_CORRUPT;
570 }
571 }
572 /* Save the amount of free space */
573 pPage->nFree = nFree;
574 return UNQLITE_OK;
575}
576/*
577 * Given a primary page, load all its cell.
578 */
579static int lhLoadCells(lhpage *pPage)
580{
581 const unsigned char *zEnd,*zRaw = pPage->pRaw->zData;
582 lhphdr *pHdr = &pPage->sHdr;
583 lhcell *pCell = 0; /* cc warning */
584 int rc;
585 /* Calculate the amount of free space available first */
586 rc = lhPageFreeSpace(pPage);
587 if( rc != UNQLITE_OK ){
588 return rc;
589 }
590 if( pHdr->iOfft < 1 ){
591 /* Don't bother processing, the page is empty */
592 return UNQLITE_OK;
593 }
594 /* Point to first cell */
595 zRaw += pHdr->iOfft;
596 zEnd = &zRaw[pPage->pHash->iPageSize];
597 for(;;){
598 /* Parse a single cell */
599 rc = lhParseOneCell(pPage,zRaw,zEnd,&pCell);
600 if( rc != UNQLITE_OK ){
601 return rc;
602 }
603 if( pCell->iNext < 1 ){
604 /* No more cells */
605 break;
606 }
607 /* Point to the next cell */
608 zRaw = &pPage->pRaw->zData[pCell->iNext];
609 if( zRaw >= zEnd ){
610 /* Corrupt page */
611 return UNQLITE_CORRUPT;
612 }
613 }
614 /* All done */
615 return UNQLITE_OK;
616}
617/*
618 * Given a page, parse its raw headers.
619 */
620static int lhParsePageHeader(lhpage *pPage)
621{
622 const unsigned char *zRaw = pPage->pRaw->zData;
623 lhphdr *pHdr = &pPage->sHdr;
624 /* Offset of the first cell */
625 SyBigEndianUnpack16(zRaw,&pHdr->iOfft);
626 zRaw += 2;
627 /* Offset of the first free block */
628 SyBigEndianUnpack16(zRaw,&pHdr->iFree);
629 zRaw += 2;
630 /* Slave page number */
631 SyBigEndianUnpack64(zRaw,&pHdr->iSlave);
632 /* All done */
633 return UNQLITE_OK;
634}
635/*
636 * Allocate a new page instance.
637 */
638static lhpage * lhNewPage(
639 lhash_kv_engine *pEngine, /* KV store which own this instance */
640 unqlite_page *pRaw, /* Raw page contents */
641 lhpage *pMaster /* Master page in case we are dealing with a slave page */
642 )
643{
644 lhpage *pPage;
645 /* Allocate a new instance */
646 pPage = (lhpage *)SyMemBackendPoolAlloc(&pEngine->sAllocator,sizeof(lhpage));
647 if( pPage == 0 ){
648 return 0;
649 }
650 /* Zero the structure */
651 SyZero(pPage,sizeof(lhpage));
652 /* Fill-in the structure */
653 pPage->pHash = pEngine;
654 pPage->pRaw = pRaw;
655 pPage->pMaster = pMaster ? pMaster /* Slave page */ : pPage /* Master page */ ;
656 if( pPage->pMaster != pPage ){
657 /* Slave page, attach it to its master */
658 pPage->pNextSlave = pMaster->pSlave;
659 pMaster->pSlave = pPage;
660 pMaster->iSlave++;
661 }
662 /* Save this instance for future fast lookup */
663 pRaw->pUserData = pPage;
664 /* All done */
665 return pPage;
666}
667/*
668 * Load a primary and its associated slave pages from disk.
669 */
670static int lhLoadPage(lhash_kv_engine *pEngine,pgno pnum,lhpage *pMaster,lhpage **ppOut,int iNest)
671{
672 unqlite_page *pRaw;
673 lhpage *pPage = 0; /* cc warning */
674 int rc;
675 /* Aquire the page from the pager first */
676 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pnum,&pRaw);
677 if( rc != UNQLITE_OK ){
678 return rc;
679 }
680 if( pRaw->pUserData ){
681 /* The page is already parsed and loaded in memory. Point to it */
682 pPage = (lhpage *)pRaw->pUserData;
683 }else{
684 /* Allocate a new page */
685 pPage = lhNewPage(pEngine,pRaw,pMaster);
686 if( pPage == 0 ){
687 return UNQLITE_NOMEM;
688 }
689 /* Process the page */
690 rc = lhParsePageHeader(pPage);
691 if( rc == UNQLITE_OK ){
692 /* Load cells */
693 rc = lhLoadCells(pPage);
694 }
695 if( rc != UNQLITE_OK ){
696 pEngine->pIo->xPageUnref(pPage->pRaw); /* pPage will be released inside this call */
697 return rc;
698 }
699 if( pPage->sHdr.iSlave > 0 && iNest < 128 ){
700 if( pMaster == 0 ){
701 pMaster = pPage;
702 }
703 /* Slave page. Not a fatal error if something goes wrong here */
704 lhLoadPage(pEngine,pPage->sHdr.iSlave,pMaster,0,iNest++);
705 }
706 }
707 if( ppOut ){
708 *ppOut = pPage;
709 }
710 return UNQLITE_OK;
711}
712/*
713 * Given a cell, Consume its key by invoking the given callback for each extracted chunk.
714 */
715static int lhConsumeCellkey(
716 lhcell *pCell, /* Target cell */
717 int (*xConsumer)(const void *,unsigned int,void *), /* Consumer callback */
718 void *pUserData, /* Last argument to xConsumer() */
719 int offt_only
720 )
721{
722 lhpage *pPage = pCell->pPage;
723 const unsigned char *zRaw = pPage->pRaw->zData;
724 const unsigned char *zPayload;
725 int rc;
726 /* Point to the payload area */
727 zPayload = &zRaw[pCell->iStart];
728 if( pCell->iOvfl == 0 ){
729 /* Best scenario, consume the key directly without any overflow page */
730 zPayload += L_HASH_CELL_SZ;
731 rc = xConsumer((const void *)zPayload,pCell->nKey,pUserData);
732 if( rc != UNQLITE_OK ){
733 rc = UNQLITE_ABORT;
734 }
735 }else{
736 lhash_kv_engine *pEngine = pPage->pHash;
737 sxu32 nByte,nData = pCell->nKey;
738 unqlite_page *pOvfl;
739 int data_offset = 0;
740 pgno iOvfl;
741 /* Overflow page */
742 iOvfl = pCell->iOvfl;
743 /* Total usable bytes in an overflow page */
744 nByte = L_HASH_OVERFLOW_SIZE(pEngine->iPageSize);
745 for(;;){
746 if( iOvfl == 0 || nData < 1 ){
747 /* no more overflow page */
748 break;
749 }
750 /* Point to the overflow page */
751 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iOvfl,&pOvfl);
752 if( rc != UNQLITE_OK ){
753 return rc;
754 }
755 zPayload = &pOvfl->zData[8];
756 /* Point to the raw content */
757 if( !data_offset ){
758 /* Get the data page and offset */
759 SyBigEndianUnpack64(zPayload,&pCell->iDataPage);
760 zPayload += 8;
761 SyBigEndianUnpack16(zPayload,&pCell->iDataOfft);
762 zPayload += 2;
763 if( offt_only ){
764 /* Key too large, grab the data offset and return */
765 pEngine->pIo->xPageUnref(pOvfl);
766 return UNQLITE_OK;
767 }
768 data_offset = 1;
769 }
770 /* Consume the key */
771 if( nData <= nByte ){
772 rc = xConsumer((const void *)zPayload,nData,pUserData);
773 if( rc != UNQLITE_OK ){
774 pEngine->pIo->xPageUnref(pOvfl);
775 return UNQLITE_ABORT;
776 }
777 nData = 0;
778 }else{
779 rc = xConsumer((const void *)zPayload,nByte,pUserData);
780 if( rc != UNQLITE_OK ){
781 pEngine->pIo->xPageUnref(pOvfl);
782 return UNQLITE_ABORT;
783 }
784 nData -= nByte;
785 }
786 /* Next overflow page in the chain */
787 SyBigEndianUnpack64(pOvfl->zData,&iOvfl);
788 /* Unref the page */
789 pEngine->pIo->xPageUnref(pOvfl);
790 }
791 rc = UNQLITE_OK;
792 }
793 return rc;
794}
795/*
796 * Given a cell, Consume its data by invoking the given callback for each extracted chunk.
797 */
798static int lhConsumeCellData(
799 lhcell *pCell, /* Target cell */
800 int (*xConsumer)(const void *,unsigned int,void *), /* Data consumer callback */
801 void *pUserData /* Last argument to xConsumer() */
802 )
803{
804 lhpage *pPage = pCell->pPage;
805 const unsigned char *zRaw = pPage->pRaw->zData;
806 const unsigned char *zPayload;
807 int rc;
808 /* Point to the payload area */
809 zPayload = &zRaw[pCell->iStart];
810 if( pCell->iOvfl == 0 ){
811 /* Best scenario, consume the data directly without any overflow page */
812 zPayload += L_HASH_CELL_SZ + pCell->nKey;
813 rc = xConsumer((const void *)zPayload,(sxu32)pCell->nData,pUserData);
814 if( rc != UNQLITE_OK ){
815 rc = UNQLITE_ABORT;
816 }
817 }else{
818 lhash_kv_engine *pEngine = pPage->pHash;
819 sxu64 nData = pCell->nData;
820 unqlite_page *pOvfl;
821 int fix_offset = 0;
822 sxu32 nByte;
823 pgno iOvfl;
824 /* Overflow page where data is stored */
825 iOvfl = pCell->iDataPage;
826 for(;;){
827 if( iOvfl == 0 || nData < 1 ){
828 /* no more overflow page */
829 break;
830 }
831 /* Point to the overflow page */
832 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iOvfl,&pOvfl);
833 if( rc != UNQLITE_OK ){
834 return rc;
835 }
836 /* Point to the raw content */
837 zPayload = pOvfl->zData;
838 if( !fix_offset ){
839 /* Point to the data */
840 zPayload += pCell->iDataOfft;
841 nByte = pEngine->iPageSize - pCell->iDataOfft;
842 fix_offset = 1;
843 }else{
844 zPayload += 8;
845 /* Total usable bytes in an overflow page */
846 nByte = L_HASH_OVERFLOW_SIZE(pEngine->iPageSize);
847 }
848 /* Consume the data */
849 if( nData <= (sxu64)nByte ){
850 rc = xConsumer((const void *)zPayload,(unsigned int)nData,pUserData);
851 if( rc != UNQLITE_OK ){
852 pEngine->pIo->xPageUnref(pOvfl);
853 return UNQLITE_ABORT;
854 }
855 nData = 0;
856 }else{
857 if( nByte > 0 ){
858 rc = xConsumer((const void *)zPayload,nByte,pUserData);
859 if( rc != UNQLITE_OK ){
860 pEngine->pIo->xPageUnref(pOvfl);
861 return UNQLITE_ABORT;
862 }
863 nData -= nByte;
864 }
865 }
866 /* Next overflow page in the chain */
867 SyBigEndianUnpack64(pOvfl->zData,&iOvfl);
868 /* Unref the page */
869 pEngine->pIo->xPageUnref(pOvfl);
870 }
871 rc = UNQLITE_OK;
872 }
873 return rc;
874}
875/*
876 * Read the linear hash header (Page one of the database).
877 */
878static int lhash_read_header(lhash_kv_engine *pEngine,unqlite_page *pHeader)
879{
880 const unsigned char *zRaw = pHeader->zData;
881 lhash_bmap_page *pMap;
882 sxu32 nHash;
883 int rc;
884 pEngine->pHeader = pHeader;
885 /* 4 byte magic number */
886 SyBigEndianUnpack32(zRaw,&pEngine->nMagic);
887 zRaw += 4;
888 if( pEngine->nMagic != L_HASH_MAGIC ){
889 /* Corrupt implementation */
890 return UNQLITE_CORRUPT;
891 }
892 /* 4 byte hash value to identify a valid hash function */
893 SyBigEndianUnpack32(zRaw,&nHash);
894 zRaw += 4;
895 /* Sanity check */
896 if( pEngine->xHash(L_HASH_WORD,sizeof(L_HASH_WORD)-1) != nHash ){
897 /* Different hash function */
898 pEngine->pIo->xErr(pEngine->pIo->pHandle,"Invalid hash function");
899 return UNQLITE_INVALID;
900 }
901 /* List of free pages */
902 SyBigEndianUnpack64(zRaw,&pEngine->nFreeList);
903 zRaw += 8;
904 /* Current split bucket */
905 SyBigEndianUnpack64(zRaw,&pEngine->split_bucket);
906 zRaw += 8;
907 /* Maximum split bucket */
908 SyBigEndianUnpack64(zRaw,&pEngine->max_split_bucket);
909 zRaw += 8;
910 /* Next generation */
911 pEngine->nmax_split_nucket = pEngine->max_split_bucket << 1;
912 /* Initialiaze the bucket map */
913 pMap = &pEngine->sPageMap;
914 /* Fill in the structure */
915 pMap->iNum = pHeader->pgno;
916 /* Next page in the bucket map */
917 SyBigEndianUnpack64(zRaw,&pMap->iNext);
918 zRaw += 8;
919 /* Total number of records in the bucket map (This page only) */
920 SyBigEndianUnpack32(zRaw,&pMap->nRec);
921 zRaw += 4;
922 pMap->iPtr = (sxu16)(zRaw - pHeader->zData);
923 /* Load the map in memory */
924 rc = lhMapLoadPage(pEngine,pMap,pHeader->zData);
925 if( rc != UNQLITE_OK ){
926 return rc;
927 }
928 /* Load the bucket map chain if any */
929 for(;;){
930 pgno iNext = pMap->iNext;
931 unqlite_page *pPage;
932 if( iNext == 0 ){
933 /* No more map pages */
934 break;
935 }
936 /* Point to the target page */
937 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iNext,&pPage);
938 if( rc != UNQLITE_OK ){
939 return rc;
940 }
941 /* Fill in the structure */
942 pMap->iNum = iNext;
943 pMap->iPtr = 0;
944 /* Load the map in memory */
945 rc = lhMapLoadPage(pEngine,pMap,pPage->zData);
946 if( rc != UNQLITE_OK ){
947 return rc;
948 }
949 }
950 /* All done */
951 return UNQLITE_OK;
952}
953/*
954 * Perform a record lookup.
955 */
956static int lhRecordLookup(
957 lhash_kv_engine *pEngine, /* KV storage engine */
958 const void *pKey, /* Lookup key */
959 sxu32 nByte, /* Key length */
960 lhcell **ppCell /* OUT: Target cell on success */
961 )
962{
963 lhash_bmap_rec *pRec;
964 lhpage *pPage;
965 lhcell *pCell;
966 pgno iBucket;
967 sxu32 nHash;
968 int rc;
969 /* Acquire the first page (hash Header) so that everything gets loaded autmatically */
970 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0);
971 if( rc != UNQLITE_OK ){
972 return rc;
973 }
974 /* Compute the hash of the key first */
975 nHash = pEngine->xHash(pKey,nByte);
976 /* Extract the logical (i.e. not real) page number */
977 iBucket = nHash & (pEngine->nmax_split_nucket - 1);
978 if( iBucket >= (pEngine->split_bucket + pEngine->max_split_bucket) ){
979 /* Low mask */
980 iBucket = nHash & (pEngine->max_split_bucket - 1);
981 }
982 /* Map the logical bucket number to real page number */
983 pRec = lhMapFindBucket(pEngine,iBucket);
984 if( pRec == 0 ){
985 /* No such entry */
986 return UNQLITE_NOTFOUND;
987 }
988 /* Load the master page and it's slave page in-memory */
989 rc = lhLoadPage(pEngine,pRec->iReal,0,&pPage,0);
990 if( rc != UNQLITE_OK ){
991 /* IO error, unlikely scenario */
992 return rc;
993 }
994 /* Lookup for the cell */
995 pCell = lhFindCell(pPage,pKey,nByte,nHash);
996 if( pCell == 0 ){
997 /* No such entry */
998 return UNQLITE_NOTFOUND;
999 }
1000 if( ppCell ){
1001 *ppCell = pCell;
1002 }
1003 return UNQLITE_OK;
1004}
1005/*
1006 * Acquire a new page either from the free list or ask the pager
1007 * for a new one.
1008 */
1009static int lhAcquirePage(lhash_kv_engine *pEngine,unqlite_page **ppOut)
1010{
1011 unqlite_page *pPage;
1012 int rc;
1013 if( pEngine->nFreeList != 0 ){
1014 /* Acquire one from the free list */
1015 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pEngine->nFreeList,&pPage);
1016 if( rc == UNQLITE_OK ){
1017 /* Point to the next free page */
1018 SyBigEndianUnpack64(pPage->zData,&pEngine->nFreeList);
1019 /* Update the database header */
1020 rc = pEngine->pIo->xWrite(pEngine->pHeader);
1021 if( rc != UNQLITE_OK ){
1022 return rc;
1023 }
1024 SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/],pEngine->nFreeList);
1025 /* Tell the pager do not journal this page */
1026 pEngine->pIo->xDontJournal(pPage);
1027 /* Return to the caller */
1028 *ppOut = pPage;
1029 /* All done */
1030 return UNQLITE_OK;
1031 }
1032 }
1033 /* Acquire a new page */
1034 rc = pEngine->pIo->xNew(pEngine->pIo->pHandle,&pPage);
1035 if( rc != UNQLITE_OK ){
1036 return rc;
1037 }
1038 /* Point to the target page */
1039 *ppOut = pPage;
1040 return UNQLITE_OK;
1041}
1042/*
1043 * Write a bucket map record to disk.
1044 */
1045static int lhMapWriteRecord(lhash_kv_engine *pEngine,pgno iLogic,pgno iReal)
1046{
1047 lhash_bmap_page *pMap = &pEngine->sPageMap;
1048 unqlite_page *pPage = 0;
1049 int rc;
1050 if( pMap->iPtr > (pEngine->iPageSize - 16) /* 8 byte logical bucket number + 8 byte real bucket number */ ){
1051 unqlite_page *pOld;
1052 /* Point to the old page */
1053 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pMap->iNum,&pOld);
1054 if( rc != UNQLITE_OK ){
1055 return rc;
1056 }
1057 /* Acquire a new page */
1058 rc = lhAcquirePage(pEngine,&pPage);
1059 if( rc != UNQLITE_OK ){
1060 return rc;
1061 }
1062 /* Reflect the change */
1063 pMap->iNext = 0;
1064 pMap->iNum = pPage->pgno;
1065 pMap->nRec = 0;
1066 pMap->iPtr = 8/* Next page number */+4/* Total records in the map*/;
1067 /* Link this page */
1068 rc = pEngine->pIo->xWrite(pOld);
1069 if( rc != UNQLITE_OK ){
1070 return rc;
1071 }
1072 if( pOld->pgno == pEngine->pHeader->pgno ){
1073 /* First page (Hash header) */
1074 SyBigEndianPack64(&pOld->zData[4/*magic*/+4/*hash*/+8/* Free page */+8/*current split bucket*/+8/*Maximum split bucket*/],pPage->pgno);
1075 }else{
1076 /* Link the new page */
1077 SyBigEndianPack64(pOld->zData,pPage->pgno);
1078 /* Unref */
1079 pEngine->pIo->xPageUnref(pOld);
1080 }
1081 /* Assume the last bucket map page */
1082 rc = pEngine->pIo->xWrite(pPage);
1083 if( rc != UNQLITE_OK ){
1084 return rc;
1085 }
1086 SyBigEndianPack64(pPage->zData,0); /* Next bucket map page on the list */
1087 }
1088 if( pPage == 0){
1089 /* Point to the current map page */
1090 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pMap->iNum,&pPage);
1091 if( rc != UNQLITE_OK ){
1092 return rc;
1093 }
1094 }
1095 /* Make page writable */
1096 rc = pEngine->pIo->xWrite(pPage);
1097 if( rc != UNQLITE_OK ){
1098 return rc;
1099 }
1100 /* Write the data */
1101 SyBigEndianPack64(&pPage->zData[pMap->iPtr],iLogic);
1102 pMap->iPtr += 8;
1103 SyBigEndianPack64(&pPage->zData[pMap->iPtr],iReal);
1104 pMap->iPtr += 8;
1105 /* Install the bucket map */
1106 rc = lhMapInstallBucket(pEngine,iLogic,iReal);
1107 if( rc == UNQLITE_OK ){
1108 /* Total number of records */
1109 pMap->nRec++;
1110 if( pPage->pgno == pEngine->pHeader->pgno ){
1111 /* Page one: Always writable */
1112 SyBigEndianPack32(
1113 &pPage->zData[4/*magic*/+4/*hash*/+8/* Free page */+8/*current split bucket*/+8/*Maximum split bucket*/+8/*Next map page*/],
1114 pMap->nRec);
1115 }else{
1116 /* Make page writable */
1117 rc = pEngine->pIo->xWrite(pPage);
1118 if( rc != UNQLITE_OK ){
1119 return rc;
1120 }
1121 SyBigEndianPack32(&pPage->zData[8],pMap->nRec);
1122 }
1123 }
1124 return rc;
1125}
1126/*
1127 * Defragment a page.
1128 */
1129static int lhPageDefragment(lhpage *pPage)
1130{
1131 lhash_kv_engine *pEngine = pPage->pHash;
1132 unsigned char *zTmp,*zPtr,*zEnd,*zPayload;
1133 lhcell *pCell;
1134 /* Get a temporary page from the pager. This opertaion never fail */
1135 zTmp = pEngine->pIo->xTmpPage(pEngine->pIo->pHandle);
1136 /* Move the target cells to the begining */
1137 pCell = pPage->pList;
1138 /* Write the slave page number */
1139 SyBigEndianPack64(&zTmp[2/*Offset of the first cell */+2/*Offset of the first free block */],pPage->sHdr.iSlave);
1140 zPtr = &zTmp[L_HASH_PAGE_HDR_SZ]; /* Offset to start writing from */
1141 zEnd = &zTmp[pEngine->iPageSize];
1142 pPage->sHdr.iOfft = 0; /* Offset of the first cell */
1143 for(;;){
1144 if( pCell == 0 ){
1145 /* No more cells */
1146 break;
1147 }
1148 if( pCell->pPage->pRaw->pgno == pPage->pRaw->pgno ){
1149 /* Cell payload if locally stored */
1150 zPayload = 0;
1151 if( pCell->iOvfl == 0 ){
1152 zPayload = &pCell->pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ];
1153 }
1154 /* Move the cell */
1155 pCell->iNext = pPage->sHdr.iOfft;
1156 pCell->iStart = (sxu16)(zPtr - zTmp); /* Offset where this cell start */
1157 pPage->sHdr.iOfft = pCell->iStart;
1158 /* Write the cell header */
1159 /* 4 byte hash number */
1160 SyBigEndianPack32(zPtr,pCell->nHash);
1161 zPtr += 4;
1162 /* 4 byte ley length */
1163 SyBigEndianPack32(zPtr,pCell->nKey);
1164 zPtr += 4;
1165 /* 8 byte data length */
1166 SyBigEndianPack64(zPtr,pCell->nData);
1167 zPtr += 8;
1168 /* 2 byte offset of the next cell */
1169 SyBigEndianPack16(zPtr,pCell->iNext);
1170 zPtr += 2;
1171 /* 8 byte overflow page number */
1172 SyBigEndianPack64(zPtr,pCell->iOvfl);
1173 zPtr += 8;
1174 if( zPayload ){
1175 /* Local payload */
1176 SyMemcpy((const void *)zPayload,zPtr,(sxu32)(pCell->nKey + pCell->nData));
1177 zPtr += pCell->nKey + pCell->nData;
1178 }
1179 if( zPtr >= zEnd ){
1180 /* Can't happen */
1181 break;
1182 }
1183 }
1184 /* Point to the next page */
1185 pCell = pCell->pNext;
1186 }
1187 /* Mark the free block */
1188 pPage->nFree = (sxu16)(zEnd - zPtr); /* Block length */
1189 if( pPage->nFree > 3 ){
1190 pPage->sHdr.iFree = (sxu16)(zPtr - zTmp); /* Offset of the free block */
1191 /* Mark the block */
1192 SyBigEndianPack16(zPtr,0); /* Offset of the next free block */
1193 SyBigEndianPack16(&zPtr[2],pPage->nFree); /* Block length */
1194 }else{
1195 /* Block of length less than 4 bytes are simply discarded */
1196 pPage->nFree = 0;
1197 pPage->sHdr.iFree = 0;
1198 }
1199 /* Reflect the change */
1200 SyBigEndianPack16(zTmp,pPage->sHdr.iOfft); /* Offset of the first cell */
1201 SyBigEndianPack16(&zTmp[2],pPage->sHdr.iFree); /* Offset of the first free block */
1202 SyMemcpy((const void *)zTmp,pPage->pRaw->zData,pEngine->iPageSize);
1203 /* All done */
1204 return UNQLITE_OK;
1205}
1206/*
1207** Allocate nByte bytes of space on a page.
1208**
1209** Return the index into pPage->pRaw->zData[] of the first byte of
1210** the new allocation. Or return 0 if there is not enough free
1211** space on the page to satisfy the allocation request.
1212**
1213** If the page contains nBytes of free space but does not contain
1214** nBytes of contiguous free space, then this routine automatically
1215** calls defragementPage() to consolidate all free space before
1216** allocating the new chunk.
1217*/
1218static int lhAllocateSpace(lhpage *pPage,sxu64 nAmount,sxu16 *pOfft)
1219{
1220 const unsigned char *zEnd,*zPtr;
1221 sxu16 iNext,iBlksz,nByte;
1222 unsigned char *zPrev;
1223 int rc;
1224 if( (sxu64)pPage->nFree < nAmount ){
1225 /* Don't bother looking for a free chunk */
1226 return UNQLITE_FULL;
1227 }
1228 if( pPage->nCell < 10 && ((int)nAmount >= (pPage->pHash->iPageSize / 2)) ){
1229 /* Big chunk need an overflow page for its data */
1230 return UNQLITE_FULL;
1231 }
1232 zPtr = &pPage->pRaw->zData[pPage->sHdr.iFree];
1233 zEnd = &pPage->pRaw->zData[pPage->pHash->iPageSize];
1234 nByte = (sxu16)nAmount;
1235 zPrev = 0;
1236 iBlksz = 0; /* cc warning */
1237 /* Perform the lookup */
1238 for(;;){
1239 if( zPtr >= zEnd ){
1240 return UNQLITE_FULL;
1241 }
1242 /* Offset of the next free block */
1243 SyBigEndianUnpack16(zPtr,&iNext);
1244 /* Block size */
1245 SyBigEndianUnpack16(&zPtr[2],&iBlksz);
1246 if( iBlksz >= nByte ){
1247 /* Got one */
1248 break;
1249 }
1250 zPrev = (unsigned char *)zPtr;
1251 if( iNext == 0 ){
1252 /* No more free blocks, defragment the page */
1253 rc = lhPageDefragment(pPage);
1254 if( rc == UNQLITE_OK && pPage->nFree >= nByte) {
1255 /* Free blocks are merged together */
1256 iNext = 0;
1257 zPtr = &pPage->pRaw->zData[pPage->sHdr.iFree];
1258 iBlksz = pPage->nFree;
1259 zPrev = 0;
1260 break;
1261 }else{
1262 return UNQLITE_FULL;
1263 }
1264 }
1265 /* Point to the next free block */
1266 zPtr = &pPage->pRaw->zData[iNext];
1267 }
1268 /* Acquire writer lock on this page */
1269 rc = pPage->pHash->pIo->xWrite(pPage->pRaw);
1270 if( rc != UNQLITE_OK ){
1271 return rc;
1272 }
1273 /* Save block offset */
1274 *pOfft = (sxu16)(zPtr - pPage->pRaw->zData);
1275 /* Fix pointers */
1276 if( iBlksz >= nByte && (iBlksz - nByte) > 3 ){
1277 unsigned char *zBlock = &pPage->pRaw->zData[(*pOfft) + nByte];
1278 /* Create a new block */
1279 zPtr = zBlock;
1280 SyBigEndianPack16(zBlock,iNext); /* Offset of the next block */
1281 SyBigEndianPack16(&zBlock[2],iBlksz-nByte); /* Block size*/
1282 /* Offset of the new block */
1283 iNext = (sxu16)(zPtr - pPage->pRaw->zData);
1284 iBlksz = nByte;
1285 }
1286 /* Fix offsets */
1287 if( zPrev ){
1288 SyBigEndianPack16(zPrev,iNext);
1289 }else{
1290 /* First block */
1291 pPage->sHdr.iFree = iNext;
1292 /* Reflect on the page header */
1293 SyBigEndianPack16(&pPage->pRaw->zData[2/* Offset of the first cell1*/],iNext);
1294 }
1295 /* All done */
1296 pPage->nFree -= iBlksz;
1297 return UNQLITE_OK;
1298}
1299/*
1300 * Write the cell header into the corresponding offset.
1301 */
1302static int lhCellWriteHeader(lhcell *pCell)
1303{
1304 lhpage *pPage = pCell->pPage;
1305 unsigned char *zRaw = pPage->pRaw->zData;
1306 /* Seek to the desired location */
1307 zRaw += pCell->iStart;
1308 /* 4 byte hash number */
1309 SyBigEndianPack32(zRaw,pCell->nHash);
1310 zRaw += 4;
1311 /* 4 byte key length */
1312 SyBigEndianPack32(zRaw,pCell->nKey);
1313 zRaw += 4;
1314 /* 8 byte data length */
1315 SyBigEndianPack64(zRaw,pCell->nData);
1316 zRaw += 8;
1317 /* 2 byte offset of the next cell */
1318 pCell->iNext = pPage->sHdr.iOfft;
1319 SyBigEndianPack16(zRaw,pCell->iNext);
1320 zRaw += 2;
1321 /* 8 byte overflow page number */
1322 SyBigEndianPack64(zRaw,pCell->iOvfl);
1323 /* Update the page header */
1324 pPage->sHdr.iOfft = pCell->iStart;
1325 /* pEngine->pIo->xWrite() has been successfully called on this page */
1326 SyBigEndianPack16(pPage->pRaw->zData,pCell->iStart);
1327 /* All done */
1328 return UNQLITE_OK;
1329}
1330/*
1331 * Write local payload.
1332 */
1333static int lhCellWriteLocalPayload(lhcell *pCell,
1334 const void *pKey,sxu32 nKeylen,
1335 const void *pData,unqlite_int64 nDatalen
1336 )
1337{
1338 /* A writer lock have been acquired on this page */
1339 lhpage *pPage = pCell->pPage;
1340 unsigned char *zRaw = pPage->pRaw->zData;
1341 /* Seek to the desired location */
1342 zRaw += pCell->iStart + L_HASH_CELL_SZ;
1343 /* Write the key */
1344 SyMemcpy(pKey,(void *)zRaw,nKeylen);
1345 zRaw += nKeylen;
1346 if( nDatalen > 0 ){
1347 /* Write the Data */
1348 SyMemcpy(pData,(void *)zRaw,(sxu32)nDatalen);
1349 }
1350 return UNQLITE_OK;
1351}
1352/*
1353 * Allocate as much overflow page we need to store the cell payload.
1354 */
1355static int lhCellWriteOvflPayload(lhcell *pCell,const void *pKey,sxu32 nKeylen,...)
1356{
1357 lhpage *pPage = pCell->pPage;
1358 lhash_kv_engine *pEngine = pPage->pHash;
1359 unqlite_page *pOvfl,*pFirst,*pNew;
1360 const unsigned char *zPtr,*zEnd;
1361 unsigned char *zRaw,*zRawEnd;
1362 sxu32 nAvail;
1363 va_list ap;
1364 int rc;
1365 /* Acquire a new overflow page */
1366 rc = lhAcquirePage(pEngine,&pOvfl);
1367 if( rc != UNQLITE_OK ){
1368 return rc;
1369 }
1370 /* Acquire a writer lock */
1371 rc = pEngine->pIo->xWrite(pOvfl);
1372 if( rc != UNQLITE_OK ){
1373 return rc;
1374 }
1375 pFirst = pOvfl;
1376 /* Link */
1377 pCell->iOvfl = pOvfl->pgno;
1378 /* Update the cell header */
1379 SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4/*Hash*/ + 4/*Key*/ + 8/*Data*/ + 2 /*Next cell*/],pCell->iOvfl);
1380 /* Start the write process */
1381 zPtr = (const unsigned char *)pKey;
1382 zEnd = &zPtr[nKeylen];
1383 SyBigEndianPack64(pOvfl->zData,0); /* Next overflow page on the chain */
1384 zRaw = &pOvfl->zData[8/* Next ovfl page*/ + 8 /* Data page */ + 2 /* Data offset*/];
1385 zRawEnd = &pOvfl->zData[pEngine->iPageSize];
1386 pNew = pOvfl;
1387 /* Write the key */
1388 for(;;){
1389 if( zPtr >= zEnd ){
1390 break;
1391 }
1392 if( zRaw >= zRawEnd ){
1393 /* Acquire a new page */
1394 rc = lhAcquirePage(pEngine,&pNew);
1395 if( rc != UNQLITE_OK ){
1396 return rc;
1397 }
1398 rc = pEngine->pIo->xWrite(pNew);
1399 if( rc != UNQLITE_OK ){
1400 return rc;
1401 }
1402 /* Link */
1403 SyBigEndianPack64(pOvfl->zData,pNew->pgno);
1404 pEngine->pIo->xPageUnref(pOvfl);
1405 SyBigEndianPack64(pNew->zData,0); /* Next overflow page on the chain */
1406 pOvfl = pNew;
1407 zRaw = &pNew->zData[8];
1408 zRawEnd = &pNew->zData[pEngine->iPageSize];
1409 }
1410 nAvail = (sxu32)(zRawEnd-zRaw);
1411 nKeylen = (sxu32)(zEnd-zPtr);
1412 if( nKeylen > nAvail ){
1413 nKeylen = nAvail;
1414 }
1415 SyMemcpy((const void *)zPtr,(void *)zRaw,nKeylen);
1416 /* Synchronize pointers */
1417 zPtr += nKeylen;
1418 zRaw += nKeylen;
1419 }
1420 rc = UNQLITE_OK;
1421 va_start(ap,nKeylen);
1422 pCell->iDataPage = pNew->pgno;
1423 pCell->iDataOfft = (sxu16)(zRaw-pNew->zData);
1424 /* Write the data page and its offset */
1425 SyBigEndianPack64(&pFirst->zData[8/*Next ovfl*/],pCell->iDataPage);
1426 SyBigEndianPack16(&pFirst->zData[8/*Next ovfl*/+8/*Data page*/],pCell->iDataOfft);
1427 /* Write data */
1428 for(;;){
1429 const void *pData;
1430 sxu32 nDatalen;
1431 sxu64 nData;
1432 pData = va_arg(ap,const void *);
1433 nData = va_arg(ap,sxu64);
1434 if( pData == 0 ){
1435 /* No more chunks */
1436 break;
1437 }
1438 /* Write this chunk */
1439 zPtr = (const unsigned char *)pData;
1440 zEnd = &zPtr[nData];
1441 for(;;){
1442 if( zPtr >= zEnd ){
1443 break;
1444 }
1445 if( zRaw >= zRawEnd ){
1446 /* Acquire a new page */
1447 rc = lhAcquirePage(pEngine,&pNew);
1448 if( rc != UNQLITE_OK ){
1449 va_end(ap);
1450 return rc;
1451 }
1452 rc = pEngine->pIo->xWrite(pNew);
1453 if( rc != UNQLITE_OK ){
1454 va_end(ap);
1455 return rc;
1456 }
1457 /* Link */
1458 SyBigEndianPack64(pOvfl->zData,pNew->pgno);
1459 pEngine->pIo->xPageUnref(pOvfl);
1460 SyBigEndianPack64(pNew->zData,0); /* Next overflow page on the chain */
1461 pOvfl = pNew;
1462 zRaw = &pNew->zData[8];
1463 zRawEnd = &pNew->zData[pEngine->iPageSize];
1464 }
1465 nAvail = (sxu32)(zRawEnd-zRaw);
1466 nDatalen = (sxu32)(zEnd-zPtr);
1467 if( nDatalen > nAvail ){
1468 nDatalen = nAvail;
1469 }
1470 SyMemcpy((const void *)zPtr,(void *)zRaw,nDatalen);
1471 /* Synchronize pointers */
1472 zPtr += nDatalen;
1473 zRaw += nDatalen;
1474 }
1475 }
1476 /* Unref the overflow page */
1477 pEngine->pIo->xPageUnref(pOvfl);
1478 va_end(ap);
1479 return UNQLITE_OK;
1480}
1481/*
1482 * Restore a page to the free list.
1483 */
1484static int lhRestorePage(lhash_kv_engine *pEngine,unqlite_page *pPage)
1485{
1486 int rc;
1487 rc = pEngine->pIo->xWrite(pEngine->pHeader);
1488 if( rc != UNQLITE_OK ){
1489 return rc;
1490 }
1491 rc = pEngine->pIo->xWrite(pPage);
1492 if( rc != UNQLITE_OK ){
1493 return rc;
1494 }
1495 /* Link to the list of free page */
1496 SyBigEndianPack64(pPage->zData,pEngine->nFreeList);
1497 pEngine->nFreeList = pPage->pgno;
1498 SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/],pEngine->nFreeList);
1499 /* All done */
1500 return UNQLITE_OK;
1501}
1502/*
1503 * Restore cell space and mark it as a free block.
1504 */
1505static int lhRestoreSpace(lhpage *pPage,sxu16 iOfft,sxu16 nByte)
1506{
1507 unsigned char *zRaw;
1508 if( nByte < 4 ){
1509 /* At least 4 bytes of freespace must form a valid block */
1510 return UNQLITE_OK;
1511 }
1512 /* pEngine->pIo->xWrite() has been successfully called on this page */
1513 zRaw = &pPage->pRaw->zData[iOfft];
1514 /* Mark as a free block */
1515 SyBigEndianPack16(zRaw,pPage->sHdr.iFree); /* Offset of the next free block */
1516 zRaw += 2;
1517 SyBigEndianPack16(zRaw,nByte);
1518 /* Link */
1519 SyBigEndianPack16(&pPage->pRaw->zData[2/* offset of the first cell */],iOfft);
1520 pPage->sHdr.iFree = iOfft;
1521 pPage->nFree += nByte;
1522 return UNQLITE_OK;
1523}
1524/* Forward declaration */
1525static lhcell * lhFindSibeling(lhcell *pCell);
1526/*
1527 * Unlink a cell.
1528 */
1529static int lhUnlinkCell(lhcell *pCell)
1530{
1531 lhash_kv_engine *pEngine = pCell->pPage->pHash;
1532 lhpage *pPage = pCell->pPage;
1533 sxu16 nByte = L_HASH_CELL_SZ;
1534 lhcell *pPrev;
1535 int rc;
1536 rc = pEngine->pIo->xWrite(pPage->pRaw);
1537 if( rc != UNQLITE_OK ){
1538 return rc;
1539 }
1540 /* Bring the link */
1541 pPrev = lhFindSibeling(pCell);
1542 if( pPrev ){
1543 pPrev->iNext = pCell->iNext;
1544 /* Fix offsets in the page header */
1545 SyBigEndianPack16(&pPage->pRaw->zData[pPrev->iStart + 4/*Hash*/+4/*Key*/+8/*Data*/],pCell->iNext);
1546 }else{
1547 /* First entry on this page (either master or slave) */
1548 pPage->sHdr.iOfft = pCell->iNext;
1549 /* Update the page header */
1550 SyBigEndianPack16(pPage->pRaw->zData,pCell->iNext);
1551 }
1552 /* Restore cell space */
1553 if( pCell->iOvfl == 0 ){
1554 nByte += (sxu16)(pCell->nData + pCell->nKey);
1555 }
1556 lhRestoreSpace(pPage,pCell->iStart,nByte);
1557 /* Discard the cell from the in-memory hashtable */
1558 lhCellDiscard(pCell);
1559 return UNQLITE_OK;
1560}
1561/*
1562 * Remove a cell and its paylod (key + data).
1563 */
1564static int lhRecordRemove(lhcell *pCell)
1565{
1566 lhash_kv_engine *pEngine = pCell->pPage->pHash;
1567 int rc;
1568 if( pCell->iOvfl > 0){
1569 /* Discard overflow pages */
1570 unqlite_page *pOvfl;
1571 pgno iNext = pCell->iOvfl;
1572 for(;;){
1573 /* Point to the overflow page */
1574 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iNext,&pOvfl);
1575 if( rc != UNQLITE_OK ){
1576 return rc;
1577 }
1578 /* Next page on the chain */
1579 SyBigEndianUnpack64(pOvfl->zData,&iNext);
1580 /* Restore the page to the free list */
1581 rc = lhRestorePage(pEngine,pOvfl);
1582 if( rc != UNQLITE_OK ){
1583 return rc;
1584 }
1585 /* Unref */
1586 pEngine->pIo->xPageUnref(pOvfl);
1587 if( iNext == 0 ){
1588 break;
1589 }
1590 }
1591 }
1592 /* Unlink the cell */
1593 rc = lhUnlinkCell(pCell);
1594 return rc;
1595}
1596/*
1597 * Find cell sibeling.
1598 */
1599static lhcell * lhFindSibeling(lhcell *pCell)
1600{
1601 lhpage *pPage = pCell->pPage->pMaster;
1602 lhcell *pEntry;
1603 pEntry = pPage->pFirst;
1604 while( pEntry ){
1605 if( pEntry->pPage == pCell->pPage && pEntry->iNext == pCell->iStart ){
1606 /* Sibeling found */
1607 return pEntry;
1608 }
1609 /* Point to the previous entry */
1610 pEntry = pEntry->pPrev;
1611 }
1612 /* Last inserted cell */
1613 return 0;
1614}
1615/*
1616 * Move a cell to a new location with its new data.
1617 */
1618static int lhMoveLocalCell(
1619 lhcell *pCell,
1620 sxu16 iOfft,
1621 const void *pData,
1622 unqlite_int64 nData
1623 )
1624{
1625 sxu16 iKeyOfft = pCell->iStart + L_HASH_CELL_SZ;
1626 lhpage *pPage = pCell->pPage;
1627 lhcell *pSibeling;
1628 pSibeling = lhFindSibeling(pCell);
1629 if( pSibeling ){
1630 /* Fix link */
1631 SyBigEndianPack16(&pPage->pRaw->zData[pSibeling->iStart + 4/*Hash*/+4/*Key*/+8/*Data*/],pCell->iNext);
1632 pSibeling->iNext = pCell->iNext;
1633 }else{
1634 /* First cell, update page header only */
1635 SyBigEndianPack16(pPage->pRaw->zData,pCell->iNext);
1636 pPage->sHdr.iOfft = pCell->iNext;
1637 }
1638 /* Set the new offset */
1639 pCell->iStart = iOfft;
1640 pCell->nData = (sxu64)nData;
1641 /* Write the cell payload */
1642 lhCellWriteLocalPayload(pCell,(const void *)&pPage->pRaw->zData[iKeyOfft],pCell->nKey,pData,nData);
1643 /* Finally write the cell header */
1644 lhCellWriteHeader(pCell);
1645 /* All done */
1646 return UNQLITE_OK;
1647}
1648/*
1649 * Overwrite an existing record.
1650 */
1651static int lhRecordOverwrite(
1652 lhcell *pCell,
1653 const void *pData,unqlite_int64 nByte
1654 )
1655{
1656 lhash_kv_engine *pEngine = pCell->pPage->pHash;
1657 unsigned char *zRaw,*zRawEnd,*zPayload;
1658 const unsigned char *zPtr,*zEnd;
1659 unqlite_page *pOvfl,*pOld,*pNew;
1660 lhpage *pPage = pCell->pPage;
1661 sxu32 nAvail;
1662 pgno iOvfl;
1663 int rc;
1664 /* Acquire a writer lock on this page */
1665 rc = pEngine->pIo->xWrite(pPage->pRaw);
1666 if( rc != UNQLITE_OK ){
1667 return rc;
1668 }
1669 if( pCell->iOvfl == 0 ){
1670 /* Local payload, try to deal with the free space issues */
1671 zPayload = &pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ + pCell->nKey];
1672 if( pCell->nData == (sxu64)nByte ){
1673 /* Best scenario, simply a memcpy operation */
1674 SyMemcpy(pData,(void *)zPayload,(sxu32)nByte);
1675 }else if( (sxu64)nByte < pCell->nData ){
1676 /* Shorter data, not so ugly */
1677 SyMemcpy(pData,(void *)zPayload,(sxu32)nByte);
1678 /* Update the cell header */
1679 SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],nByte);
1680 /* Restore freespace */
1681 lhRestoreSpace(pPage,(sxu16)(pCell->iStart + L_HASH_CELL_SZ + pCell->nKey + nByte),(sxu16)(pCell->nData - nByte));
1682 /* New data size */
1683 pCell->nData = (sxu64)nByte;
1684 }else{
1685 sxu16 iOfft = 0; /* cc warning */
1686 /* Check if another chunk is available for this cell */
1687 rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ + pCell->nKey + nByte,&iOfft);
1688 if( rc != UNQLITE_OK ){
1689 /* Transfer the payload to an overflow page */
1690 rc = lhCellWriteOvflPayload(pCell,&pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ],pCell->nKey,pData,nByte,(const void *)0);
1691 if( rc != UNQLITE_OK ){
1692 return rc;
1693 }
1694 /* Update the cell header */
1695 SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],(sxu64)nByte);
1696 /* Restore freespace */
1697 lhRestoreSpace(pPage,(sxu16)(pCell->iStart + L_HASH_CELL_SZ),(sxu16)(pCell->nKey + pCell->nData));
1698 /* New data size */
1699 pCell->nData = (sxu64)nByte;
1700 }else{
1701 sxu16 iOldOfft = pCell->iStart;
1702 sxu32 iOld = (sxu32)pCell->nData;
1703 /* Space is available, transfer the cell */
1704 lhMoveLocalCell(pCell,iOfft,pData,nByte);
1705 /* Restore cell space */
1706 lhRestoreSpace(pPage,iOldOfft,(sxu16)(L_HASH_CELL_SZ + pCell->nKey + iOld));
1707 }
1708 }
1709 return UNQLITE_OK;
1710 }
1711 /* Point to the overflow page */
1712 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pCell->iDataPage,&pOvfl);
1713 if( rc != UNQLITE_OK ){
1714 return rc;
1715 }
1716 /* Relase all old overflow pages first */
1717 SyBigEndianUnpack64(pOvfl->zData,&iOvfl);
1718 pOld = pOvfl;
1719 for(;;){
1720 if( iOvfl == 0 ){
1721 /* No more overflow pages on the chain */
1722 break;
1723 }
1724 /* Point to the target page */
1725 if( UNQLITE_OK != pEngine->pIo->xGet(pEngine->pIo->pHandle,iOvfl,&pOld) ){
1726 /* Not so fatal if something goes wrong here */
1727 break;
1728 }
1729 /* Next overflow page to be released */
1730 SyBigEndianUnpack64(pOld->zData,&iOvfl);
1731 if( pOld != pOvfl ){ /* xx: chm is maniac */
1732 /* Restore the page to the free list */
1733 lhRestorePage(pEngine,pOld);
1734 /* Unref */
1735 pEngine->pIo->xPageUnref(pOld);
1736 }
1737 }
1738 /* Point to the data offset */
1739 zRaw = &pOvfl->zData[pCell->iDataOfft];
1740 zRawEnd = &pOvfl->zData[pEngine->iPageSize];
1741 /* The data to be stored */
1742 zPtr = (const unsigned char *)pData;
1743 zEnd = &zPtr[nByte];
1744 /* Start the overwrite process */
1745 /* Acquire a writer lock */
1746 rc = pEngine->pIo->xWrite(pOvfl);
1747 if( rc != UNQLITE_OK ){
1748 return rc;
1749 }
1750 SyBigEndianPack64(pOvfl->zData,0);
1751 for(;;){
1752 sxu32 nLen;
1753 if( zPtr >= zEnd ){
1754 break;
1755 }
1756 if( zRaw >= zRawEnd ){
1757 /* Acquire a new page */
1758 rc = lhAcquirePage(pEngine,&pNew);
1759 if( rc != UNQLITE_OK ){
1760 return rc;
1761 }
1762 rc = pEngine->pIo->xWrite(pNew);
1763 if( rc != UNQLITE_OK ){
1764 return rc;
1765 }
1766 /* Link */
1767 SyBigEndianPack64(pOvfl->zData,pNew->pgno);
1768 pEngine->pIo->xPageUnref(pOvfl);
1769 SyBigEndianPack64(pNew->zData,0); /* Next overflow page on the chain */
1770 pOvfl = pNew;
1771 zRaw = &pNew->zData[8];
1772 zRawEnd = &pNew->zData[pEngine->iPageSize];
1773 }
1774 nAvail = (sxu32)(zRawEnd-zRaw);
1775 nLen = (sxu32)(zEnd-zPtr);
1776 if( nLen > nAvail ){
1777 nLen = nAvail;
1778 }
1779 SyMemcpy((const void *)zPtr,(void *)zRaw,nLen);
1780 /* Synchronize pointers */
1781 zPtr += nLen;
1782 zRaw += nLen;
1783 }
1784 /* Unref the last overflow page */
1785 pEngine->pIo->xPageUnref(pOvfl);
1786 /* Finally, update the cell header */
1787 pCell->nData = (sxu64)nByte;
1788 SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],pCell->nData);
1789 /* All done */
1790 return UNQLITE_OK;
1791}
1792/*
1793 * Append data to an existing record.
1794 */
1795static int lhRecordAppend(
1796 lhcell *pCell,
1797 const void *pData,unqlite_int64 nByte
1798 )
1799{
1800 lhash_kv_engine *pEngine = pCell->pPage->pHash;
1801 const unsigned char *zPtr,*zEnd;
1802 lhpage *pPage = pCell->pPage;
1803 unsigned char *zRaw,*zRawEnd;
1804 unqlite_page *pOvfl,*pNew;
1805 sxu64 nDatalen;
1806 sxu32 nAvail;
1807 pgno iOvfl;
1808 int rc;
1809 if( pCell->nData + nByte < pCell->nData ){
1810 /* Overflow */
1811 pEngine->pIo->xErr(pEngine->pIo->pHandle,"Append operation will cause data overflow");
1812 return UNQLITE_LIMIT;
1813 }
1814 /* Acquire a writer lock on this page */
1815 rc = pEngine->pIo->xWrite(pPage->pRaw);
1816 if( rc != UNQLITE_OK ){
1817 return rc;
1818 }
1819 if( pCell->iOvfl == 0 ){
1820 sxu16 iOfft = 0; /* cc warning */
1821 /* Local payload, check for a bigger place */
1822 rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ + pCell->nKey + pCell->nData + nByte,&iOfft);
1823 if( rc != UNQLITE_OK ){
1824 /* Transfer the payload to an overflow page */
1825 rc = lhCellWriteOvflPayload(pCell,
1826 &pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ],pCell->nKey,
1827 (const void *)&pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ + pCell->nKey],pCell->nData,
1828 pData,nByte,
1829 (const void *)0);
1830 if( rc != UNQLITE_OK ){
1831 return rc;
1832 }
1833 /* Update the cell header */
1834 SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],pCell->nData + nByte);
1835 /* Restore freespace */
1836 lhRestoreSpace(pPage,(sxu16)(pCell->iStart + L_HASH_CELL_SZ),(sxu16)(pCell->nKey + pCell->nData));
1837 /* New data size */
1838 pCell->nData += nByte;
1839 }else{
1840 sxu16 iOldOfft = pCell->iStart;
1841 sxu32 iOld = (sxu32)pCell->nData;
1842 SyBlob sWorker;
1843 SyBlobInit(&sWorker,&pEngine->sAllocator);
1844 /* Copy the old data */
1845 rc = SyBlobAppend(&sWorker,(const void *)&pPage->pRaw->zData[pCell->iStart + L_HASH_CELL_SZ + pCell->nKey],(sxu32)pCell->nData);
1846 if( rc == SXRET_OK ){
1847 /* Append the new data */
1848 rc = SyBlobAppend(&sWorker,pData,(sxu32)nByte);
1849 }
1850 if( rc != UNQLITE_OK ){
1851 SyBlobRelease(&sWorker);
1852 return rc;
1853 }
1854 /* Space is available, transfer the cell */
1855 lhMoveLocalCell(pCell,iOfft,SyBlobData(&sWorker),(unqlite_int64)SyBlobLength(&sWorker));
1856 /* Restore cell space */
1857 lhRestoreSpace(pPage,iOldOfft,(sxu16)(L_HASH_CELL_SZ + pCell->nKey + iOld));
1858 /* All done */
1859 SyBlobRelease(&sWorker);
1860 }
1861 return UNQLITE_OK;
1862 }
1863 /* Point to the overflow page which hold the data */
1864 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,pCell->iDataPage,&pOvfl);
1865 if( rc != UNQLITE_OK ){
1866 return rc;
1867 }
1868 /* Next overflow page in the chain */
1869 SyBigEndianUnpack64(pOvfl->zData,&iOvfl);
1870 /* Point to the end of the chunk */
1871 zRaw = &pOvfl->zData[pCell->iDataOfft];
1872 zRawEnd = &pOvfl->zData[pEngine->iPageSize];
1873 nDatalen = pCell->nData;
1874 nAvail = (sxu32)(zRawEnd - zRaw);
1875 for(;;){
1876 if( zRaw >= zRawEnd ){
1877 if( iOvfl == 0 ){
1878 /* Cant happen */
1879 pEngine->pIo->xErr(pEngine->pIo->pHandle,"Corrupt overflow page");
1880 return UNQLITE_CORRUPT;
1881 }
1882 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,iOvfl,&pNew);
1883 if( rc != UNQLITE_OK ){
1884 return rc;
1885 }
1886 /* Next overflow page on the chain */
1887 SyBigEndianUnpack64(pNew->zData,&iOvfl);
1888 /* Unref the previous overflow page */
1889 pEngine->pIo->xPageUnref(pOvfl);
1890 /* Point to the new chunk */
1891 zRaw = &pNew->zData[8];
1892 zRawEnd = &pNew->zData[pCell->pPage->pHash->iPageSize];
1893 nAvail = L_HASH_OVERFLOW_SIZE(pCell->pPage->pHash->iPageSize);
1894 pOvfl = pNew;
1895 }
1896 if( (sxu64)nAvail > nDatalen ){
1897 zRaw += nDatalen;
1898 break;
1899 }else{
1900 nDatalen -= nAvail;
1901 }
1902 zRaw += nAvail;
1903 }
1904 /* Start the append process */
1905 zPtr = (const unsigned char *)pData;
1906 zEnd = &zPtr[nByte];
1907 /* Acquire a writer lock */
1908 rc = pEngine->pIo->xWrite(pOvfl);
1909 if( rc != UNQLITE_OK ){
1910 return rc;
1911 }
1912 for(;;){
1913 sxu32 nLen;
1914 if( zPtr >= zEnd ){
1915 break;
1916 }
1917 if( zRaw >= zRawEnd ){
1918 /* Acquire a new page */
1919 rc = lhAcquirePage(pEngine,&pNew);
1920 if( rc != UNQLITE_OK ){
1921 return rc;
1922 }
1923 rc = pEngine->pIo->xWrite(pNew);
1924 if( rc != UNQLITE_OK ){
1925 return rc;
1926 }
1927 /* Link */
1928 SyBigEndianPack64(pOvfl->zData,pNew->pgno);
1929 pEngine->pIo->xPageUnref(pOvfl);
1930 SyBigEndianPack64(pNew->zData,0); /* Next overflow page on the chain */
1931 pOvfl = pNew;
1932 zRaw = &pNew->zData[8];
1933 zRawEnd = &pNew->zData[pEngine->iPageSize];
1934 }
1935 nAvail = (sxu32)(zRawEnd-zRaw);
1936 nLen = (sxu32)(zEnd-zPtr);
1937 if( nLen > nAvail ){
1938 nLen = nAvail;
1939 }
1940 SyMemcpy((const void *)zPtr,(void *)zRaw,nLen);
1941 /* Synchronize pointers */
1942 zPtr += nLen;
1943 zRaw += nLen;
1944 }
1945 /* Unref the last overflow page */
1946 pEngine->pIo->xPageUnref(pOvfl);
1947 /* Finally, update the cell header */
1948 pCell->nData += nByte;
1949 SyBigEndianPack64(&pPage->pRaw->zData[pCell->iStart + 4 /* Hash */ + 4 /* Key */],pCell->nData);
1950 /* All done */
1951 return UNQLITE_OK;
1952}
1953/*
1954 * A write privilege have been acquired on this page.
1955 * Mark it as an empty page (No cells).
1956 */
1957static int lhSetEmptyPage(lhpage *pPage)
1958{
1959 unsigned char *zRaw = pPage->pRaw->zData;
1960 lhphdr *pHeader = &pPage->sHdr;
1961 sxu16 nByte;
1962 int rc;
1963 /* Acquire a writer lock */
1964 rc = pPage->pHash->pIo->xWrite(pPage->pRaw);
1965 if( rc != UNQLITE_OK ){
1966 return rc;
1967 }
1968 /* Offset of the first cell */
1969 SyBigEndianPack16(zRaw,0);
1970 zRaw += 2;
1971 /* Offset of the first free block */
1972 pHeader->iFree = L_HASH_PAGE_HDR_SZ;
1973 SyBigEndianPack16(zRaw,L_HASH_PAGE_HDR_SZ);
1974 zRaw += 2;
1975 /* Slave page number */
1976 SyBigEndianPack64(zRaw,0);
1977 zRaw += 8;
1978 /* Fill the free block */
1979 SyBigEndianPack16(zRaw,0); /* Offset of the next free block */
1980 zRaw += 2;
1981 nByte = (sxu16)L_HASH_MX_FREE_SPACE(pPage->pHash->iPageSize);
1982 SyBigEndianPack16(zRaw,nByte);
1983 pPage->nFree = nByte;
1984 /* Do not add this page to the hot dirty list */
1985 pPage->pHash->pIo->xDontMkHot(pPage->pRaw);
1986 return UNQLITE_OK;
1987}
1988/* Forward declaration */
1989static int lhSlaveStore(
1990 lhpage *pPage,
1991 const void *pKey,sxu32 nKeyLen,
1992 const void *pData,unqlite_int64 nDataLen,
1993 sxu32 nHash
1994 );
1995/*
1996 * Store a cell and its payload in a given page.
1997 */
1998static int lhStoreCell(
1999 lhpage *pPage, /* Target page */
2000 const void *pKey,sxu32 nKeyLen, /* Payload: Key */
2001 const void *pData,unqlite_int64 nDataLen, /* Payload: Data */
2002 sxu32 nHash, /* Hash of the key */
2003 int auto_append /* Auto append a slave page if full */
2004 )
2005{
2006 lhash_kv_engine *pEngine = pPage->pHash;
2007 int iNeedOvfl = 0; /* Need overflow page for this cell and its payload*/
2008 lhcell *pCell;
2009 sxu16 nOfft;
2010 int rc;
2011 /* Acquire a writer lock on this page first */
2012 rc = pEngine->pIo->xWrite(pPage->pRaw);
2013 if( rc != UNQLITE_OK ){
2014 return rc;
2015 }
2016 /* Check for a free block */
2017 rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ+nKeyLen+nDataLen,&nOfft);
2018 if( rc != UNQLITE_OK ){
2019 /* Check for a free block to hold a single cell only (without payload) */
2020 rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ,&nOfft);
2021 if( rc != UNQLITE_OK ){
2022 if( !auto_append ){
2023 /* A split must be done */
2024 return UNQLITE_FULL;
2025 }else{
2026 /* Store this record in a slave page */
2027 rc = lhSlaveStore(pPage,pKey,nKeyLen,pData,nDataLen,nHash);
2028 return rc;
2029 }
2030 }
2031 iNeedOvfl = 1;
2032 }
2033 /* Allocate a new cell instance */
2034 pCell = lhNewCell(pEngine,pPage);
2035 if( pCell == 0 ){
2036 pEngine->pIo->xErr(pEngine->pIo->pHandle,"KV store is running out of memory");
2037 return UNQLITE_NOMEM;
2038 }
2039 /* Fill-in the structure */
2040 pCell->iStart = nOfft;
2041 pCell->nKey = nKeyLen;
2042 pCell->nData = (sxu64)nDataLen;
2043 pCell->nHash = nHash;
2044 if( nKeyLen < 262144 /* 256 KB */ ){
2045 /* Keep the key in-memory for fast lookup */
2046 SyBlobAppend(&pCell->sKey,pKey,nKeyLen);
2047 }
2048 /* Link the cell */
2049 rc = lhInstallCell(pCell);
2050 if( rc != UNQLITE_OK ){
2051 return rc;
2052 }
2053 /* Write the payload */
2054 if( iNeedOvfl ){
2055 rc = lhCellWriteOvflPayload(pCell,pKey,nKeyLen,pData,nDataLen,(const void *)0);
2056 if( rc != UNQLITE_OK ){
2057 lhCellDiscard(pCell);
2058 return rc;
2059 }
2060 }else{
2061 lhCellWriteLocalPayload(pCell,pKey,nKeyLen,pData,nDataLen);
2062 }
2063 /* Finally, Write the cell header */
2064 lhCellWriteHeader(pCell);
2065 /* All done */
2066 return UNQLITE_OK;
2067}
2068/*
2069 * Find a slave page capable of hosting the given amount.
2070 */
2071static int lhFindSlavePage(lhpage *pPage,sxu64 nAmount,sxu16 *pOfft,lhpage **ppSlave)
2072{
2073 lhash_kv_engine *pEngine = pPage->pHash;
2074 lhpage *pMaster = pPage->pMaster;
2075 lhpage *pSlave = pMaster->pSlave;
2076 unqlite_page *pRaw;
2077 lhpage *pNew;
2078 sxu16 iOfft;
2079 sxi32 i;
2080 int rc;
2081 /* Look for an already attached slave page */
2082 for( i = 0 ; i < pMaster->iSlave ; ++i ){
2083 /* Find a free chunk big enough */
2084 sxu16 size = L_HASH_CELL_SZ + nAmount;
2085 rc = lhAllocateSpace(pSlave,size,&iOfft);
2086 if( rc != UNQLITE_OK ){
2087 /* A space for cell header only */
2088 size = L_HASH_CELL_SZ;
2089 rc = lhAllocateSpace(pSlave,size,&iOfft);
2090 }
2091 if( rc == UNQLITE_OK ){
2092 /* All done */
2093 if( pOfft ){
2094 *pOfft = iOfft;
2095 }else{
2096 rc = lhRestoreSpace(pSlave, iOfft, size);
2097 }
2098 *ppSlave = pSlave;
2099 return rc;
2100 }
2101 /* Point to the next slave page */
2102 pSlave = pSlave->pNextSlave;
2103 }
2104 /* Acquire a new slave page */
2105 rc = lhAcquirePage(pEngine,&pRaw);
2106 if( rc != UNQLITE_OK ){
2107 return rc;
2108 }
2109 /* Last slave page */
2110 pSlave = pMaster->pSlave;
2111 if( pSlave == 0 ){
2112 /* First slave page */
2113 pSlave = pMaster;
2114 }
2115 /* Initialize the page */
2116 pNew = lhNewPage(pEngine,pRaw,pMaster);
2117 if( pNew == 0 ){
2118 return UNQLITE_NOMEM;
2119 }
2120 /* Mark as an empty page */
2121 rc = lhSetEmptyPage(pNew);
2122 if( rc != UNQLITE_OK ){
2123 goto fail;
2124 }
2125 if( pOfft ){
2126 /* Look for a free block */
2127 if( UNQLITE_OK != lhAllocateSpace(pNew,L_HASH_CELL_SZ+nAmount,&iOfft) ){
2128 /* Cell header only */
2129 lhAllocateSpace(pNew,L_HASH_CELL_SZ,&iOfft); /* Never fail */
2130 }
2131 *pOfft = iOfft;
2132 }
2133 /* Link this page to the previous slave page */
2134 rc = pEngine->pIo->xWrite(pSlave->pRaw);
2135 if( rc != UNQLITE_OK ){
2136 goto fail;
2137 }
2138 /* Reflect in the page header */
2139 SyBigEndianPack64(&pSlave->pRaw->zData[2/*Cell offset*/+2/*Free block offset*/],pRaw->pgno);
2140 pSlave->sHdr.iSlave = pRaw->pgno;
2141 /* All done */
2142 *ppSlave = pNew;
2143 return UNQLITE_OK;
2144fail:
2145 pEngine->pIo->xPageUnref(pNew->pRaw); /* pNew will be released in this call */
2146 return rc;
2147
2148}
2149/*
2150 * Perform a store operation in a slave page.
2151 */
2152static int lhSlaveStore(
2153 lhpage *pPage, /* Master page */
2154 const void *pKey,sxu32 nKeyLen, /* Payload: key */
2155 const void *pData,unqlite_int64 nDataLen, /* Payload: data */
2156 sxu32 nHash /* Hash of the key */
2157 )
2158{
2159 lhpage *pSlave;
2160 int rc;
2161 /* Find a slave page */
2162 rc = lhFindSlavePage(pPage,nKeyLen + nDataLen,0,&pSlave);
2163 if( rc != UNQLITE_OK ){
2164 return rc;
2165 }
2166 /* Perform the insertion in the slave page */
2167 rc = lhStoreCell(pSlave,pKey,nKeyLen,pData,nDataLen,nHash,1);
2168 return rc;
2169}
2170/*
2171 * Transfer a cell to a new page (either a master or slave).
2172 */
2173static int lhTransferCell(lhcell *pTarget,lhpage *pPage)
2174{
2175 lhcell *pCell;
2176 sxu16 nOfft;
2177 int rc;
2178 /* Check for a free block to hold a single cell only */
2179 rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ,&nOfft);
2180 if( rc != UNQLITE_OK ){
2181 /* Store in a slave page */
2182 rc = lhFindSlavePage(pPage,L_HASH_CELL_SZ,&nOfft,&pPage);
2183 if( rc != UNQLITE_OK ){
2184 return rc;
2185 }
2186 }
2187 /* Allocate a new cell instance */
2188 pCell = lhNewCell(pPage->pHash,pPage);
2189 if( pCell == 0 ){
2190 return UNQLITE_NOMEM;
2191 }
2192 /* Fill-in the structure */
2193 pCell->iStart = nOfft;
2194 pCell->nData = pTarget->nData;
2195 pCell->nKey = pTarget->nKey;
2196 pCell->iOvfl = pTarget->iOvfl;
2197 pCell->iDataOfft = pTarget->iDataOfft;
2198 pCell->iDataPage = pTarget->iDataPage;
2199 pCell->nHash = pTarget->nHash;
2200 SyBlobDup(&pTarget->sKey,&pCell->sKey);
2201 /* Link the cell */
2202 rc = lhInstallCell(pCell);
2203 if( rc != UNQLITE_OK ){
2204 return rc;
2205 }
2206 /* Finally, Write the cell header */
2207 lhCellWriteHeader(pCell);
2208 /* All done */
2209 return UNQLITE_OK;
2210}
2211/*
2212 * Perform a page split.
2213 */
2214static int lhPageSplit(
2215 lhpage *pOld, /* Page to be split */
2216 lhpage *pNew, /* New page */
2217 pgno split_bucket, /* Current split bucket */
2218 pgno high_mask /* High mask (Max split bucket - 1) */
2219 )
2220{
2221 lhcell *pCell,*pNext;
2222 SyBlob sWorker;
2223 pgno iBucket;
2224 int rc;
2225 SyBlobInit(&sWorker,&pOld->pHash->sAllocator);
2226 /* Perform the split */
2227 pCell = pOld->pList;
2228 for( ;; ){
2229 if( pCell == 0 ){
2230 /* No more cells */
2231 break;
2232 }
2233 /* Obtain the new logical bucket */
2234 iBucket = pCell->nHash & high_mask;
2235 pNext = pCell->pNext;
2236 if( iBucket != split_bucket){
2237 rc = UNQLITE_OK;
2238 if( pCell->iOvfl ){
2239 /* Transfer the cell only */
2240 rc = lhTransferCell(pCell,pNew);
2241 }else{
2242 /* Transfer the cell and its payload */
2243 SyBlobReset(&sWorker);
2244 if( SyBlobLength(&pCell->sKey) < 1 ){
2245 /* Consume the key */
2246 rc = lhConsumeCellkey(pCell,unqliteDataConsumer,&pCell->sKey,0);
2247 if( rc != UNQLITE_OK ){
2248 goto fail;
2249 }
2250 }
2251 /* Consume the data (Very small data < 65k) */
2252 rc = lhConsumeCellData(pCell,unqliteDataConsumer,&sWorker);
2253 if( rc != UNQLITE_OK ){
2254 goto fail;
2255 }
2256 /* Perform the transfer */
2257 rc = lhStoreCell(
2258 pNew,
2259 SyBlobData(&pCell->sKey),(int)SyBlobLength(&pCell->sKey),
2260 SyBlobData(&sWorker),SyBlobLength(&sWorker),
2261 pCell->nHash,
2262 1
2263 );
2264 }
2265 if( rc != UNQLITE_OK ){
2266 goto fail;
2267 }
2268 /* Discard the cell from the old page */
2269 lhUnlinkCell(pCell);
2270 }
2271 /* Point to the next cell */
2272 pCell = pNext;
2273 }
2274 /* All done */
2275 rc = UNQLITE_OK;
2276fail:
2277 SyBlobRelease(&sWorker);
2278 return rc;
2279}
2280/*
2281 * Perform the infamous linear hash split operation.
2282 */
2283static int lhSplit(lhpage *pTarget,int *pRetry)
2284{
2285 lhash_kv_engine *pEngine = pTarget->pHash;
2286 lhash_bmap_rec *pRec;
2287 lhpage *pOld,*pNew;
2288 unqlite_page *pRaw;
2289 int rc;
2290 /* Get the real page number of the bucket to split */
2291 pRec = lhMapFindBucket(pEngine,pEngine->split_bucket);
2292 if( pRec == 0 ){
2293 /* Can't happen */
2294 return UNQLITE_CORRUPT;
2295 }
2296 /* Load the page to be split */
2297 rc = lhLoadPage(pEngine,pRec->iReal,0,&pOld,0);
2298 if( rc != UNQLITE_OK ){
2299 return rc;
2300 }
2301 /* Request a new page */
2302 rc = lhAcquirePage(pEngine,&pRaw);
2303 if( rc != UNQLITE_OK ){
2304 return rc;
2305 }
2306 /* Initialize the page */
2307 pNew = lhNewPage(pEngine,pRaw,0);
2308 if( pNew == 0 ){
2309 return UNQLITE_NOMEM;
2310 }
2311 /* Mark as an empty page */
2312 rc = lhSetEmptyPage(pNew);
2313 if( rc != UNQLITE_OK ){
2314 goto fail;
2315 }
2316 /* Install and write the logical map record */
2317 rc = lhMapWriteRecord(pEngine,
2318 pEngine->split_bucket + pEngine->max_split_bucket,
2319 pRaw->pgno
2320 );
2321 if( rc != UNQLITE_OK ){
2322 goto fail;
2323 }
2324 if( pTarget->pRaw->pgno == pOld->pRaw->pgno ){
2325 *pRetry = 1;
2326 }
2327 /* Perform the split */
2328 rc = lhPageSplit(pOld,pNew,pEngine->split_bucket,pEngine->nmax_split_nucket - 1);
2329 if( rc != UNQLITE_OK ){
2330 goto fail;
2331 }
2332 /* Update the database header */
2333 pEngine->split_bucket++;
2334 /* Acquire a writer lock on the first page */
2335 rc = pEngine->pIo->xWrite(pEngine->pHeader);
2336 if( rc != UNQLITE_OK ){
2337 return rc;
2338 }
2339 if( pEngine->split_bucket >= pEngine->max_split_bucket ){
2340 /* Increment the generation number */
2341 pEngine->split_bucket = 0;
2342 pEngine->max_split_bucket = pEngine->nmax_split_nucket;
2343 pEngine->nmax_split_nucket <<= 1;
2344 if( !pEngine->nmax_split_nucket ){
2345 /* If this happen to your installation, please tell us <chm@symisc.net> */
2346 pEngine->pIo->xErr(pEngine->pIo->pHandle,"Database page (64-bit integer) limit reached");
2347 return UNQLITE_LIMIT;
2348 }
2349 /* Reflect in the page header */
2350 SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/+8/*Free list*/],pEngine->split_bucket);
2351 SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/+8/*Free list*/+8/*Split bucket*/],pEngine->max_split_bucket);
2352 }else{
2353 /* Modify only the split bucket */
2354 SyBigEndianPack64(&pEngine->pHeader->zData[4/*Magic*/+4/*Hash*/+8/*Free list*/],pEngine->split_bucket);
2355 }
2356 /* All done */
2357 return UNQLITE_OK;
2358fail:
2359 pEngine->pIo->xPageUnref(pNew->pRaw);
2360 return rc;
2361}
2362/*
2363 * Store a record in the target page.
2364 */
2365static int lhRecordInstall(
2366 lhpage *pPage, /* Target page */
2367 sxu32 nHash, /* Hash of the key */
2368 const void *pKey,sxu32 nKeyLen, /* Payload: Key */
2369 const void *pData,unqlite_int64 nDataLen /* Payload: Data */
2370 )
2371{
2372 int rc;
2373 rc = lhStoreCell(pPage,pKey,nKeyLen,pData,nDataLen,nHash,0);
2374 if( rc == UNQLITE_FULL ){
2375 int do_retry = 0;
2376 /* Split */
2377 rc = lhSplit(pPage,&do_retry);
2378 if( rc == UNQLITE_OK ){
2379 if( do_retry ){
2380 /* Re-calculate logical bucket number */
2381 return SXERR_RETRY;
2382 }
2383 /* Perform the store */
2384 rc = lhStoreCell(pPage,pKey,nKeyLen,pData,nDataLen,nHash,1);
2385 }
2386 }
2387 return rc;
2388}
2389/*
2390 * Insert a record (Either overwrite or append operation) in our database.
2391 */
2392static int lh_record_insert(
2393 unqlite_kv_engine *pKv, /* KV store */
2394 const void *pKey,sxu32 nKeyLen, /* Payload: Key */
2395 const void *pData,unqlite_int64 nDataLen, /* Payload: data */
2396 int is_append /* True for an append operation */
2397 )
2398{
2399 lhash_kv_engine *pEngine = (lhash_kv_engine *)pKv;
2400 lhash_bmap_rec *pRec;
2401 unqlite_page *pRaw;
2402 lhpage *pPage;
2403 lhcell *pCell;
2404 pgno iBucket;
2405 sxu32 nHash;
2406 int iCnt;
2407 int rc;
2408
2409 /* Acquire the first page (DB hash Header) so that everything gets loaded autmatically */
2410 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0);
2411 if( rc != UNQLITE_OK ){
2412 return rc;
2413 }
2414 iCnt = 0;
2415 /* Compute the hash of the key first */
2416 nHash = pEngine->xHash(pKey,(sxu32)nKeyLen);
2417retry:
2418 /* Extract the logical bucket number */
2419 iBucket = nHash & (pEngine->nmax_split_nucket - 1);
2420 if( iBucket >= pEngine->split_bucket + pEngine->max_split_bucket ){
2421 /* Low mask */
2422 iBucket = nHash & (pEngine->max_split_bucket - 1);
2423 }
2424 /* Map the logical bucket number to real page number */
2425 pRec = lhMapFindBucket(pEngine,iBucket);
2426 if( pRec == 0 ){
2427 /* Request a new page */
2428 rc = lhAcquirePage(pEngine,&pRaw);
2429 if( rc != UNQLITE_OK ){
2430 return rc;
2431 }
2432 /* Initialize the page */
2433 pPage = lhNewPage(pEngine,pRaw,0);
2434 if( pPage == 0 ){
2435 return UNQLITE_NOMEM;
2436 }
2437 /* Mark as an empty page */
2438 rc = lhSetEmptyPage(pPage);
2439 if( rc != UNQLITE_OK ){
2440 pEngine->pIo->xPageUnref(pRaw); /* pPage will be released during this call */
2441 return rc;
2442 }
2443 /* Store the cell */
2444 rc = lhStoreCell(pPage,pKey,nKeyLen,pData,nDataLen,nHash,1);
2445 if( rc == UNQLITE_OK ){
2446 /* Install and write the logical map record */
2447 rc = lhMapWriteRecord(pEngine,iBucket,pRaw->pgno);
2448 }
2449 pEngine->pIo->xPageUnref(pRaw);
2450 return rc;
2451 }else{
2452 /* Load the page */
2453 rc = lhLoadPage(pEngine,pRec->iReal,0,&pPage,0);
2454 if( rc != UNQLITE_OK ){
2455 /* IO error, unlikely scenario */
2456 return rc;
2457 }
2458 /* Do not add this page to the hot dirty list */
2459 pEngine->pIo->xDontMkHot(pPage->pRaw);
2460 /* Lookup for the cell */
2461 pCell = lhFindCell(pPage,pKey,(sxu32)nKeyLen,nHash);
2462 if( pCell == 0 ){
2463 /* Create the record */
2464 rc = lhRecordInstall(pPage,nHash,pKey,nKeyLen,pData,nDataLen);
2465 if( rc == SXERR_RETRY && iCnt++ < 2 ){
2466 rc = UNQLITE_OK;
2467 goto retry;
2468 }
2469 }else{
2470 if( is_append ){
2471 /* Append operation */
2472 rc = lhRecordAppend(pCell,pData,nDataLen);
2473 }else{
2474 /* Overwrite old value */
2475 rc = lhRecordOverwrite(pCell,pData,nDataLen);
2476 }
2477 }
2478 pEngine->pIo->xPageUnref(pPage->pRaw);
2479 }
2480 return rc;
2481}
2482/*
2483 * Replace method.
2484 */
2485static int lhash_kv_replace(
2486 unqlite_kv_engine *pKv,
2487 const void *pKey,int nKeyLen,
2488 const void *pData,unqlite_int64 nDataLen
2489 )
2490{
2491 int rc;
2492 rc = lh_record_insert(pKv,pKey,(sxu32)nKeyLen,pData,nDataLen,0);
2493 return rc;
2494}
2495/*
2496 * Append method.
2497 */
2498static int lhash_kv_append(
2499 unqlite_kv_engine *pKv,
2500 const void *pKey,int nKeyLen,
2501 const void *pData,unqlite_int64 nDataLen
2502 )
2503{
2504 int rc;
2505 rc = lh_record_insert(pKv,pKey,(sxu32)nKeyLen,pData,nDataLen,1);
2506 return rc;
2507}
2508/*
2509 * Write the hash header (Page one).
2510 */
2511static int lhash_write_header(lhash_kv_engine *pEngine,unqlite_page *pHeader)
2512{
2513 unsigned char *zRaw = pHeader->zData;
2514 lhash_bmap_page *pMap;
2515
2516 pEngine->pHeader = pHeader;
2517 /* 4 byte magic number */
2518 SyBigEndianPack32(zRaw,pEngine->nMagic);
2519 zRaw += 4;
2520 /* 4 byte hash value to identify a valid hash function */
2521 SyBigEndianPack32(zRaw,pEngine->xHash(L_HASH_WORD,sizeof(L_HASH_WORD)-1));
2522 zRaw += 4;
2523 /* List of free pages: Empty */
2524 SyBigEndianPack64(zRaw,0);
2525 zRaw += 8;
2526 /* Current split bucket */
2527 SyBigEndianPack64(zRaw,pEngine->split_bucket);
2528 zRaw += 8;
2529 /* Maximum split bucket */
2530 SyBigEndianPack64(zRaw,pEngine->max_split_bucket);
2531 zRaw += 8;
2532 /* Initialiaze the bucket map */
2533 pMap = &pEngine->sPageMap;
2534 /* Fill in the structure */
2535 pMap->iNum = pHeader->pgno;
2536 /* Next page in the bucket map */
2537 SyBigEndianPack64(zRaw,0);
2538 zRaw += 8;
2539 /* Total number of records in the bucket map */
2540 SyBigEndianPack32(zRaw,0);
2541 zRaw += 4;
2542 pMap->iPtr = (sxu16)(zRaw - pHeader->zData);
2543 /* All done */
2544 return UNQLITE_OK;
2545 }
2546/*
2547 * Exported: xOpen() method.
2548 */
2549static int lhash_kv_open(unqlite_kv_engine *pEngine,pgno dbSize)
2550{
2551 lhash_kv_engine *pHash = (lhash_kv_engine *)pEngine;
2552 unqlite_page *pHeader;
2553 int rc;
2554 if( dbSize < 1 ){
2555 /* A new database, create the header */
2556 rc = pEngine->pIo->xNew(pEngine->pIo->pHandle,&pHeader);
2557 if( rc != UNQLITE_OK ){
2558 return rc;
2559 }
2560 /* Acquire a writer lock */
2561 rc = pEngine->pIo->xWrite(pHeader);
2562 if( rc != UNQLITE_OK ){
2563 return rc;
2564 }
2565 /* Write the hash header */
2566 rc = lhash_write_header(pHash,pHeader);
2567 if( rc != UNQLITE_OK ){
2568 return rc;
2569 }
2570 }else{
2571 /* Acquire the page one of the database */
2572 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,&pHeader);
2573 if( rc != UNQLITE_OK ){
2574 return rc;
2575 }
2576 /* Read the database header */
2577 rc = lhash_read_header(pHash,pHeader);
2578 if( rc != UNQLITE_OK ){
2579 return rc;
2580 }
2581 }
2582 return UNQLITE_OK;
2583}
2584/*
2585 * Release a master or slave page. (xUnpin callback).
2586 */
2587static void lhash_page_release(void *pUserData)
2588{
2589 lhpage *pPage = (lhpage *)pUserData;
2590 lhash_kv_engine *pEngine = pPage->pHash;
2591 lhcell *pNext,*pCell = pPage->pList;
2592 unqlite_page *pRaw = pPage->pRaw;
2593 sxu32 n;
2594 /* Drop in-memory cells */
2595 for( n = 0 ; n < pPage->nCell ; ++n ){
2596 pNext = pCell->pNext;
2597 SyBlobRelease(&pCell->sKey);
2598 /* Release the cell instance */
2599 SyMemBackendPoolFree(&pEngine->sAllocator,(void *)pCell);
2600 /* Point to the next entry */
2601 pCell = pNext;
2602 }
2603 if( pPage->apCell ){
2604 /* Release the cell table */
2605 SyMemBackendFree(&pEngine->sAllocator,(void *)pPage->apCell);
2606 }
2607 /* Finally, release the whole page */
2608 SyMemBackendPoolFree(&pEngine->sAllocator,pPage);
2609 pRaw->pUserData = 0;
2610}
2611/*
2612 * Default hash function (DJB).
2613 */
2614static sxu32 lhash_bin_hash(const void *pSrc,sxu32 nLen)
2615{
2616 register unsigned char *zIn = (unsigned char *)pSrc;
2617 unsigned char *zEnd;
2618 sxu32 nH = 5381;
2619 if( nLen > 2048 /* 2K */ ){
2620 nLen = 2048;
2621 }
2622 zEnd = &zIn[nLen];
2623 for(;;){
2624 if( zIn >= zEnd ){ break; } nH = nH * 33 + zIn[0] ; zIn++;
2625 if( zIn >= zEnd ){ break; } nH = nH * 33 + zIn[0] ; zIn++;
2626 if( zIn >= zEnd ){ break; } nH = nH * 33 + zIn[0] ; zIn++;
2627 if( zIn >= zEnd ){ break; } nH = nH * 33 + zIn[0] ; zIn++;
2628 }
2629 return nH;
2630}
2631/*
2632 * Exported: xInit() method.
2633 * Initialize the Key value storage engine.
2634 */
2635static int lhash_kv_init(unqlite_kv_engine *pEngine,int iPageSize)
2636{
2637 lhash_kv_engine *pHash = (lhash_kv_engine *)pEngine;
2638 int rc;
2639
2640 /* This structure is always zeroed, go to the initialization directly */
2641 SyMemBackendInitFromParent(&pHash->sAllocator,unqliteExportMemBackend());
2642#if defined(UNQLITE_ENABLE_THREADS)
2643 /* Already protected by the upper layers */
2644 SyMemBackendDisbaleMutexing(&pHash->sAllocator);
2645#endif
2646 pHash->iPageSize = iPageSize;
2647 /* Default hash function */
2648 pHash->xHash = lhash_bin_hash;
2649 /* Default comparison function */
2650 pHash->xCmp = SyMemcmp;
2651 /* Allocate a new record map */
2652 pHash->nBuckSize = 32;
2653 pHash->apMap = (lhash_bmap_rec **)SyMemBackendAlloc(&pHash->sAllocator,pHash->nBuckSize *sizeof(lhash_bmap_rec *));
2654 if( pHash->apMap == 0 ){
2655 rc = UNQLITE_NOMEM;
2656 goto err;
2657 }
2658 /* Zero the table */
2659 SyZero(pHash->apMap,pHash->nBuckSize * sizeof(lhash_bmap_rec *));
2660 /* Linear hashing components */
2661 pHash->split_bucket = 0; /* Logical not real bucket number */
2662 pHash->max_split_bucket = 1;
2663 pHash->nmax_split_nucket = 2;
2664 pHash->nMagic = L_HASH_MAGIC;
2665 /* Install the cache unpin and reload callbacks */
2666 pHash->pIo->xSetUnpin(pHash->pIo->pHandle,lhash_page_release);
2667 pHash->pIo->xSetReload(pHash->pIo->pHandle,lhash_page_release);
2668 return UNQLITE_OK;
2669err:
2670 SyMemBackendRelease(&pHash->sAllocator);
2671 return rc;
2672}
2673/*
2674 * Exported: xRelease() method.
2675 * Release the Key value storage engine.
2676 */
2677static void lhash_kv_release(unqlite_kv_engine *pEngine)
2678{
2679 lhash_kv_engine *pHash = (lhash_kv_engine *)pEngine;
2680 /* Release the private memory backend */
2681 SyMemBackendRelease(&pHash->sAllocator);
2682}
2683/*
2684 * Exported: xConfig() method.
2685 * Configure the linear hash KV store.
2686 */
2687static int lhash_kv_config(unqlite_kv_engine *pEngine,int op,va_list ap)
2688{
2689 lhash_kv_engine *pHash = (lhash_kv_engine *)pEngine;
2690 int rc = UNQLITE_OK;
2691 switch(op){
2692 case UNQLITE_KV_CONFIG_HASH_FUNC: {
2693 /* Default hash function */
2694 if( pHash->nBuckRec > 0 ){
2695 /* Locked operation */
2696 rc = UNQLITE_LOCKED;
2697 }else{
2698 ProcHash xHash = va_arg(ap,ProcHash);
2699 if( xHash ){
2700 pHash->xHash = xHash;
2701 }
2702 }
2703 break;
2704 }
2705 case UNQLITE_KV_CONFIG_CMP_FUNC: {
2706 /* Default comparison function */
2707 ProcCmp xCmp = va_arg(ap,ProcCmp);
2708 if( xCmp ){
2709 pHash->xCmp = xCmp;
2710 }
2711 break;
2712 }
2713 default:
2714 /* Unknown OP */
2715 rc = UNQLITE_UNKNOWN;
2716 break;
2717 }
2718 return rc;
2719}
2720/*
2721 * Each public cursor is identified by an instance of this structure.
2722 */
2723typedef struct lhash_kv_cursor lhash_kv_cursor;
2724struct lhash_kv_cursor
2725{
2726 unqlite_kv_engine *pStore; /* Must be first */
2727 /* Private fields */
2728 int iState; /* Current state of the cursor */
2729 int is_first; /* True to read the database header */
2730 lhcell *pCell; /* Current cell we are processing */
2731 unqlite_page *pRaw; /* Raw disk page */
2732 lhash_bmap_rec *pRec; /* Logical to real bucket map */
2733};
2734/*
2735 * Possible state of the cursor
2736 */
2737#define L_HASH_CURSOR_STATE_NEXT_PAGE 1 /* Next page in the list */
2738#define L_HASH_CURSOR_STATE_CELL 2 /* Processing Cell */
2739#define L_HASH_CURSOR_STATE_DONE 3 /* Cursor does not point to anything */
2740/*
2741 * Initialize the cursor.
2742 */
2743static void lhInitCursor(unqlite_kv_cursor *pPtr)
2744{
2745 lhash_kv_engine *pEngine = (lhash_kv_engine *)pPtr->pStore;
2746 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pPtr;
2747 /* Init */
2748 pCur->iState = L_HASH_CURSOR_STATE_NEXT_PAGE;
2749 pCur->pCell = 0;
2750 pCur->pRec = pEngine->pFirst;
2751 pCur->pRaw = 0;
2752 pCur->is_first = 1;
2753}
2754/*
2755 * Point to the next page on the database.
2756 */
2757static int lhCursorNextPage(lhash_kv_cursor *pPtr)
2758{
2759 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pPtr;
2760 lhash_bmap_rec *pRec;
2761 lhpage *pPage;
2762 int rc;
2763 for(;;){
2764 pRec = pCur->pRec;
2765 if( pRec == 0 ){
2766 pCur->iState = L_HASH_CURSOR_STATE_DONE;
2767 return UNQLITE_DONE;
2768 }
2769 if( pPtr->iState == L_HASH_CURSOR_STATE_CELL && pPtr->pRaw ){
2770 /* Unref this page */
2771 pCur->pStore->pIo->xPageUnref(pPtr->pRaw);
2772 pPtr->pRaw = 0;
2773 }
2774 /* Advance the map cursor */
2775 pCur->pRec = pRec->pPrev; /* Not a bug, reverse link */
2776 /* Load the next page on the list */
2777 rc = lhLoadPage((lhash_kv_engine *)pCur->pStore,pRec->iReal,0,&pPage,0);
2778 if( rc != UNQLITE_OK ){
2779 return rc;
2780 }
2781 if( pPage->pList ){
2782 /* Reflect the change */
2783 pCur->pCell = pPage->pList;
2784 pCur->iState = L_HASH_CURSOR_STATE_CELL;
2785 pCur->pRaw = pPage->pRaw;
2786 break;
2787 }
2788 /* Empty page, discard this page and continue */
2789 pPage->pHash->pIo->xPageUnref(pPage->pRaw);
2790 }
2791 return UNQLITE_OK;
2792}
2793/*
2794 * Point to the previous page on the database.
2795 */
2796static int lhCursorPrevPage(lhash_kv_cursor *pPtr)
2797{
2798 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pPtr;
2799 lhash_bmap_rec *pRec;
2800 lhpage *pPage;
2801 int rc;
2802 for(;;){
2803 pRec = pCur->pRec;
2804 if( pRec == 0 ){
2805 pCur->iState = L_HASH_CURSOR_STATE_DONE;
2806 return UNQLITE_DONE;
2807 }
2808 if( pPtr->iState == L_HASH_CURSOR_STATE_CELL && pPtr->pRaw ){
2809 /* Unref this page */
2810 pCur->pStore->pIo->xPageUnref(pPtr->pRaw);
2811 pPtr->pRaw = 0;
2812 }
2813 /* Advance the map cursor */
2814 pCur->pRec = pRec->pNext; /* Not a bug, reverse link */
2815 /* Load the previous page on the list */
2816 rc = lhLoadPage((lhash_kv_engine *)pCur->pStore,pRec->iReal,0,&pPage,0);
2817 if( rc != UNQLITE_OK ){
2818 return rc;
2819 }
2820 if( pPage->pFirst ){
2821 /* Reflect the change */
2822 pCur->pCell = pPage->pFirst;
2823 pCur->iState = L_HASH_CURSOR_STATE_CELL;
2824 pCur->pRaw = pPage->pRaw;
2825 break;
2826 }
2827 /* Discard this page and continue */
2828 pPage->pHash->pIo->xPageUnref(pPage->pRaw);
2829 }
2830 return UNQLITE_OK;
2831}
2832/*
2833 * Is a valid cursor.
2834 */
2835static int lhCursorValid(unqlite_kv_cursor *pPtr)
2836{
2837 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pPtr;
2838 return (pCur->iState == L_HASH_CURSOR_STATE_CELL) && pCur->pCell;
2839}
2840/*
2841 * Point to the first record.
2842 */
2843static int lhCursorFirst(unqlite_kv_cursor *pCursor)
2844{
2845 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
2846 lhash_kv_engine *pEngine = (lhash_kv_engine *)pCursor->pStore;
2847 int rc;
2848 if( pCur->is_first ){
2849 /* Read the database header first */
2850 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0);
2851 if( rc != UNQLITE_OK ){
2852 return rc;
2853 }
2854 pCur->is_first = 0;
2855 }
2856 /* Point to the first map record */
2857 pCur->pRec = pEngine->pFirst;
2858 /* Load the cells */
2859 rc = lhCursorNextPage(pCur);
2860 return rc;
2861}
2862/*
2863 * Point to the last record.
2864 */
2865static int lhCursorLast(unqlite_kv_cursor *pCursor)
2866{
2867 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
2868 lhash_kv_engine *pEngine = (lhash_kv_engine *)pCursor->pStore;
2869 int rc;
2870 if( pCur->is_first ){
2871 /* Read the database header first */
2872 rc = pEngine->pIo->xGet(pEngine->pIo->pHandle,1,0);
2873 if( rc != UNQLITE_OK ){
2874 return rc;
2875 }
2876 pCur->is_first = 0;
2877 }
2878 /* Point to the last map record */
2879 pCur->pRec = pEngine->pList;
2880 /* Load the cells */
2881 rc = lhCursorPrevPage(pCur);
2882 return rc;
2883}
2884/*
2885 * Reset the cursor.
2886 */
2887static void lhCursorReset(unqlite_kv_cursor *pCursor)
2888{
2889 lhCursorFirst(pCursor);
2890}
2891/*
2892 * Point to the next record.
2893 */
2894static int lhCursorNext(unqlite_kv_cursor *pCursor)
2895{
2896 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
2897 lhcell *pCell;
2898 int rc;
2899 if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){
2900 /* Load the cells of the next page */
2901 rc = lhCursorNextPage(pCur);
2902 return rc;
2903 }
2904 pCell = pCur->pCell;
2905 pCur->pCell = pCell->pNext;
2906 if( pCur->pCell == 0 ){
2907 /* Load the cells of the next page */
2908 rc = lhCursorNextPage(pCur);
2909 return rc;
2910 }
2911 return UNQLITE_OK;
2912}
2913/*
2914 * Point to the previous record.
2915 */
2916static int lhCursorPrev(unqlite_kv_cursor *pCursor)
2917{
2918 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
2919 lhcell *pCell;
2920 int rc;
2921 if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){
2922 /* Load the cells of the previous page */
2923 rc = lhCursorPrevPage(pCur);
2924 return rc;
2925 }
2926 pCell = pCur->pCell;
2927 pCur->pCell = pCell->pPrev;
2928 if( pCur->pCell == 0 ){
2929 /* Load the cells of the previous page */
2930 rc = lhCursorPrevPage(pCur);
2931 return rc;
2932 }
2933 return UNQLITE_OK;
2934}
2935/*
2936 * Return key length.
2937 */
2938static int lhCursorKeyLength(unqlite_kv_cursor *pCursor,int *pLen)
2939{
2940 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
2941 lhcell *pCell;
2942
2943 if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){
2944 /* Invalid state */
2945 return UNQLITE_INVALID;
2946 }
2947 /* Point to the target cell */
2948 pCell = pCur->pCell;
2949 /* Return key length */
2950 *pLen = (int)pCell->nKey;
2951 return UNQLITE_OK;
2952}
2953/*
2954 * Return data length.
2955 */
2956static int lhCursorDataLength(unqlite_kv_cursor *pCursor,unqlite_int64 *pLen)
2957{
2958 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
2959 lhcell *pCell;
2960
2961 if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){
2962 /* Invalid state */
2963 return UNQLITE_INVALID;
2964 }
2965 /* Point to the target cell */
2966 pCell = pCur->pCell;
2967 /* Return data length */
2968 *pLen = (unqlite_int64)pCell->nData;
2969 return UNQLITE_OK;
2970}
2971/*
2972 * Consume the key.
2973 */
2974static int lhCursorKey(unqlite_kv_cursor *pCursor,int (*xConsumer)(const void *,unsigned int,void *),void *pUserData)
2975{
2976 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
2977 lhcell *pCell;
2978 int rc;
2979 if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){
2980 /* Invalid state */
2981 return UNQLITE_INVALID;
2982 }
2983 /* Point to the target cell */
2984 pCell = pCur->pCell;
2985 if( SyBlobLength(&pCell->sKey) > 0 ){
2986 /* Consume the key directly */
2987 rc = xConsumer(SyBlobData(&pCell->sKey),SyBlobLength(&pCell->sKey),pUserData);
2988 }else{
2989 /* Very large key */
2990 rc = lhConsumeCellkey(pCell,xConsumer,pUserData,0);
2991 }
2992 return rc;
2993}
2994/*
2995 * Consume the data.
2996 */
2997static int lhCursorData(unqlite_kv_cursor *pCursor,int (*xConsumer)(const void *,unsigned int,void *),void *pUserData)
2998{
2999 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
3000 lhcell *pCell;
3001 int rc;
3002 if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){
3003 /* Invalid state */
3004 return UNQLITE_INVALID;
3005 }
3006 /* Point to the target cell */
3007 pCell = pCur->pCell;
3008 /* Consume the data */
3009 rc = lhConsumeCellData(pCell,xConsumer,pUserData);
3010 return rc;
3011}
3012/*
3013 * Find a partiuclar record.
3014 */
3015static int lhCursorSeek(unqlite_kv_cursor *pCursor,const void *pKey,int nByte,int iPos)
3016{
3017 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
3018 int rc;
3019 /* Perform a lookup */
3020 rc = lhRecordLookup((lhash_kv_engine *)pCur->pStore,pKey,nByte,&pCur->pCell);
3021 if( rc != UNQLITE_OK ){
3022 SXUNUSED(iPos);
3023 pCur->pCell = 0;
3024 pCur->iState = L_HASH_CURSOR_STATE_DONE;
3025 return rc;
3026 }
3027 pCur->iState = L_HASH_CURSOR_STATE_CELL;
3028 return UNQLITE_OK;
3029}
3030/*
3031 * Remove a particular record.
3032 */
3033static int lhCursorDelete(unqlite_kv_cursor *pCursor)
3034{
3035 lhash_kv_cursor *pCur = (lhash_kv_cursor *)pCursor;
3036 lhcell *pCell;
3037 int rc;
3038 if( pCur->iState != L_HASH_CURSOR_STATE_CELL || pCur->pCell == 0 ){
3039 /* Invalid state */
3040 return UNQLITE_INVALID;
3041 }
3042 /* Point to the target cell */
3043 pCell = pCur->pCell;
3044 /* Point to the next entry */
3045 pCur->pCell = pCell->pNext;
3046 /* Perform the deletion */
3047 rc = lhRecordRemove(pCell);
3048 return rc;
3049}
3050/*
3051 * Export the linear-hash storage engine.
3052 */
3053UNQLITE_PRIVATE const unqlite_kv_methods * unqliteExportDiskKvStorage(void)
3054{
3055 static const unqlite_kv_methods sDiskStore = {
3056 "hash", /* zName */
3057 sizeof(lhash_kv_engine), /* szKv */
3058 sizeof(lhash_kv_cursor), /* szCursor */
3059 1, /* iVersion */
3060 lhash_kv_init, /* xInit */
3061 lhash_kv_release, /* xRelease */
3062 lhash_kv_config, /* xConfig */
3063 lhash_kv_open, /* xOpen */
3064 lhash_kv_replace, /* xReplace */
3065 lhash_kv_append, /* xAppend */
3066 lhInitCursor, /* xCursorInit */
3067 lhCursorSeek, /* xSeek */
3068 lhCursorFirst, /* xFirst */
3069 lhCursorLast, /* xLast */
3070 lhCursorValid, /* xValid */
3071 lhCursorNext, /* xNext */
3072 lhCursorPrev, /* xPrev */
3073 lhCursorDelete, /* xDelete */
3074 lhCursorKeyLength, /* xKeyLength */
3075 lhCursorKey, /* xKey */
3076 lhCursorDataLength, /* xDataLength */
3077 lhCursorData, /* xData */
3078 lhCursorReset, /* xReset */
3079 0 /* xRelease */
3080 };
3081 return &sDiskStore;
3082}