summaryrefslogtreecommitdiffstats
path: root/common/unqlite/unqlite_vm.c
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-07 10:08:07 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-11 01:07:08 +0100
commit9ee8378d393778ac67314be7ea8d5bcbaeee9ee0 (patch)
treecf93471a69f9f4bbb4940de55ae134106fcd8380 /common/unqlite/unqlite_vm.c
parentee6f068dff6b15441e553ffbfb2bf8aa97b26f57 (diff)
downloadsink-9ee8378d393778ac67314be7ea8d5bcbaeee9ee0.tar.gz
sink-9ee8378d393778ac67314be7ea8d5bcbaeee9ee0.zip
try out unqlite
Diffstat (limited to 'common/unqlite/unqlite_vm.c')
-rw-r--r--common/unqlite/unqlite_vm.c894
1 files changed, 894 insertions, 0 deletions
diff --git a/common/unqlite/unqlite_vm.c b/common/unqlite/unqlite_vm.c
new file mode 100644
index 0000000..e025a1d
--- /dev/null
+++ b/common/unqlite/unqlite_vm.c
@@ -0,0 +1,894 @@
1/*
2 * Symisc unQLite: An Embeddable NoSQL (Post Modern) Database Engine.
3 * Copyright (C) 2012-2013, Symisc Systems http://unqlite.org/
4 * Version 1.1.6
5 * For information on licensing, redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES
6 * please contact Symisc Systems via:
7 * legal@symisc.net
8 * licensing@symisc.net
9 * contact@symisc.net
10 * or visit:
11 * http://unqlite.org/licensing.html
12 */
13 /* $SymiscID: unqlite_vm.c v1.0 Win7 2013-01-29 23:37 stable <chm@symisc.net> $ */
14#ifndef UNQLITE_AMALGAMATION
15#include "unqliteInt.h"
16#endif
17/* This file deals with low level stuff related to the unQLite Virtual Machine */
18
19/* Record ID as a hash value */
20#define COL_RECORD_HASH(RID) (RID)
21/*
22 * Fetch a record from a given collection.
23 */
24static unqlite_col_record * CollectionCacheFetchRecord(
25 unqlite_col *pCol, /* Target collection */
26 jx9_int64 nId /* Unique record ID */
27 )
28{
29 unqlite_col_record *pEntry;
30 if( pCol->nRec < 1 ){
31 /* Don't bother hashing */
32 return 0;
33 }
34 pEntry = pCol->apRecord[COL_RECORD_HASH(nId) & (pCol->nRecSize - 1)];
35 for(;;){
36 if( pEntry == 0 ){
37 break;
38 }
39 if( pEntry->nId == nId ){
40 /* Record found */
41 return pEntry;
42 }
43 /* Point to the next entry */
44 pEntry = pEntry->pNextCol;
45
46 }
47 /* No such record */
48 return 0;
49}
50/*
51 * Install a freshly created record in a given collection.
52 */
53static int CollectionCacheInstallRecord(
54 unqlite_col *pCol, /* Target collection */
55 jx9_int64 nId, /* Unique record ID */
56 jx9_value *pValue /* JSON value */
57 )
58{
59 unqlite_col_record *pRecord;
60 sxu32 iBucket;
61 /* Fetch the record first */
62 pRecord = CollectionCacheFetchRecord(pCol,nId);
63 if( pRecord ){
64 /* Record already installed, overwrite its old value */
65 jx9MemObjStore(pValue,&pRecord->sValue);
66 return UNQLITE_OK;
67 }
68 /* Allocate a new instance */
69 pRecord = (unqlite_col_record *)SyMemBackendPoolAlloc(&pCol->pVm->sAlloc,sizeof(unqlite_col_record));
70 if( pRecord == 0 ){
71 return UNQLITE_NOMEM;
72 }
73 /* Zero the structure */
74 SyZero(pRecord,sizeof(unqlite_col_record));
75 /* Fill in the structure */
76 jx9MemObjInit(pCol->pVm->pJx9Vm,&pRecord->sValue);
77 jx9MemObjStore(pValue,&pRecord->sValue);
78 pRecord->nId = nId;
79 pRecord->pCol = pCol;
80 /* Install in the corresponding bucket */
81 iBucket = COL_RECORD_HASH(nId) & (pCol->nRecSize - 1);
82 pRecord->pNextCol = pCol->apRecord[iBucket];
83 if( pCol->apRecord[iBucket] ){
84 pCol->apRecord[iBucket]->pPrevCol = pRecord;
85 }
86 pCol->apRecord[iBucket] = pRecord;
87 /* Link */
88 MACRO_LD_PUSH(pCol->pList,pRecord);
89 pCol->nRec++;
90 if( (pCol->nRec >= pCol->nRecSize * 3) && pCol->nRec < 100000 ){
91 /* Allocate a new larger table */
92 sxu32 nNewSize = pCol->nRecSize << 1;
93 unqlite_col_record *pEntry;
94 unqlite_col_record **apNew;
95 sxu32 n;
96
97 apNew = (unqlite_col_record **)SyMemBackendAlloc(&pCol->pVm->sAlloc, nNewSize * sizeof(unqlite_col_record *));
98 if( apNew ){
99 /* Zero the new table */
100 SyZero((void *)apNew, nNewSize * sizeof(unqlite_col_record *));
101 /* Rehash all entries */
102 n = 0;
103 pEntry = pCol->pList;
104 for(;;){
105 /* Loop one */
106 if( n >= pCol->nRec ){
107 break;
108 }
109 pEntry->pNextCol = pEntry->pPrevCol = 0;
110 /* Install in the new bucket */
111 iBucket = COL_RECORD_HASH(pEntry->nId) & (nNewSize - 1);
112 pEntry->pNextCol = apNew[iBucket];
113 if( apNew[iBucket] ){
114 apNew[iBucket]->pPrevCol = pEntry;
115 }
116 apNew[iBucket] = pEntry;
117 /* Point to the next entry */
118 pEntry = pEntry->pNext;
119 n++;
120 }
121 /* Release the old table and reflect the change */
122 SyMemBackendFree(&pCol->pVm->sAlloc,(void *)pCol->apRecord);
123 pCol->apRecord = apNew;
124 pCol->nRecSize = nNewSize;
125 }
126 }
127 /* All done */
128 return UNQLITE_OK;
129}
130/*
131 * Remove a record from the collection table.
132 */
133UNQLITE_PRIVATE int unqliteCollectionCacheRemoveRecord(
134 unqlite_col *pCol, /* Target collection */
135 jx9_int64 nId /* Unique record ID */
136 )
137{
138 unqlite_col_record *pRecord;
139 /* Fetch the record first */
140 pRecord = CollectionCacheFetchRecord(pCol,nId);
141 if( pRecord == 0 ){
142 /* No such record */
143 return UNQLITE_NOTFOUND;
144 }
145 if( pRecord->pPrevCol ){
146 pRecord->pPrevCol->pNextCol = pRecord->pNextCol;
147 }else{
148 sxu32 iBucket = COL_RECORD_HASH(nId) & (pCol->nRecSize - 1);
149 pCol->apRecord[iBucket] = pRecord->pNextCol;
150 }
151 if( pRecord->pNextCol ){
152 pRecord->pNextCol->pPrevCol = pRecord->pPrevCol;
153 }
154 /* Unlink */
155 MACRO_LD_REMOVE(pCol->pList,pRecord);
156 pCol->nRec--;
157 return UNQLITE_OK;
158}
159/*
160 * Discard a collection and its records.
161 */
162static int CollectionCacheRelease(unqlite_col *pCol)
163{
164 unqlite_col_record *pNext,*pRec = pCol->pList;
165 unqlite_vm *pVm = pCol->pVm;
166 sxu32 n;
167 /* Discard all records */
168 for( n = 0 ; n < pCol->nRec ; ++n ){
169 pNext = pRec->pNext;
170 jx9MemObjRelease(&pRec->sValue);
171 SyMemBackendPoolFree(&pVm->sAlloc,(void *)pRec);
172 /* Point to the next record */
173 pRec = pNext;
174 }
175 SyMemBackendFree(&pVm->sAlloc,(void *)pCol->apRecord);
176 pCol->nRec = pCol->nRecSize = 0;
177 pCol->pList = 0;
178 return UNQLITE_OK;
179}
180/*
181 * Install a freshly created collection in the unqlite VM.
182 */
183static int unqliteVmInstallCollection(
184 unqlite_vm *pVm, /* Target VM */
185 unqlite_col *pCol /* Collection to install */
186 )
187{
188 SyString *pName = &pCol->sName;
189 sxu32 iBucket;
190 /* Hash the collection name */
191 pCol->nHash = SyBinHash((const void *)pName->zString,pName->nByte);
192 /* Install it in the corresponding bucket */
193 iBucket = pCol->nHash & (pVm->iColSize - 1);
194 pCol->pNextCol = pVm->apCol[iBucket];
195 if( pVm->apCol[iBucket] ){
196 pVm->apCol[iBucket]->pPrevCol = pCol;
197 }
198 pVm->apCol[iBucket] = pCol;
199 /* Link to the list of active collections */
200 MACRO_LD_PUSH(pVm->pCol,pCol);
201 pVm->iCol++;
202 if( (pVm->iCol >= pVm->iColSize * 4) && pVm->iCol < 10000 ){
203 /* Grow the hashtable */
204 sxu32 nNewSize = pVm->iColSize << 1;
205 unqlite_col *pEntry;
206 unqlite_col **apNew;
207 sxu32 n;
208
209 apNew = (unqlite_col **)SyMemBackendAlloc(&pVm->sAlloc, nNewSize * sizeof(unqlite_col *));
210 if( apNew ){
211 /* Zero the new table */
212 SyZero((void *)apNew, nNewSize * sizeof(unqlite_col *));
213 /* Rehash all entries */
214 n = 0;
215 pEntry = pVm->pCol;
216 for(;;){
217 /* Loop one */
218 if( n >= pVm->iCol ){
219 break;
220 }
221 pEntry->pNextCol = pEntry->pPrevCol = 0;
222 /* Install in the new bucket */
223 iBucket = pEntry->nHash & (nNewSize - 1);
224 pEntry->pNextCol = apNew[iBucket];
225 if( apNew[iBucket] ){
226 apNew[iBucket]->pPrevCol = pEntry;
227 }
228 apNew[iBucket] = pEntry;
229 /* Point to the next entry */
230 pEntry = pEntry->pNext;
231 n++;
232 }
233 /* Release the old table and reflect the change */
234 SyMemBackendFree(&pVm->sAlloc,(void *)pVm->apCol);
235 pVm->apCol = apNew;
236 pVm->iColSize = nNewSize;
237 }
238 }
239 return UNQLITE_OK;
240}
241/*
242 * Fetch a collection from the target VM.
243 */
244static unqlite_col * unqliteVmFetchCollection(
245 unqlite_vm *pVm, /* Target VM */
246 SyString *pName /* Lookup name */
247 )
248{
249 unqlite_col *pCol;
250 sxu32 nHash;
251 if( pVm->iCol < 1 ){
252 /* Don't bother hashing */
253 return 0;
254 }
255 nHash = SyBinHash((const void *)pName->zString,pName->nByte);
256 /* Perform the lookup */
257 pCol = pVm->apCol[nHash & ( pVm->iColSize - 1)];
258 for(;;){
259 if( pCol == 0 ){
260 break;
261 }
262 if( nHash == pCol->nHash && SyStringCmp(pName,&pCol->sName,SyMemcmp) == 0 ){
263 /* Collection found */
264 return pCol;
265 }
266 /* Point to the next entry */
267 pCol = pCol->pNextCol;
268 }
269 /* No such collection */
270 return 0;
271}
272/*
273 * Write and/or alter collection binary header.
274 */
275static int CollectionSetHeader(
276 unqlite_kv_engine *pEngine, /* Underlying KV storage engine */
277 unqlite_col *pCol, /* Target collection */
278 jx9_int64 iRec, /* Last record ID */
279 jx9_int64 iTotal, /* Total number of records in this collection */
280 jx9_value *pSchema /* Collection schema */
281 )
282{
283 SyBlob *pHeader = &pCol->sHeader;
284 unqlite_kv_methods *pMethods;
285 int iWrite = 0;
286 int rc;
287 if( pEngine == 0 ){
288 /* Default storage engine */
289 pEngine = unqlitePagerGetKvEngine(pCol->pVm->pDb);
290 }
291 pMethods = pEngine->pIo->pMethods;
292 if( SyBlobLength(pHeader) < 1 ){
293 Sytm *pCreate = &pCol->sCreation; /* Creation time */
294 unqlite_vfs *pVfs;
295 sxu32 iDos;
296 /* Magic number */
297 rc = SyBlobAppendBig16(pHeader,UNQLITE_COLLECTION_MAGIC);
298 if( rc != UNQLITE_OK ){
299 return rc;
300 }
301 /* Initial record ID */
302 rc = SyBlobAppendBig64(pHeader,0);
303 if( rc != UNQLITE_OK ){
304 return rc;
305 }
306 /* Total records in the collection */
307 rc = SyBlobAppendBig64(pHeader,0);
308 if( rc != UNQLITE_OK ){
309 return rc;
310 }
311 pVfs = (unqlite_vfs *)unqliteExportBuiltinVfs();
312 /* Creation time of the collection */
313 if( pVfs->xCurrentTime ){
314 /* Get the creation time */
315 pVfs->xCurrentTime(pVfs,pCreate);
316 }else{
317 /* Zero the structure */
318 SyZero(pCreate,sizeof(Sytm));
319 }
320 /* Convert to DOS time */
321 SyTimeFormatToDos(pCreate,&iDos);
322 rc = SyBlobAppendBig32(pHeader,iDos);
323 if( rc != UNQLITE_OK ){
324 return rc;
325 }
326 /* Offset to start writing collection schema */
327 pCol->nSchemaOfft = SyBlobLength(pHeader);
328 iWrite = 1;
329 }else{
330 unsigned char *zBinary = (unsigned char *)SyBlobData(pHeader);
331 /* Header update */
332 if( iRec >= 0 ){
333 /* Update record ID */
334 SyBigEndianPack64(&zBinary[2/* Magic number*/],(sxu64)iRec);
335 iWrite = 1;
336 }
337 if( iTotal >= 0 ){
338 /* Total records */
339 SyBigEndianPack64(&zBinary[2/* Magic number*/+8/* Record ID*/],(sxu64)iTotal);
340 iWrite = 1;
341 }
342 if( pSchema ){
343 /* Collection Schema */
344 SyBlobTruncate(pHeader,pCol->nSchemaOfft);
345 /* Encode the schema to FastJson */
346 rc = FastJsonEncode(pSchema,pHeader,0);
347 if( rc != UNQLITE_OK ){
348 return rc;
349 }
350 /* Copy the collection schema */
351 jx9MemObjStore(pSchema,&pCol->sSchema);
352 iWrite = 1;
353 }
354 }
355 if( iWrite ){
356 SyString *pId = &pCol->sName;
357 /* Reflect the disk and/or in-memory image */
358 rc = pMethods->xReplace(pEngine,
359 (const void *)pId->zString,pId->nByte,
360 SyBlobData(pHeader),SyBlobLength(pHeader)
361 );
362 if( rc != UNQLITE_OK ){
363 unqliteGenErrorFormat(pCol->pVm->pDb,
364 "Cannot save collection '%z' header in the underlying storage engine",
365 pId
366 );
367 return rc;
368 }
369 }
370 return UNQLITE_OK;
371}
372/*
373 * Load a binary collection from disk.
374 */
375static int CollectionLoadHeader(unqlite_col *pCol)
376{
377 SyBlob *pHeader = &pCol->sHeader;
378 unsigned char *zRaw,*zEnd;
379 sxu16 nMagic;
380 sxu32 iDos;
381 int rc;
382 SyBlobReset(pHeader);
383 /* Read the binary header */
384 rc = unqlite_kv_cursor_data_callback(pCol->pCursor,unqliteDataConsumer,pHeader);
385 if( rc != UNQLITE_OK ){
386 return rc;
387 }
388 /* Perform a sanity check */
389 if( SyBlobLength(pHeader) < (2 /* magic */ + 8 /* record_id */ + 8 /* total_records */+ 4 /* DOS creation time*/) ){
390 return UNQLITE_CORRUPT;
391 }
392 zRaw = (unsigned char *)SyBlobData(pHeader);
393 zEnd = &zRaw[SyBlobLength(pHeader)];
394 /* Extract the magic number */
395 SyBigEndianUnpack16(zRaw,&nMagic);
396 if( nMagic != UNQLITE_COLLECTION_MAGIC ){
397 return UNQLITE_CORRUPT;
398 }
399 zRaw += 2; /* sizeof(sxu16) */
400 /* Extract the record ID */
401 SyBigEndianUnpack64(zRaw,(sxu64 *)&pCol->nLastid);
402 zRaw += 8; /* sizeof(sxu64) */
403 /* Total records in the collection */
404 SyBigEndianUnpack64(zRaw,(sxu64 *)&pCol->nTotRec);
405 /* Extract the collection creation date (DOS) */
406 zRaw += 8; /* sizeof(sxu64) */
407 SyBigEndianUnpack32(zRaw,&iDos);
408 SyDosTimeFormat(iDos,&pCol->sCreation);
409 zRaw += 4;
410 /* Check for a collection schema */
411 pCol->nSchemaOfft = (sxu32)(zRaw - (unsigned char *)SyBlobData(pHeader));
412 if( zRaw < zEnd ){
413 /* Decode the FastJson value */
414 FastJsonDecode((const void *)zRaw,(sxu32)(zEnd-zRaw),&pCol->sSchema,0,0);
415 }
416 return UNQLITE_OK;
417}
418/*
419 * Load or create a binary collection.
420 */
421static int unqliteVmLoadCollection(
422 unqlite_vm *pVm, /* Target VM */
423 const char *zName, /* Collection name */
424 sxu32 nByte, /* zName length */
425 int iFlag, /* Control flag */
426 unqlite_col **ppOut /* OUT: in-memory collection */
427 )
428{
429 unqlite_kv_methods *pMethods;
430 unqlite_kv_engine *pEngine;
431 unqlite_kv_cursor *pCursor;
432 unqlite *pDb = pVm->pDb;
433 unqlite_col *pCol = 0; /* cc warning */
434 int rc = SXERR_MEM;
435 char *zDup = 0;
436 /* Point to the underlying KV store */
437 pEngine = unqlitePagerGetKvEngine(pVm->pDb);
438 pMethods = pEngine->pIo->pMethods;
439 /* Allocate a new cursor */
440 rc = unqliteInitCursor(pDb,&pCursor);
441 if( rc != UNQLITE_OK ){
442 return rc;
443 }
444 if( (iFlag & UNQLITE_VM_COLLECTION_CREATE) == 0 ){
445 /* Seek to the desired location */
446 rc = pMethods->xSeek(pCursor,(const void *)zName,(unqlite_int64)nByte,UNQLITE_CURSOR_MATCH_EXACT);
447 if( rc != UNQLITE_OK ){
448 unqliteGenErrorFormat(pDb,"Collection '%.*s' not defined in the underlying database",nByte,zName);
449 unqliteReleaseCursor(pDb,pCursor);
450 return rc;
451 }
452 }
453 /* Allocate a new instance */
454 pCol = (unqlite_col *)SyMemBackendPoolAlloc(&pVm->sAlloc,sizeof(unqlite_col));
455 if( pCol == 0 ){
456 unqliteGenOutofMem(pDb);
457 rc = UNQLITE_NOMEM;
458 goto fail;
459 }
460 SyZero(pCol,sizeof(unqlite_col));
461 /* Fill in the structure */
462 SyBlobInit(&pCol->sWorker,&pVm->sAlloc);
463 SyBlobInit(&pCol->sHeader,&pVm->sAlloc);
464 pCol->pVm = pVm;
465 pCol->pCursor = pCursor;
466 /* Duplicate collection name */
467 zDup = SyMemBackendStrDup(&pVm->sAlloc,zName,nByte);
468 if( zDup == 0 ){
469 unqliteGenOutofMem(pDb);
470 rc = UNQLITE_NOMEM;
471 goto fail;
472 }
473 pCol->nRecSize = 64; /* Must be a power of two */
474 pCol->apRecord = (unqlite_col_record **)SyMemBackendAlloc(&pVm->sAlloc,pCol->nRecSize * sizeof(unqlite_col_record *));
475 if( pCol->apRecord == 0 ){
476 unqliteGenOutofMem(pDb);
477 rc = UNQLITE_NOMEM;
478 goto fail;
479 }
480 /* Zero the table */
481 SyZero((void *)pCol->apRecord,pCol->nRecSize * sizeof(unqlite_col_record *));
482 SyStringInitFromBuf(&pCol->sName,zDup,nByte);
483 jx9MemObjInit(pVm->pJx9Vm,&pCol->sSchema);
484 if( iFlag & UNQLITE_VM_COLLECTION_CREATE ){
485 /* Create a new collection */
486 if( pMethods->xReplace == 0 ){
487 /* Read-only KV engine: Generate an error message and return */
488 unqliteGenErrorFormat(pDb,
489 "Cannot create new collection '%z' due to a read-only Key/Value storage engine",
490 &pCol->sName
491 );
492 rc = UNQLITE_ABORT; /* Abort VM execution */
493 goto fail;
494 }
495 /* Write the collection header */
496 rc = CollectionSetHeader(pEngine,pCol,0,0,0);
497 if( rc != UNQLITE_OK ){
498 rc = UNQLITE_ABORT; /* Abort VM execution */
499 goto fail;
500 }
501 }else{
502 /* Read the collection header */
503 rc = CollectionLoadHeader(pCol);
504 if( rc != UNQLITE_OK ){
505 unqliteGenErrorFormat(pDb,"Corrupt collection '%z' header",&pCol->sName);
506 goto fail;
507 }
508 }
509 /* Finally install the collection */
510 unqliteVmInstallCollection(pVm,pCol);
511 /* All done */
512 if( ppOut ){
513 *ppOut = pCol;
514 }
515 return UNQLITE_OK;
516fail:
517 unqliteReleaseCursor(pDb,pCursor);
518 if( zDup ){
519 SyMemBackendFree(&pVm->sAlloc,zDup);
520 }
521 if( pCol ){
522 if( pCol->apRecord ){
523 SyMemBackendFree(&pVm->sAlloc,(void *)pCol->apRecord);
524 }
525 SyBlobRelease(&pCol->sHeader);
526 SyBlobRelease(&pCol->sWorker);
527 jx9MemObjRelease(&pCol->sSchema);
528 SyMemBackendPoolFree(&pVm->sAlloc,pCol);
529 }
530 return rc;
531}
532/*
533 * Fetch a collection.
534 */
535UNQLITE_PRIVATE unqlite_col * unqliteCollectionFetch(
536 unqlite_vm *pVm, /* Target VM */
537 SyString *pName, /* Lookup key */
538 int iFlag /* Control flag */
539 )
540{
541 unqlite_col *pCol = 0; /* cc warning */
542 int rc;
543 /* Check if the collection is already loaded in memory */
544 pCol = unqliteVmFetchCollection(pVm,pName);
545 if( pCol ){
546 /* Already loaded in memory*/
547 return pCol;
548 }
549 if( (iFlag & UNQLITE_VM_AUTO_LOAD) == 0 ){
550 return 0;
551 }
552 /* Ask the storage engine for the collection */
553 rc = unqliteVmLoadCollection(pVm,pName->zString,pName->nByte,0,&pCol);
554 /* Return to the caller */
555 return rc == UNQLITE_OK ? pCol : 0;
556}
557/*
558 * Return the unique ID of the last inserted record.
559 */
560UNQLITE_PRIVATE jx9_int64 unqliteCollectionLastRecordId(unqlite_col *pCol)
561{
562 return pCol->nLastid == 0 ? 0 : (pCol->nLastid - 1);
563}
564/*
565 * Return the current record ID.
566 */
567UNQLITE_PRIVATE jx9_int64 unqliteCollectionCurrentRecordId(unqlite_col *pCol)
568{
569 return pCol->nCurid;
570}
571/*
572 * Return the total number of records in a given collection.
573 */
574UNQLITE_PRIVATE jx9_int64 unqliteCollectionTotalRecords(unqlite_col *pCol)
575{
576 return pCol->nTotRec;
577}
578/*
579 * Reset the record cursor.
580 */
581UNQLITE_PRIVATE void unqliteCollectionResetRecordCursor(unqlite_col *pCol)
582{
583 pCol->nCurid = 0;
584}
585/*
586 * Fetch a record by its unique ID.
587 */
588UNQLITE_PRIVATE int unqliteCollectionFetchRecordById(
589 unqlite_col *pCol, /* Target collection */
590 jx9_int64 nId, /* Unique record ID */
591 jx9_value *pValue /* OUT: record value */
592 )
593{
594 SyBlob *pWorker = &pCol->sWorker;
595 unqlite_col_record *pRec;
596 int rc;
597 jx9_value_null(pValue);
598 /* Perform a cache lookup first */
599 pRec = CollectionCacheFetchRecord(pCol,nId);
600 if( pRec ){
601 /* Copy record value */
602 jx9MemObjStore(&pRec->sValue,pValue);
603 return UNQLITE_OK;
604 }
605 /* Reset the working buffer */
606 SyBlobReset(pWorker);
607 /* Generate the unique ID */
608 SyBlobFormat(pWorker,"%z_%qd",&pCol->sName,nId);
609 /* Reset the cursor */
610 unqlite_kv_cursor_reset(pCol->pCursor);
611 /* Seek the cursor to the desired location */
612 rc = unqlite_kv_cursor_seek(pCol->pCursor,
613 SyBlobData(pWorker),SyBlobLength(pWorker),
614 UNQLITE_CURSOR_MATCH_EXACT
615 );
616 if( rc != UNQLITE_OK ){
617 return rc;
618 }
619 /* Consume the binary JSON */
620 SyBlobReset(pWorker);
621 unqlite_kv_cursor_data_callback(pCol->pCursor,unqliteDataConsumer,pWorker);
622 if( SyBlobLength(pWorker) < 1 ){
623 unqliteGenErrorFormat(pCol->pVm->pDb,
624 "Empty record '%qd'",nId
625 );
626 jx9_value_null(pValue);
627 }else{
628 /* Decode the binary JSON */
629 rc = FastJsonDecode(SyBlobData(pWorker),SyBlobLength(pWorker),pValue,0,0);
630 if( rc == UNQLITE_OK ){
631 /* Install the record in the cache */
632 CollectionCacheInstallRecord(pCol,nId,pValue);
633 }
634 }
635 return rc;
636}
637/*
638 * Fetch the next record from a given collection.
639 */
640UNQLITE_PRIVATE int unqliteCollectionFetchNextRecord(unqlite_col *pCol,jx9_value *pValue)
641{
642 int rc;
643 for(;;){
644 if( pCol->nCurid >= pCol->nLastid ){
645 /* No more records, reset the record cursor ID */
646 pCol->nCurid = 0;
647 /* Return to the caller */
648 return SXERR_EOF;
649 }
650 rc = unqliteCollectionFetchRecordById(pCol,pCol->nCurid,pValue);
651 /* Increment the record ID */
652 pCol->nCurid++;
653 /* Lookup result */
654 if( rc == UNQLITE_OK || rc != UNQLITE_NOTFOUND ){
655 break;
656 }
657 }
658 return rc;
659}
660/*
661 * Create a new collection.
662 */
663UNQLITE_PRIVATE int unqliteCreateCollection(
664 unqlite_vm *pVm, /* Target VM */
665 SyString *pName /* Collection name */
666 )
667{
668 unqlite_col *pCol;
669 int rc;
670 /* Perform a lookup first */
671 pCol = unqliteCollectionFetch(pVm,pName,UNQLITE_VM_AUTO_LOAD);
672 if( pCol ){
673 return UNQLITE_EXISTS;
674 }
675 /* Now, safely create the collection */
676 rc = unqliteVmLoadCollection(pVm,pName->zString,pName->nByte,UNQLITE_VM_COLLECTION_CREATE,0);
677 return rc;
678}
679/*
680 * Set a schema (JSON object) for a given collection.
681 */
682UNQLITE_PRIVATE int unqliteCollectionSetSchema(unqlite_col *pCol,jx9_value *pValue)
683{
684 int rc;
685 if( !jx9_value_is_json_object(pValue) ){
686 /* Must be a JSON object */
687 return SXERR_INVALID;
688 }
689 rc = CollectionSetHeader(0,pCol,-1,-1,pValue);
690 return rc;
691}
692/*
693 * Perform a store operation on a given collection.
694 */
695static int CollectionStore(
696 unqlite_col *pCol, /* Target collection */
697 jx9_value *pValue /* JSON value to be stored */
698 )
699{
700 SyBlob *pWorker = &pCol->sWorker;
701 unqlite_kv_methods *pMethods;
702 unqlite_kv_engine *pEngine;
703 sxu32 nKeyLen;
704 int rc;
705 /* Point to the underlying KV store */
706 pEngine = unqlitePagerGetKvEngine(pCol->pVm->pDb);
707 pMethods = pEngine->pIo->pMethods;
708 if( pCol->nTotRec >= SXI64_HIGH ){
709 /* Collection limit reached. No more records */
710 unqliteGenErrorFormat(pCol->pVm->pDb,
711 "Collection '%z': Records limit reached",
712 &pCol->sName
713 );
714 return UNQLITE_LIMIT;
715 }
716 if( pMethods->xReplace == 0 ){
717 unqliteGenErrorFormat(pCol->pVm->pDb,
718 "Cannot store record into collection '%z' due to a read-only Key/Value storage engine",
719 &pCol->sName
720 );
721 return UNQLITE_READ_ONLY;
722 }
723 /* Reset the working buffer */
724 SyBlobReset(pWorker);
725 if( jx9_value_is_json_object(pValue) ){
726 jx9_value sId;
727 /* If the given type is a JSON object, then add the special __id field */
728 jx9MemObjInitFromInt(pCol->pVm->pJx9Vm,&sId,pCol->nLastid);
729 jx9_array_add_strkey_elem(pValue,"__id",&sId);
730 jx9MemObjRelease(&sId);
731 }
732 /* Prepare the unique ID for this record */
733 SyBlobFormat(pWorker,"%z_%qd",&pCol->sName,pCol->nLastid);
734 nKeyLen = SyBlobLength(pWorker);
735 if( nKeyLen < 1 ){
736 unqliteGenOutofMem(pCol->pVm->pDb);
737 return UNQLITE_NOMEM;
738 }
739 /* Turn to FastJson */
740 rc = FastJsonEncode(pValue,pWorker,0);
741 if( rc != UNQLITE_OK ){
742 return rc;
743 }
744 /* Finally perform the insertion */
745 rc = pMethods->xReplace(
746 pEngine,
747 SyBlobData(pWorker),nKeyLen,
748 SyBlobDataAt(pWorker,nKeyLen),SyBlobLength(pWorker)-nKeyLen
749 );
750 if( rc == UNQLITE_OK ){
751 /* Save the value in the cache */
752 CollectionCacheInstallRecord(pCol,pCol->nLastid,pValue);
753 /* Increment the unique __id */
754 pCol->nLastid++;
755 pCol->nTotRec++;
756 /* Reflect the change */
757 rc = CollectionSetHeader(0,pCol,pCol->nLastid,pCol->nTotRec,0);
758 }
759 if( rc != UNQLITE_OK ){
760 unqliteGenErrorFormat(pCol->pVm->pDb,
761 "IO error while storing record into collection '%z'",
762 &pCol->sName
763 );
764 return rc;
765 }
766 return UNQLITE_OK;
767}
768/*
769 * Array walker callback (Refer to jx9_array_walk()).
770 */
771static int CollectionRecordArrayWalker(jx9_value *pKey,jx9_value *pData,void *pUserData)
772{
773 unqlite_col *pCol = (unqlite_col *)pUserData;
774 int rc;
775 /* Perform the insertion */
776 rc = CollectionStore(pCol,pData);
777 if( rc != UNQLITE_OK ){
778 SXUNUSED(pKey); /* cc warning */
779 }
780 return rc;
781}
782/*
783 * Perform a store operation on a given collection.
784 */
785UNQLITE_PRIVATE int unqliteCollectionPut(unqlite_col *pCol,jx9_value *pValue,int iFlag)
786{
787 int rc;
788 if( !jx9_value_is_json_object(pValue) && jx9_value_is_json_array(pValue) ){
789 /* Iterate over the array and store its members in the collection */
790 rc = jx9_array_walk(pValue,CollectionRecordArrayWalker,pCol);
791 SXUNUSED(iFlag); /* cc warning */
792 }else{
793 rc = CollectionStore(pCol,pValue);
794 }
795 return rc;
796}
797/*
798 * Drop a record from a given collection.
799 */
800UNQLITE_PRIVATE int unqliteCollectionDropRecord(
801 unqlite_col *pCol, /* Target collection */
802 jx9_int64 nId, /* Unique ID of the record to be droped */
803 int wr_header, /* True to alter collection header */
804 int log_err /* True to log error */
805 )
806{
807 SyBlob *pWorker = &pCol->sWorker;
808 int rc;
809 /* Reset the working buffer */
810 SyBlobReset(pWorker);
811 /* Prepare the unique ID for this record */
812 SyBlobFormat(pWorker,"%z_%qd",&pCol->sName,nId);
813 /* Reset the cursor */
814 unqlite_kv_cursor_reset(pCol->pCursor);
815 /* Seek the cursor to the desired location */
816 rc = unqlite_kv_cursor_seek(pCol->pCursor,
817 SyBlobData(pWorker),SyBlobLength(pWorker),
818 UNQLITE_CURSOR_MATCH_EXACT
819 );
820 if( rc != UNQLITE_OK ){
821 return rc;
822 }
823 /* Remove the record from the storage engine */
824 rc = unqlite_kv_cursor_delete_entry(pCol->pCursor);
825 /* Finally, Remove the record from the cache */
826 unqliteCollectionCacheRemoveRecord(pCol,nId);
827 if( rc == UNQLITE_OK ){
828 pCol->nTotRec--;
829 if( wr_header ){
830 /* Relect in the collection header */
831 rc = CollectionSetHeader(0,pCol,-1,pCol->nTotRec,0);
832 }
833 }else if( rc == UNQLITE_NOTIMPLEMENTED ){
834 if( log_err ){
835 unqliteGenErrorFormat(pCol->pVm->pDb,
836 "Cannot delete record from collection '%z' due to a read-only Key/Value storage engine",
837 &pCol->sName
838 );
839 }
840 }
841 return rc;
842}
843/*
844 * Drop a collection from the KV storage engine and the underlying
845 * unqlite VM.
846 */
847UNQLITE_PRIVATE int unqliteDropCollection(unqlite_col *pCol)
848{
849 unqlite_vm *pVm = pCol->pVm;
850 jx9_int64 nId;
851 int rc;
852 /* Reset the cursor */
853 unqlite_kv_cursor_reset(pCol->pCursor);
854 /* Seek the cursor to the desired location */
855 rc = unqlite_kv_cursor_seek(pCol->pCursor,
856 SyStringData(&pCol->sName),SyStringLength(&pCol->sName),
857 UNQLITE_CURSOR_MATCH_EXACT
858 );
859 if( rc == UNQLITE_OK ){
860 /* Remove the record from the storage engine */
861 rc = unqlite_kv_cursor_delete_entry(pCol->pCursor);
862 }
863 if( rc != UNQLITE_OK ){
864 unqliteGenErrorFormat(pCol->pVm->pDb,
865 "Cannot remove collection '%z' due to a read-only Key/Value storage engine",
866 &pCol->sName
867 );
868 return rc;
869 }
870 /* Drop collection records */
871 for( nId = 0 ; nId < pCol->nLastid ; ++nId ){
872 unqliteCollectionDropRecord(pCol,nId,0,0);
873 }
874 /* Cleanup */
875 CollectionCacheRelease(pCol);
876 SyBlobRelease(&pCol->sHeader);
877 SyBlobRelease(&pCol->sWorker);
878 SyMemBackendFree(&pVm->sAlloc,(void *)SyStringData(&pCol->sName));
879 unqliteReleaseCursor(pVm->pDb,pCol->pCursor);
880 /* Unlink */
881 if( pCol->pPrevCol ){
882 pCol->pPrevCol->pNextCol = pCol->pNextCol;
883 }else{
884 sxu32 iBucket = pCol->nHash & (pVm->iColSize - 1);
885 pVm->apCol[iBucket] = pCol->pNextCol;
886 }
887 if( pCol->pNextCol ){
888 pCol->pNextCol->pPrevCol = pCol->pPrevCol;
889 }
890 MACRO_LD_REMOVE(pVm->pCol,pCol);
891 pVm->iCol--;
892 SyMemBackendPoolFree(&pVm->sAlloc,pCol);
893 return UNQLITE_OK;
894}