;;;; mailbox.scm ;;;; Kon Lovett, Mar '09 ;;;; Kon Lovett, Aug '17 ;;;; From Chicken 3 "mailbox" by Felix & Kon ;; Issues ;; ;; - When compile-time feature `unsafe-operations' inlined & primitive routines used. ;; ;; - Has explicit "unspecified" returns in some cases to avoid leaks of internal ;; objects. ;; ;; - 'wait-mailbox' may not return should a timeout exception occur. ;; ;; - Uses ##sys#thread-unblock! ;; ;; - Has knowledge of Unit srfi-18 time object internals. ;; ;; - Uses the Chicken extensions 'thread-suspend' & 'thread-resume'. ;; ;; - The thread waiting on a mailbox cursor may miss items since only ;; the end of the queue is available safely. ;; ;; - Probably should be rewritten to use a mutex & condition-variable rather than ;; disabling interrupts and having own thread waiting queue. (module mailbox (;export ;Mailbox Exception API mailbox-timeout-condition? mailbox-timeout-exception? ;Mailbox API make-mailbox mailbox? mailbox-name mailbox-empty? mailbox-count mailbox-waiting? mailbox-waiters mailbox-send! mailbox-wait! mailbox-receive! mailbox-push-back! mailbox-push-back-list! ;Mailbox Cursor API make-mailbox-cursor mailbox-cursor? mailbox-cursor-mailbox mailbox-cursor-next mailbox-cursor-rewind mailbox-cursor-rewound? mailbox-cursor-unwound? mailbox-cursor-extract-and-rewind!) (import scheme) (import chicken) (import (only ports with-output-to-port) (only srfi-1 append! delete! list-copy last-pair) (only srfi-18 current-thread thread-signal! thread-sleep! thread-suspend! thread-resume! time?) ) (require-library ports srfi-1 srfi-18) (import (only type-errors define-error-type error-list) (only condition-utils make-exn-condition+ make-condition-predicate) record-variants ) (require-library type-errors condition-utils record-variants) ;yes, yes, not a module form (declare (disable-interrupts) ;A MUST! (always-bound ##sys#primordial-thread) (bound-to-procedure ##sys#signal-hook ##sys#thread-unblock!) ) ;;; Primitives (include "chicken-primitive-object-inlines") (include "chicken-thread-object-inlines") (include "inline-type-checks") (include "inline-queue") (cond-expand (unsafe-operations (define-syntax $eq? (syntax-rules () ((_ ?arg0 ...) (%eq? ?arg0 ...)))) (define-syntax $null? (syntax-rules () ((_ ?arg0 ...) (%null? ?arg0 ...)))) (define-syntax $list? (syntax-rules () ((_ ?arg0 ...) (%list? ?arg0 ...)))) (define-syntax $length (syntax-rules () ((_ ?arg0 ...) (%length ?arg0 ...)))) (define-syntax $append! (syntax-rules () ((_ ?arg0 ...) (%append! ?arg0 ...)))) (define-syntax $delq! (syntax-rules () ((_ ?arg0 ...) (%delq! ?arg0 ...)))) (define-syntax $cons (syntax-rules () ((_ ?arg0 ...) (%cons ?arg0 ...)))) (define-syntax $car (syntax-rules () ((_ ?arg0 ...) (%car ?arg0 ...)))) (define-syntax $cdr (syntax-rules () ((_ ?arg0 ...) (%cdr ?arg0 ...)))) (define-syntax $set-car! (syntax-rules () ((_ ?arg0 ...) (%set-car! ?arg0 ...)))) (define-syntax $set-cdr! (syntax-rules () ((_ ?arg0 ...) (%set-cdr! ?arg0 ...)))) (define-syntax $list-copy (syntax-rules () ((_ ?arg0 ...) (%list-copy ?arg0 ...)))) (define-syntax $last-pair (syntax-rules () ((_ ?arg0 ...) (%last-pair ?arg0 ...)))) (define-syntax $current-thread (syntax-rules () ((_ ?arg0 ...) (%current-thread ?arg0 ...)))) (define-syntax $thread-blocked? (syntax-rules () ((_ ?arg0 ...) (%thread-blocked? ?arg0 ...)))) (define-syntax $thread-blocked-for-timeout? (syntax-rules () ((_ ?arg0 ...) (%thread-blocked-for-timeout? ?arg0 ...)))) ) (else (define-syntax $eq? (syntax-rules () ((_ ?arg0 ...) (eq? ?arg0 ...)))) (define-syntax $null? (syntax-rules () ((_ ?arg0 ...) (null? ?arg0 ...)))) (define-syntax $list? (syntax-rules () ((_ ?arg0 ...) (list? ?arg0 ...)))) (define-syntax $length (syntax-rules () ((_ ?arg0 ...) (length ?arg0 ...)))) (define-syntax $append! (syntax-rules () ((_ ?arg0 ...) (append! ?arg0 ...)))) (define-syntax $delq! (syntax-rules () ((_ ?arg0 ...) (delete! ?arg0 ...)))) (define-syntax $cons (syntax-rules () ((_ ?arg0 ...) (cons ?arg0 ...)))) (define-syntax $car (syntax-rules () ((_ ?arg0 ...) (car ?arg0 ...)))) (define-syntax $cdr (syntax-rules () ((_ ?arg0 ...) (cdr ?arg0 ...)))) (define-syntax $set-car! (syntax-rules () ((_ ?arg0 ...) (set-car! ?arg0 ...)))) (define-syntax $set-cdr! (syntax-rules () ((_ ?arg0 ...) (set-cdr! ?arg0 ...)))) (define-syntax $list-copy (syntax-rules () ((_ ?arg0 ...) (list-copy ?arg0 ...)))) (define-syntax $last-pair (syntax-rules () ((_ ?arg0 ...) (last-pair ?arg0 ...)))) (define-syntax $current-thread (syntax-rules () ((_ ?arg0 ...) (current-thread ?arg0 ...)))) (define ($thread-blocked? th) (eq? 'blocked (##sys#slot th 3))) (define ($thread-blocked-for-timeout? th) (and (##sys#slot th 4) (not (##sys#slot th 11)))) ) ) ;;; Support (define-inline (->boolean obj) (and obj #t) ) ;;; Mailbox Support ;; Mailbox (define-record-type-variant mailbox (unsafe unchecked inline) (%%make-mailbox nm qu wt) %mailbox? (nm %mailbox-name) (qu %mailbox-queue) (wt %mailbox-waiters %mailbox-waiters-set!) ) (define-inline (%make-mailbox nm) (%%make-mailbox nm (%make-queue) '()) ) (define-error-type mailbox) (define-inline-check-type mailbox) ;; Message queue (define-inline (%mailbox-queue-first-pair mb) (%queue-first-pair (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-last-pair mb) (%queue-last-pair (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-empty? mb) (%queue-empty? (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-count mb) (%queue-count (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-add! mb x) (%queue-add! (%mailbox-queue mb) x) ) (define-inline (%mailbox-queue-remove! mb) (%queue-remove! (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-push-back! mb x) (%queue-push-back! (%mailbox-queue mb) x) ) (define-inline (%mailbox-queue-push-back-list! mb ls) (%queue-push-back-list! (%mailbox-queue mb) ls) ) ;; Waiting threads (define-inline (%mailbox-waiters-empty? mb) ($null? (%mailbox-waiters mb)) ) (define-inline (%mailbox-waiters-count mb) ($length (%mailbox-waiters mb)) ) (define-inline (%mailbox-waiters-add! mb th) (%mailbox-waiters-set! mb ($append! (%mailbox-waiters mb) ($cons th '()))) ) (define-inline (%mailbox-waiters-delete! mb th) (%mailbox-waiters-set! mb ($delq! th (%mailbox-waiters mb))) ) (define-inline (%mailbox-waiters-pop! mb) (let ((ts (%mailbox-waiters mb))) (%mailbox-waiters-set! mb ($cdr ts)) ($car ts) ) ) ;;; Mailbox Cursor Support (define-record-type-variant mailbox-cursor (unsafe unchecked inline) (%%make-mailbox-cursor np pp mb) %mailbox-cursor? (np %mailbox-cursor-next-pair %mailbox-cursor-next-pair-set!) (pp %mailbox-cursor-prev-pair %mailbox-cursor-prev-pair-set!) (mb %mailbox-cursor-mailbox) ) (define-inline (%make-mailbox-cursor mb) (%%make-mailbox-cursor '() #f mb) ) (define-error-type mailbox-cursor) (define-inline-check-type mailbox-cursor) (define-inline (%mailbox-cursor-winding? mbc) (->boolean (%mailbox-cursor-prev-pair mbc)) ) (define-inline (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-next-pair-set! mbc '()) ) (define-inline (%mailbox-cursor-prev-pair-clear! mbc) (%mailbox-cursor-prev-pair-set! mbc #f) ) (define-inline (%mailbox-cursor-rewind! mbc) (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-prev-pair-clear! mbc) ) (define-inline (%mailbox-cursor-extract! mbc) ;Unless 'mailbox-cursor-next' has been called don't remove (and-let* ((prev-pair (%mailbox-cursor-prev-pair mbc))) (%queue-extract-pair! (%mailbox-queue (%mailbox-cursor-mailbox mbc)) prev-pair) ) ) ;; Time Support (define-inline (%timeout? obj) (or (number? obj) (time? obj)) ) (define-error-type timeout) (define-inline-check-type timeout) ;;; ;Unique objects used as tags (define UNBLOCKED-TAG (%make-unique-object 'unblocked)) (define SEQ-FAIL-TAG (%make-unique-object 'seq-fail)) (define NO-TOVAL-TAG (%make-unique-object 'timeout-value)) #; ;XXX (define MESSAGE-WAITING-TAG (%make-unique-object 'message-waiting)) ;;; Mailbox Exceptions (define (make-mailbox-timeout-condition loc mb timout timout-value) (let ((tv (if ($eq? timout-value NO-TOVAL-TAG) (void) timout-value))) (make-exn-condition+ loc "mailbox wait timeout occurred" (list timout tv) `(mailbox box ,mb) `(timeout time ,timout value ,tv) ) ) ) ;;; Mailbox Threading ;; Select next waiting thread for the mailbox (define (ready-mailbox-thread! mb) ;Ready oldest waiting thread (unless (%mailbox-waiters-empty? mb) (let ((thread (%mailbox-waiters-pop! mb))) ;Ready the thread based on wait mode (if (not ($thread-blocked? thread)) (thread-resume! thread) ;else wake early if sleeping (when ($thread-blocked-for-timeout? thread) ;Ready the thread (##sys#thread-unblock! thread) ;Tell 'wait-mailbox-thread!' we unblocked early (thread-signal! thread UNBLOCKED-TAG) ) ) ) (void) ) ) ;; Sleep current thread until timeout, known condition, ;; or some other condition (define (thread-sleep/maybe-unblock! tim unblocked-tag) ;Sleep current thread for desired seconds, unless unblocked "early". (call/cc (lambda (return) (with-exception-handler (lambda (exp) (if ($eq? unblocked-tag exp) (return #f) ;Propagate any "real" exception. (signal exp) ) ) (lambda () (thread-sleep! tim) #t) ) ) ) ) ;; Wait current thread on the mailbox until timeout, available message ;; or some other condition (define (wait-mailbox-thread! loc mb timout timout-value) ;no available message due to timeout (define (timeout-exit!) (if (not ($eq? timout-value NO-TOVAL-TAG)) timout-value (begin (thread-signal! ($current-thread) (make-mailbox-timeout-condition loc mb timout timout-value)) SEQ-FAIL-TAG ) ) ) ;Push current thread on mailbox waiting queue (%mailbox-waiters-add! mb ($current-thread)) ;Waiting action (cond ;Timeout wanted so sleep until something happens (timout (cond ((thread-sleep/maybe-unblock! timout UNBLOCKED-TAG) ;Timed-out, so no message ;Remove from wait queue (%mailbox-waiters-delete! mb ($current-thread)) ;Indicate no available message (timeout-exit!) ) (else ;Unblocked early UNBLOCKED-TAG ) ) #; ;NOT YET (if (eq? ($current-thread) ##sys#primordial-thread) (begin (warning "mailbox attempt to sleep primordial-thread" mb) (timeout-exit!) ) (cond ((thread-sleep/maybe-unblock! timout UNBLOCKED-TAG) ;Timed-out, so no message ;Remove from wait queue (%mailbox-waiters-delete! mb ($current-thread)) ;Indicate no available message (timeout-exit!) ) (else ;Unblocked early UNBLOCKED-TAG ) ) ) ) ;No timeout so suspend until something delivered (else (thread-suspend! ($current-thread)) ;We're resumed UNBLOCKED-TAG ) ) ) ;; Wait current thread on the mailbox unless a message available ;Note that the arguments, except the ?expr0 ..., must be base values. (define-syntax on-mailbox-available (syntax-rules () ((_ ?loc ?mb ?timout ?timout-value ?expr0 ...) (let waiting () (cond ((%mailbox-queue-empty? ?mb) (let ((res (wait-mailbox-thread! ?loc ?mb ?timout ?timout-value))) ;When a thread ready then check mailbox again, could be empty. (if ($eq? UNBLOCKED-TAG res) (waiting) ;else some sort of problem res ) ) ) (else ?expr0 ... ) ) ) ) ) ) #; ;XXX (define (wait-mailbox-if-empty! loc mb timout timout-value) (on-mailbox-available loc mb timout timout-value MESSAGE-WAITING-TAG ) ) ;;; Mailbox ;; Mailbox Exceptions (define mailbox-timeout-condition? (make-condition-predicate exn mailbox timeout)) ;DEPRECATE (define mailbox-timeout-exception? mailbox-timeout-condition?) ;; Mailbox Constructor (define (make-mailbox #!optional (nm (gensym 'mailbox))) (%make-mailbox nm) ) (define (mailbox? obj) (%mailbox? obj) ) ;; Mailbox Properties (define (mailbox-name mb) (%mailbox-name (%check-mailbox 'mailbox-name mb)) ) (define (mailbox-empty? mb) (%mailbox-queue-empty? (%check-mailbox 'mailbox-empty? mb)) ) (define (mailbox-count mb) (%mailbox-queue-count (%check-mailbox 'mailbox-count mb)) ) (define (mailbox-waiting? mb) (not ($null? (%mailbox-waiters (%check-mailbox 'mailbox-waiting? mb)))) ) (define (mailbox-waiters mb) ($list-copy (%mailbox-waiters (%check-mailbox 'mailbox-waiters mb))) ) ;; Mailbox Operations (define (mailbox-send! mb x) (%mailbox-queue-add! (%check-mailbox 'mailbox-send! mb) x) (ready-mailbox-thread! mb) ) (define (mailbox-wait! mb #!optional timout) (when timout (%check-timeout 'mailbox-wait! timout)) (on-mailbox-available 'mailbox-wait! (%check-mailbox 'mailbox-wait! mb) timout NO-TOVAL-TAG (void) ) ) (define (mailbox-receive! mb #!optional timout (timout-value NO-TOVAL-TAG)) (when timout (%check-timeout 'mailbox-receive! timout)) (on-mailbox-available 'mailbox-receive! (%check-mailbox 'mailbox-receive! mb) timout timout-value (%mailbox-queue-remove! mb) ) ) (define (mailbox-push-back! mb x) (%mailbox-queue-push-back! (%check-mailbox 'mailbox-send! mb) x) (ready-mailbox-thread! mb) ) (define (mailbox-push-back-list! mb ls) (%mailbox-queue-push-back-list! (%check-mailbox 'mailbox-send! mb) (%check-list ls 'mailbox-send!)) (ready-mailbox-thread! mb) ) ;;; Mailbox Cursor ;; Mailbox Cursor Constructor (define (make-mailbox-cursor mb) (%make-mailbox-cursor (%check-mailbox 'make-mailbox-cursor mb)) ) ;; Mailbox Cursor Properties (define (mailbox-cursor? obj) (%mailbox-cursor? obj) ) (define (mailbox-cursor-mailbox mbc) (%mailbox-cursor-mailbox (%check-mailbox-cursor 'mailbox-cursor-mailbox mbc)) ) (define (mailbox-cursor-rewound? mbc) (not (%mailbox-cursor-winding? (%check-mailbox-cursor 'mailbox-cursor-rewound? mbc))) ) (define (mailbox-cursor-unwound? mbc) ($null? (%mailbox-cursor-next-pair (%check-mailbox-cursor 'mailbox-cursor-unwound? mbc))) ) ;; Mailbox Cursor Operations (define (mailbox-cursor-rewind mbc) (%mailbox-cursor-rewind! (%check-mailbox-cursor 'mailbox-cursor-rewind mbc)) ) #; ;XXX (define (mailbox-cursor-next mbc #!optional timout (timout-value NO-TOVAL-TAG)) (%check-mailbox-cursor 'mailbox-cursor-next mbc) (when timout (%check-timeout 'mailbox-cursor-next timout)) ;Waiting mailbox peek. (let ((mb (%mailbox-cursor-mailbox mbc))) (receive (mailbox-waiter cursor-pair-getter) (if (%mailbox-cursor-winding? mbc) ;then unconditionally wait until something added (values wait-mailbox-thread! (lambda () (%mailbox-queue-last-pair mb))) ;else grab the start of a, probably, non-empty queue (values wait-mailbox-if-empty! (lambda () (%mailbox-queue-first-pair mb)))) (let scanning () (let ((next-pair (%mailbox-cursor-next-pair mbc))) ;Anything next? (if (not (%null? next-pair)) ;then peek into the queue for the next item (let ((item (%car next-pair))) (%mailbox-cursor-prev-pair-set! mbc next-pair) (%mailbox-cursor-next-pair-set! mbc (%cdr next-pair)) item ) ;else wait for something in the mailbox (let ((res (mailbox-waiter 'mailbox-cursor-next mb timout timout-value))) (cond ;continue scanning? ((or ($eq? MESSAGE-WAITING-TAG res) ($eq? UNBLOCKED-TAG res)) (%mailbox-cursor-next-pair-set! mbc (cursor-pair-getter)) (scanning) ) ;otherwise timed-out (else res ) ) ) ) ) ) ) ) ) (define (mailbox-cursor-next mbc #!optional timout (timout-value NO-TOVAL-TAG)) (when timout (%check-timeout 'mailbox-cursor-next timout)) (let ((mb (%mailbox-cursor-mailbox (%check-mailbox-cursor 'mailbox-cursor-next mbc)))) ;Seed rewound cursor (unless (%mailbox-cursor-winding? mbc) (%mailbox-cursor-next-pair-set! mbc (%mailbox-queue-first-pair mb)) ) ;Pull next item from queue at cursor (let scanning () (let ((curr-pair (%mailbox-cursor-next-pair mbc))) ;Anything next? (if (not ($null? curr-pair)) ;then peek into the queue for the next item (let ((item ($car curr-pair))) (%mailbox-cursor-prev-pair-set! mbc curr-pair) (%mailbox-cursor-next-pair-set! mbc ($cdr curr-pair)) item ) ;else wait for something in the mailbox (let ((res (wait-mailbox-thread! 'mailbox-cursor-next mb timout timout-value))) (cond ;continue scanning? (($eq? UNBLOCKED-TAG res) (%mailbox-cursor-next-pair-set! mbc (%mailbox-queue-last-pair mb)) (scanning) ) ;some problem (timeout maybe) (else res ) ) ) ) ) ) ) ) (define (mailbox-cursor-extract-and-rewind! mbc) (%mailbox-cursor-extract! (%check-mailbox-cursor 'mailbox-cursor-extract-and-rewind! mbc)) (%mailbox-cursor-rewind! mbc) ) ;;; Read/Print Syntax (define-record-printer (mailbox mb out) (with-output-to-port out (lambda () (display "#") ) ) ) (define-record-printer (mailbox-cursor mbc out) (with-output-to-port out (lambda () (display "#") ) ) ) ) ;module mailbox