(module ugarit-streams (make-key-stream-writer* key-stream-writer? key-stream-writer-write! key-stream-writer-finish! unlink-key-stream! fold-key-stream make-sexpr-stream-writer* sexpr-stream-writer? sexpr-stream-writer-write! sexpr-stream-writer-finish! unlink-sexpr-stream! fold-sexpr-stream store-job-log!) (import scheme) (import chicken) (use data-structures) (use ugarit-core) (use srfi-4) (use lolevel) (use ports) (use srfi-1) ;; GENERIC STREAMS OF KEYS ;; Both file and directory storage involve storing an arbitrary list of keys, in order ;; to string together a load of data blocks into one. ;; If they all fit into one block, then so be it. Otherwise, we have to split them ;; into blocks then create a higher-level stream of keys to store the keys of those blocks... (define-record key-stream-writer write! ;; Write a single string key to the stream. Accepts the key, and the already-existed boolean for proper reference counting. finish!) ;; Terminate the stream. Returns two values: key of the stream, and an already-existed boolean. (define (copy-string-into-place! u8v offset string string-offs string-len) (move-memory! string u8v (- string-len string-offs) string-offs offset) (void)) (define (serialise-strings! u8v offset strings) (if (null? strings) (void) (begin (let* ((string (blob->u8vector/shared (string->blob (string-append (car strings) "\n")))) (string-len (u8vector-length string))) (copy-string-into-place! u8v (- offset string-len) string 0 string-len) (serialise-strings! u8v (- offset string-len) (cdr strings)))))) (define (make-key-stream-writer* vault type) (check-vault-writable vault) (let* ((*key-buffer* '()) (*key-buffer-bytes* 0) (*key-buffer-reused?* #t) (*parent-stream* #f) (next-write-will-overflow? (lambda (key) (assert (< (string-length key) (vault-max-block-size vault))) (> (+ *key-buffer-bytes* (string-length key) 1) (vault-max-block-size vault)))) (flush! (lambda () (let ((keys-serialised (make-u8vector *key-buffer-bytes*))) (serialise-strings! keys-serialised *key-buffer-bytes* (map car *key-buffer*)) (let ((hash ((vault-hash vault) keys-serialised type))) (if (and *key-buffer-reused?* (vault-exists? vault hash)) (begin (set! *key-buffer* '()) (set! *key-buffer-bytes* 0) (set! *key-buffer-reused?* #t) (vault-log-reuse! vault keys-serialised) (values (reusing hash) #t)) ; We, too, are reused (begin ; We are unique and new and precious! (for-each (lambda (x) ; link! all reused children (let ((key (car x)) (reused? (cdr x))) (if reused? (vault-link! vault key)))) *key-buffer*) (vault-put! vault hash keys-serialised type) (set! *key-buffer* '()) (set! *key-buffer-bytes* 0) (set! *key-buffer-reused?* #t) (values (virgin hash) #f))))))) (write! (lambda (key reused?) (if (next-write-will-overflow? key) (let-values (((flush-key flush-reused?) (flush!))) (if (not *parent-stream*) (set! *parent-stream* (make-key-stream-writer* vault type))) ((key-stream-writer-write! *parent-stream*) flush-key flush-reused?))) ;; What happens if the same key comes up twice, eh? (set! *key-buffer* (cons (cons key reused?) *key-buffer*)) (set! *key-buffer-reused?* (and *key-buffer-reused?* reused?)) (set! *key-buffer-bytes* (+ *key-buffer-bytes* (string-length key) 1)) (void))) (finish! (lambda () (cond (*parent-stream* (begin (if (not (null? *key-buffer*)) (let-values (((flush-key flush-reused?) (flush!))) ((key-stream-writer-write! *parent-stream*) flush-key flush-reused?))) ((key-stream-writer-finish! *parent-stream*)))) ((null? *key-buffer*) ; Empty stream (vault-store-block! vault (make-u8vector 0) type)) ((null? (cdr *key-buffer*)) ; Single-element stream (values (caar *key-buffer*) (cdar *key-buffer*))) ; Just return the one element! (else ; More than one key, but not enough to have flushed before (flush!)))))) (make-key-stream-writer write! finish!))) (define (deserialise-key-stream block) ; Convert a key stream block to a list of key strings (string-split (blob->string (u8vector->blob/shared block)) "\n")) ;; kons is called on (key type accumulator) for every key in the stream, in order (define (fold-key-stream vault key ks-type kons knil) (let ((type (vault-exists? vault key))) (if (eq? ks-type type) ; Recurse (begin (let ((subkeys (deserialise-key-stream (vault-get vault key type)))) (fold (lambda (subkey acc) (fold-key-stream vault subkey ks-type kons acc)) knil subkeys))) ; Leaf node (kons key type knil)))) ; (child-unlink! vault key type) is called on every child key of a deleted block (define (unlink-key-stream! vault key type child-unlink!) (check-vault-unlinkable vault) (let ((result (vault-unlink! vault key))) (if result ; result is now list of keys, \n separated, to recursively unlink (for-each (lambda (subkey) (let ((child-type (vault-exists? vault subkey))) (if child-type ; The child may not actually exist any more, in which case, job done! (if (eq? child-type type) (unlink-key-stream! vault subkey type child-unlink!) (child-unlink! vault subkey child-type))))) (deserialise-key-stream result))))) ;; GENERIC STREAMS OF S-EXPRESSIONS ;; These are to be used to implement directories ;; But might be useful for other complex structures in future (define-record sexpr-stream-writer write! ;; Write an sexpr to the stream. Second argument is a list of pairs, one per key mentioned in the sexpr, car is the key and cdr is the reused? flag. finish!) ;; Return the key and reused? flag for the whole thing ;; IDEA: Examine this and make-key-stream-writer* ;; and try and merge them to use a common string-stream-writer abstraction ;; if it's worth it. They share a lot, yet also differ a lot. (define (make-sexpr-stream-writer* vault type ks-type) (check-vault-writable vault) (let* ((*sexpr-buffer* '()) ; List of strings (*sexpr-buffer-bytes* 0) ; Bytes used so far (*key-buffer* '()) ; List of key-reused? pairs (*key-buffer-reused?* #t) ; All reused in the buffer so far? (*parent-stream* #f) ; Key stream (flush! (lambda () (let ((serialised-buffer (make-u8vector *sexpr-buffer-bytes*))) (begin (serialise-strings! serialised-buffer *sexpr-buffer-bytes* *sexpr-buffer*) (let ((hash ((vault-hash vault) serialised-buffer type))) (begin (if (job-check-correctness? (current-job)) (if *key-buffer-reused?* (assert (every cdr *key-buffer*) "Key buffer thinks it's all reused, but it isn't:" *key-buffer*) ; else (assert (not (every cdr *key-buffer*)) "Key buffer thinks it's not all reused, but it is:" *key-buffer*))) (if (and *key-buffer-reused?* (vault-exists? vault hash)) (begin (set! *sexpr-buffer* '()) (set! *sexpr-buffer-bytes* 0) (set! *key-buffer* '()) (set! *key-buffer-reused?* #t) (vault-log-reuse! vault serialised-buffer) (values (reusing hash) #t)) ; We, too, are reused (begin ; We are unique and new and precious! (for-each (lambda (x) ; link! all reused children (let ((key (car x)) (reused? (cdr x))) (if reused? (vault-link! vault key)))) *key-buffer*) (vault-put! vault hash serialised-buffer type) (set! *sexpr-buffer* '()) (set! *sexpr-buffer-bytes* 0) (set! *key-buffer* '()) (set! *key-buffer-reused?* #t) (values (virgin hash) #f))))))))) (write! (lambda (sexpr keys) (let* ((sexpr-string (with-output-to-string (lambda () (write sexpr)))) (sexpr-len (string-length sexpr-string))) (assert (< sexpr-len (vault-max-block-size vault))) (if (> (+ *sexpr-buffer-bytes* sexpr-len 1) (vault-max-block-size vault)) (let-values (((flush-key flush-reused?) (flush!))) (if (not *parent-stream*) (set! *parent-stream* (make-key-stream-writer* vault ks-type))) ((key-stream-writer-write! *parent-stream*) flush-key flush-reused?))) (set! *sexpr-buffer* (cons sexpr-string *sexpr-buffer*)) (set! *key-buffer* (append keys *key-buffer*)) (set! *key-buffer-reused?* (and *key-buffer-reused?* (every cdr keys))) (set! *sexpr-buffer-bytes* (+ *sexpr-buffer-bytes* sexpr-len 1)) (void)))) (finish! (lambda () (cond (*parent-stream* (begin (if (not (null? *sexpr-buffer*)) (let-values (((flush-key flush-reused?) (flush!))) ((key-stream-writer-write! *parent-stream*) flush-key flush-reused?))) ((key-stream-writer-finish! *parent-stream*)))) ((null? *sexpr-buffer*) ; Empty stream (vault-store-block! vault (make-u8vector 0) type)) (else ; Some sexprs, but not enough to have flushed before (flush!)))))) (make-sexpr-stream-writer write! finish!))) (define (deserialise-sexpr-stream block) ; Convert a sexpr stream block to a list of sexprs (map (lambda (string) (with-input-from-string string read)) (string-split (blob->string (u8vector->blob/shared block)) "\n"))) (define (fold-sexpr-stream vault key leaf-type ks-type kons knil) (fold-key-stream vault key ks-type (lambda (key found-leaf-type acc) (assert (eq? found-leaf-type leaf-type)) (let ((sexprs (deserialise-sexpr-stream (vault-get vault key found-leaf-type)))) (fold kons acc sexprs))) knil)) (define (unlink-sexpr-stream-block! vault key sexpr-unlink!) (let ((result (vault-unlink! vault key))) (if result (for-each sexpr-unlink! (deserialise-sexpr-stream result))))) (define (unlink-sexpr-stream! vault key leaf-type ks-type sexpr-unlink!) (check-vault-unlinkable vault) (let ((type (vault-exists? vault key))) (cond ((eq? type ks-type) (unlink-key-stream! vault key ks-type (lambda (vault leaf-key found-leaf-type) (assert (eq? found-leaf-type leaf-type)) (unlink-sexpr-stream-block! vault leaf-key sexpr-unlink!)))) ((eq? type leaf-type) (unlink-sexpr-stream-block! vault key sexpr-unlink!)) (else (assert (or (eq? type leaf-type) (eq? type ks-type)) "unlink-sexpr-stream!: Invalid block type" (list 'expected leaf-type ks-type) type))))) ;; A little utility that writes the log of a ugarit job into ;; a sexpr stream. (define (store-job-log! vault job) (let ((ssw (make-sexpr-stream-writer* vault 'f 'fi))) (for-each (lambda (event) ((sexpr-stream-writer-write! ssw) (list (event-type event) (event-time event) (event-path event) (event-message event)) '())) (queue->list (job-event-log job))) ; Returns (values key reused?) ((sexpr-stream-writer-finish! ssw)))) )