(use ugarit-backend) (use sql-de-lite) (use srfi-69) (use matchable) (use regex) (define (backend-fs base) (define (make-name key extension) ; Break into levels to reduce files-in-one-dir strain (cond ((< (string-length key) 4) (string-append base "/" key extension)) ((< (string-length key) 7) (string-append base "/" (string-take key 3) "/" (string-drop key 3) extension)) ((< (string-length key) 10) (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-drop key 6) extension)) (else (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3) "/" (string-drop key 9) extension)))) (define (ensure-directory! key) (let ((ed (lambda (path) (if (not (directory? path)) (create-directory path))))) (if (>= (string-length key) 4) (ed (string-append base "/" (string-take key 3)))) (if (>= (string-length key) 7) (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)))) (if (>= (string-length key) 10) (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3)))) (void))) (define (delete-dir-if-empty! key) (let ((dd (lambda (path) (if (and (directory? path) (null? (directory path))) (delete-directory path))))) (if (>= (string-length key) 10) (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3)))) (if (>= (string-length key) 7) (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)))) (if (>= (string-length key) 4) (dd (string-append base "/" (string-take key 3)))) (void))) (define (make-tag-name tag) (string-append base "/" tag ".tag")) (if (not (directory? base)) (signal (make-property-condition 'exn 'message "The archive does not exist" 'arguments base))) (make-storage (* 1024 1024) ; 1MiB blocks since local disk is fast and cheap #t ; We are writable #t ; We support unlink! (lambda (key data type) ; put! (if (file-read-access? (make-name key ".type")) (signal (make-property-condition 'exn 'message "Duplicate block: put! should not be called on an existing hash" 'arguments (list key type))) (begin (ensure-directory! key) ; Note: We save to ...~ files then mv them into place, so as to avoid ending up with a partial block ; in the archive if it dies in mid-write. We move the .type file in last, since the existance of that is what ; makes the block "official". ; The only thing we need worry about is a race between two snapshots writing the same block at once... ; However, since we can't easily provide atomicity on link!, we just say "don't do that" for now. (with-output-to-file (make-name key ".data~") (lambda () (write-u8vector data))) (with-output-to-file (make-name key ".type~") (lambda () (write type))) (with-output-to-file (make-name key ".refcount~") (lambda () (write 1))) (rename-file (make-name key ".data~") (make-name key ".data")) (rename-file (make-name key ".refcount~") (make-name key ".refcount")) (rename-file (make-name key ".type~") (make-name key ".type")) (void)))) (lambda (key) ; exists? (if (file-read-access? (make-name key ".data")) (with-input-from-file (make-name key ".type") (lambda () (read))) #f)) (lambda (key) ; get (if (file-read-access? (make-name key ".data")) (with-input-from-file (make-name key ".data") (lambda () (read-u8vector))) #f)) (lambda (key) ; link! (if (file-read-access? (make-name key ".data")) (let ((current-refcount (with-input-from-file (make-name key ".refcount") (lambda () (read))))) (begin (with-output-to-file (make-name key ".refcount~") (lambda () (write (+ current-refcount 1)))) (rename-file (make-name key ".refcount~") (make-name key ".refcount")))))) (lambda (key) ; unlink! (and-let* (((file-read-access? (make-name key ".data"))) (current-refcount (with-input-from-file (make-name key ".refcount") (lambda () (read)))) (new-refcount (- current-refcount 1))) (if (zero? new-refcount) (let ((data (with-input-from-file (make-name key ".data") (lambda () (read-u8vector))))) (begin (delete-file (make-name key ".data")) (delete-file (make-name key ".type")) (delete-file (make-name key ".refcount")) (delete-dir-if-empty! key) data)) ; returned in case of deletion (begin (with-output-to-file (make-name key ".refcount~") (lambda () (write new-refcount))) (rename-file (make-name key ".refcount~") (make-name key ".refcount")) #f)))) (lambda (tag key) ; set-tag! (with-output-to-file (make-tag-name tag) (lambda () (write key)))) (lambda (tag) ; tag (if (file-read-access? (make-tag-name tag)) (with-input-from-file (make-tag-name tag) (lambda () (read))) #f)) (lambda () ; all-tags (let ((tag-path-regexp (regexp (make-tag-name "(.*)")))) (map (lambda (path) (cadr (string-match tag-path-regexp path))) (glob (make-tag-name "*"))))) (lambda (tag) ; remove-tag! (if (file-write-access? (make-tag-name tag)) (delete-file (make-tag-name tag)) #f)) (lambda (tag) ; lock-tag! ; (printf "FIXME: Implement lock-tag! in backend-fs.scm\n") #f) (lambda (tag) ; tag-locked? ; (printf "FIXME: Implement tag-locked? in backend-fs.scm\n") #f) (lambda (tag) ; unlock-tag! ; (printf "FIXME: Implement unlock-tag! in backend-fs.scm\n") #f) (lambda () ; close! (void)))) (define splitlog-sql-schema (list "CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT);" "INSERT INTO metadata VALUES ('version','1');" "CREATE TABLE blocks (key TEXT PRIMARY KEY, type TEXT, fileno INTEGER, position INTEGER, length INTEGER);" "CREATE TABLE tags (tag TEXT PRIMARY KEY, key TEXT, locked INTEGER DEFAULT 0);")) (define (backend-splitlog logdir metapath max-logpart-size) (let* ((*db* (let ((db (open-database metapath))) (change-file-mode metapath (bitwise-ior perm/irusr perm/iwusr)) ; Don't think we can do anything about the journal files, though. (when (null? (schema db)) (for-each (lambda (statement) (exec (sql db statement))) splitlog-sql-schema)) (exec (sql db "BEGIN;")) db)) ; Prepared statements (get-metadata-query (sql *db* "SELECT value FROM metadata WHERE key = ?")) (set-metadata-query (sql *db* "INSERT OR REPLACE INTO metadata (key,value) VALUES (?,?)")) (get-block-data-query (sql *db* "SELECT type, fileno, position, length FROM blocks WHERE key = ?")) (set-block-data-query (sql *db* "INSERT INTO blocks (key,type,fileno,position,length) VALUES (?,?,?,?,?)")) (get-tag-query (sql *db* "SELECT key FROM tags WHERE tag = ?")) (set-tag-query (sql *db* "INSERT OR REPLACE INTO tags (tag,key) VALUES (?,?)")) (remove-tag-query (sql *db* "DELETE FROM tags WHERE tag = ?")) (set-tag-lock-query (sql *db* "UPDATE tags SET locked = ? WHERE tag = ?")) (get-tag-lock-query (sql *db* "SELECT locked FROM tags WHERE tag = ?")) (get-tags-query (sql *db* "SELECT tag FROM tags")) ; Database access functions (get-metadata (lambda (key default) (let ((result (query fetch get-metadata-query key))) (if (null? result) (begin (exec set-metadata-query key default) default) (car result))))) (set-metadata (lambda (key value) (exec set-metadata-query key value))) ; Log file management (*logcount* (string->number (get-metadata "current-logfile" "0"))) (set-logcount! (lambda (newcount) (set! *logcount* newcount))) (*log* (file-open (string-append logdir "/log" (number->string *logcount*)) (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr))) (*logfiles* (make-hash-table)) ; hash of file number to FD (get-log (lambda (index) (if (hash-table-exists? *logfiles* index) (hash-table-ref *logfiles* index) (begin (let ((fd (file-open (string-append logdir "/log" (number->string index)) open/rdonly perm/irwxu))) (set! (hash-table-ref *logfiles* index) fd) fd))))) ; Periodic commit management (commit-interval (string->number (get-metadata "commit-interval" "1000"))) (*updates-since-last-commit* 0) (flush! (lambda () (set-metadata "current-logfile" (number->string *logcount*)) (exec (sql *db* "COMMIT;")) (exec (sql *db* "BEGIN;")) (set! *updates-since-last-commit* 0))) (maybe-flush! (lambda () (set! *updates-since-last-commit* (+ *updates-since-last-commit* 1)) (when (> *updates-since-last-commit* commit-interval) (flush!)))) ; Higher-level database utilities (get-block-data (lambda (key) ; Returns #f for nonexistant blocks (let ((bd (query fetch get-block-data-query key))) (if (pair? bd) (let ((type (string->symbol (first bd))) (fileno (second bd)) (position (third bd)) (length (fourth bd))) (list type fileno position length)) #f)))) (set-block-data! (lambda (key type fileno position length) (exec set-block-data-query key (symbol->string type) fileno position length) (maybe-flush!))) (set-tag! (lambda (tag key) (exec set-tag-query tag key) (flush!))) (remove-tag! (lambda (tag) (exec remove-tag-query tag) (flush!))) (get-tag (lambda (tag) (let ((td (query fetch get-tag-query tag))) (if (pair? td) (car td) #f)))) (set-tag-lock! (lambda (tag lock) (exec set-tag-lock-query lock tag) (flush!))) (get-tag-lock (lambda (tag lock) (let ((td (query fetch get-tag-lock-query tag))) (if (pair? td) (car td) #f)))) (get-tags (lambda () (map car (query fetch-all get-tags-query))))) (make-storage (* 1024 1024) ; 1MiB blocks since local disk is fast and cheap, right? #t ; We are writable #f ; We DO NOT support unlink! (lambda (key data type) ; put! (when (pair? (get-block-data key)) (signal (make-property-condition 'exn 'message "Duplicate block: put! should not be called on an existing hash" 'arguments (list key type)))) (set-file-position! *log* 0 seek/end) (let ((header (sprintf "(block ~S ~S ~S)" key type (u8vector-length data))) (posn (file-position *log*))) (if (> posn max-logpart-size) (begin (file-close *log*) (set! posn 0) (set-logcount! (+ *logcount* 1)) (set! *log* (file-open (string-append logdir "/log" (number->string *logcount*)) (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr))))) (file-write *log* header) (file-write *log* (u8vector->blob/shared data)) (set-block-data! key type *logcount* (+ (string-length header) posn) (u8vector-length data)) (void))) (lambda (key) ; exists? (let ((bd (get-block-data key))) (if (pair? bd) (car bd) #f))) (lambda (key) ; get (let* ((entry (get-block-data key))) (if (pair? entry) (let* ((type (first entry)) (index (second entry)) (position (third entry)) (length (fourth entry)) (buffer (make-blob length)) (logpart (get-log index))) (set-file-position! logpart position seek/set) (file-read logpart length buffer) (blob->u8vector/shared buffer)) #f))) (lambda (key) ; link! (void)) (lambda (key) ; unlink! (signal (make-property-condition 'exn 'message "Log archives do not support deletion"))) (lambda (tag key) ; set-tag! (file-write *log* (sprintf "(tag ~S ~S)" tag key)) (set-tag! tag key) (void)) (lambda (tag) ; tag (get-tag tag)) (lambda () ; all-tags (get-tags)) (lambda (tag) ; remove-tag! (remove-tag! tag) (void)) (lambda (tag) ; lock-tag! (set-tag-lock! tag 1) (void)) (lambda (tag) ; tag-locked? (if (zero? (get-tag-lock tag)) #f #t)) (lambda (tag) ; unlock-tag! (set-tag-lock! tag 0)) (lambda () ; close! (flush!) (exec (sql *db* "COMMIT;")) (close-database *db*) (file-close *log*) (hash-table-for-each *logfiles* (lambda (key value) (file-close value))))))) (define backend (match (command-line-arguments) (("fs" base) (backend-fs base)) (("splitlog" logdir metadir max-logpart-size) (backend-splitlog logdir metadir (string->number max-logpart-size))) (else (printf "USAGE:\nbackend-fs fs \nbackend-fs splitlog \n") #f))) (if backend (export-storage! backend))