(module bokbok (passphrase->key open-connection close-connection! request! remote-error? remote-error-message connection? connection-user connection-addr start-server stop-server! wait-until-server-stopped ) (import chicken scheme) (use extras) (use data-structures) (use ports) (use srfi-1) (use srfi-4) (use srfi-13) (use srfi-18) (use srfi-69) (use matchable) (use tweetnacl) (use socket) (use srfi-27) (use moa) ;; from srfi-27 (use entropy-unix) ;; from srfi-27 (use bokbok-packet) ;; FIXME: Try-again-later responses to requests, caused by a ;; configurable concurrent request limit or a special condition ;; type. CLient sleeps and retries (sleep time is listed in the ;; try-again-later response). ;; FIXME: TCP connection handshake to include option to redirect to ;; another server IP:port, controlled by the open handlers ;; open-connection gets all A records and tries until it gets in ;; FIXME: Accept a hostname for open-connection, and look it up ;; FIXME: Support sending creds over a unix-domain socket instead of a username. ;; FIXME: User data field in a connection ;; FIXME: Reduce timeouts to detect gone servers quickly ;; FIXME: Support IPv6 ;; FIXME: Is the 60s receive timeout OK? Connections can't linger. Is ;; that a feature? ;; FIXME: Auto reconnect on request! ;; FIXME: Configurable maximum message size (client sends it in the connection ;; request and server sends its own back, and both ends use min of them, to ;; negotiate) #; (define (debug . args) (let ((str (apply sprintf args))) (printf "DEBUG: ~a\n" str))) (define (debug . args) (void)) (define *header* "BOKBOKBOK") (define (read-packet! i) (let ((l1 (read-byte i))) (if (eof-object? l1) l1 (let ((l2 (read-byte i))) (if (eof-object? l2) l2 (let ((l3 (read-byte i))) (if (eof-object? l3) l3 (let ((len (+ (* 65536 l1) (* 256 l2) l3))) (let ((p (read-string len i))) (if (= (string-length p) len) p (with-input-from-string "" read))))))))))) (define (write-packet-no-flush! o p) (let* ((len (string-length p)) (l1 (remainder (quotient len 65536) 256)) (l2 (remainder (quotient len 256) 256)) (l3 (remainder len 256))) (when (> len 16777216) (error "Cannot send oversized packet, maximum is 16777216 bytes" len)) (write-byte l1 o) (write-byte l2 o) (write-byte l3 o) (write-string p #f o))) (define (write-packet! o p) (write-packet-no-flush! o p) (debug "Flushing packets") (flush-output o)) (define (passphrase->key key-string) (let* ((key-hash (hash key-string)) (key (string->blob (substring/shared key-hash 0 symmetric-box-keybytes)))) (cons (symmetric-box key) (symmetric-unbox key)))) (define (bytes->key key-bytes) (let ((key (string->blob key-bytes))) (cons (symmetric-box key) (symmetric-unbox key)))) (define get-random-bytes (let* ((entropy (make-entropy-source-urandom-device)) (random (make-random-source-moa)) (make-random-u8vector (random-source-make-u8vectors random)) (mutex (make-mutex 'rpc-random-source))) (random-source-entropy-source-set! random entropy) (random-source-randomize! random entropy) (lambda (length) (mutex-lock! mutex) (let ((rb (make-random-u8vector length))) (mutex-unlock! mutex) rb)))) (define (generate-session-key) (let* ((key-u8vector (get-random-bytes symmetric-box-keybytes)) (key (u8vector->blob/shared key-u8vector))) (values (cons (symmetric-box key) (symmetric-unbox key)) (blob->string key)))) (define *nonce-counter* 0) (define *nonce-seed* (blob->string (u8vector->blob/shared (get-random-bytes symmetric-box-noncebytes)))) (define (encrypt key plaintext) (set! *nonce-counter* (+ *nonce-counter* 1)) (let* ((nonce-hash (hash (string-append (number->string *nonce-counter*) *nonce-seed*))) (nonce (substring/shared nonce-hash 0 symmetric-box-noncebytes)) (nonce-u8vector (blob->u8vector/shared (string->blob nonce)))) (string-append nonce ((car key) plaintext nonce-u8vector)))) (define (decrypt key c) (assert (>= (string-length c) symmetric-box-noncebytes)) (let* ((nonce (substring/shared c 0 symmetric-box-noncebytes)) (c-wo-n (substring/shared c symmetric-box-noncebytes)) (nonce-u8vector (blob->u8vector/shared (string->blob nonce))) (plaintext ((cdr key) c-wo-n nonce-u8vector))) plaintext)) ;; Returns #f if key was wrong (define-record-type connection (make-connection* addr user socket input output key mutex thread waiters request-handler close-handler counter) connection? (addr connection-addr) (user connection-user) (socket connection-socket) (input connection-input) (output connection-output) (key connection-key) (mutex connection-mutex) (thread connection-thread) (waiters connection-waiters) (request-handler connection-request-handler) (close-handler connection-close-handler) (counter connection-counter (setter connection-counter))) (define (connection-send! con packet-parts) (let ((packet (if (connection-key con) (encrypt (connection-key con) (join-packet packet-parts)) (join-packet packet-parts)))) (debug "MUTEX: send") (mutex-lock! (connection-mutex con)) (write-packet! (connection-output con) packet) (debug "MUTEX: !send") (mutex-unlock! (connection-mutex con)))) (define (connection-log-error! con id body error) ;; FIXME: Allow for configuring this to output to other places.. (if (condition? error) (begin (fprintf (current-error-port) "Error in request handler (request id:~a ~s): ~s in ~s\n" id body ((condition-property-accessor 'exn 'message "Unknown error") error) (cons ((condition-property-accessor 'exn 'location (void)) error) ((condition-property-accessor 'exn 'arguments '()) error))) (for-each (lambda (cc) (fprintf (current-error-port) "\t~a ~s ~s\n" (vector-ref cc 0) (vector-ref cc 1) (vector-ref cc 2))) ((condition-property-accessor 'exn 'call-chain '()) error)) ) (fprintf (current-error-port) "Error in request handler (request id:~a ~s): ~s\n" id body error)) ) (define (handle-request! con id body) (debug "Handling request id:~a ~s" id body) (let ((thread (make-thread (lambda () (let ((response (handle-exceptions error (begin (connection-log-error! con id body error) (list "err" id (cond ((condition? error) (sprintf "~s in ~s" ((condition-property-accessor 'exn 'message "Unknown error") error) (cons ((condition-property-accessor 'exn 'location (void)) error) ((condition-property-accessor 'exn 'arguments '()) error)))) (else (->string error))))) (append (list "ok" id) ((connection-request-handler con) con body))))) (debug "Sending response id:~a ~s" id response) (connection-send! con response))) `(bokbok-request-thread ,(connection-addr con) ,id)))) (thread-start! thread))) (define (handle-response! con id body) (debug "Handling response id:~a ~s" id body) (debug "MUTEX: handle-response") (mutex-lock! (connection-mutex con)) (let ((waiter (hash-table-ref/default (connection-waiters con) id #f))) (debug "MUTEX: !handle-response") (mutex-unlock! (connection-mutex con)) (if waiter (begin (set-cdr! waiter body) (mutex-unlock! (car waiter))) (debug "Discarding response to unknown request id:~a" id)))) (define (handle-connection-thread!) (let* ((con (thread-specific (current-thread))) (session-key (connection-key con))) (debug "Session thread starting") (let loop () (debug "Session thread waiting for packet") ;; We are the only thing that reads from connection-input, so need no mutex! (let ((raw-request (read-packet! (connection-input con)))) (if (eof-object? raw-request) ;; Terminate loop ((connection-close-handler con) con) ;; Handle request and loop (let ((request-bytes (if session-key (decrypt session-key raw-request) raw-request))) (if (not request-bytes) ((connection-close-handler con) con) ;; Close connection if decryption failed (let ((request (split-packet request-bytes))) (match request (("req" id . body) (handle-request! con id body)) (("ok" id . body) (handle-response! con id (cons 'ok body))) (("err" id . body) (handle-response! con id (cons 'error body)))) ;; Loop for next request (loop))))))) ;; Connection loop has terminated, close connections - also kills this thread, so do this last! (close-connection! con))) (define (make-connection addr user socket input output key request-handler close-handler) (let ((con (make-connection* addr user socket input output key (make-mutex `(bokbok-connection-mutex ,addr)) (make-thread handle-connection-thread! `(bokbok-connection-thread ,addr)) (make-hash-table) request-handler close-handler 0))) (thread-specific-set! (connection-thread con) con) (thread-start! (connection-thread con)) con)) (define (parse-string-address addr) (let ((parts (string-split addr ":"))) (match parts ((path) `(unix ,path)) ((host port) `(tcp ,host ,(string->number port)))))) (define (open-connection addr username key request-handler close-handler) (let* ((address (if (string? addr) (parse-string-address addr) addr)) (s (match address (('unix path) ;; UNIX domain (let ((s (socket af/unix sock/stream))) (socket-connect s (unix-address path)) s)) (('tcp host port) ;; TCP domain (let ((s (socket af/inet sock/stream)) (ai (filter (lambda (ai) (eq? (addrinfo-family ai) af/inet)) (address-information host port)))) (socket-connect s (addrinfo-address (car ai))) (set! (tcp-no-delay? s) #t) s))))) (receive (input output) (parameterize ((socket-send-buffer-size 4096) (socket-send-size 16384)) (socket-i/o-ports s)) (if (and username key) (begin ;; Encrypted connection (receive (session-key session-key-bytes) (generate-session-key) (debug "Client writing username and session key") (write-packet-no-flush! output username) (write-packet! output (encrypt key session-key-bytes)) (debug "Client waiting for header") (let ((header (read-string (string-length *header*) input))) (debug "Got header bytes: ~s" header) (if (string=? header *header*) (make-connection address username s input output session-key request-handler close-handler) (error "Invalid hello from server" header))))) (begin ;; Plaintext connection (debug "Client waiting for header") (let ((header (read-string (string-length *header*) input))) (debug "Got header bytes: ~s" header) (if (string=? header *header*) (make-connection address #f s input output #f request-handler close-handler) (error "Invalid hello from server" header)))))))) (define (close-connection! con) ;; FIXME: Set a flag in the connection so we quietly discard any further responses from still-running handlers. ;; FIXME: Return an error to all pending waiters. (mutex-lock! (connection-mutex con)) (handle-exceptions exc (void) (close-output-port (connection-output con))) (handle-exceptions exc (void) (close-input-port (connection-input con))) (mutex-unlock! (connection-mutex con)) (thread-terminate! (connection-thread con))) (define (request! con packet-parts) (debug "MUTEX: request!") (mutex-lock! (connection-mutex con)) (let* ((id (number->string (connection-counter con))) (waiter (cons (make-mutex `(bokbok-request-mutex ,(connection-addr con) id)) #f))) ;; Mutex starts life locked by the connection thread (mutex-lock! (car waiter) #f (connection-thread con)) (hash-table-set! (connection-waiters con) id waiter) (set! (connection-counter con) (+ (connection-counter con) 1)) (debug "MUTEX: !request!") (mutex-unlock! (connection-mutex con)) (connection-send! con (cons "req" (cons id packet-parts))) ;; Wait for response, when connection thread unlocks the mutex (mutex-lock! (car waiter)) ;; Return response (match (cdr waiter) (('ok . body) body) (('error error-string) (signal (make-property-condition 'bokbok-remote 'message error-string))) (else (error "Invalid response" (cdr waiter)))))) (define remote-error? (condition-predicate 'bokbok-remote)) (define remote-error-message (condition-property-accessor 'bokbok-remote 'message)) (define (start-server bind-addr backlog user->key open-handler request-handler close-handler) (let* ((s (match bind-addr (('unix path) ;; UNIX domain (let ((s (socket af/unix sock/stream))) (socket-bind s (unix-address path)) (socket-listen s backlog) s)) (('tcp addr port) ;; TCP domain (let ((s (socket af/inet sock/stream))) (socket-bind s (inet-address addr port)) (set! (so-reuse-address? s) #t) (socket-listen s backlog) s)) (else (error "Unknown bind address ~s" bind-addr)))) (tcp? (match bind-addr (('tcp . any) #t) (else #f))) (thread ;; FIXME: Refactor this monstrosity (make-thread (lambda () (let loop () (debug "Listener thread calling accept") (let* ((cs (socket-accept s)) (handler-thread (make-thread (lambda () ;; Connection setup handler thread ;; Hands over to connection thread when make-connection is called (when tcp? (set! (tcp-no-delay? cs) #t)) (receive (input output) (parameterize ((socket-send-buffer-size 4096) (socket-send-size 16384)) (socket-i/o-ports cs)) (let* ((peer (socket-peer-name cs)) (peer-addr (if tcp? (list 'tcp (sockaddr-address peer) (sockaddr-port peer)) (list 'unix (sockaddr-path peer))))) (debug "Handshake thread started for ~s" peer-addr) (if user->key ;; Encrypted connection (let* ((user-name (read-packet! input)) (encrypted-session-key (read-packet! input))) (if (and (not (eof-object? user-name)) (not (eof-object? encrypted-session-key))) (let ((user-key (user->key user-name))) (if user-key (let ((session-key-bytes (decrypt user-key encrypted-session-key))) (if session-key-bytes (let ((session-key (bytes->key session-key-bytes))) (if session-key (begin (debug "Writing header") (write-string *header* #f output) (flush-output output) (open-handler (make-connection peer-addr user-name cs input output session-key request-handler close-handler))) (begin (debug "Rejecting connection due to invalid session key") (close-output-port output) (close-input-port input)))) (begin (debug "Rejecting connection due to invalid user key") (close-output-port output) (close-input-port input)))) (begin (debug "Rejecting connection due to unknown user") (close-output-port output) (close-input-port input)))) (begin (debug "Rejecting connection due to EOF in session setup") (close-output-port output) (close-input-port input)))) ;; Plaintext connection (begin (debug "Writing header") (write-string *header* #f output) (flush-output output) (open-handler (make-connection peer-addr #f cs input output #f request-handler close-handler)))))) cs) `(bokbok-handshake-handler ,bind-addr)))) (debug "Listener thread starting handshake thread") (thread-start! handler-thread) (loop)))) `(bokbok-listen-handler ,bind-addr)))) (thread-specific-set! thread s) (thread-start! thread) thread)) (define (stop-server! server) (thread-terminate! server)) (define (wait-until-server-stopped server) (let ((s (thread-specific server))) (handle-exceptions exc (if (and (uncaught-exception? exc) (terminated-thread-exception? (uncaught-exception-reason exc))) (void) (abort exc)) ; unexpected exceptions are rethrown (debug "Waiting for listener thread death...") (thread-join! server) (void)) (socket-close s))))