;;;; mailbox.scm -*- Scheme -*- ;;;; Kon Lovett, Jul '18 ;;;; Kon Lovett, Aug '17 ;;;; Kon Lovett, Mar '09 ;;;; From Chicken 3 "mailbox" by Felix & Kon ;; Issues ;; ;; - Has explicit "unspecified" returns in some cases to avoid leaks of internal ;; objects. ;; ;; - 'wait-mailbox' may not return should a timeout exception occur. ;; ;; - Uses ##sys#thread-unblock! ;; ;; - Has knowledge of Unit srfi-18 time object internals. ;; ;; - Uses the Chicken extensions 'thread-suspend' & 'thread-resume'. ;; ;; - The thread waiting on a mailbox cursor may miss items since only ;; the end of the queue is available safely. ;; ;; - Probably should be rewritten to use a mutex & condition-variable rather than ;; disabling interrupts and having own thread waiting queue. ;; ;; - (declare (disable-interrupts) ;REQUIRED - see Issues above (always-bound ##sys#primordial-thread) (bound-to-procedure ##sys#signal-hook ##sys#thread-unblock!)) (module mailbox (;export ;Mailbox Exception API mailbox-timeout-condition? ;Mailbox API make-mailbox mailbox? mailbox-name mailbox-empty? mailbox-count mailbox-waiting? mailbox-waiters mailbox-send! mailbox-wait! mailbox-receive! mailbox-push-back! mailbox-push-back-list! ;Mailbox Cursor API make-mailbox-cursor mailbox-cursor? mailbox-cursor-mailbox mailbox-cursor-next mailbox-cursor-rewind mailbox-cursor-rewound? mailbox-cursor-unwound? mailbox-cursor-extract-and-rewind!) (import scheme (chicken base) (chicken syntax) (chicken condition) (chicken type) (only (chicken port) with-output-to-port) (only (chicken format) printf) (only (chicken string) ->string) (only (srfi 1) append! delete! list-copy last-pair) (only (srfi 18) time? current-thread thread-signal! thread-sleep! thread-suspend! thread-resume!)) ;;; Support ;;record-variants (define-syntax define-record-type-variant (er-macro-transformer (lambda (form r c) (define (any p L) (and (pair? L) (or (p (car L)) (any p (cdr L))))) (##sys#check-syntax 'define-record-type-variant form '(_ _ #(variable 0) #(variable 1) _ . _)) (let* ((name-spec (cadr form)) (name (if (pair? name-spec) (car name-spec) name-spec)) (t (if (pair? name-spec) (cadr name-spec) name-spec)) (variant? (lambda (type) (any (lambda (x) (c x (r type))) (caddr form)))) (unsafe? (variant? 'unsafe)) (unchecked? (variant? 'unchecked)) (inline? (variant? 'inline)) (constructor? (eq? name t)) (conser (cadddr form)) (predspec (car (cddddr form))) (pred (if (pair? predspec) (car predspec) predspec)) (checker (if (and (pair? predspec) (pair? (cdr predspec))) (cadr predspec) #f)) (slots (cdr (cddddr form))) (%begin (r 'begin)) (%lambda (r 'lambda)) (%define (if inline? (r 'define-inline) (r 'define))) (vars (cdr conser)) (x (r 'x)) (y (r 'y)) (%getter-with-setter (r 'getter-with-setter)) (slotnames (map car slots))) `(,%begin ,(if constructor? `(,%define ,conser (##sys#make-structure ,t ,@(map (lambda (sname) (if (memq sname vars) sname '(##core#undefined))) slotnames))) `(,%begin)) (,%define (,pred ,x) (##sys#structure? ,x ,t)) ,(if checker `(,%define (,checker ,x) (##core#check (##sys#check-structure ,x ,t))) `(,%begin)) ,@(let loop ([slots slots] [i 1]) (if (null? slots) '() (let* ([slot (car slots)] (setters (memq #:record-setters ##sys#features)) (setr? (pair? (cddr slot))) (getr `(,%lambda (,x) ,(if unchecked? `(,%begin) `(##core#check (##sys#check-structure ,x ,t))) ,(if unsafe? `(##sys#slot ,x ,i) `(##sys#block-ref ,x ,i))))) `(,@(if setr? `((,%define (,(caddr slot) ,x ,y) ,(if unchecked? `(,%begin) `(##core#check (##sys#check-structure ,x ,t))) ,(if unsafe? `(##sys#setslot ,x ,i ,y) `(##sys#block-set! ,x ,i ,y)))) '()) (,%define ,(cadr slot) ,(if (and setr? setters) `(,%getter-with-setter ,getr ,(caddr slot)) getr) ) ,@(loop (cdr slots) (add1 i))))))))))) ;;miscmacros ;; evaluates body with an explicit exit continuation ;; (define-syntax let/cc (syntax-rules () ((let/cc k e0 e1 ...) (call-with-current-continuation (lambda (k) e0 e1 ...))))) ;;(only type-errors define-error-type) (define (make-bad-argument-message #!optional argnam) (if (not argnam) "bad argument" (string-append "bad `" (->string argnam) "' argument") ) ) (define (make-type-name-message typnam) (string-append "a " (->string typnam)) ) (define (make-error-type-message typnam #!optional argnam) (string-append (make-bad-argument-message argnam) " type - not " (make-type-name-message typnam)) ) (define (error-list loc obj #!optional argnam) (##sys#signal-hook #:type-error loc obj (make-error-type-message 'list argnam) obj) ) (include-relative "inline-type-checks") ;; (define-inline (%thread-blocked? th) (eq? 'blocked (##sys#slot th 3))) (define-inline (%thread-blocked-for-timeout? th) (and (##sys#slot th 4) (not (##sys#slot th 11)))) (define-inline (%->boolean obj) (and obj #t)) (define-inline (%make-unique-object #!optional (id 'unique)) (vector id)) ;; Time Support (define-inline (%time-number? obj) (or (fixnum? obj) (flonum? obj)) ) (define-inline (%timeout? obj) (or (%time-number? obj) (time? obj)) ) (define (error-timeout loc obj #!optional argnam) (##sys#signal-hook #:type-error loc (make-error-type-message 'timeout argnam) obj) ) (define (timeout? obj) (%timeout? obj)) (define-inline-check-type timeout) ;; Queue Support (include-relative "inline-queue") ;;; Typoes (define-type srfi-18-time (struct time)) (define-type mailbox (struct mailbox)) (define-type mailbox-cursor (struct mailbox-cursor)) (define-type time-number (or fixnum float)) (define-type timeout (or time-number srfi-18-time)) (define-type unique-object (vector-of symbol)) (: mailbox-timeout-condition? (* -> boolean : condition)) (: make-mailbox (#!optional * -> mailbox)) (: mailbox? (* -> boolean : mailbox)) (: mailbox-name (mailbox --> *)) (: mailbox-empty? (mailbox -> boolean)) (: mailbox-count (mailbox -> fixnum)) (: mailbox-waiting? (mailbox -> boolean)) (: mailbox-waiters (mailbox -> list)) (: mailbox-send! (mailbox * -> void)) (: mailbox-wait! (mailbox #!optional timeout -> void)) (: mailbox-receive! (mailbox #!optional timeout * -> *)) (: mailbox-push-back! (mailbox * -> void)) (: mailbox-push-back-list! (mailbox list -> void)) (: make-mailbox-cursor (mailbox -> mailbox-cursor)) (: mailbox-cursor? (* -> boolean : mailbox-cursor)) (: mailbox-cursor-mailbox (mailbox-cursor --> mailbox)) (: mailbox-cursor-rewound? (mailbox-cursor -> boolean)) (: mailbox-cursor-unwound? (mailbox-cursor -> boolean)) (: mailbox-cursor-rewind (mailbox-cursor -> void)) (: mailbox-cursor-next (mailbox-cursor #!optional timeout * -> *)) (: mailbox-cursor-extract-and-rewind! (mailbox-cursor -> void)) ;;; Mailbox ;the identifier needs to be defined by somebody (define mailbox 'mailbox) (define-record-type-variant mailbox (unsafe unchecked inline) (%raw-make-mailbox nm qu wt) (%mailbox?) (nm %mailbox-name) (qu %mailbox-queue) (wt %mailbox-waiters %mailbox-waiters-set!) ) (define-inline (%make-mailbox nm) (%raw-make-mailbox nm (%make-empty-queue) '()) ) (define (error-mailbox loc obj #!optional argnam) (##sys#signal-hook #:type-error loc (make-error-type-message 'mailbox argnam) obj) ) (define-inline-check-type mailbox) ;; Message queue (define-inline (%mailbox-queue-first-pair mb) (%queue-first-pair (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-last-pair mb) (%queue-last-pair (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-empty? mb) (%queue-empty? (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-count mb) (%queue-count (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-add! mb x) (%queue-add! (%mailbox-queue mb) x) ) (define-inline (%mailbox-queue-remove! mb) (%queue-remove! (%mailbox-queue mb)) ) (define-inline (%mailbox-queue-push-back! mb x) (%queue-push-back! (%mailbox-queue mb) x) ) (define-inline (%mailbox-queue-push-back-list! mb ls) (%queue-push-back-list! (%mailbox-queue mb) ls) ) ;; Waiting threads (define-inline (%mailbox-waiters-empty? mb) (null? (%mailbox-waiters mb)) ) (define-inline (%mailbox-waiters-count mb) (length (%mailbox-waiters mb)) ) (define-inline (%mailbox-waiters-add! mb th) (%mailbox-waiters-set! mb (append! (%mailbox-waiters mb) (cons th '()))) ) (define-inline (%mailbox-waiters-delete! mb th) (%mailbox-waiters-set! mb (delete! th (%mailbox-waiters mb))) ) (define-inline (%mailbox-waiters-pop! mb) (let ((ts (%mailbox-waiters mb))) (%mailbox-waiters-set! mb (cdr ts)) (car ts) ) ) ;;; Mailbox Cursor Support ;the identifier needs to be defined by somebody (define mailbox-cursor 'mailbox-cursor) (define-record-type-variant mailbox-cursor (unsafe unchecked inline) (%raw-make-mailbox-cursor np pp mb) (%mailbox-cursor?) (np %mailbox-cursor-next-pair %mailbox-cursor-next-pair-set!) (pp %mailbox-cursor-prev-pair %mailbox-cursor-prev-pair-set!) (mb %mailbox-cursor-mailbox) ) (define-inline (%make-mailbox-cursor mb) (%raw-make-mailbox-cursor '() #f mb) ) (define (error-mailbox-cursor loc obj #!optional argnam) (##sys#signal-hook #:type-error loc (make-error-type-message 'mailbox-cursor argnam) obj) ) (define-inline-check-type mailbox-cursor) (define-inline (%mailbox-cursor-winding? mbc) (%->boolean (%mailbox-cursor-prev-pair mbc)) ) (define-inline (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-next-pair-set! mbc '()) ) (define-inline (%mailbox-cursor-prev-pair-clear! mbc) (%mailbox-cursor-prev-pair-set! mbc #f) ) (define-inline (%mailbox-cursor-rewind! mbc) (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-prev-pair-clear! mbc) ) (define-inline (%mailbox-cursor-extract! mbc) ;unless 'mailbox-cursor-next' has been called don't remove (and-let* ((prev-pair (%mailbox-cursor-prev-pair mbc))) (%queue-extract-pair! (%mailbox-queue (%mailbox-cursor-mailbox mbc)) prev-pair) ) ) ;;; ;Unique objects used as tags (define UNBLOCKED-TAG (%make-unique-object 'unblocked)) (define SEQ-FAIL-TAG (%make-unique-object 'seq-fail)) (define NO-TOVAL-TAG (%make-unique-object 'timeout-value)) #; ;XXX (define MESSAGE-WAITING-TAG (%make-unique-object 'message-waiting)) ;;; Mailbox Exceptions (define-inline (optional-timeout-value x #!optional (def (void))) (if (eq? x NO-TOVAL-TAG) def x) ) (define (make-mailbox-timeout-condition loc mb timout timout-value) (let ((tv (optional-timeout-value timout-value))) (make-composite-condition (make-property-condition 'exn 'location loc 'message "mailbox wait timeout occurred" 'arguments (list timout tv)) (make-property-condition 'mailbox 'box mb) (make-property-condition 'timeout 'time timout 'value tv)) ) ) ;;; Mailbox Threading ;; Select next waiting thread for the mailbox (define-inline (%mailbox-waiters-pop!? mb) (and (not (%mailbox-waiters-empty? mb)) (%mailbox-waiters-pop! mb)) ) (define (ready-mailbox-thread! mb) ;ready oldest waiting thread (and-let* ((th (%mailbox-waiters-pop!? mb))) ;ready the thread based on wait mode (if (not (%thread-blocked? th)) ;then restart (thread-resume! th) ;else wake early if sleeping ;all others dropped on the floor (when (%thread-blocked-for-timeout? th) ;ready the thread (##sys#thread-unblock! th) ;tell 'wait-mailbox-thread!' we unblocked early (thread-signal! th UNBLOCKED-TAG) ) ) ) (void) ) ;; Sleep current thread until timeout, known condition, ;; or some other condition (define (thread-sleep/maybe-unblock! tim unblocked-tag) ;(print "mailbox sleep/maybe-unblock!: " tim " " unblocked-tag) ;sleep current thread for desired seconds, unless unblocked "early". (let/cc return (with-exception-handler (lambda (exp) (if (eq? unblocked-tag exp) (return #f) ;propagate any "real" exception. (signal exp) ) ) (lambda () (thread-sleep! tim) #t) ) ) ) ;; Wait current thread on the mailbox until timeout, available message ;; or some other condition (define (wait-mailbox-thread! loc mb timout timout-value) ; ;no available message due to timeout (define (timeout-exit!) (if (not (eq? timout-value NO-TOVAL-TAG)) timout-value (begin (thread-signal! (current-thread) (make-mailbox-timeout-condition loc mb timout timout-value)) SEQ-FAIL-TAG ) ) ) ; ;push current thread on mailbox waiting queue (%mailbox-waiters-add! mb (current-thread)) ;waiting action (cond ;timeout wanted so sleep until something happens (timout (cond-expand (sleep-primordial-thread ; (cond ((thread-sleep/maybe-unblock! timout UNBLOCKED-TAG) ;timed-out, so no message ;remove from wait queue (%mailbox-waiters-delete! mb (current-thread)) ;indicate no available message (timeout-exit!) ) (else ;unblocked early UNBLOCKED-TAG ) ) ) (else ; (if (eq? (current-thread) ##sys#primordial-thread) (begin (%mailbox-waiters-delete! mb (current-thread)) (warning "mailbox attempt to sleep primordial-thread" mb) (timeout-exit!) ) (cond ((thread-sleep/maybe-unblock! timout UNBLOCKED-TAG) ;timed-out, so no message ;remove from wait queue (%mailbox-waiters-delete! mb (current-thread)) ;indicate no available message (timeout-exit!) ) (else ;unblocked early UNBLOCKED-TAG ) ) ) ) ) ) ;no timeout so suspend until something delivered (else (thread-suspend! (current-thread)) ;we're resumed UNBLOCKED-TAG ) ) ) ;; Wait current thread on the mailbox unless a message available ;Note that the arguments, except the ?expr0 ..., must be base values. (define-syntax on-mailbox-available (syntax-rules () ((_ ?loc ?mb ?timout ?timout-value ?expr0 ...) (let ((_mb ?mb) (_to ?timout) (_tv ?timout-value)) (let waiting () (cond ((%mailbox-queue-empty? _mb) (let ((res (wait-mailbox-thread! ?loc _mb _to _tv))) ;when a thread ready then check mailbox again, could be empty. (if (eq? UNBLOCKED-TAG res) (waiting) ;else some sort of problem res ) ) ) (else ?expr0 ... ) ) ) ) ) ) ) #; ;XXX (define (wait-mailbox-if-empty! loc mb timout timout-value) (on-mailbox-available loc mb timout timout-value MESSAGE-WAITING-TAG ) ) ;;; Mailbox ;; Mailbox Exceptions (define (mailbox-timeout-condition? obj) (and ((condition-predicate 'exn) obj) ((condition-predicate 'mailbox) obj) ((condition-predicate 'timeout) obj) ) ) ;; Mailbox Constructor (define (make-mailbox #!optional (nm (gensym 'mailbox))) (%make-mailbox nm) ) (define (mailbox? obj) (%mailbox? obj) ) ;; Mailbox Properties (define (mailbox-name mb) (%mailbox-name (%check-mailbox 'mailbox-name mb)) ) (define (mailbox-empty? mb) (%mailbox-queue-empty? (%check-mailbox 'mailbox-empty? mb)) ) (define (mailbox-count mb) (%mailbox-queue-count (%check-mailbox 'mailbox-count mb)) ) (define (mailbox-waiting? mb) (not (null? (%mailbox-waiters (%check-mailbox 'mailbox-waiting? mb)))) ) (define (mailbox-waiters mb) (list-copy (%mailbox-waiters (%check-mailbox 'mailbox-waiters mb))) ) ;; Mailbox Operations (define (mailbox-send! mb x) (%mailbox-queue-add! (%check-mailbox 'mailbox-send! mb) x) (ready-mailbox-thread! mb) ) (define (mailbox-wait! mb #!optional timout) (when timout (%check-timeout 'mailbox-wait! timout)) (on-mailbox-available 'mailbox-wait! (%check-mailbox 'mailbox-wait! mb) timout NO-TOVAL-TAG (void) ) ) (define (mailbox-receive! mb #!optional timout (timout-value NO-TOVAL-TAG)) (when timout (%check-timeout 'mailbox-receive! timout)) (on-mailbox-available 'mailbox-receive! (%check-mailbox 'mailbox-receive! mb) timout timout-value (%mailbox-queue-remove! mb) ) ) (define (mailbox-push-back! mb x) (%mailbox-queue-push-back! (%check-mailbox 'mailbox-send! mb) x) (ready-mailbox-thread! mb) ) (define (mailbox-push-back-list! mb ls) (%mailbox-queue-push-back-list! (%check-mailbox 'mailbox-send! mb) (%check-list 'mailbox-push-back-list! ls 'mailbox-send!)) (ready-mailbox-thread! mb) ) ;; Read/Print Syntax (define (mailbox-print mb out) (with-output-to-port out (lambda () (printf "#" (%mailbox-name mb) (%mailbox-queue-count mb) (%mailbox-waiters-count mb)) ) ) ) ;;; Mailbox Cursor ;; Mailbox Cursor Constructor (define (make-mailbox-cursor mb) (%make-mailbox-cursor (%check-mailbox 'make-mailbox-cursor mb)) ) ;; Mailbox Cursor Properties (define (mailbox-cursor? obj) (%mailbox-cursor? obj) ) (define (mailbox-cursor-mailbox mbc) (%mailbox-cursor-mailbox (%check-mailbox-cursor 'mailbox-cursor-mailbox mbc)) ) (define (mailbox-cursor-rewound? mbc) (not (%mailbox-cursor-winding? (%check-mailbox-cursor 'mailbox-cursor-rewound? mbc))) ) (define (mailbox-cursor-unwound? mbc) (null? (%mailbox-cursor-next-pair (%check-mailbox-cursor 'mailbox-cursor-unwound? mbc))) ) ;; Mailbox Cursor Operations (define (mailbox-cursor-rewind mbc) (%mailbox-cursor-rewind! (%check-mailbox-cursor 'mailbox-cursor-rewind mbc)) ) (define (mailbox-cursor-next mbc #!optional timout (timout-value NO-TOVAL-TAG)) (when timout (%check-timeout 'mailbox-cursor-next timout)) (let ((mb (%mailbox-cursor-mailbox (%check-mailbox-cursor 'mailbox-cursor-next mbc)))) ;seed rewound cursor (unless (%mailbox-cursor-winding? mbc) (%mailbox-cursor-next-pair-set! mbc (%mailbox-queue-first-pair mb)) ) ;pull next item from queue at cursor (let scanning () (let ((curr-pair (%mailbox-cursor-next-pair mbc))) ;anything next? (if (not (null? curr-pair)) ;then peek into the queue for the next item (let ((item (car curr-pair))) (%mailbox-cursor-prev-pair-set! mbc curr-pair) (%mailbox-cursor-next-pair-set! mbc (cdr curr-pair)) item ) ;else wait for something in the mailbox (let ((res (wait-mailbox-thread! 'mailbox-cursor-next mb timout timout-value))) (cond ;continue scanning? ((eq? UNBLOCKED-TAG res) (%mailbox-cursor-next-pair-set! mbc (%mailbox-queue-last-pair mb)) (scanning) ) ;some problem (timeout maybe) (else res ) ) ) ) ) ) ) ) (define (mailbox-cursor-extract-and-rewind! mbc) (%mailbox-cursor-extract! (%check-mailbox-cursor 'mailbox-cursor-extract-and-rewind! mbc)) (%mailbox-cursor-rewind! mbc) ) ;; Read/Print Syntax (define (mailbox-cursor-print mbc out) (with-output-to-port out (lambda () (printf "#" (%mailbox-name (%mailbox-cursor-mailbox mbc)) (if (%mailbox-cursor-winding? mbc) "winding" "rewound")) ) ) ) ;;; (set! (record-printer mailbox) mailbox-print) (set! (record-printer mailbox-cursor) mailbox-cursor-print) ) ;module mailbox