(load-relative "../channel") (load-relative "../channel.import") (import channel) (use srfi-18 test data-structures miscmacros) (test-begin) (define (gc!!) (repeat 5 (gc #t))) (define (make-receiver) (let ((q (make-queue))) (case-lambda (() (queue->list q)) ((x) (queue-add! q x))))) (test-group "channel-receive" (define c (make-channel)) (channel-enqueue c 1) (define results (make-receiver)) (test-assert (channel-receive c results)) (test (results) '(1)) (test-assert (not (channel-receive c results))) (channel-enqueue c 2) (test (results) '(1 2)) (define results (make-receiver)) (channel-enqueue c 1 2) (channel-receive c results results) (channel-receive c results results) (test (results) '(1 1 2 2))) (test-group "channel receivers" (define c (make-channel 1 2 3)) (define results-1 (make-receiver)) (define results-2 (make-receiver)) (define results-3 (make-receiver)) (on-channel-receive c results-1) ;; using the same receiver multiple times will register it only once (channel-receive c results-2 results-2) (channel-enqueue c 4) (on-channel-receive c results-3) (channel-enqueue c 5) (test (results-1) '(1 2 3 4 5)) (test (results-2) '(4 4)) (test (results-3) '(5)) (channel-remove-receiver c results-1) (channel-enqueue c 6) (test (results-1) '(1 2 3 4 5)) (test (results-3) '(5 6))) (test-group "parallel channel receivers" (define results-receiver-t (make-queue)) (define results-receiver-p (make-queue)) (define results-receive-1 (make-queue)) (define results-receive-2 (make-queue)) (define (print-channel-status c) (display "> ") (pp (channel-messages c))) (define (enqueue c . x) (print ">> enqueuing " x) (apply channel-enqueue c x) (print-channel-status c)) (define (test-receiver name results-queue) (let ((source-thread (thread-name (current-thread)))) (print "<< " source-thread " " name) (lambda (x) (print "< " source-thread "\t" (thread-name (current-thread)) "\t" name "\t" x) (queue-add! results-queue x)))) (define channel (make-channel 'foo)) (print-channel-status channel) (define m (make-mutex)) (define cv (make-condition-variable)) (define-syntax sync (syntax-rules () ((_ body ...) (begin (mutex-lock! m) body ... (condition-variable-signal! cv) (mutex-unlock! m cv))))) (sync (define thread (thread-start! (lambda () (sync (channel-receive channel (test-receiver 'receive-1 results-receive-1))) (sync (on-channel-receive channel (test-receiver 'receiver-t results-receiver-t))) (sync (channel-receive channel (test-receiver 'receive-2 results-receive-2) (test-receiver 'receive-2 results-receive-2))) (enqueue channel 'frob))))) (sync (enqueue channel 'bar)) (sync (on-channel-receive channel (test-receiver 'receiver-p results-receiver-p))) (enqueue channel 'baz 'qux 'quux) (condition-variable-signal! cv) (thread-join! thread) (test '(foo) (queue->list results-receive-1)) (test '(bar baz qux quux frob) (queue->list results-receiver-t)) (test '(baz baz) (queue->list results-receive-2)) (test '(baz qux quux frob) (queue->list results-receiver-p))) (test-group "closing, emptying and draining channels" (define c (make-channel)) (test-assert (not (channel-closed? c))) (test-assert (not (channel-drained? c))) (test-assert (channel-empty? c)) (test-assert (channel-enqueue c 1)) (test-assert (not (channel-empty? c))) (test-assert (not (channel-closed? c))) (test-assert (not (channel-drained? c))) (channel-receive c void) (test-assert (channel-empty? c)) (test-assert (not (channel-closed? c))) (test-assert (not (channel-drained? c))) (test-assert (channel-enqueue c 1)) (close-channel c) (test-assert (channel-closed? c)) (test-assert (not (channel-enqueue c 1))) (test-assert (not (channel-empty? c))) (channel-receive c void) (test-assert (channel-closed? c)) (test-assert (channel-empty? c)) (test-assert (channel-drained? c))) (test-group "synchronous channel-receive" (define c (make-channel 1)) (test 1 (channel-receive c)) (define consumer (thread-start! (lambda () (channel-receive c)))) (define producer (thread-start! (lambda () (channel-enqueue c 'foo)))) (test (thread-join! consumer) 'foo) (test-group "with timeout" (define c (make-channel 'foo)) (test 'foo (channel-receive c 3)) (test-assert (not (channel-receive c 0.2))) (test 'none (channel-receive c 0.2 (lambda () 'none))))) (test-group "fork-channel" (define r1 (make-receiver)) (define c1 (make-channel 1 2)) (define r2 (make-receiver)) (define c2 (fork-channel c1)) (on-channel-receive c1 r1) (on-channel-receive c2 r2) (test (r1) '(1 2)) (test (r2) '(1 2)) (channel-enqueue c1 3) (channel-enqueue c2 4) (test (r1) '(1 2 3)) (test (r2) '(1 2 3 4)) (test-assert (not (channel-closed? c2))) (close-channel c1) (test-assert (channel-closed? c2)) (test-group "gc" (define c (make-channel)) (define f (fork-channel c)) (test 1 (length (channel-forks c))) (set! f #f) (gc!!) (test 1 (length (channel-forks c))) (channel-enqueue c 'x) (test 0 (length (channel-forks c))))) (test-group "on-channel-close, on-channel-drain" (define close (make-receiver)) (define drain (make-receiver)) (define c (make-channel 1)) (on-channel-close c (lambda () (close 'close))) (on-channel-drain c (lambda () (drain 'drain))) (test (close) '()) (test (drain) '()) (close-channel c) (test (close) '(close)) (test (drain) '()) (on-channel-receive c void) (test (close) '(close)) (test (drain) '(drain)) (on-channel-close c (lambda () (close 'close))) (on-channel-drain c (lambda () (drain 'drain))) (test (close) '(close close)) (test (drain) '(drain drain))) (test-group "errors" (define c (make-channel 1)) (define r1 (make-receiver)) (define r2 (make-receiver)) (on-channel-receive c (lambda (x) (error "boom"))) (channel-receive c r1) (on-channel-receive c r1) (channel-enqueue c 2) (channel-receive c (lambda (x) (error "kaboom"))) (channel-receive c r2) (channel-enqueue c 3) (test (r1) '(1 2 3)) (test (r2) '(3))) (test-group "on-channel-error" (define channel (make-channel)) (define err #f) (define msg #f) (on-channel-receive channel (lambda _ (error 'in-the-error-handler "explosion"))) (on-channel-error channel (lambda (e m) (set! err e) (set! msg m))) (channel-enqueue channel 'foo) (test-assert (condition? err)) (test (get-condition-property err 'exn 'location) 'in-the-error-handler) (test msg 'foo)) (test-group "siphon-channel" (define c (make-channel 1 2 3)) (define sc (make-channel)) (siphon-channel c sc (lambda (sc m) (channel-enqueue sc (+ 1 m)))) (test (channel-messages sc) '(2 3 4)) (test (channel-messages c) '()) (channel-enqueue c 4) (test (channel-messages sc) '(2 3 4 5)) (test (channel-messages c) '()) (test-assert (not (channel-closed? c))) (test-assert (not (channel-closed? sc))) (test-group "gc" (define c (make-channel 1 2)) (define result (make-receiver)) (define rc (siphon-channel c (make-channel) (lambda (_ m) (result m)))) (test '(1 2) (result)) (set! rc #f) (gc!!) (channel-enqueue c 3) (test '(1 2) (result))) (test-group "close propagation" (define c1 (make-channel)) (define c1.1 (siphon-channel c1)) (define c1.2 (siphon-channel c1)) (define c1.2.1 (siphon-channel c1.2)) (close-channel c1.2.1) (test-assert (channel-closed? c1.2.1)) (test-assert (channel-closed? c1.2)) (test-assert (not (channel-closed? c1.1))) (test-assert (not (channel-closed? c1))) (close-channel c1.1) (test-assert (channel-closed? c1.1)) (test-assert (channel-closed? c1)))) (test-group "fold-channel" (define c (make-channel 1 2)) (define cf (fold-channel c cons '())) (test (channel-messages cf) '((1) (2 1))) (channel-enqueue c 3) (test (channel-messages cf) '((1) (2 1) (3 2 1))) (test (channel-messages c) '())) (test-group "map-channel" (define c (make-channel 1 2)) (define m (map-channel c add1)) (test (channel-messages m) '(2 3)) (channel-enqueue c 3) (test (channel-messages m) '(2 3 4)) (test (channel-messages c) '())) (test-group "filter-channel" (define c (make-channel 1 2 3 4)) (define m (filter-channel c even?)) (test (channel-messages m) '(2 4)) (channel-enqueue c 5 6) (test (channel-messages m) '(2 4 6)) (test (channel-messages c) '())) (test-group "channel-receive/delay" (define c (make-channel)) (define p1 (channel-receive/delay c)) (define p2 (channel-receive/delay c)) (test-assert (promise? p1)) (test-assert (promise? p2)) (channel-enqueue c 'foo) (test 'foo (force p1)) (test 'foo (force p2)) (define p3 (channel-receive/delay c 0.5)) (test-assert (not (force p3)))) (test-group "siphon-input-port" (define port (open-input-string "this is a test")) (define-values (next channel) (siphon-input-port port read)) (let loop () (when (next) (loop))) (test '(this is a test) (channel-messages channel))) (test-group "flush-channel-to-output-port" (define port (open-output-string)) (define channel (make-channel 'foo 'bar)) (flush-channel-to-output-port channel port write) (test "foobar" (get-output-string port)) (channel-enqueue channel 'baz) (test "foobarbaz" (get-output-string port)) ;; what's sensible in this case? detaching the receiver? ;; (test-group "closing the port" ;; (close-output-port port) ;; (channel-enqueue channel 'boom)) ) (test-end) (test-exit)