(use ugarit-backend) (use sql-de-lite) (use srfi-69) (use matchable) (use regex) (use miscmacros) (define (backend-fs base) (define (make-name key extension) ; Break into levels to reduce files-in-one-dir strain (cond ((< (string-length key) 4) (string-append base "/" key extension)) ((< (string-length key) 7) (string-append base "/" (string-take key 3) "/" (string-drop key 3) extension)) ((< (string-length key) 10) (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-drop key 6) extension)) (else (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3) "/" (string-drop key 9) extension)))) (define (ensure-directory! key) (let ((ed (lambda (path) (if (not (directory? path)) (create-directory path))))) (if (>= (string-length key) 4) (ed (string-append base "/" (string-take key 3)))) (if (>= (string-length key) 7) (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)))) (if (>= (string-length key) 10) (ed (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3)))) (void))) (define (delete-dir-if-empty! key) (let ((dd (lambda (path) (if (and (directory? path) (null? (directory path))) (delete-directory path))))) (if (>= (string-length key) 10) (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3) "/" (string-take (string-drop key 6) 3)))) (if (>= (string-length key) 7) (dd (string-append base "/" (string-take key 3) "/" (string-take (string-drop key 3) 3)))) (if (>= (string-length key) 4) (dd (string-append base "/" (string-take key 3)))) (void))) (define (make-tag-name tag) (string-append base "/" tag ".tag")) (define (make-tag-lock-name tag) (string-append base "/" tag ".tag-lock")) (if (not (directory? base)) (error "The archive directory does not exist" base)) (define block-size (* 1024 1024)) (make-storage block-size #t ; We are writable #t ; We support unlink! (lambda (key data type) ; put! (if (file-read-access? (make-name key ".type")) (signal (make-property-condition 'exn 'message "Duplicate block: put! should not be called on an existing hash" 'arguments (list key type))) (begin (ensure-directory! key) ; Note: We save to ...~ files then mv them into place, so as to avoid ending up with a partial block ; in the archive if it dies in mid-write. We move the .type file in last, since the existance of that is what ; makes the block "official". ; The only thing we need worry about is a race between two snapshots writing the same block at once... ; However, since we can't easily provide atomicity on link!, we just say "don't do that" for now. (with-output-to-file (make-name key ".data~") (lambda () (write-u8vector data))) (with-output-to-file (make-name key ".type~") (lambda () (write type))) (with-output-to-file (make-name key ".refcount~") (lambda () (write 1))) (rename-file (make-name key ".data~") (make-name key ".data")) (rename-file (make-name key ".refcount~") (make-name key ".refcount")) (rename-file (make-name key ".type~") (make-name key ".type")) (void)))) (lambda () (void)) ; flush! - a no-op for us (lambda (key) ; exists? (if (file-read-access? (make-name key ".data")) (with-input-from-file (make-name key ".type") (lambda () (read))) #f)) (lambda (key) ; get (if (file-read-access? (make-name key ".data")) (with-input-from-file (make-name key ".data") (lambda () (read-u8vector))) #f)) (lambda (key) ; link! (if (file-read-access? (make-name key ".data")) (let ((current-refcount (with-input-from-file (make-name key ".refcount") (lambda () (read))))) (begin (with-output-to-file (make-name key ".refcount~") (lambda () (write (+ current-refcount 1)))) (rename-file (make-name key ".refcount~") (make-name key ".refcount")))))) (lambda (key) ; unlink! (and-let* (((file-read-access? (make-name key ".data"))) (current-refcount (with-input-from-file (make-name key ".refcount") (lambda () (read)))) (new-refcount (- current-refcount 1))) (if (zero? new-refcount) (let ((data (with-input-from-file (make-name key ".data") (lambda () (read-u8vector))))) (begin (delete-file (make-name key ".data")) (delete-file (make-name key ".type")) (delete-file (make-name key ".refcount")) (delete-dir-if-empty! key) data)) ; returned in case of deletion (begin (with-output-to-file (make-name key ".refcount~") (lambda () (write new-refcount))) (rename-file (make-name key ".refcount~") (make-name key ".refcount")) #f)))) (lambda (tag key) ; set-tag! (with-output-to-file (make-tag-name tag) (lambda () (write key)))) (lambda (tag) ; tag (if (file-read-access? (make-tag-name tag)) (with-input-from-file (make-tag-name tag) (lambda () (let ((key (read))) (if (eof-object? key) #f ; Treat empty file as no tag key)))) #f)) (lambda () ; all-tags (let ((tag-path-regexp (regexp (make-tag-name "(.*)")))) (map (lambda (path) (cadr (string-match tag-path-regexp path))) (glob (make-tag-name "*"))))) (lambda (tag) ; remove-tag! (if (file-write-access? (make-tag-name tag)) (begin (delete-file (make-tag-name tag)) (when (file-exists? (make-tag-lock-name tag)) (delete-file (make-tag-lock-name tag)))) #f)) (lambda (tag) ; lock-tag! ; Ensure tag file exists first, as an empty file if necessary (file-close (file-open (make-tag-name tag) (+ open/wronly open/append open/creat))) (condition-case (begin (file-link (make-tag-name tag) (make-tag-lock-name tag)) #t) ((exn i/o file) #f))) ; If we can't create it for any reason, we haven't got the lock; it'd be nicer to check errno = EEXIST, though, and raise an exception for other errors. (lambda (tag) ; tag-locked? (not (not (file-exists? (make-tag-lock-name tag))))) (lambda (tag) ; unlock-tag! (delete-file (make-tag-lock-name tag)) (void)) (lambda (command) ; admin! (match command (('info) (list (cons 'backend "fs") (cons 'path base) (cons 'block-size block-size) (cons 'writable? #t) (cons 'unlinkable? #t))) (('help) (list (cons 'info "Return information about the archive") (cons 'help "List available admin commands"))) (else (error "Unknown admin command")))) (lambda () ; close! (void)))) (define splitlog-sql-schema (list "CREATE TABLE metadata (key TEXT PRIMARY KEY, value TEXT);" "INSERT INTO metadata VALUES ('version','1');" "CREATE TABLE blocks (key TEXT PRIMARY KEY, type TEXT, fileno INTEGER, position INTEGER, length INTEGER);" "CREATE TABLE tags (tag TEXT PRIMARY KEY, key TEXT, locked INTEGER DEFAULT 0);")) (define file-sync (foreign-lambda int "fsync" int)) (define (backend-splitlog logdir metapath) (let* ((*db* (let ((db (open-database metapath))) (change-file-mode metapath (bitwise-ior perm/irusr perm/iwusr)) ; Don't think we can do anything about the journal files, though. (set-busy-handler! db (busy-timeout 100000)) (when (null? (schema db)) (for-each (lambda (statement) (exec (sql db statement))) splitlog-sql-schema)) (exec (sql db "BEGIN;")) db)) ; Prepared statements (get-metadata-query (sql *db* "SELECT value FROM metadata WHERE key = ?")) (set-metadata-query (sql *db* "INSERT OR REPLACE INTO metadata (key,value) VALUES (?,?)")) (get-block-data-query (sql *db* "SELECT type, fileno, position, length FROM blocks WHERE key = ?")) (set-block-data-query (sql *db* "INSERT INTO blocks (key,type,fileno,position,length) VALUES (?,?,?,?,?)")) (get-tag-query (sql *db* "SELECT key FROM tags WHERE tag = ?")) (set-tag-query (sql *db* "INSERT OR REPLACE INTO tags (tag,key) VALUES (?,?)")) (remove-tag-query (sql *db* "DELETE FROM tags WHERE tag = ?")) (set-tag-lock-query (sql *db* "UPDATE tags SET locked = ? WHERE tag = ?")) (get-tag-lock-query (sql *db* "SELECT locked FROM tags WHERE tag = ?")) (get-tags-query (sql *db* "SELECT tag FROM tags")) ; Database access functions (get-metadata (lambda (key default) (let ((result (query fetch get-metadata-query key))) (if (null? result) (begin (exec set-metadata-query key default) default) (car result))))) (set-metadata! (lambda (key value) (exec set-metadata-query key value))) (max-logpart-size (string->number (get-metadata "max-logpart-size" "600000000"))) ; Log file management (*logcount* (string->number (get-metadata "current-logfile" "0"))) (set-logcount! (lambda (newcount) (set! *logcount* newcount))) (*log* (file-open (string-append logdir "/log" (number->string *logcount*)) (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr))) (*logfiles* (make-hash-table)) ; hash of file number to FD (get-log (lambda (index) (if (hash-table-exists? *logfiles* index) (hash-table-ref *logfiles* index) (begin (let ((fd (file-open (string-append logdir "/log" (number->string index)) open/rdonly perm/irwxu))) (set! (hash-table-ref *logfiles* index) fd) fd))))) ; Basic configurables (block-size (string->number (get-metadata "block-size" "1048576"))) (writable? (not (string=? "0" (get-metadata "writable" "1")))) (check-writable (lambda () (unless writable? (error "This archive is write protected")))) ; Periodic commit management (commit-interval (string->number (get-metadata "commit-interval" "1000"))) (*updates-since-last-commit* 0) (flush! (lambda () (when (> *updates-since-last-commit* 0) (file-sync *log*) (set-metadata! "current-logfile" (number->string *logcount*)) (exec (sql *db* "COMMIT;")) (exec (sql *db* "BEGIN;")) (set! *updates-since-last-commit* 0)))) (maybe-flush! (lambda () (inc! *updates-since-last-commit*) (when (> *updates-since-last-commit* commit-interval) (flush!)))) ; Higher-level database utilities (get-block-data (lambda (key) ; Returns #f for nonexistant blocks (let ((bd (query fetch get-block-data-query key))) (if (pair? bd) (let ((type (string->symbol (first bd))) (fileno (second bd)) (position (third bd)) (length (fourth bd))) (list type fileno position length)) #f)))) (set-block-data! (lambda (key type fileno position length) (exec set-block-data-query key (symbol->string type) fileno position length) (maybe-flush!))) (set-tag! (lambda (tag key) (exec set-tag-query tag key) (flush!))) (remove-tag! (lambda (tag) (exec remove-tag-query tag) (flush!))) (get-tag (lambda (tag) (let ((td (query fetch get-tag-query tag))) (if (pair? td) (if (null? (car td)) ; treat NULL as no tag #f (car td)) #f)))) (set-tag-lock! (lambda (tag lock) (exec set-tag-lock-query lock tag) (flush!))) (get-tag-lock (lambda (tag lock) (let ((td (query fetch get-tag-lock-query tag))) (if (pair? td) (car td) (begin ; Tag does not exist, create it on demand (set-tag! tag '()) ; insert NULL tag record 0))))) (get-tags (lambda () (map car (query fetch-all get-tags-query)))) (reindex! (lambda () (flush!) (exec (sql *db* "DELETE FROM tags;")) (exec (sql *db* "DELETE FROM blocks;")) (let loop-over-logs ((log-number 0)) (let* ((log-file-name (string-append logdir "/log" (number->string log-number)))) (if (file-exists? log-file-name) (begin ((backend-log!) 'info (sprintf "Reading ~a" log-file-name)) (with-input-from-file log-file-name (lambda () (let loop-over-entries () (let* ((entry (read)) (posn (file-position (current-input-port)))) (if (eof-object? entry) (loop-over-logs (+ log-number 1)) (begin (match entry (('block key type length) (set-block-data! key type log-number posn length) (set-file-position! (current-input-port) length seek/cur)) (('tag tag key) (set-tag! tag key)) (else ((backend-log!) 'error "Unknown log entry ~S" entry))) (loop-over-entries))))))))) (void))) (flush!) (void)))) (make-storage block-size writable? #f ; We DO NOT support unlink! (lambda (key data type) ; put! (check-writable) (when (pair? (get-block-data key)) (error "Duplicate block" key type)) (set-file-position! *log* 0 seek/end) (let ((header (sprintf "(block ~S ~S ~S)" key type (u8vector-length data))) (posn (file-position *log*))) (if (and (not (zero? posn)) (> (+ (u8vector-length data) (string-length header) posn) max-logpart-size)) (begin (file-close *log*) (set! posn 0) (set-logcount! (+ *logcount* 1)) (set! *log* (file-open (string-append logdir "/log" (number->string *logcount*)) (bitwise-ior open/creat open/rdwr open/append) (bitwise-ior perm/irusr perm/iwusr))))) (file-write *log* header) (file-write *log* (u8vector->blob/shared data)) (set-block-data! key type *logcount* (+ (string-length header) posn) (u8vector-length data)) (void))) (lambda () ; flush! (flush!) (void)) (lambda (key) ; exists? (let ((bd (get-block-data key))) (if (pair? bd) (car bd) #f))) (lambda (key) ; get (let* ((entry (get-block-data key))) (if (pair? entry) (let* ((type (first entry)) (index (second entry)) (position (third entry)) (length (fourth entry)) (buffer (make-blob length)) (logpart (get-log index))) (set-file-position! logpart position seek/set) (file-read logpart length buffer) (blob->u8vector/shared buffer)) #f))) (lambda (key) ; link! (check-writable) (void)) (lambda (key) ; unlink! (check-writable) (error "splitlog archives do not support unlinkined")) (lambda (tag key) ; set-tag! (check-writable) (file-write *log* (sprintf "(tag ~S ~S)" tag key)) (set-tag! tag key) (void)) (lambda (tag) ; tag (get-tag tag)) (lambda () ; all-tags (get-tags)) (lambda (tag) ; remove-tag! (check-writable) (remove-tag! tag) (void)) (lambda (tag) ; lock-tag! (check-writable) (exec (sql *db* "COMMIT;")) (exec (sql *db* "BEGIN EXCLUSIVE;")) ; There can be ONLY ONE! (let ((existing-lock? (not (zero? (get-tag-lock tag))))) (if existing-lock? (begin #f) (begin (set-tag-lock! tag 1) (flush!) #t)))) (lambda (tag) ; tag-locked? (if (zero? (get-tag-lock tag)) #f #t)) (lambda (tag) ; unlock-tag! (check-writable) (set-tag-lock! tag 0) (flush!)) (lambda (command) ; admin! (match command (('info) (list (cons 'backend "splitlog") (cons 'block-size block-size) (cons 'writable? writable?) (cons 'unlinkable? #f) (cons 'path logdir) (cons 'metadata-file metapath) (cons 'max-logfile-size max-logpart-size) (cons 'currently-writing-to *logcount*) (cons 'commit-interval commit-interval))) (('help) (list (cons 'info "Return information about the archive") (cons 'help "List available admin commands") (cons 'stats "Examine the metadata and report back statistics") (cons 'set-block-size! (sprintf " Sets a new maximum block size (current: ~a)" block-size)) (cons 'set-max-logfile-size! (sprintf " Sets a new maximum logfile size (current: ~a)" max-logpart-size)) (cons 'set-commit-interval! (sprintf " Sets a new commit interval (current: ~a)" commit-interval)) (cons 'write-protect! (sprintf "Disable writing to the archive (currently ~a)" (if writable? "enabled" "disabled"))) (cons 'write-unprotect! (sprintf "Enable writing to the archive (currently ~a)" (if writable? "enabled" "disabled"))) (cons 'reindex! "Rebuild the index in the metadata from scratch by scanning the log (takes a while)"))) (('stats) (let* ((stats (query fetch (sql *db* "SELECT COUNT(*), SUM(length) FROM blocks")))) (list (cons 'blocks (first stats)) (cons 'bytes (second stats))))) (('set-block-size! size) (assert (integer? size)) (set! block-size size) (set-metadata! "block-size" (number->string size)) (list (cons 'result "Done"))) (('set-max-logfile-size! size) (assert (integer? size)) (set! max-logpart-size size) (set-metadata! "max-logpart-size" (number->string size)) (list (cons 'result "Done"))) (('set-commit-interval! cis) (assert (integer? cis)) (set! commit-interval cis) (set-metadata! "commit-interval" (number->string cis)) (list (cons 'result "Done"))) (('write-protect!) (set! writable? #f) (set-metadata! "writable" "0") (list (cons 'result "Done"))) (('write-unprotect!) (set! writable? #f) (set-metadata! "writable" "1") (list (cons 'result "Done"))) (('reindex!) (reindex!) (list (cons 'result "Done"))) (else (error "Unknown admin command")))) (lambda () ; close! (flush!) (exec (sql *db* "COMMIT;")) (close-database *db*) (file-close *log*) (hash-table-for-each *logfiles* (lambda (key value) (file-close value))))))) (define backend (match (command-line-arguments) (("fs" base) (lambda () (backend-fs base))) (("splitlog" logdir metadir) (lambda () (backend-splitlog logdir metadir))) (else (export-storage-error! "Invalid arguments to backend-fs") (printf "USAGE:\nbackend-fs fs \nbackend-fs splitlog \n") #f))) (if backend (export-storage! backend))