;; based on https://github.com/ztellman/lamina/ (module channel (make-channel ;; derivators fork-channel fold-channel map-channel filter-channel siphon-channel siphon-input-port flush-channel-to-output-port ;; operations channel-enqueue channel-receive channel-receive/delay channel-remove-receiver close-channel ;; callbacks on-channel-receive on-channel-close on-channel-drain on-channel-error ;; accessors channel-messages channel-forks ;; predicates channel-empty? channel-closed? channel-drained?) (import chicken scheme) (use data-structures srfi-18 srfi-1 (only srfi-1 filter) (only miscmacros push!) (only lolevel make-weak-locative locative->object)) (include "set.scm") (define-record channel (setter closed?) (setter receivers) (setter forks) onetime-receivers queue on-close-handlers on-drain-handlers on-error-handlers mutex cvar) (define-record-printer (channel c out) (display "#list (channel-queue c))) (display ">" out)) ;; semaphore implementation stolen from the 9p egg (define (have-lock? mutex) (eq? (mutex-state mutex) (current-thread))) (define (semaphore-lock! mutex) (if (have-lock? mutex) (mutex-specific-set! mutex (add1 (mutex-specific mutex))) (begin (mutex-lock! mutex) (mutex-specific-set! mutex 0)))) (define (semaphore-unlock! mutex) (cond ((not (have-lock? mutex)) (error "The current thread does not own the mutex!")) ((> (mutex-specific mutex) 0) (mutex-specific-set! mutex (sub1 (mutex-specific mutex)))) (else (mutex-unlock! mutex)))) (define (with-semaphore mutex thunk) (dynamic-wind (lambda () (semaphore-lock! mutex)) thunk (lambda () (semaphore-unlock! mutex)))) (define (with-locked-channel channel thunk) (with-semaphore (channel-mutex channel) thunk)) (define %make-channel make-channel) (define (queue-add-list! queue items) (for-each (lambda (item) (queue-add! queue item)) items)) (define (safe-apply on-error proc args) (handle-exceptions exn (begin (apply on-error exn args) #f) (begin (apply proc args) #t))) (define (safe-apply-some on-error-handlers procs . args) (let ((on-error (if (set-empty? on-error-handlers) print-exn (lambda args (run-callbacks on-error-handlers args))))) (fold (lambda (proc handled?) (or (safe-apply on-error proc args) handled?)) #f procs))) (define (print-exn exn . _) (print-error-message exn (current-error-port))) (define (run-callbacks callbacks args) (set-fold callbacks (lambda (callback handled?) (or (safe-apply print-exn callback args) handled?)) #f)) (define (run-callbacks! callbacks . args) (define handled? (run-callbacks callbacks args)) (set-clear! callbacks) handled?) (define (make-channel . messages) (%make-channel #f '() '() (make-set eq?) (list->queue messages) (make-set eq?) (make-set eq?) (make-set eq?) (make-mutex) (make-condition-variable))) (define (channel-messages channel) (queue->list (channel-queue channel))) (define (channel-has-receivers? channel) (not (and (null? (channel-receivers channel)) (set-empty? (channel-onetime-receivers channel))))) (define (flush-channel channel) (let ((queue (channel-queue channel)) (receivers (channel-receivers channel)) (onetime-receivers (channel-onetime-receivers channel))) (with-locked-channel channel (lambda () (let loop () (when (and (not (queue-empty? queue)) (channel-has-receivers? channel)) (let ((message (queue-first queue)) (handled? #f)) (unless (set-empty? onetime-receivers) (when (run-callbacks! onetime-receivers message) (set! handled? #t) (condition-variable-broadcast! (channel-cvar channel)))) (when (or (safe-apply-some (channel-on-error-handlers channel) receivers message) handled?) (queue-remove! queue) (loop))))) (when (channel-drained? channel) (run-callbacks! (channel-on-drain-handlers channel))))))) (define (channel-enqueue channel message . messages) (and (not (channel-closed? channel)) (let ((queue (channel-queue channel)) (messages (cons message messages))) (with-locked-channel channel (lambda () (queue-add-list! queue messages) (set! (channel-forks channel) (filter (lambda (fork) (and-let* ((fork (locative->object fork))) (unless (null? messages) (apply channel-enqueue fork messages)))) (channel-forks channel))) (flush-channel channel))) #t))) (define (on-channel-receive channel receiver) (with-locked-channel channel (lambda () (push! receiver (channel-receivers channel)) (flush-channel channel)))) (define (channel-remove-receiver channel receiver) (with-locked-channel channel (lambda () (set! (channel-receivers channel) (remove (lambda (r) (eq? r receiver)) (channel-receivers channel)))))) (define (channel-receive channel . args) (let* ((mutex (channel-mutex channel)) (queue (channel-queue channel)) (otrec (channel-onetime-receivers channel)) (cvar (channel-cvar channel)) (timeout (and (pair? args) (let ((t (car args))) (and (number? t) t)))) (default (and timeout (pair? (cdr args)) (cadr args))) (sync? (or timeout (null? args) (not (procedure? (car args)))))) (when (have-lock? mutex) (error "can't receive from already locked channel")) (mutex-lock! mutex) (if sync? (if (queue-empty? queue) (let ((message #f)) (let ((receiver (lambda (x) (set! message x)))) (set-insert! otrec receiver) (if (mutex-unlock! mutex cvar timeout) message (with-locked-channel channel (lambda () (or message (begin (set-remove! otrec receiver) (and default (default))))))))) (let ((message (queue-remove! queue))) (mutex-unlock! mutex) message)) (if (queue-empty? queue) (begin (for-each (lambda (receiver) (set-insert! otrec (lambda (m) (receiver m)))) args) (mutex-unlock! mutex) #f) (let ((message (queue-first queue))) (when (safe-apply-some (channel-on-error-handlers channel) args message) (queue-remove! queue)) (mutex-unlock! mutex) #t))))) (define (channel-receive/delay channel #!optional timeout default) (let ((result (thread-start! (lambda () (channel-receive channel timeout default))))) (thread-yield!) (delay (thread-join! result)))) (define (close-channel channel) (set! (channel-closed? channel) #t) (run-callbacks! (channel-on-close-handlers channel))) (define (channel-empty? channel) (queue-empty? (channel-queue channel))) (define (channel-drained? channel) (and (channel-closed? channel) (channel-empty? channel))) (define (fork-channel channel) (with-locked-channel channel (lambda () (let* ((fork (apply make-channel (channel-messages channel))) (fork* (make-weak-locative fork))) (push! fork* (channel-forks channel)) (on-channel-close channel (lambda () (and-let* ((fork (locative->object fork*))) (close-channel fork)))) fork)))) (define (on-channel-close channel thunk) (if (channel-closed? channel) (thunk) (set-insert! (channel-on-close-handlers channel) thunk))) (define (on-channel-drain channel thunk) (if (channel-drained? channel) (thunk) (set-insert! (channel-on-drain-handlers channel) thunk))) (define (on-channel-error channel proc) (set-insert! (channel-on-error-handlers channel) proc)) (define (siphon-channel source-channel #!optional (destination-channel (make-channel)) (on-receive channel-enqueue)) (let ((receiver (let ((rc* (make-weak-locative destination-channel))) (lambda (message) (and-let* ((rc (locative->object rc*))) (on-receive rc message)))))) (on-channel-receive source-channel receiver) (on-channel-close destination-channel (lambda () (with-locked-channel source-channel (lambda () (channel-remove-receiver source-channel receiver) (unless (channel-has-receivers? source-channel) (close-channel source-channel)))))) (set-finalizer! destination-channel (lambda (x) (channel-remove-receiver source-channel receiver))))) (define (fold-channel channel proc seed) (let ((accumulator (make-mutex))) (mutex-specific-set! accumulator seed) (siphon-channel channel (make-channel) (lambda (folding-channel message) (mutex-lock! accumulator) (let ((acc (proc message (mutex-specific accumulator)))) (mutex-specific-set! accumulator acc) (channel-enqueue folding-channel acc) (mutex-unlock! accumulator)))))) (define (map-channel channel proc) (siphon-channel channel (make-channel) (lambda (mapping-channel message) (channel-enqueue mapping-channel (proc message))))) (define (filter-channel channel pred?) (siphon-channel channel (make-channel) (lambda (filtering-channel message) (when (pred? message) (channel-enqueue filtering-channel message))))) (define (siphon-input-port port read #!optional (channel (make-channel))) (values (lambda () (let ((message (read port))) (and (not (eof-object? message)) (channel-enqueue channel message)))) channel)) (define (flush-channel-to-output-port channel port write) (on-channel-receive channel (lambda (message) (write message port)))) )