;;;; mailbox.scm -*- Scheme -*- ;;;; Kon Lovett, Jul '18 ;;;; Kon Lovett, Aug '17 ;;;; Kon Lovett, Mar '09 ;;;; From Chicken 3 "mailbox" by Felix & Kon ;; Issues ;; ;; - When compile-time feature `unsafe-operations' inlined & primitive routines used. ;; ;; - Has explicit "unspecified" returns in some cases to avoid leaks of internal ;; objects. ;; ;; - 'wait-mailbox' may not return should a timeout exception occur. ;; ;; - Uses ##sys#thread-unblock! ;; ;; - Has knowledge of Unit srfi-18 time object internals. ;; ;; - Uses the Chicken extensions 'thread-suspend' & 'thread-resume'. ;; ;; - The thread waiting on a mailbox cursor may miss items since only ;; the end of the queue is available safely. ;; ;; - Probably should be rewritten to use a mutex & condition-variable rather than ;; disabling interrupts and having own thread waiting queue. ;; ;; - (declare (disable-interrupts) ;REQUIRED - see Issues above (always-bound ##sys#primordial-thread) (bound-to-procedure ##sys#signal-hook ##sys#thread-unblock!)) (module mailbox (;export ;Mailbox Exception API mailbox-timeout-condition? ;Mailbox API make-mailbox mailbox? mailbox-name mailbox-empty? mailbox-count mailbox-waiting? mailbox-waiters mailbox-send! mailbox-wait! mailbox-receive! mailbox-push-back! mailbox-push-back-list! ;Mailbox Cursor API make-mailbox-cursor mailbox-cursor? mailbox-cursor-mailbox mailbox-cursor-next mailbox-cursor-rewind mailbox-cursor-rewound? mailbox-cursor-unwound? mailbox-cursor-extract-and-rewind!) (import scheme (chicken base) (chicken syntax) (chicken condition) (chicken type) (only (chicken port) with-output-to-port) (only (chicken format) printf) (only (chicken string) ->string) (only (srfi 1) append! delete! list-copy last-pair) (only (srfi 18) time? current-thread thread-signal! thread-sleep! thread-suspend! thread-resume!) record-variants) ;;; Support ;;miscmacros, Felix Winkelmann ;; evaluates body with an explicit exit continuation ;; (define-syntax let/cc (syntax-rules () ((let/cc k e0 e1 ...) (call-with-current-continuation (lambda (k) e0 e1 ...))))) ;;record-variants ;the identifier needs to be defined by somebody (define queue) (define mailbox) (define mailbox-cursor) ;; (define-type srfi-18-time (struct time)) ;; (define-inline (->boolean obj) (and obj #t)) ;;(only type-errors define-error-type) ;; (define (make-bad-argument-message #!optional argnam) (if (not argnam) "bad argument" (string-append "bad `" (->string argnam) "' argument") ) ) (define (make-type-name-message typnam) (string-append "a " (->string typnam)) ) (define (make-error-type-message typnam #!optional argnam) (string-append (make-bad-argument-message argnam) " type - not " (make-type-name-message typnam)) ) ;; (define (error-list loc obj #!optional argnam) (##sys#signal-hook #:type-error loc obj (make-error-type-message 'list argnam) obj) ) ;;; Primitives (include "chicken-primitive-object-inlines") (include "chicken-thread-object-inlines") (include "inline-type-checks") (include "inline-queue") (cond-expand (unsafe-operations (define-syntax $eq? (syntax-rules () ((_ ?arg0 ...) (%eq? ?arg0 ...)))) (define-syntax $null? (syntax-rules () ((_ ?arg0 ...) (%null? ?arg0 ...)))) (define-syntax $list? (syntax-rules () ((_ ?arg0 ...) (%list? ?arg0 ...)))) (define-syntax $length (syntax-rules () ((_ ?arg0 ...) (%length ?arg0 ...)))) (define-syntax $append! (syntax-rules () ((_ ?arg0 ...) (%append! ?arg0 ...)))) (define-syntax $delq! (syntax-rules () ((_ ?arg0 ...) (%delq! ?arg0 ...)))) (define-syntax $cons (syntax-rules () ((_ ?arg0 ...) (%cons ?arg0 ...)))) (define-syntax $car (syntax-rules () ((_ ?arg0 ...) (%car ?arg0 ...)))) (define-syntax $cdr (syntax-rules () ((_ ?arg0 ...) (%cdr ?arg0 ...)))) (define-syntax $set-car! (syntax-rules () ((_ ?arg0 ...) (%set-car! ?arg0 ...)))) (define-syntax $set-cdr! (syntax-rules () ((_ ?arg0 ...) (%set-cdr! ?arg0 ...)))) (define-syntax $list-copy (syntax-rules () ((_ ?arg0 ...) (%list-copy ?arg0 ...)))) (define-syntax $last-pair (syntax-rules () ((_ ?arg0 ...) (%last-pair ?arg0 ...)))) (define-syntax $current-thread (syntax-rules () ((_ ?arg0 ...) (%current-thread ?arg0 ...)))) (define-syntax $thread-blocked? (syntax-rules () ((_ ?arg0 ...) (%thread-blocked? ?arg0 ...)))) (define-syntax $thread-blocked-for-timeout? (syntax-rules () ((_ ?arg0 ...) (%thread-blocked-for-timeout? ?arg0 ...)))) ) (else (define-syntax $eq? (syntax-rules () ((_ ?arg0 ...) (eq? ?arg0 ...)))) (define-syntax $null? (syntax-rules () ((_ ?arg0 ...) (null? ?arg0 ...)))) (define-syntax $list? (syntax-rules () ((_ ?arg0 ...) (list? ?arg0 ...)))) (define-syntax $length (syntax-rules () ((_ ?arg0 ...) (length ?arg0 ...)))) (define-syntax $append! (syntax-rules () ((_ ?arg0 ...) (append! ?arg0 ...)))) (define-syntax $delq! (syntax-rules () ((_ ?arg0 ...) (delete! ?arg0 ...)))) (define-syntax $cons (syntax-rules () ((_ ?arg0 ...) (cons ?arg0 ...)))) (define-syntax $car (syntax-rules () ((_ ?arg0 ...) (car ?arg0 ...)))) (define-syntax $cdr (syntax-rules () ((_ ?arg0 ...) (cdr ?arg0 ...)))) (define-syntax $set-car! (syntax-rules () ((_ ?arg0 ...) (set-car! ?arg0 ...)))) (define-syntax $set-cdr! (syntax-rules () ((_ ?arg0 ...) (set-cdr! ?arg0 ...)))) (define-syntax $list-copy (syntax-rules () ((_ ?arg0 ...) (list-copy ?arg0 ...)))) (define-syntax $last-pair (syntax-rules () ((_ ?arg0 ...) (last-pair ?arg0 ...)))) (define-syntax $current-thread (syntax-rules () ((_ ?arg0 ...) (current-thread ?arg0 ...)))) (define ($thread-blocked? th) (eq? 'blocked (##sys#slot th 3))) (define ($thread-blocked-for-timeout? th) (and (##sys#slot th 4) (not (##sys#slot th 11)))) ) ) ;;; Mailbox Support ;; Mailbox (define-type mailbox (struct mailbox)) (define-record-type-variant mailbox (unsafe unchecked inline) (%raw-make-mailbox nm qu wt) %mailbox? (nm %mailbox-name) (qu %mailbox-queue) (wt %mailbox-waiters %mailbox-waiters-set!) ) (define-inline (%make-mailbox nm) (%raw-make-mailbox nm (%make-queue) '()) ) (define (error-mailbox loc obj #!optional argnam) (##sys#signal-hook #:type-error loc (make-error-type-message 'mailbox argnam) obj) ) (define-inline-check-type mailbox) ;; Message queue (define-inline (%mailbox-queue-first-pair mb) (%queue-first-pair (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-last-pair mb) (%queue-last-pair (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-empty? mb) (%queue-empty? (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-count mb) (%queue-count (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-add! mb x) (%queue-add! (%mailbox-queue mb) x) ) (define-inline (%mailbox-queue-remove! mb) (%queue-remove! (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-push-back! mb x) (%queue-push-back! (%mailbox-queue mb) x) ) (define-inline (%mailbox-queue-push-back-list! mb ls) (%queue-push-back-list! (%mailbox-queue mb) ls) ) ;; Waiting threads (define-inline (%mailbox-waiters-empty? mb) ($null? (%mailbox-waiters mb)) ) (define-inline (%mailbox-waiters-count mb) ($length (%mailbox-waiters mb)) ) (define-inline (%mailbox-waiters-add! mb th) (%mailbox-waiters-set! mb ($append! (%mailbox-waiters mb) ($cons th '()))) ) (define-inline (%mailbox-waiters-delete! mb th) (%mailbox-waiters-set! mb ($delq! th (%mailbox-waiters mb))) ) (define-inline (%mailbox-waiters-pop! mb) (let ((ts (%mailbox-waiters mb))) (%mailbox-waiters-set! mb ($cdr ts)) ($car ts) ) ) ;;; Mailbox Cursor Support (define-type mailbox-cursor (struct mailbox-cursor)) (define-record-type-variant mailbox-cursor (unsafe unchecked inline) (%raw-make-mailbox-cursor np pp mb) %mailbox-cursor? (np %mailbox-cursor-next-pair %mailbox-cursor-next-pair-set!) (pp %mailbox-cursor-prev-pair %mailbox-cursor-prev-pair-set!) (mb %mailbox-cursor-mailbox) ) (define-inline (%make-mailbox-cursor mb) (%raw-make-mailbox-cursor '() #f mb) ) (define (error-mailbox-cursor loc obj #!optional argnam) (##sys#signal-hook #:type-error loc (make-error-type-message 'mailbox-cursor argnam) obj) ) (define-inline-check-type mailbox-cursor) (define-inline (%mailbox-cursor-winding? mbc) (->boolean (%mailbox-cursor-prev-pair mbc)) ) (define-inline (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-next-pair-set! mbc '()) ) (define-inline (%mailbox-cursor-prev-pair-clear! mbc) (%mailbox-cursor-prev-pair-set! mbc #f) ) (define-inline (%mailbox-cursor-rewind! mbc) (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-prev-pair-clear! mbc) ) (define-inline (%mailbox-cursor-extract! mbc) ;unless 'mailbox-cursor-next' has been called don't remove (and-let* ((prev-pair (%mailbox-cursor-prev-pair mbc))) (%queue-extract-pair! (%mailbox-queue (%mailbox-cursor-mailbox mbc)) prev-pair) ) ) ;; Time Support (define-type time-number (or fixnum float)) (define-inline (%time-number? obj) (or (fixnum? obj) (flonum? obj)) ) (define-type timeout (or time-number srfi-18-time)) (define-inline (%timeout? obj) (or (%time-number? obj) (time? obj)) ) (define (error-timeout loc obj #!optional argnam) (##sys#signal-hook #:type-error loc (make-error-type-message 'timeout argnam) obj) ) (define-inline-check-type timeout) ;;; (define-type unique-object (vector-of symbol)) ;Unique objects used as tags (define UNBLOCKED-TAG (%make-unique-object 'unblocked)) (define SEQ-FAIL-TAG (%make-unique-object 'seq-fail)) (define NO-TOVAL-TAG (%make-unique-object 'timeout-value)) #; ;XXX (define MESSAGE-WAITING-TAG (%make-unique-object 'message-waiting)) ;;; Mailbox Exceptions (define-inline (optional-timeout-value x #!optional (def (void))) (if ($eq? x NO-TOVAL-TAG) def x) ) (define (make-mailbox-timeout-condition loc mb timout timout-value) (let ((tv (optional-timeout-value timout-value))) (make-composite-condition (make-property-condition 'exn 'location loc 'message "mailbox wait timeout occurred" 'arguments (list timout tv)) (make-property-condition 'mailbox 'box mb) (make-property-condition 'timeout 'time timout 'value tv)) ) ) ;;; Mailbox Threading ;; Select next waiting thread for the mailbox (define-inline (%mailbox-waiters-pop!? mb) (and (not (%mailbox-waiters-empty? mb)) (%mailbox-waiters-pop! mb)) ) (define (ready-mailbox-thread! mb) ;ready oldest waiting thread (and-let* ((th (%mailbox-waiters-pop!? mb))) ;ready the thread based on wait mode (if (not ($thread-blocked? th)) ;then restart (thread-resume! th) ;else wake early if sleeping ;all others dropped on the floor (when ($thread-blocked-for-timeout? th) ;ready the thread (##sys#thread-unblock! th) ;tell 'wait-mailbox-thread!' we unblocked early (thread-signal! th UNBLOCKED-TAG) ) ) ) (void) ) ;; Sleep current thread until timeout, known condition, ;; or some other condition (define (thread-sleep/maybe-unblock! tim unblocked-tag) ;(print "mailbox sleep/maybe-unblock!: " tim " " unblocked-tag) ;sleep current thread for desired seconds, unless unblocked "early". (let/cc return (with-exception-handler (lambda (exp) (if ($eq? unblocked-tag exp) (return #f) ;propagate any "real" exception. (signal exp) ) ) (lambda () (thread-sleep! tim) #t) ) ) ) ;; Wait current thread on the mailbox until timeout, available message ;; or some other condition (define (wait-mailbox-thread! loc mb timout timout-value) ; ;no available message due to timeout (define (timeout-exit!) (if (not ($eq? timout-value NO-TOVAL-TAG)) timout-value (begin (thread-signal! ($current-thread) (make-mailbox-timeout-condition loc mb timout timout-value)) SEQ-FAIL-TAG ) ) ) ; ;push current thread on mailbox waiting queue (%mailbox-waiters-add! mb ($current-thread)) ;waiting action (cond ;timeout wanted so sleep until something happens (timout (cond-expand (sleep-primordial-thread ; (cond ((thread-sleep/maybe-unblock! timout UNBLOCKED-TAG) ;timed-out, so no message ;remove from wait queue (%mailbox-waiters-delete! mb ($current-thread)) ;indicate no available message (timeout-exit!) ) (else ;unblocked early UNBLOCKED-TAG ) ) ) (else ; (if (eq? ($current-thread) ##sys#primordial-thread) (begin (%mailbox-waiters-delete! mb ($current-thread)) (warning "mailbox attempt to sleep primordial-thread" mb) (timeout-exit!) ) (cond ((thread-sleep/maybe-unblock! timout UNBLOCKED-TAG) ;timed-out, so no message ;remove from wait queue (%mailbox-waiters-delete! mb ($current-thread)) ;indicate no available message (timeout-exit!) ) (else ;unblocked early UNBLOCKED-TAG ) ) ) ) ) ) ;no timeout so suspend until something delivered (else (thread-suspend! ($current-thread)) ;we're resumed UNBLOCKED-TAG ) ) ) ;; Wait current thread on the mailbox unless a message available ;Note that the arguments, except the ?expr0 ..., must be base values. (define-syntax on-mailbox-available (syntax-rules () ((_ ?loc ?mb ?timout ?timout-value ?expr0 ...) (let ((_mb ?mb) (_to ?timout) (_tv ?timout-value)) (let waiting () (cond ((%mailbox-queue-empty? _mb) (let ((res (wait-mailbox-thread! ?loc _mb _to _tv))) ;when a thread ready then check mailbox again, could be empty. (if ($eq? UNBLOCKED-TAG res) (waiting) ;else some sort of problem res ) ) ) (else ?expr0 ... ) ) ) ) ) ) ) #; ;XXX (define (wait-mailbox-if-empty! loc mb timout timout-value) (on-mailbox-available loc mb timout timout-value MESSAGE-WAITING-TAG ) ) ;;; Mailbox ;; Mailbox Exceptions (: mailbox-timeout-condition? (* -> boolean : condition)) ; (define (mailbox-timeout-condition? obj) (and ((condition-predicate 'exn) obj) ((condition-predicate 'mailbox) obj) ((condition-predicate 'timeout) obj) ) ) ;; Mailbox Constructor (: make-mailbox (#!optional * -> mailbox)) ; (define (make-mailbox #!optional (nm (gensym 'mailbox))) (%make-mailbox nm) ) (: mailbox? (* -> boolean : mailbox)) ; (define (mailbox? obj) (%mailbox? obj) ) ;; Mailbox Properties (: mailbox-name (mailbox --> *)) ; (define (mailbox-name mb) (%mailbox-name (%check-mailbox 'mailbox-name mb)) ) (: mailbox-empty? (mailbox -> boolean)) ; (define (mailbox-empty? mb) (%mailbox-queue-empty? (%check-mailbox 'mailbox-empty? mb)) ) (: mailbox-count (mailbox -> fixnum)) ; (define (mailbox-count mb) (%mailbox-queue-count (%check-mailbox 'mailbox-count mb)) ) (: mailbox-waiting? (mailbox -> boolean)) ; (define (mailbox-waiting? mb) (not ($null? (%mailbox-waiters (%check-mailbox 'mailbox-waiting? mb)))) ) (: mailbox-waiters (mailbox -> list)) ; (define (mailbox-waiters mb) ($list-copy (%mailbox-waiters (%check-mailbox 'mailbox-waiters mb))) ) ;; Mailbox Operations (: mailbox-send! (mailbox * -> void)) ; (define (mailbox-send! mb x) (%mailbox-queue-add! (%check-mailbox 'mailbox-send! mb) x) (ready-mailbox-thread! mb) ) (: mailbox-wait! (mailbox #!optional timeout -> void)) ; (define (mailbox-wait! mb #!optional timout) (when timout (%check-timeout 'mailbox-wait! timout)) (on-mailbox-available 'mailbox-wait! (%check-mailbox 'mailbox-wait! mb) timout NO-TOVAL-TAG (void) ) ) (: mailbox-receive! (mailbox #!optional timeout * -> *)) ; (define (mailbox-receive! mb #!optional timout (timout-value NO-TOVAL-TAG)) (when timout (%check-timeout 'mailbox-receive! timout)) (on-mailbox-available 'mailbox-receive! (%check-mailbox 'mailbox-receive! mb) timout timout-value (%mailbox-queue-remove! mb) ) ) (: mailbox-push-back! (mailbox * -> void)) ; (define (mailbox-push-back! mb x) (%mailbox-queue-push-back! (%check-mailbox 'mailbox-send! mb) x) (ready-mailbox-thread! mb) ) (: mailbox-push-back-list! (mailbox list -> void)) ; (define (mailbox-push-back-list! mb ls) (%mailbox-queue-push-back-list! (%check-mailbox 'mailbox-send! mb) (%check-list ls 'mailbox-send!)) (ready-mailbox-thread! mb) ) ;; Read/Print Syntax (define-record-printer (mailbox mb out) (with-output-to-port out (lambda () (printf "#" (%mailbox-name mb) (%mailbox-queue-count mb) (%mailbox-waiters-count mb)) ) ) ) ;;; Mailbox Cursor ;; Mailbox Cursor Constructor (: make-mailbox-cursor (mailbox -> mailbox-cursor)) ; (define (make-mailbox-cursor mb) (%make-mailbox-cursor (%check-mailbox 'make-mailbox-cursor mb)) ) ;; Mailbox Cursor Properties (: mailbox-cursor? (* -> boolean : mailbox-cursor)) ; (define (mailbox-cursor? obj) (%mailbox-cursor? obj) ) (: mailbox-cursor-mailbox (mailbox-cursor --> mailbox)) ; (define (mailbox-cursor-mailbox mbc) (%mailbox-cursor-mailbox (%check-mailbox-cursor 'mailbox-cursor-mailbox mbc)) ) (: mailbox-cursor-rewound? (mailbox-cursor -> boolean)) ; (define (mailbox-cursor-rewound? mbc) (not (%mailbox-cursor-winding? (%check-mailbox-cursor 'mailbox-cursor-rewound? mbc))) ) (: mailbox-cursor-unwound? (mailbox-cursor -> boolean)) ; (define (mailbox-cursor-unwound? mbc) ($null? (%mailbox-cursor-next-pair (%check-mailbox-cursor 'mailbox-cursor-unwound? mbc))) ) ;; Mailbox Cursor Operations (: mailbox-cursor-rewind (mailbox-cursor -> void)) ; (define (mailbox-cursor-rewind mbc) (%mailbox-cursor-rewind! (%check-mailbox-cursor 'mailbox-cursor-rewind mbc)) ) (: mailbox-cursor-next (mailbox-cursor #!optional timeout * -> *)) ; (define (mailbox-cursor-next mbc #!optional timout (timout-value NO-TOVAL-TAG)) (when timout (%check-timeout 'mailbox-cursor-next timout)) (let ((mb (%mailbox-cursor-mailbox (%check-mailbox-cursor 'mailbox-cursor-next mbc)))) ;seed rewound cursor (unless (%mailbox-cursor-winding? mbc) (%mailbox-cursor-next-pair-set! mbc (%mailbox-queue-first-pair mb)) ) ;pull next item from queue at cursor (let scanning () (let ((curr-pair (%mailbox-cursor-next-pair mbc))) ;anything next? (if (not ($null? curr-pair)) ;then peek into the queue for the next item (let ((item ($car curr-pair))) (%mailbox-cursor-prev-pair-set! mbc curr-pair) (%mailbox-cursor-next-pair-set! mbc ($cdr curr-pair)) item ) ;else wait for something in the mailbox (let ((res (wait-mailbox-thread! 'mailbox-cursor-next mb timout timout-value))) (cond ;continue scanning? (($eq? UNBLOCKED-TAG res) (%mailbox-cursor-next-pair-set! mbc (%mailbox-queue-last-pair mb)) (scanning) ) ;some problem (timeout maybe) (else res ) ) ) ) ) ) ) ) (: mailbox-cursor-extract-and-rewind! (mailbox-cursor -> void)) ; (define (mailbox-cursor-extract-and-rewind! mbc) (%mailbox-cursor-extract! (%check-mailbox-cursor 'mailbox-cursor-extract-and-rewind! mbc)) (%mailbox-cursor-rewind! mbc) ) ;; Read/Print Syntax (define-record-printer (mailbox-cursor mbc out) (with-output-to-port out (lambda () (printf "#" (%mailbox-name (%mailbox-cursor-mailbox mbc)) (if (%mailbox-cursor-winding? mbc) "winding" "rewound")) ) ) ) ) ;module mailbox