(module ugarit-core (open-archive archive? archive-hash archive-global-directory-rules archive-snapshot-blocks-stored archive-snapshot-bytes-stored archive-snapshot-blocks-skipped archive-snapshot-bytes-skipped archive-file-cache-hits archive-file-cache-bytes archive-writable? archive-unlinkable? archive-log! archive-exists? archive-get archive-put! archive-flush! archive-remove-tag! archive-set-tag! archive-tag archive-all-tags archive-lock-tag! archive-unlock-tag! archive-tag-locked? archive-link! archive-unlink! archive-admin! archive-close! archive-store-block! make-key-stream-writer* key-stream-writer? key-stream-writer-write! key-stream-writer-finish! unlink-key-stream! fold-key-stream make-sexpr-stream-writer* sexpr-stream-writer? sexpr-stream-writer-write! sexpr-stream-writer-finish! unlink-sexpr-stream! fold-sexpr-stream store-sexpr! read-sexpr epochtime->string store-file! write-file-contents unlink-file! store-directory! unlink-directory! extract-directory! extract-object! ; FIXME: These two will be useful in future ;verify-directory! ;verify-object! snapshot-directory-tree! tag-snapshot! fold-history fold-archive-node) (import scheme) (import chicken) (use autoload) (define ((deny-autoload module)) (error (sprintf "Autoload does not seem to be working, so optional components from module ~s are not working" module) module)) (define-syntax no-autoload (er-macro-transformer (lambda (expr rename compare) (let ((module (cadr expr)) (procs (cddr expr)) (_begin (rename 'begin)) (_define (rename 'define)) (_deny-autoload (rename 'deny-autoload))) (cons _begin (map (lambda (x) (let ((orig-binding (if (pair? x) (car x) x)) (new-binding (if (pair? x) (cadr x) x))) `(,_define ,new-binding (,_deny-autoload ',module)))) procs)))))) (no-autoload lzma (compress lzma:compress) (decompress lzma:decompress)) (no-autoload z3 z3:encode-buffer z3:decode-buffer) (no-autoload sha2 sha256-primitive sha384-primitive sha512-primitive) (no-autoload aes make-aes128-encryptor make-aes128-decryptor make-aes192-encryptor make-aes192-decryptor make-aes256-encryptor make-aes256-decryptor) (use srfi-1) (use srfi-4) (use srfi-13) (use srfi-18) (use extras) (use ports) (use files) (use lolevel) (use data-structures) (use directory-rules) (use miscmacros) (use posix) (use posix-extras) (use crypto-tools) (use stty) (use matchable) (use regex) (use ugarit-backend) (use sql-de-lite) (use data-structures) (use tiger-hash) (use message-digest) ;; ;; LOG EVENTS ;; (define-record event type ; error/warning/note time ; timestamp (current-seconds) path ; where applicable, #f if not message ; string ) (define (make-event* type path message) (let ((now (current-seconds))) (printf "~A: ~A [~A] ~A\n" type (epochtime->string now) path message) (make-event type now path message))) ;; ;; THE ARCHIVE ;; This thing is becoming a bit of a God Object. Figure out how to ;; refactor it a bit, perhaps? ;; (define-record archive storage ; The storage instance we use check-correctness? ; boolean flag store-atime? ; boolean flag store-ctime? ; boolean flag hash ; the hash function, u8vector+type symbol->hex string compress ; the compressor, u8vector->smaller u8vector decompress ; the decompressor, inverse of the above encrypt ; the encryptor, u8vector -> u8vector decrypt ; the decryptor, inverse of the above global-directory-rules ; top-level directory rules ; Snapshot counters (setter snapshot-blocks-stored) ; Blocks written to storage (setter snapshot-bytes-stored) ; Bytes written to storage (setter snapshot-blocks-skipped) ; Blocks already in storage and reused (not including file cache wins) (setter snapshot-bytes-skipped) ; Bytes already in storage and reused (not including file cache wins) ; File cache file-cache ; sqlite db storing filesystem cache (see store-file! procedure); #f if not enabled file-cache-get-query ; sqlite stored procedure file-cache-set-query ; sqlite stored procedure (setter file-cache-updates-uncommitted) ; count of updates since last commit (setter file-cache-hits) ; count of file cache hits (setter file-cache-bytes) ; count of file cache bytes saved ; Event log event-log ; a queue (see data-structures unit) of event records ) (define (archive-log! archive type path message) (queue-add! (archive-event-log archive) (make-event* type path message)) (void)) (define-syntax-rule (with-backend-logging archive body ...) (parameterize ((backend-log! (lambda (type message) (archive-log! archive type #f message) (void)))) body ...)) (define file-cache-commit-interval 1000) (define (file-cache-put! archive file-path mtime size key) (when (> file-cache-commit-interval (archive-file-cache-updates-uncommitted archive)) ((with-backend-logging archive (storage-flush! (archive-storage archive)))) ; Flush the storage before we commit our cache, for crash safety (exec (sql (archive-file-cache archive) "commit;")) (exec (sql (archive-file-cache archive) "begin;")) (set! (archive-file-cache-updates-uncommitted archive) 0)) (exec (archive-file-cache-set-query archive) file-path mtime size key) (inc! (archive-file-cache-updates-uncommitted archive))) (define (file-cache-get archive file-path mtime size) (let ((data (query fetch (archive-file-cache-get-query archive) file-path mtime size))) (if (pair? data) (car data) #f))) (define (prepend-type-byte b v) (let* ((v-len (u8vector-length v)) (v2 (make-u8vector (+ 1 v-len)))) (set! (u8vector-ref v2 0) b) (move-memory! v v2 v-len 0 1) v2)) (define (choose-compression-function config) (match config (#f (lambda (block) (prepend-type-byte 0 block))) ; No compression (('deflate) (lambda (block) (prepend-type-byte 1 (blob->u8vector/shared (string->blob (z3:encode-buffer (blob->string (u8vector->blob/shared block)))))))) ; deflate compression (('lzma) (lambda (block) (prepend-type-byte 2 (blob->u8vector/shared (lzma:compress (u8vector->blob/shared block)))))) (else (signal (make-property-condition 'exn 'location 'open-archive 'message "Unknown compression type" 'arguments (list config)))))) (define (decompress block) (case (u8vector-ref block 0) ((0) (subu8vector block 1 (u8vector-length block))) ; No compression ((1) (blob->u8vector/shared (string->blob (z3:decode-buffer (blob->string (u8vector->blob/shared (subu8vector block 1 (u8vector-length block)))))))) ; deflate ((2) (blob->u8vector/shared (lzma:decompress (u8vector->blob/shared (subu8vector block 1 (u8vector-length block)))))))) ; lzma #| function hmac (key, message) if (length(key) > blocksize) then key = hash(key) // keys longer than blocksize are shortened end if if (length(key) < blocksize) then key = key ∥ [0x00 * (blocksize - length(key))] // keys shorter than blocksize are zero-padded ('∥' is concatenation) end if o_key_pad = [0x5c * blocksize] ⊕ key // Where blocksize is that of the underlying hash function i_key_pad = [0x36 * blocksize] ⊕ key // Where ⊕ is exclusive or (XOR) return hash(o_key_pad ∥ hash(i_key_pad ∥ message)) // Where '∥' is concatenation end function |# (define (choose-hash-function config) (let ((make-basic-hash (lambda (prim) (lambda (block type) (message-digest-string (prim) (string-append (message-digest-u8vector (prim) block) (symbol->string type)))))) (make-keyed-hash (lambda (prim key) (lambda (block type) (message-digest-string (prim) (string-append key (message-digest-u8vector (prim) block) (symbol->string type))))))) (match config ((or #f ('tiger)) (make-basic-hash tiger192-primitive)) (('tiger key) (make-keyed-hash tiger192-primitive key)) (('sha256) (make-basic-hash sha256-primitive)) (('sha256 key) (make-keyed-hash sha256-primitive key)) (('sha384) (make-basic-hash sha384-primitive)) (('sha384 key) (make-keyed-hash sha384-primitive key)) (('sha512) (make-basic-hash sha512-primitive)) (('sha512 key) (make-keyed-hash sha512-primitive key)) (else (signal (make-property-condition 'exn 'location 'open-archive 'message "Unknown hash algorithm" 'arguments (list config))))))) (define (read-password prompt) (display prompt) (with-stty '(not echo) read-line)) ; Key specs are "hexhexhex" or (number-of-bytes "passphrase") (define (key->blob keyspec) (cond ((string? keyspec) (hexstring->blob keyspec)) ((pair? keyspec) (let* ((get-passphrase (lambda (maybe-passphrase) (if (eq? maybe-passphrase 'prompt) (read-password "Passphrase: ") maybe-passphrase))) (length (car keyspec)) (passphrase (get-passphrase (cadr keyspec))) (key (message-digest-string (sha512-primitive) passphrase 'string))) (if (> length 64) ; 512 bits = 64 bytes (signal (make-property-condition 'exn 'location 'open-archive 'message "Cannot generate a key that large due to a shortage of a big enough hash function (max 64)" 'arguments (list keyspec))) (string->blob (substring/shared key 0 length))))))) (define (choose-crypto-functions config) (match config (#f (values (lambda (block) block) (lambda (block) block))) ; No encryption (('aes keyspec) (let ((key (key->blob keyspec)) (iv (make-blob 16)) ; IV is pseudo-randomly generated based on the blocks we are fed as an entropy source (stir-iv! (lambda (iv block) (move-memory! (string->blob (message-digest-string (tiger192-primitive) (string-append (message-digest-string (tiger192-primitive) block 'string) (blob->string iv)) 'blob)) iv 16)))) ; Generate initial IV from the key and current time (move-memory! (string->blob (message-digest-string (tiger192-primitive) (string-append (blob->string key) (number->string (current-seconds))) 'blob)) iv 16) (let-values (((encryptor decryptor) (case (blob-size key) ((16) (values (make-aes128-encryptor key) (make-aes128-decryptor key))) ((24) (values (make-aes192-encryptor key) (make-aes192-decryptor key))) ((32) (values (make-aes256-encryptor key) (make-aes256-decryptor key))) (else (signal (make-property-condition 'exn 'location 'open-archive 'message "AES keys must be 16, 24, or 32 bytes long" 'arguments (list keyspec))))))) (let ((cbc-encryptor (make-cbc*-encryptor encryptor 16)) (cbc-decryptor (make-cbc*-decryptor decryptor 16))) (values (lambda (block) (stir-iv! iv block) (blob->u8vector/shared (cbc-encryptor (u8vector->blob/shared block) iv))) (lambda (block) (blob->u8vector/shared (cbc-decryptor (u8vector->blob/shared block))))))))) (else (signal (make-property-condition 'exn 'location 'open-archive 'message "Unknown encryption type" 'arguments (list config)))))) ; A config is an sexpr of the form: ; (( )|...) ; Valid keys: ; storage (expression to create a storage backend) ; compression algorithm name ; encryption (algorithm-name "key") ; Valid flags: ; double-check - check correctness lots, even if it costs efficiency (define (open-archive config store-atime? store-ctime?) (let ((*storage* #f) (*compression* #f) (*crypto* #f) (*hash* #f) (*double-check?* #f) (*file-cache* #f) (*global-rules* '()) (setup-log (make-queue))) (for-each (lambda (confentry) (match confentry ('double-check (set! *double-check?* #t)) (('storage command-line) (set! *storage* (parameterize ((backend-log! (lambda (type message) (queue-add! setup-log (make-event* type #f message)) (void)))) (import-storage command-line)))) (('hash . conf) (set! *hash* conf)) (('compression . conf) (set! *compression* conf)) (('encryption . conf) (set! *crypto* conf)) (('file-cache path) (set! *file-cache* (open-database path)) (change-file-mode path (bitwise-ior perm/irusr perm/iwusr)) (set-busy-handler! *file-cache* (busy-timeout 100000)) (when (null? (schema *file-cache*)) (exec (sql *file-cache* "CREATE TABLE files (path TEXT PRIMARY KEY, mtime INTEGER, size INTEGER, key TEXT);"))) (exec (sql *file-cache* "begin;"))) (('rule . conf) (set! *global-rules* (cons conf *global-rules*))) (_ (signal (make-property-condition 'exn 'location 'open-archive 'message "Unknown configuration entry" 'arguments (list confentry)))))) config) (if (not *storage*) (signal (make-property-condition 'exn 'location 'open-archive 'message "No archive storage was specified in the configuration!" 'arguments (list config)))) (let-values (((compress) (choose-compression-function *compression*)) ((hash) (choose-hash-function *hash*)) ((encrypt decrypt) (choose-crypto-functions *crypto*))) (make-archive *storage* *double-check?* store-atime? store-ctime? hash compress decompress encrypt decrypt *global-rules* ; Snapshot counters 0 0 0 0 ; File cache *file-cache* (if *file-cache* (sql *file-cache* "SELECT key FROM files WHERE path = ? AND mtime = ? AND size = ?") #f) (if *file-cache* (sql *file-cache* "INSERT OR REPLACE INTO files (path,mtime,size,key) VALUES (?,?,?,?)") #f) 0 0 0 ; event log setup-log)))) ; Take a block, and return a compressed and encrypted block (define (wrap-block archive block) ((archive-encrypt archive) ((archive-compress archive) block))) ;; Take a compressed and encrypted block, and recover the original data (define (unwrap-block archive block) ((archive-decompress archive) ((archive-decrypt archive) block))) (define (archive-max-block-size archive) (storage-max-block-size (archive-storage archive))) (define (archive-writable? archive) (storage-writable? (archive-storage archive))) (define (archive-unlinkable? archive) (storage-unlinkable? (archive-storage archive))) (define (check-archive-writable archive) (if (not (archive-writable? archive)) (signal (make-property-condition 'exn 'location 'check-archive-writable 'message "This isn't a writable archive")))) (define (check-archive-unlinkable archive) (if (not (archive-writable? archive)) (signal (make-property-condition 'exn 'location 'check-archive-unlinkable 'message "This isn't an unlinkable archive - it's append-only")))) (define (archive-log-reuse! archive data) (inc! (archive-snapshot-blocks-skipped archive)) (inc! (archive-snapshot-bytes-skipped archive) (u8vector-length data))) (define (epochtime->string e) (let ((localtime (seconds->local-time e))) (string-append (string-pad (number->string (+ 1900 (vector-ref localtime 5))) 4 #\0) "-" (string-pad (number->string (+ 1 (vector-ref localtime 4))) 2 #\0) "-" (string-pad (number->string (vector-ref localtime 3)) 2 #\0) " " (string-pad (number->string (vector-ref localtime 2)) 2 #\0) ":" (string-pad (number->string (vector-ref localtime 1)) 2 #\0) ":" (string-pad (number->string (vector-ref localtime 0)) 2 #\0)))) (define (archive-put! archive key data type) (unless (archive-writable? archive) (error 'archive-put! "This isn't a writable archive")) (with-backend-logging archive ((storage-put! (archive-storage archive)) key (wrap-block archive data) type)) (inc! (archive-snapshot-blocks-stored archive)) (inc! (archive-snapshot-bytes-stored archive) (u8vector-length data)) (void)) (define (archive-flush! archive) (with-backend-logging archive ((storage-flush! (archive-storage archive)))) ; Flush the storage first, to ensure crash safety (when (archive-file-cache archive) (exec (sql (archive-file-cache archive) "commit;")) (exec (sql (archive-file-cache archive) "begin;")) (set! (archive-file-cache-updates-uncommitted archive) 0))) (define (archive-exists? archive key) (with-backend-logging archive ((storage-exists? (archive-storage archive)) key))) (define (archive-get archive key type) (let* ((raw-data (with-backend-logging archive ((storage-get (archive-storage archive)) key))) (data (if raw-data (unwrap-block archive raw-data) (error 'archive-get (sprintf "Nonexistant block ~A ~A" key type))))) (unless (string=? key ((archive-hash archive) data type)) (error 'archive-get (sprintf "Consistency check failure: asked for ~A, got ~A" key ((archive-hash archive) data type)))) data)) (define (archive-link! archive key) (unless (archive-writable? archive) (error 'archive-link! "This isn't a writable archive")) (with-backend-logging archive ((storage-link! (archive-storage archive)) key))) (define (archive-unlink! archive key) (unless (archive-writable? archive) (error 'archive-unlink! "This isn't a writable archive")) (let ((result (with-backend-logging archive ((storage-unlink! (archive-storage archive)) key)))) (if result (unwrap-block archive result) #f))) (define (archive-admin! archive command) (with-backend-logging archive ((storage-admin! (archive-storage archive)) command))) (define (archive-set-tag! archive tag key) (unless (archive-writable? archive) (error 'archive-set-tag! "This isn't a writable archive")) (with-backend-logging archive ((storage-set-tag! (archive-storage archive)) tag key))) (define (archive-tag archive tag) (with-backend-logging archive ((storage-tag (archive-storage archive)) tag))) (define (archive-all-tags archive) (with-backend-logging archive ((storage-all-tags (archive-storage archive))))) (define (archive-remove-tag! archive tag) (unless (archive-writable? archive) (error 'archive-remove-tag! "This isn't a writable archive")) (with-backend-logging archive ((storage-remove-tag! (archive-storage archive)) tag))) (define (archive-lock-tag! archive tag) (unless (archive-writable? archive) (error 'archive-lock-tag! "This isn't a writable archive")) (let loop ((tries-left 10)) (if (zero? tries-left) (signal (make-property-condition 'exn 'location 'archive-lock-tag! 'message (sprintf "We timed out attempting to lock the tag '~A'" tag))) (let ((result (with-backend-logging archive ((storage-lock-tag! (archive-storage archive)) tag)))) (if result result ; Lock got! (begin (thread-sleep! 1) (loop (- tries-left 1)))))))) (define (archive-tag-locked? archive tag) (unless (archive-writable? archive) (error 'archive-tag-locked? "This isn't a writable archive")) (with-backend-logging archive ((storage-tag-locked? (archive-storage archive)) tag))) (define (archive-unlock-tag! archive tag) (unless (archive-writable? archive) (error 'archive-unlock-tag! "This isn't a writable archive")) (with-backend-logging archive ((storage-unlock-tag! (archive-storage archive)) tag))) (define (archive-close! archive) (with-backend-logging archive ((storage-close! (archive-storage archive)))) ;; This flushes the backend before we flush the file cache, for crash safety (when (archive-file-cache archive) (exec (sql (archive-file-cache archive) "commit;")) (close-database (archive-file-cache archive))) (void)) ;; ;; CORE ALGORITHMS ;; ;; Philosophy: insertion routines ;; Insertion routines insert an object into the archive, correctly ;; managing reference counts. In order to do this, they all return ;; two values: the key the object went in under, and a boolean flag ;; that is true if the object was already in the archive. This is so ;; that a parent object that calls that function can construct its ;; data block from the supplied child keys, then do an exists? check ;; to see if it already exists in the archive itself, if all of its ;; children were already in the archive. If it was, then it in turn ;; can just return the key and #t But if not, then it can link! every ;; child that WAS already in the archive, and then put! its own value ;; into the archive and return that with #f Thus, the reference counts ;; are maintained correctly. (define (reusing hash) ; (printf "REUSING: ~A\n" hash) hash) (define (virgin hash) ; (printf "CREATED: ~A\n" hash) hash) ;; BLOCKS OF RAW DATA THAT CANNOT CONTAIN CHILD KEYS ;; We never have any child keys to link!, so the not-reused case is simple. (define (archive-store-block! archive data type) (check-archive-writable archive) (let ((hash ((archive-hash archive) data type))) (if (archive-exists? archive hash) (begin (archive-log-reuse! archive data) (values (reusing hash) #t)) (begin (archive-put! archive hash data type) (values (virgin hash) #f))))) ;; GENERIC STREAMS OF KEYS ;; Both file and directory storage involve storing an arbitrary list of keys, in order ;; to string together a load of data blocks into one. ;; If they all fit into one block, then so be it. Otherwise, we have to split them ;; into blocks then create a higher-level stream of keys to store the keys of those blocks... (define-record key-stream-writer write! ;; Write a single string key to the stream. Accepts the key, and the already-existed boolean for proper reference counting. finish!) ;; Terminate the stream. Returns two values: key of the stream, and an already-existed boolean. (define (copy-string-into-place! u8v offset string string-offs string-len) (move-memory! string u8v (- string-len string-offs) string-offs offset) (void)) (define (serialise-strings! u8v offset strings) (if (null? strings) (void) (begin (let* ((string (blob->u8vector/shared (string->blob (string-append (car strings) "\n")))) (string-len (u8vector-length string))) (copy-string-into-place! u8v (- offset string-len) string 0 string-len) (serialise-strings! u8v (- offset string-len) (cdr strings)))))) (define (make-key-stream-writer* archive type) (check-archive-writable archive) (let* ((*key-buffer* '()) (*key-buffer-bytes* 0) (*key-buffer-reused?* #t) (*parent-stream* #f) (next-write-will-overflow? (lambda (key) (assert (< (string-length key) (archive-max-block-size archive))) (> (+ *key-buffer-bytes* (string-length key) 1) (archive-max-block-size archive)))) (flush! (lambda () (let ((keys-serialised (make-u8vector *key-buffer-bytes*))) (serialise-strings! keys-serialised *key-buffer-bytes* (map car *key-buffer*)) (let ((hash ((archive-hash archive) keys-serialised type))) (if (and *key-buffer-reused?* (archive-exists? archive hash)) (begin (set! *key-buffer* '()) (set! *key-buffer-bytes* 0) (set! *key-buffer-reused?* #t) (archive-log-reuse! archive keys-serialised) (values (reusing hash) #t)) ; We, too, are reused (begin ; We are unique and new and precious! (for-each (lambda (x) ; link! all reused children (let ((key (car x)) (reused? (cdr x))) (if reused? (archive-link! archive key)))) *key-buffer*) (archive-put! archive hash keys-serialised type) (set! *key-buffer* '()) (set! *key-buffer-bytes* 0) (set! *key-buffer-reused?* #t) (values (virgin hash) #f))))))) (write! (lambda (key reused?) (if (next-write-will-overflow? key) (let-values (((flush-key flush-reused?) (flush!))) (if (not *parent-stream*) (set! *parent-stream* (make-key-stream-writer* archive type))) ((key-stream-writer-write! *parent-stream*) flush-key flush-reused?))) ;; What happens if the same key comes up twice, eh? (set! *key-buffer* (cons (cons key reused?) *key-buffer*)) (set! *key-buffer-reused?* (and *key-buffer-reused?* reused?)) (set! *key-buffer-bytes* (+ *key-buffer-bytes* (string-length key) 1)) (void))) (finish! (lambda () (cond (*parent-stream* (begin (if (not (null? *key-buffer*)) (let-values (((flush-key flush-reused?) (flush!))) ((key-stream-writer-write! *parent-stream*) flush-key flush-reused?))) ((key-stream-writer-finish! *parent-stream*)))) ((null? *key-buffer*) ; Empty stream (archive-store-block! archive (make-u8vector 0) type)) ((null? (cdr *key-buffer*)) ; Single-element stream (values (caar *key-buffer*) (cdar *key-buffer*))) ; Just return the one element! (else ; More than one key, but not enough to have flushed before (flush!)))))) (make-key-stream-writer write! finish!))) (define (deserialise-key-stream block) ; Convert a key stream block to a list of key strings (string-split (blob->string (u8vector->blob/shared block)) "\n")) ;; kons is called on (key type accumulator) for every key in the stream, in order (define (fold-key-stream archive key ks-type kons knil) (let ((type (archive-exists? archive key))) (if (eq? ks-type type) ; Recurse (begin (let ((subkeys (deserialise-key-stream (archive-get archive key type)))) (fold (lambda (subkey acc) (fold-key-stream archive subkey ks-type kons acc)) knil subkeys))) ; Leaf node (kons key type knil)))) ; (child-unlink! archive key type) is called on every child key of a deleted block (define (unlink-key-stream! archive key type child-unlink!) (check-archive-unlinkable archive) (let ((result (archive-unlink! archive key))) (if result ; result is now list of keys, \n separated, to recursively unlink (for-each (lambda (subkey) (let ((child-type (archive-exists? archive subkey))) (if child-type ; The child may not actually exist any more, in which case, job done! (if (eq? child-type type) (unlink-key-stream! archive subkey type child-unlink!) (child-unlink! archive subkey child-type))))) (deserialise-key-stream result))))) ;; FILE STORAGE ;; Files are stored as either: ;; 1) A direct block of type "f" containing the file data ;; 2) An indirect block of type "fi" that's a keystream of keys of direct or indirect blocks ;; Uses standard input port for the file data ;; Returns key and reused? (define (store-file! archive file-path file-stat) (let* ((store-file-without-caching! (lambda () ;; Actually upload the file ;; FIXME: memory-map the file in 1MB chunks, and copy them into u8vectors? (letrec ((blocksize (archive-max-block-size archive)) (*buffer* (make-u8vector blocksize)) (ksw (make-key-stream-writer* archive 'fi)) (upload-file (lambda () (let ((bytes-read (read-u8vector! blocksize *buffer*))) (if (not (zero? bytes-read)) (let-values (((data-key data-reused?) (archive-store-block! archive (subu8vector *buffer* 0 bytes-read) 'f))) ((key-stream-writer-write! ksw) data-key data-reused?) (upload-file)) ((key-stream-writer-finish! ksw))))))) (upload-file)))) (store-file-and-cache! (lambda (mtime size) (let-values (((key reused?) (store-file-without-caching!))) (file-cache-put! archive file-path mtime size key) (values key reused?))))) (check-archive-writable archive) ;; Firstly, if we have an mtime cache, use it to see if the file is already in the archive ;; The cache is keyed on file paths, and the contents are ;; sexprs of the form (mtime hash) (if (archive-file-cache archive) (let* ((mtime (vector-ref file-stat 8)) ; Should have used and-let* (size (vector-ref file-stat 5)) (cache-result (file-cache-get archive file-path mtime size))) (if (and cache-result (archive-exists? archive cache-result)) (begin (inc! (archive-file-cache-hits archive)) (inc! (archive-file-cache-bytes archive) size) (values cache-result #t)) ; Found in cache! Woot! (store-file-and-cache! mtime size))) ; not in cache (store-file-without-caching!)))) ; no mtime cache ;; Call kons on each u8vector block of the file in turn ;; with an accumulator that starts as knil as a second argument (define (fold-file archive key kons knil) (fold-key-stream archive key 'fi (lambda (key type acc) (kons (archive-get archive key type) acc)) knil)) ;; Write the contents of the file to the standard output port (define (write-file-contents archive key) (fold-file archive key (lambda (block acc) (begin (write-u8vector block) #f)) #f)) (define (unlink-file! archive key) (check-archive-unlinkable archive) (unlink-key-stream! archive key 'fi (lambda (archive key type) (archive-unlink! archive key)))) ;; GENERIC STREAMS OF S-EXPRESSIONS ;; These are to be used to implement directories ;; But might be useful for other complex structures in future (define-record sexpr-stream-writer write! ;; Write an sexpr to the stream. Second argument is a list of pairs, one per key mentioned in the sexpr, car is the key and cdr is the reused? flag. finish!) ;; Return the key and reused? flag for the whole thing ;; FIXME: Examine this and make-key-stream-writer* ;; and try and merge them to use a common string-stream-writer abstraction ;; if it's worth it. They share a lot, yet also differ a lot. (define (make-sexpr-stream-writer* archive type ks-type) (check-archive-writable archive) (let* ((*sexpr-buffer* '()) ; List of strings (*sexpr-buffer-bytes* 0) ; Bytes used so far (*key-buffer* '()) ; List of key-reused? pairs (*key-buffer-reused?* #t) ; All reused in the buffer so far? (*parent-stream* #f) ; Key stream (flush! (lambda () (let ((serialised-buffer (make-u8vector *sexpr-buffer-bytes*))) (begin (serialise-strings! serialised-buffer *sexpr-buffer-bytes* *sexpr-buffer*) (let ((hash ((archive-hash archive) serialised-buffer type))) (begin (if (archive-check-correctness? archive) (if *key-buffer-reused?* (assert (every cdr *key-buffer*) "Key buffer thinks it's all reused, but it isn't:" *key-buffer*) ; else (assert (not (every cdr *key-buffer*)) "Key buffer thinks it's not all reused, but it is:" *key-buffer*))) (if (and *key-buffer-reused?* (archive-exists? archive hash)) (begin (set! *sexpr-buffer* '()) (set! *sexpr-buffer-bytes* 0) (set! *key-buffer* '()) (set! *key-buffer-reused?* #t) (archive-log-reuse! archive serialised-buffer) (values (reusing hash) #t)) ; We, too, are reused (begin ; We are unique and new and precious! (for-each (lambda (x) ; link! all reused children (let ((key (car x)) (reused? (cdr x))) (if reused? (archive-link! archive key)))) *key-buffer*) (archive-put! archive hash serialised-buffer type) (set! *sexpr-buffer* '()) (set! *sexpr-buffer-bytes* 0) (set! *key-buffer* '()) (set! *key-buffer-reused?* #t) (values (virgin hash) #f))))))))) (write! (lambda (sexpr keys) (let* ((sexpr-string (with-output-to-string (lambda () (write sexpr)))) (sexpr-len (string-length sexpr-string))) (assert (< sexpr-len (archive-max-block-size archive))) (if (> (+ *sexpr-buffer-bytes* sexpr-len 1) (archive-max-block-size archive)) (let-values (((flush-key flush-reused?) (flush!))) (if (not *parent-stream*) (set! *parent-stream* (make-key-stream-writer* archive ks-type))) ((key-stream-writer-write! *parent-stream*) flush-key flush-reused?))) (set! *sexpr-buffer* (cons sexpr-string *sexpr-buffer*)) (set! *key-buffer* (append keys *key-buffer*)) (set! *key-buffer-reused?* (and *key-buffer-reused?* (every cdr keys))) (set! *sexpr-buffer-bytes* (+ *sexpr-buffer-bytes* sexpr-len 1)) (void)))) (finish! (lambda () (cond (*parent-stream* (begin (if (not (null? *sexpr-buffer*)) (let-values (((flush-key flush-reused?) (flush!))) ((key-stream-writer-write! *parent-stream*) flush-key flush-reused?))) ((key-stream-writer-finish! *parent-stream*)))) ((null? *sexpr-buffer*) ; Empty stream (archive-store-block! archive (make-u8vector 0) type)) (else ; Some sexprs, but not enough to have flushed before (flush!)))))) (make-sexpr-stream-writer write! finish!))) (define (deserialise-sexpr-stream block) ; Convert a sexpr stream block to a list of sexprs (map (lambda (string) (with-input-from-string string read)) (string-split (blob->string (u8vector->blob/shared block)) "\n"))) (define (fold-sexpr-stream archive key leaf-type ks-type kons knil) (fold-key-stream archive key ks-type (lambda (key found-leaf-type acc) (assert (eq? found-leaf-type leaf-type)) (let ((sexprs (deserialise-sexpr-stream (archive-get archive key found-leaf-type)))) (fold kons acc sexprs))) knil)) (define (unlink-sexpr-stream-block! archive key sexpr-unlink!) (let ((result (archive-unlink! archive key))) (if result (for-each sexpr-unlink! (deserialise-sexpr-stream result))))) (define (unlink-sexpr-stream! archive key leaf-type ks-type sexpr-unlink!) (check-archive-unlinkable archive) (let ((type (archive-exists? archive key))) (cond ((eq? type ks-type) (unlink-key-stream! archive key ks-type (lambda (archive leaf-key found-leaf-type) (assert (eq? found-leaf-type leaf-type)) (unlink-sexpr-stream-block! archive leaf-key sexpr-unlink!)))) ((eq? type leaf-type) (unlink-sexpr-stream-block! archive key sexpr-unlink!)) (else (assert (or (eq? type leaf-type) (eq? type ks-type)) "unlink-sexpr-stream!: Invalid block type" (list 'expected leaf-type ks-type) type))))) ;; DIRECTORY STORAGE ;; Directories are stored as either; ;; 1) A direct block of type "d" containing a list of file/directory entries, each of which is an s-expr ;; The car of the s-expr is the file name ;; The cadr is a type symbol - file, dir, symlink, chardev, blockdev, fifo, socket ;; The cddr is an alist of other properties ;; Regular files have a 'content entry containing a key, for example. ;; Also look out for 'mode 'uid 'gid 'atime 'mtime 'ctime ;; Symlinks have 'target ;; Directories have 'content, too ;; Files with streams or forks or whatnot can have more than one content key, of course... ;; 2) An indirect block of type "di" that's a keystream of keys to direct or indirect blocks ;; Look for a .ugarit file in the given directory ;; If one is found, return its contents (define (read-local-rules archive path) (let ((conf-file (make-pathname path ".ugarit"))) (if (file-exists? conf-file) (with-input-from-file conf-file read-file) '()))) ;; Do the rules list say to ignore the file? ;; Statements towards the head of the list take priority ;; And we want to accept the most recent 'ignore' or 'include', ;; defaulting to 'include' if neither is found (define (rules-say-ignore rules) (match rules ('() #f) ((('exclude) . _) #t) ((('include) . _) #f) ((_ . more) (rules-say-ignore more)))) ;; Store a directory ;; Returns the usual key and reused? values (define (store-directory! archive path) (call-with-context (read-local-rules archive path) path (lambda () (check-archive-writable archive) (let ((ssw (make-sexpr-stream-writer* archive 'd 'di)) (rules-checker (make-filesystem-object-pattern-checker path))) (for-each (lambda (filename) (handle-exceptions exn (archive-log! archive 'error (make-pathname path filename) (sprintf "Unable to store into the archive (~a)" ((condition-property-accessor 'exn 'message "Unknown error") exn))) (let* ((file-path (make-pathname path filename)) (stats (file-stat file-path #t)) (mode (bitwise-and (vector-ref stats 1) (bitwise-not stat/ifmt))) (uid (vector-ref stats 3)) (gid (vector-ref stats 4)) (atime (vector-ref stats 6)) (ctime (vector-ref stats 7)) (mtime (vector-ref stats 8)) (type (bitwise-and (vector-ref stats 1) stat/ifmt)) (standard-file-attributes (list (cons 'mode mode) (cons 'uid uid) (cons 'gid gid) (cons 'mtime mtime))) (file-rules (object-matches filename rules-checker))) (if (archive-store-ctime? archive) (set! standard-file-attributes (cons (cons 'ctime ctime) standard-file-attributes))) (if (archive-store-atime? archive) (set! standard-file-attributes (cons (cons 'atime atime) standard-file-attributes))) (if (not (rules-say-ignore file-rules)) (cond ((eq? type stat/ifsock) (archive-log! archive 'warning file-path "Ignoring a socket")) ((eq? type stat/ifreg) (let-values (((content-key content-reused?) (with-input-from-file file-path (lambda () (store-file! archive file-path stats))))) ((sexpr-stream-writer-write! ssw) (append (list filename 'file (cons 'contents content-key) (cons 'size (vector-ref stats 5))) standard-file-attributes) (list (cons content-key content-reused?))))) ((eq? type stat/ifdir) (let-values (((content-key content-reused?) (store-directory! archive file-path))) ((sexpr-stream-writer-write! ssw) (append (list filename 'dir (cons 'contents content-key)) standard-file-attributes) (list (cons content-key content-reused?))))) ((eq? type stat/iflnk) ((sexpr-stream-writer-write! ssw) (append (list filename 'symlink (cons 'target (read-symbolic-link file-path))) standard-file-attributes) '())) ((eq? type stat/ifblk) (let ((devnum (vector-ref stats 10))) ((sexpr-stream-writer-write! ssw) (append (list filename 'block-device (cons 'number devnum)) standard-file-attributes) '()))) ((eq? type stat/ifchr) (let ((devnum (vector-ref stats 10))) ((sexpr-stream-writer-write! ssw) (append (list filename 'character-device (cons 'number devnum)) standard-file-attributes) '()))) ((eq? type stat/ififo) ((sexpr-stream-writer-write! ssw) (append (list filename 'fifo) standard-file-attributes) '())) (else ; WTF? (archive-log! archive 'error file-path "Unable to store object of unknown type"))))))) (sort! (directory path #t) string>?)) ((sexpr-stream-writer-finish! ssw)))))) (define (unlink-directory! archive key) (check-archive-unlinkable archive) (unlink-sexpr-stream! archive key 'd 'di (lambda (dirent) (let ((type (cadr dirent)) (name (car dirent)) (props (cddr dirent))) (cond ((eq? type 'file) (unlink-file! archive (cdr (assq 'contents props)))) ((eq? type 'dir) (unlink-directory! archive (cdr (assq 'contents props))))))))) (define (set-standard-file-metadata! archive path props) (let ((mode (assq 'mode props)) (uid (assq 'uid props)) (gid (assq 'gid props)) (mtime (assq 'mtime props)) (atime (assq 'atime props))) (if mode (change-file-mode path (cdr mode))) (if (or uid gid) (handle-exceptions exn (archive-log! archive 'warning path "Unable to set the uid/gid") (change-file-owner path (if uid (cdr uid) (current-user-id)) (if gid (cdr gid) (current-group-id))))) (if (or mtime atime) (change-file-times path (if atime (cdr atime) (current-seconds)) (if mtime (cdr mtime) (current-seconds)))) (void))) (define (extract-file! archive props path) (let ((contents-key (cdr (assq 'contents props)))) (with-output-to-file path (lambda () (write-file-contents archive contents-key))) (set-standard-file-metadata! archive path props))) (define (extract-subdirectory! archive props path) (if (not (directory? path)) (create-directory path)) (let ((contents-key (cdr (assq 'contents props)))) (extract-directory! archive contents-key path) (set-standard-file-metadata! archive path props))) (define (extract-symlink! archive props path) (let ((target (cdr (assq 'target props))) (mode (assq 'mode props)) (uid (assq 'uid props)) (gid (assq 'gid props)) (mtime (assq 'mtime props)) (atime (assq 'atime props))) (create-symbolic-link target path) ;; Alas, there is no portable way to set the atime/mtime on a link. ;; I think, somehow, we will manage to live our lives without the atime and mtime on links... (if mode (change-link-mode path (cdr mode))) (if (or uid gid) (handle-exceptions exn (archive-log! archive 'warning path "Unable to set the uid/gid") (change-link-owner path (if uid (cdr uid) (current-user-id)) (if gid (cdr gid) (current-group-id))))))) (define (extract-fifo! archive props path) (create-fifo path) (set-standard-file-metadata! archive path props)) (define (extract-block-device! archive props path) (let ((number (cdr (assq 'number props)))) (handle-exceptions exn (archive-log! archive 'warning path "Unable to recreate block device") (create-special-file path stat/ifblk number) (set-standard-file-metadata! archive path props)))) (define (extract-character-device! archive props path) (let ((number (cdr (assq 'number props)))) (handle-exceptions exn (archive-log! archive 'warning path "Unable to recreate character device") (create-special-file path stat/ifchr number) (set-standard-file-metadata! archive path props)))) (define (extract-object! archive dirent target-path) (let ((type (cadr dirent)) (name (car dirent)) (props (cddr dirent))) (cond ((eq? type 'file) (extract-file! archive props (make-pathname target-path name))) ((eq? type 'dir) (extract-subdirectory! archive props (make-pathname target-path name))) ((eq? type 'symlink) (extract-symlink! archive props (make-pathname target-path name))) ((eq? type 'fifo) (extract-fifo! archive props (make-pathname target-path name))) ((eq? type 'block-device) (extract-block-device! archive props (make-pathname target-path name))) ((eq? type 'character-device) (extract-character-device! archive props (make-pathname target-path name))) (else (archive-log! archive 'error (make-pathname target-path name) (sprintf "Unable to extract an object of unknown type ~A" type)))))) (define (extract-directory! archive key target-path) (fold-sexpr-stream archive key 'd 'di (lambda (dirent acc) (handle-exceptions exn (archive-log! archive 'error (make-pathname target-path (car dirent)) (sprintf "Unable to extract from the archive (~a)" ((condition-property-accessor 'exn 'message "Unknown error") exn))) (extract-object! archive dirent target-path)) (void)) '())) ;; SINGLE SEXPRS ;; A sexpr in a block. Simple, really. ;; Given an sexpr, a type and a list of (key . reused?) pairs, returns a key and a reused? flag. (define (store-sexpr! archive sexpr type keys) (let* ((data (blob->u8vector/shared (string->blob (with-output-to-string (lambda () (write sexpr)))))) (hash ((archive-hash archive) data type))) (if (archive-exists? archive hash) (begin (archive-log-reuse! archive data) (values (reusing hash) #t)) (begin (for-each (lambda (key) (if (cdr key) ; reused? (archive-link! archive (car key)))) keys) (archive-put! archive hash data type) (values (virgin hash) #f))))) (define (read-sexpr archive key type) (let ((data (archive-get archive key type))) (with-input-from-string (blob->string (u8vector->blob/shared data)) (lambda () (read))))) ;; SNAPSHOT STORAGE ;; A snapshot is a single block containing an alist ;; Keys are 'ctime (in seconds since the epoch), ;; 'contents (hash of root directory), ;; 'hostname (name of host snapshotted) ;; 'prefix (prefix of filesystem on host) ;; 'notes (user-supplied notes) ;; 'previous (hash of previous snapshot) ;; 'stats (alist of stats: ;; 'blocks-stored ;; 'bytes-stored ;; 'blocks-skipped ;; 'bytes-skipped ;; 'file-cache-hits ;; 'file-cache-bytes ;; 'log (list of log events, each being a (type timestamp path message) list ;; Returns the snapshot's key. (define (tag-snapshot! archive tag contents-key contents-reused? snapshot-properties) (check-archive-writable archive) (archive-lock-tag! archive tag) ;; Lock BEFORE reading previous state of the tag, to avoid races. (let* ((previous (archive-tag archive tag)) (stats (list (cons 'blocks-stored (archive-snapshot-blocks-stored archive)) (cons 'bytes-stored (archive-snapshot-bytes-stored archive)) (cons 'blocks-skipped (archive-snapshot-blocks-skipped archive)) (cons 'bytes-skipped (archive-snapshot-bytes-skipped archive)) (cons 'file-cache-hits (archive-file-cache-hits archive)) (cons 'file-cache-bytes (archive-file-cache-bytes archive)))) (log (map (lambda (event) (list (event-type event) (event-time event) (event-path event) (event-message event))) (queue->list (archive-event-log archive)))) (snapshot (append (list (cons 'mtime (current-seconds)) (cons 'contents contents-key) (cons 'stats stats) (cons 'log log)) snapshot-properties)) (keys (list ; We do not list the previous snapshot - since we are about to overwrite the tag that points to it, which would be a decrement. (cons contents-key contents-reused?)))) (when previous (set! snapshot (cons (cons 'previous previous) snapshot))) (let-values (((snapshot-key snapshot-reused?) (store-sexpr! archive snapshot 'snapshot keys))) (archive-flush! archive) ; After this point we can be sure that the snapshot and all blocks it refers to are stably stored (archive-set-tag! archive tag snapshot-key) ; Therefore, we can be confident in saving it in a tag. (archive-unlock-tag! archive tag) (when snapshot-reused? ; Rare, but possible; fork a tag then snapshot the same FS state to both at the same second. (archive-link! archive snapshot-key)) snapshot-key))) (define (fold-history archive snapshot-key kons knil) (let ((snapshot (read-sexpr archive snapshot-key 'snapshot))) (if (assq 'previous snapshot) (kons snapshot-key snapshot (fold-history archive (cdr (assq 'previous snapshot)) kons knil)) (kons snapshot-key snapshot knil)))) ;; BRING IT ALL TOGETHER (define (snapshot-directory-tree! archive tag path props) (check-archive-writable archive) (let-values (((root-key root-reused?) (call-with-context-support (archive-global-directory-rules archive) (lambda () (store-directory! archive path))))) (tag-snapshot! archive tag root-key root-reused? (append (list (cons 'hostname (get-host-name)) (cons 'prefix path)) props)))) ; If given '() as the directory-key, makes a list of all tags ; If given '(tag . "tag-name"), makes a list of snapshots of that tag ; If given a key, if that key points to a directory, makes a list of the contents of that directory ; Either way, the list of results are folded into the provided kons and knil functions ; kons is called with three arguments: a directory-key for the object, a directory entry in the usual format, and the accumulator. (define (fold-archive-node archive directory-key kons knil) (cond ((null? directory-key) ; List tags (fold (lambda (tag acc) (kons (cons 'tag tag) (list tag 'tag (cons 'current (archive-tag archive tag)) (cons 'locked (archive-tag-locked? archive tag))) acc)) knil (archive-all-tags archive))) ((and (pair? directory-key) (eq? (car directory-key) 'tag)) ; List a tag's snapshots (let* ((tag (cdr directory-key)) (current (archive-tag archive tag)) (current-contents (read-sexpr archive current 'snapshot))) (kons (cdr (assq 'contents current-contents)) (cons "current" (cons 'snapshot current-contents)) (fold-history archive current (lambda (key snapshot acc) (kons (cdr (assq 'contents snapshot)) (append (list (epochtime->string (cdr (assq 'mtime snapshot))) 'snapshot) snapshot) acc)) knil)))) ((string? directory-key) ; List directory (fold-sexpr-stream archive directory-key 'd 'di (lambda (dirent acc) (let ((name (car dirent)) (type (cadr dirent)) (props (cddr dirent))) (cond ((eq? type 'file) (kons #f dirent acc)) ((eq? type 'dir) (kons (cdr (assq 'contents props)) dirent acc)) ((eq? type 'symlink) (kons #f dirent acc)) (else (kons #f dirent acc))))) knil)))))