(module ugarit-backend (make-storage ; Storage records storage? storage-max-block-size storage-writable? storage-unlinkable? storage-put! storage-flush! storage-exists? storage-get storage-link! storage-unlink! storage-set-tag! storage-tag storage-all-tags storage-remove-tag! storage-lock-tag! storage-tag-locked? storage-unlock-tag! storage-admin! storage-close! backend-log! export-storage! ; Export a storage via stdin/stdout export-storage-error! import-storage ; Create a storage from a command line ) (import scheme) (import chicken) (use ports) (use matchable) (use posix) (use srfi-4) (use data-structures) (use miscmacros) ; Backends can call the procedure found in this paramter to log ; things. type should be 'warning, 'error or 'info. message should ; be any string. (define backend-log! (make-parameter (lambda (type message) (error "No backend log handler has been defined")))) (define-record storage max-block-size ; Integer: largest size of block we can store writable? ; Boolean: Can we call put!, link!, unlink!, set-tag!, lock-tag!, unlock-tag!? unlinkable? ; Boolean: Can we call unlink? put! ; Procedure: (put! key data type) - stores the data (u8vector) under the key (string) with the given type tag (symbol) and a refcount of 1. Does nothing of the key is already in use. flush! ; Procedure: (flush!) - all previous changes must be flushed to disk by the time the continuation is applied. exists? ; Procedure: (exists? key) - returns the type of the block with the given key if it exists, or #f otherwise get ; Procedure: (get key) - returns the contents (u8vector) of the block with the given key (string) if it exists, or #f otherwise link! ; Procedure: (link key) - increments the refcount of the block unlink! ; Procedure: (unlink key) - decrements the refcount of the block. If it's now zero, deletes it but returns its value as a u8vector. If not, returns #f. set-tag! ; Procedure: (set-tag! name key) - assigns the given key (string) to the given tag (named with a string). Creates a new tag if the name has not previously been used, otherwise updates an existing tag tag ; Procedure: (tag name) - returns the key assigned to the given tag, or #f if it does not exist. all-tags ; Procedure: (all-tags) - returns a list of all existing tag names remove-tag! ; Procedure: (remove-tag! name) - removes the named tag lock-tag! ; Procedure: (lock-tag! name) - locks the named tag, returning #t if all went well, or #f if it can't be locked. tag-locked? ; Procedure: (tag-locked? name) - returns #t if the tag is locked, #f otherwise unlock-tag! ; Procedure: (unlock-tag! name) - unlocks the named tag admin! ; Procedure: (admin! command) - returns an alist close!) ; Procedure: (close!) - closes the storage engine (define *magic-v1* 'ugarit-backend-protocol-1) (define *magic-v2* 'ugarit-backend-protocol-2) ;; Exporting a storage - taking a storage record and making it available ;; as a backend protocol server (define (describe-exception exn) (sprintf "~a: ~s in ~a" ((condition-property-accessor 'exn 'message "Unknown error") exn) ((condition-property-accessor 'exn 'arguments '()) exn) ((condition-property-accessor 'exn 'location (void)) exn))) ; Return the result of the body, and any logs (define-syntax-rule (with-error-reporting-and-result body ...) (handle-exceptions exn (write (list "error" (describe-exception exn))) (let ((log (make-queue))) (parameterize ((backend-log! (lambda (type message) (queue-add! log (cons type message)) (void)))) (let ((result (begin body ...))) (write (list (queue->list log) result))))))) ; Return the result of the body as a data block, and any logs (define-syntax-rule (with-error-reporting-and-block body ...) (handle-exceptions exn (write (list "error" (describe-exception exn))) (let ((log (make-queue))) (parameterize ((backend-log! (lambda (type message) (queue-add! log (cons type message)) (void)))) (let ((result (begin body ...))) (if result (begin (write (list (queue->list log) (u8vector-length result))) (write-u8vector result)) (write (list (queue->list log) #f)))))))) ; Return any logs (define-syntax-rule (with-error-reporting body ...) (handle-exceptions exn (write (list "error" (describe-exception exn))) (let ((log (make-queue))) (parameterize ((backend-log! (lambda (type message) (queue-add! log (cons type message)) (void)))) (let ((result (begin body ...))) (write (list (queue->list log)))))))) (define (export-storage-error! message) (set-buffering-mode! (current-output-port) #:none) ; Write the error header (write *magic-v2*) (newline) (write (list "error" message))) ;; Given a storage object, provide the storage remote access protocol ;; via current-input-port / current-output-port until the storage is closed ;; via the protocol. (define (export-storage! storage-thunk) (set-buffering-mode! (current-output-port) #:none) ; Write the header (write *magic-v2*) (newline) (let ((storage #f)) (with-error-reporting-and-result ; Initialise and send the header (let ((storage* (storage-thunk))) (set! storage storage*) ; This feels hacky (list (storage-max-block-size storage) (storage-writable? storage) (storage-unlinkable? storage)))) ; Engage command loop (if storage (let loop () (newline) (let ((command (read))) (if (eof-object? command) (begin (with-error-reporting ((storage-close! storage))) (void)) (match command (('put! key type length) (let ((data (read-u8vector length))) (with-error-reporting ((storage-put! storage) key data type))) (loop)) (('flush!) (with-error-reporting ((storage-flush! storage))) (loop)) (('exists? key) (with-error-reporting-and-result ((storage-exists? storage) key)) (loop)) (('get key) (with-error-reporting-and-block ((storage-get storage) key)) (loop)) (('link! key) (with-error-reporting ((storage-link! storage) key)) (loop)) (('unlink! key) (with-error-reporting-and-block ((storage-unlink! storage) key)) (loop)) (('set-tag! name key) (with-error-reporting ((storage-set-tag! storage) name key)) (loop)) (('tag name) (with-error-reporting-and-result ((storage-tag storage) name)) (loop)) (('all-tags) (with-error-reporting-and-result ((storage-all-tags storage))) (loop)) (('remove-tag! name) (with-error-reporting ((storage-remove-tag! storage) name)) (loop)) (('lock-tag! name) (with-error-reporting-and-result ((storage-lock-tag! storage) name)) (loop)) (('tag-locked? name) (with-error-reporting-and-result ((storage-tag-locked? storage) name)) (loop)) (('unlock-tag! name) (with-error-reporting ((storage-unlock-tag! storage) name)) (loop)) (('admin! command) (with-error-reporting-and-result ((storage-admin! storage) command)) (loop)) (('close!) (with-error-reporting ((storage-close! storage))) (void)) (else (write (list "error" (sprintf "Bad command ~s" command))) (loop))))))))) ;; Importing a storage - taking a command line to a backend protocol ;; server and turning it into a storage record (define (protocol-error message backend operation . irritants) (abort (make-composite-condition (make-property-condition 'exn 'message (sprintf "~A:~A:~A" backend operation message) 'irritants irritants) (make-property-condition 'ugarit-backend 'backend backend 'operation operation)))) ;; Rewrite I/O errors in the thunk as protocol errors (define (rewrite-i/o-errors backend operation thunk) (condition-case (thunk) (exn (exn i/o) (protocol-error ((condition-property-accessor 'exn 'message "Unknown error") exn) backend operation)))) (define (read-response-v1 port backend operation) (let ((response (read port))) (match response (("error" err) (protocol-error "Backend protocol error" backend operation err)) (else response)))) (define (read-response-v1-body port backend operation) (let ((response (read-response-v1 port backend operation))) (if response (read-u8vector (car response) port) #f))) (define (import-storage-v1 command-line debug responses commands pid) (let ((header (rewrite-i/o-errors command-line 'read-header (lambda () (read responses))))) (if debug (print "~a: read header" command-line header)) (if (not (list? header)) (protocol-error "Invalid backend protocol header" command-line 'read-header header)) (if (not (= (length header) 3)) (protocol-error "Invalid backend protocol header" command-line 'read-header header)) (let ((max-block-size (car header)) (writable? (cadr header)) (unlinkable? (caddr header))) (make-storage max-block-size writable? unlinkable? (lambda (key data type) ; put! (rewrite-i/o-errors command-line 'put! (lambda () (if debug (printf "~a: put!" command-line)) (write `(put! ,key ,type ,(u8vector-length data)) commands) (write-u8vector data commands) (read-response-v1 responses command-line 'put!) (void)))) (lambda () ; flush! (rewrite-i/o-errors command-line 'flush! (lambda () (if debug (printf "~a: flush!" command-line)) (write `(flush!) commands) (read-response-v1 responses command-line 'flush!) (void)))) (lambda (key) ; exists? (rewrite-i/o-errors command-line 'exists? (lambda () (if debug (printf "~a: exists?" command-line)) (write `(exists? ,key) commands) (read-response-v1 responses command-line 'exists?)))) (lambda (key) ; get (rewrite-i/o-errors command-line 'get (lambda () (if debug (printf "~a: get" command-line)) (write `(get ,key) commands) (read-response-v1-body responses command-line 'get)))) (lambda (key) ; link! (rewrite-i/o-errors command-line 'link! (lambda () (if debug (printf "~a: link!" command-line)) (write `(link! ,key) commands) (read-response-v1 responses command-line 'link!) (void)))) (lambda (key) ; unlink! (rewrite-i/o-errors command-line 'unlink! (lambda () (if debug (printf "~a: unlink! ~s" command-line key)) (write `(unlink! ,key) commands) (read-response-v1-body responses command-line 'unlink!)))) (lambda (name key) ; set-tag! (rewrite-i/o-errors command-line 'set-tag! (lambda () (if debug (printf "~a: set-tag!" command-line)) (write `(set-tag! ,name ,key) commands) (read-response-v1 responses command-line 'set-tag!) (void)))) (lambda (name) ; tag (rewrite-i/o-errors command-line 'tag (lambda () (if debug (printf "~a: tag" command-line)) (write `(tag ,name) commands) (read-response-v1 responses command-line 'tag)))) (lambda () ; all-tags (rewrite-i/o-errors command-line 'all-tags (lambda () (if debug (printf "~a: all-tags" command-line)) (write `(all-tags) commands) (read-response-v1 responses command-line 'all-tags)))) (lambda (name) ; remove-tag! (rewrite-i/o-errors command-line 'remove-tag! (lambda () (if debug (printf "~a: remove-tag!" command-line)) (write `(remove-tag! ,name) commands) (read-response-v1 responses command-line 'remove-tag!) (void)))) (lambda (name) ; lock-tag! (rewrite-i/o-errors command-line 'lock-tag! (lambda () (if debug (printf "~a: lock-tag!" command-line)) (write `(lock-tag! ,name) commands) (read-response-v1 responses command-line 'lock-tag!)))) (lambda (name) ; tag-locked? (rewrite-i/o-errors command-line 'tag-locked? (lambda () (if debug (printf "~a: tag-locked?" command-line)) (write `(tag-locked? ,name) commands) (read-response-v1 responses command-line 'tag-locked?)))) (lambda (name) ; unlock-tag! (rewrite-i/o-errors command-line 'unlock-tag! (lambda () (if debug (printf "~a: unlock-tag!" command-line)) (write `(unlock-tag! ,name) commands) (read-response-v1 responses command-line 'unlock-tag!) (void)))) (lambda (command) ; admin! (protocol-error "Version 1 backends do not support administration" command-line 'admin! command)) (lambda () ; close! (rewrite-i/o-errors command-line 'close! (lambda () (if debug (printf "~a: close!!" command-line)) (write '(close!) commands) (read-response-v1 responses command-line 'close!) (close-input-port responses) (close-output-port commands) (void)))))))) (define (read-response-v2 port backend operation) (let ((response (read port))) (match response (("error" err) (protocol-error (sprintf "Error from backend: ~s" err) backend operation)) ((log) (for-each (lambda (logentry) ((backend-log!) (car logentry) (cdr logentry))) log) (void)) (else (error "Malformed response from backend" response))))) (define (read-response-v2-result port backend operation) (let ((response (read port))) (match response (("error" err) (protocol-error (sprintf "Error from backend: ~s" err) backend operation)) ((log value) (for-each (lambda (logentry) ((backend-log!) (car logentry) (cdr logentry))) log) value) (else (error "Malformed response from backend" response))))) (define (read-response-v2-body port backend operation) (let ((response-length (read-response-v2-result port backend operation))) (if response-length (read-u8vector response-length port) #f))) (define (import-storage-v2 command-line debug responses commands pid) (let ((header (rewrite-i/o-errors command-line 'read-header (lambda () (read-response-v2-result responses command-line 'read-header))))) (if debug (print "~a: read header" command-line header)) (if (not (list? header)) (protocol-error "Invalid backend protocol header" command-line 'read-header header)) (if (not (= (length header) 3)) (protocol-error "Invalid backend protocol header" command-line 'read-header header)) (let ((max-block-size (car header)) (writable? (cadr header)) (unlinkable? (caddr header))) (make-storage max-block-size writable? unlinkable? (lambda (key data type) ; put! (rewrite-i/o-errors command-line 'put! (lambda () (if debug (printf "~a: put!" command-line)) (write `(put! ,key ,type ,(u8vector-length data)) commands) (write-u8vector data commands) (read-response-v2 responses command-line 'put!) (void)))) (lambda () ; flush! (rewrite-i/o-errors command-line 'flush! (lambda () (if debug (printf "~a: flush!" command-line)) (write `(flush!) commands) (read-response-v2 responses command-line 'flush!) (void)))) (lambda (key) ; exists? (rewrite-i/o-errors command-line 'exists? (lambda () (if debug (printf "~a: exists?" command-line)) (write `(exists? ,key) commands) (read-response-v2-result responses command-line 'exists?)))) (lambda (key) ; get (rewrite-i/o-errors command-line 'get (lambda () (if debug (printf "~a: get" command-line)) (write `(get ,key) commands) (read-response-v2-body responses command-line 'get)))) (lambda (key) ; link! (rewrite-i/o-errors command-line 'link! (lambda () (if debug (printf "~a: link!" command-line)) (write `(link! ,key) commands) (read-response-v2 responses command-line 'link!) (void)))) (lambda (key) ; unlink! (rewrite-i/o-errors command-line 'unlink! (lambda () (if debug (printf "~a: unlink! ~s" command-line key)) (write `(unlink! ,key) commands) (read-response-v2-body responses command-line 'unlink!)))) (lambda (name key) ; set-tag! (rewrite-i/o-errors command-line 'set-tag! (lambda () (if debug (printf "~a: set-tag!" command-line)) (write `(set-tag! ,name ,key) commands) (read-response-v2 responses command-line 'set-tag!) (void)))) (lambda (name) ; tag (rewrite-i/o-errors command-line 'tag (lambda () (if debug (printf "~a: tag" command-line)) (write `(tag ,name) commands) (read-response-v2-result responses command-line 'tag)))) (lambda () ; all-tags (rewrite-i/o-errors command-line 'all-tags (lambda () (if debug (printf "~a: all-tags" command-line)) (write `(all-tags) commands) (read-response-v2-result responses command-line 'all-tags)))) (lambda (name) ; remove-tag! (rewrite-i/o-errors command-line 'remove-tag! (lambda () (if debug (printf "~a: remove-tag!" command-line)) (write `(remove-tag! ,name) commands) (read-response-v2 responses command-line 'remove-tag!) (void)))) (lambda (name) ; lock-tag! (rewrite-i/o-errors command-line 'lock-tag! (lambda () (if debug (printf "~a: lock-tag!" command-line)) (write `(lock-tag! ,name) commands) (read-response-v2-result responses command-line 'lock-tag!)))) (lambda (name) ; tag-locked? (rewrite-i/o-errors command-line 'tag-locked? (lambda () (if debug (printf "~a: tag-locked?" command-line)) (write `(tag-locked? ,name) commands) (read-response-v2-result responses command-line 'tag-locked?)))) (lambda (name) ; unlock-tag! (rewrite-i/o-errors command-line 'unlock-tag! (lambda () (if debug (printf "~a: unlock-tag!" command-line)) (write `(unlock-tag! ,name) commands) (read-response-v2 responses command-line 'unlock-tag!) (void)))) (lambda (command) ; admin! (rewrite-i/o-errors command-line 'admin! (lambda () (if debug (printf "~a: admin!" command-line)) (write `(admin! ,command) commands) (read-response-v2-result responses command-line 'admin!)))) (lambda () ; close! (rewrite-i/o-errors command-line 'close! (lambda () (if debug (printf "~a: close!!" command-line)) (write '(close!) commands) (read-response-v2 responses command-line 'close!) (close-input-port responses) (close-output-port commands) (void)))))))) ;; Given the command line to a storage remote access protocol server, ;; activate it and return a storage object providing access to the ;; server. (define (import-storage command-line . args) (let-optionals args ((debug #f)) (let-values (((responses commands pid) (process command-line))) #;(set-buffering-mode! commands #:none) (if debug (print "~a: process opened" command-line)) (let ((magic (rewrite-i/o-errors command-line 'read-magic (lambda () (read responses))))) (if debug (print "~a: read magic ~a" command-line magic)) (cond ((equal? magic *magic-v1*) (import-storage-v1 command-line debug responses commands pid)) ((equal? magic *magic-v2*) (import-storage-v2 command-line debug responses commands pid)) (else (protocol-error "Unrecognised backend protocol header magic" command-line 'read-magic magic))))))) )