(module ugarit-core (make-vault vault? vault-storage vault-hash vault-global-directory-rules vault-max-block-size vault-writable? vault-unlinkable? vault-exists? vault-get vault-put! vault-flush! vault-remove-tag! vault-set-tag! vault-tag vault-all-tags vault-lock-tag! vault-unlock-tag! vault-tag-locked? vault-link! vault-unlink! vault-admin! vault-cache vault-file-subsystem vault-snapshot-subsystem vault-archive-subsystem vault-cache-updated! vault-cache-flush! vault-format-version vault-conf-alist vault-format-version-set! vault-conf-get vault-conf-set! ensure-table check-vault-writable check-vault-unlinkable make-tag tag? tag-name tag-type tag-key make-event event? event-type event-time event-path event-message make-job job? current-job job-blocks-stored job-bytes-stored job-blocks-skipped job-bytes-skipped job-file-cache-hits job-file-cache-bytes job-event-log job-progress! job-log! job-store-atime? job-store-ctime? job-check-correctness? job-use-rules? job-stats-alist call-with-job-context with-backend-logging reusing virgin vault-log-reuse! vault-store-block! store-sexpr! read-sexpr epochtime->string ensure-table serialise deserialise ) (import scheme) (import chicken) (use srfi-1) (use srfi-4) (use srfi-13) (use srfi-18) (use extras) (use ports) (use files) (use lolevel) (use data-structures) (use miscmacros) (use posix) (use posix-extras) (use matchable) (use regex) (use ugarit-backend) (use sql-de-lite) (use data-structures) ;; ;; LOG EVENTS ;; (define-record-type event (make-event type time path message) event? (type event-type) ; error/warning/note (time event-time) ; timestamp (current-seconds) (path event-path) ; where applicable, #f if not (message event-message)) ; a string (define (make-event* type path message) (let ((now (current-seconds))) (make-event type now path message))) (define-record-printer (event e out) (if (event-path e) (fprintf out "~A: ~A [~A] ~A" (event-type e) (epochtime->string (event-time e)) (event-path e) (event-message e)) (fprintf out "~A: ~A ~A" (event-type e) (epochtime->string (event-time e)) (event-message e)))) ;; ;; THE VAULT ;; (define-record vault config ; The configuration alist storage ; The storage instance we use hash ; the hash function, u8vector+type symbol->hex string compress ; the compressor, u8vector->smaller u8vector decompress ; the decompressor, inverse of the above encrypt ; the encryptor, u8vector -> u8vector decrypt ; the decryptor, inverse of the above cache ; sqlite db storing local cache (setter cache-updates-uncommitted) ; count of updates since last commit ; State for subsystems file-subsystem snapshot-subsystem archive-subsystem global-directory-rules ; top-level directory rules format-version ; loaded from vault (setter conf-alist) ; loaded from vault (setter conf-alist-changed?)) (define-record-printer (vault v out) (fprintf out "#" (vault-storage v) (vault-config v))) ;; The job record is the scope of a "job" performed on the vault, ;; possibly consisting of multiple different API operations. ;; It's placed in a parameter (current-job) and is a repository ;; for per-job configuration, event logging, and stats counters. (define-record job (setter check-correctness?) ; boolean flag (setter store-atime?) ; boolean flag (setter store-ctime?) ; boolean flag (setter use-rules?) ; boolean flag ; Snapshot counters (setter blocks-stored) ; Blocks written to storage (setter bytes-stored) ; Bytes written to storage (setter blocks-skipped) ; Blocks already in storage and reused (not including file cache wins) (setter bytes-skipped) ; Bytes already in storage and reused (not including file cache wins) (setter file-cache-hits) ; count of file cache hits (setter file-cache-bytes) ; count of file cache bytes saved progress-callback ; (lambda (type name size) ...) ; Event log log-event! ; (lambda (event) ...) event-log) ; a queue (see data-structures unit) of event records (define make-initialised-job make-job) (define (make-job log-event! queue-events? #!optional progress-callback) (make-initialised-job #f #f #f #t ; check correctness, store atime, store ctime, use job 0 0 0 0 ; blocks stored, bytes stored, blocks skipped, bytes skipped 0 0 ; file cache hits, file cache bytes progress-callback log-event! (if queue-events? (make-queue) ; Initial empty log #f))) (define *anonymous-job* (make-job #f #f)) (define current-job (make-parameter *anonymous-job*)) (define (call-with-job-context job thunk) (parameterize ((current-job job)) (if #t ;; *show-traces* (thunk) (handle-exceptions exn (job-log! 'error #f (sprintf "~a in ~a" ((condition-property-accessor 'exn 'message "Unknown error") exn) (cons ((condition-property-accessor 'exn 'location (void)) exn) ((condition-property-accessor 'exn 'arguments '()) exn)))) (thunk))))) (define (job-progress! type name files size) (let ((callback (job-progress-callback (current-job)))) (when callback (callback type name files size)))) (define (job-log! type path message) (let ((event (make-event* type path message))) (when (job-event-log (current-job)) (queue-add! (job-event-log (current-job)) event)) (when (job-log-event! (current-job)) ((job-log-event! (current-job)) event))) (void)) (define-syntax-rule (with-backend-logging body ...) (parameterize ((backend-log! (lambda (type message) (job-log! type #f message) (void)))) body ...)) (define (ensure-table conn table createsql) (when (zero? (car (query fetch (sql conn "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?") table))) (exec (sql conn createsql)))) (define (serialise s) (cond ((number? s) s) (else (with-output-to-string (lambda () (write s)))))) (define (deserialise s) (cond ((number? s) s) (else (with-input-from-string s read)))) (define (vault-cache-flush! vault) ((with-backend-logging (storage-flush! (vault-storage vault)))) ; Flush the storage before we commit our cache, for crash safety (exec (sql (vault-cache vault) "commit;")) (exec (sql (vault-cache vault) "begin;")) (set! (vault-cache-updates-uncommitted vault) 0)) (define cache-commit-interval 1000) (define (vault-cache-updated! vault) (inc! (vault-cache-updates-uncommitted vault)) (when (> cache-commit-interval (vault-cache-updates-uncommitted vault)) (vault-cache-flush! vault))) ;; Take a block, and return a compressed and encrypted block (define (wrap-block vault block) ((vault-encrypt vault) ((vault-compress vault) block))) ;; Take a compressed and encrypted block, and recover the original data (define (unwrap-block vault block) ((vault-decompress vault) ((vault-decrypt vault) block))) (define (vault-max-block-size vault) (storage-max-block-size (vault-storage vault))) (define (vault-writable? vault) (storage-writable? (vault-storage vault))) (define (vault-unlinkable? vault) (storage-unlinkable? (vault-storage vault))) (define (check-vault-writable vault) (if (not (vault-writable? vault)) (signal (make-property-condition 'exn 'location 'check-vault-writable 'message "This isn't a writable vault")))) (define (check-vault-unlinkable vault) (if (not (vault-writable? vault)) (signal (make-property-condition 'exn 'location 'check-vault-unlinkable 'message "This isn't an unlinkable vault - it's append-only")))) (define (vault-log-reuse! vault data) (inc! (job-blocks-skipped (current-job))) (inc! (job-bytes-skipped (current-job)) (u8vector-length data))) (define (epochtime->string e) (let ((localtime (seconds->local-time e))) (string-append (string-pad (number->string (+ 1900 (vector-ref localtime 5))) 4 #\0) "-" (string-pad (number->string (+ 1 (vector-ref localtime 4))) 2 #\0) "-" (string-pad (number->string (vector-ref localtime 3)) 2 #\0) " " (string-pad (number->string (vector-ref localtime 2)) 2 #\0) ":" (string-pad (number->string (vector-ref localtime 1)) 2 #\0) ":" (string-pad (number->string (vector-ref localtime 0)) 2 #\0)))) (define (vault-put! vault key data type) (unless (vault-writable? vault) (error 'vault-put! "This isn't a writable vault")) (with-backend-logging ((storage-put! (vault-storage vault)) key (wrap-block vault data) type)) (inc! (job-blocks-stored (current-job))) (inc! (job-bytes-stored (current-job)) (u8vector-length data)) (void)) (define (vault-conf-get vault key default) (let ((result (assq key (vault-conf-alist vault)))) (if result (cdr result) default))) (define (vault-conf-set! vault key value) (let ((new (alist-update! key value (vault-conf-alist vault)))) (set! (vault-conf-alist vault) new) (set! (vault-conf-alist-changed? vault) #t))) (define (vault-flush! vault) ; Write configuration, if it's changed (when (vault-conf-alist-changed? vault) (receive (conf-key conf-reused?) (store-sexpr! vault (cons (vault-format-version vault) (vault-conf-alist vault)) 'ugarit-vault-configuration '()) ; Read old value and unlink the block it references (let ((old-conf (vault-tag vault "#ugarit-vault-configuration"))) (when old-conf (vault-unlink! vault (tag-key old-conf)))) ; Set new value (vault-set-tag! vault (make-tag "#ugarit-vault-configuration" 'ugarit-vault-configuration conf-key)))) (set! (vault-conf-alist-changed? vault) #f) ;; vault-cache-flush also flushes the backend (vault-cache-flush! vault)) (define (vault-exists? vault key) (with-backend-logging ((storage-exists? (vault-storage vault)) key))) (define (vault-get vault key type) (let* ((raw-data (with-backend-logging ((storage-get (vault-storage vault)) key))) (data (if raw-data (unwrap-block vault raw-data) (error 'vault-get (sprintf "Nonexistant block ~A ~A" key type))))) (unless (string=? key ((vault-hash vault) data type)) (error 'vault-get (sprintf "Consistency check failure: asked for ~A, got ~A" key ((vault-hash vault) data type)))) data)) (define (vault-link! vault key) (unless (vault-writable? vault) (error 'vault-link! "This isn't a writable vault")) (with-backend-logging ((storage-link! (vault-storage vault)) key))) (define (vault-unlink! vault key) (unless (vault-writable? vault) (error 'vault-unlink! "This isn't a writable vault")) (let ((result (with-backend-logging ((storage-unlink! (vault-storage vault)) key)))) (if result (unwrap-block vault result) #f))) (define (vault-admin! vault command) (with-backend-logging ((storage-admin! (vault-storage vault)) command))) (define-record-type tag (make-tag name type key) tag? (name tag-name) (type tag-type) (key tag-key)) (define (vault-set-tag! vault tag) (unless (vault-writable? vault) (error 'vault-set-tag! "This isn't a writable vault")) (with-backend-logging ((storage-set-tag! (vault-storage vault)) (tag-name tag) (with-output-to-string (lambda () (write (cons (tag-type tag) (tag-key tag)))))))) (define (vault-tag vault tag-name) (with-backend-logging (let ((tag-string ((storage-tag (vault-storage vault)) tag-name))) (cond ; No tag found ((not tag-string) #f) ; Tag, starts with ( ((string-prefix? "(" tag-string) (let ((tag-sexpr (with-input-from-string tag-string read))) (make-tag tag-name (car tag-sexpr) (cdr tag-sexpr)))) ; Tag, not starts with (, old-style so fetch block to ; get the type (else (make-tag tag-name (vault-exists? vault tag-string) tag-string)))))) (define (vault-all-tags vault #!optional include-system-tags?) (let ((all-tags (with-backend-logging ((storage-all-tags (vault-storage vault)))))) (if include-system-tags? all-tags (filter! (lambda (tag-name) (not (string-prefix? "#" tag-name))) all-tags)))) (define (vault-remove-tag! vault tag) (unless (vault-writable? vault) (error 'vault-remove-tag! "This isn't a writable vault")) (with-backend-logging ((storage-remove-tag! (vault-storage vault)) tag))) (define (vault-lock-tag! vault tag) (unless (vault-writable? vault) (error 'vault-lock-tag! "This isn't a writable vault")) (let loop ((tries-left 10)) (if (zero? tries-left) (signal (make-property-condition 'exn 'location 'vault-lock-tag! 'message (sprintf "We timed out attempting to lock the tag '~A'" tag))) (let ((result (with-backend-logging ((storage-lock-tag! (vault-storage vault)) tag)))) (if result result ; Lock got! (begin (thread-sleep! 1) (loop (- tries-left 1)))))))) (define (vault-tag-locked? vault tag) (if (vault-writable? vault) (with-backend-logging ((storage-tag-locked? (vault-storage vault)) tag)) #f)) (define (vault-unlock-tag! vault tag) (unless (vault-writable? vault) (error 'vault-unlock-tag! "This isn't a writable vault")) (with-backend-logging ((storage-unlock-tag! (vault-storage vault)) tag))) ;; ;; CORE ALGORITHMS ;; ;; Philosophy: insertion routines ;; Insertion routines insert an object into the vault, correctly ;; managing reference counts. In order to do this, they all return ;; two values: the key the object went in under, and a boolean flag ;; that is true if the object was already in the vault. This is so ;; that a parent object that calls that function can construct its ;; data block from the supplied child keys, then do an exists? check ;; to see if it already exists in the vault itself, if all of its ;; children were already in the vault. If it was, then it in turn ;; can just return the key and #t But if not, then it can link! every ;; child that WAS already in the vault, and then put! its own value ;; into the vault and return that with #f Thus, the reference counts ;; are maintained correctly. (define (reusing hash) ; (printf "REUSING: ~A\n" hash) hash) (define (virgin hash) ; (printf "CREATED: ~A\n" hash) hash) ;; BLOCKS OF RAW DATA THAT CANNOT CONTAIN CHILD KEYS ;; We never have any child keys to link!, so the not-reused case is simple. (define (vault-store-block! vault data type) (check-vault-writable vault) (let ((hash ((vault-hash vault) data type))) (if (vault-exists? vault hash) (begin (vault-log-reuse! vault data) (values (reusing hash) #t)) (begin (vault-put! vault hash data type) (values (virgin hash) #f))))) ;; SINGLE SEXPRS ;; A sexpr in a block. Simple, really. ;; Given an sexpr, a type and a list of (key . reused?) pairs, returns a key and a reused? flag. (define (store-sexpr! vault sexpr type keys) (let* ((data (blob->u8vector/shared (string->blob (with-output-to-string (lambda () (write sexpr)))))) (hash ((vault-hash vault) data type))) (if (vault-exists? vault hash) (begin (vault-log-reuse! vault data) (values (reusing hash) #t)) (begin (for-each (lambda (key) (if (cdr key) ; reused? (vault-link! vault (car key)))) keys) (vault-put! vault hash data type) (values (virgin hash) #f))))) (define (read-sexpr vault key type) (let ((data (vault-get vault key type))) (with-input-from-string (blob->string (u8vector->blob/shared data)) (lambda () (read))))) (define (job-stats-alist job) (list (cons 'blocks-stored (job-blocks-stored job)) (cons 'bytes-stored (job-bytes-stored job)) (cons 'blocks-skipped (job-blocks-skipped job)) (cons 'bytes-skipped (job-bytes-skipped job)) (cons 'file-cache-hits (job-file-cache-hits job)) (cons 'file-cache-bytes (job-file-cache-bytes job)))) )