;;;; mailbox.scm ;;;; Kon Lovett, Mar '09 ;;;; From Chicken 3 "mailbox" by Felix & Kon ;; Issues ;; ;; - 'wait-mailbox' may not return should a timeout exception occur. ;; ;; - All operations inlined & primitive due to high-performance nature of IPC. ;; For this reason the structures are "validated." ;; ;; - Uses ##sys#thread-unblock! ;; ;; - Has knowledge of Unit srfi-18 time object internals. ;; ;; - Uses the Chicken extensions 'thread-suspend' & 'thread-resume'. ;; ;; - Probably should be rewritten to use a mutex & condition-variable rather than ;; disabling interrupts and having own thread waiting queue. (declare (disable-interrupts) ;A MUST! (fixnum) (bound-to-procedure ##sys#signal-hook ##sys#thread-unblock!) ) (module mailbox (;export ; Mailbox Exception API mailbox-timeout-condition? mailbox-timeout-exception? ; Mailbox API make-mailbox mailbox? mailbox-name mailbox-empty? mailbox-count mailbox-waiting? mailbox-waiters mailbox-send! mailbox-wait! mailbox-receive! mailbox-push-back! mailbox-push-back-list! ; Mailbox Cursor API make-mailbox-cursor mailbox-cursor? mailbox-cursor-mailbox mailbox-cursor-next mailbox-cursor-rewind mailbox-cursor-rewound? mailbox-cursor-extract-and-rewind!) (import scheme chicken (only ports with-output-to-port) (only srfi-18 thread-signal! thread-resume! thread-sleep! thread-suspend!) (only type-errors define-error-type error-list error-symbol) (only conditions make-exn-condition+ make-condition-predicate)) (require-library ports srfi-18 type-errors conditions) ;;; Primitives (include "chicken-primitive-object-inlines") (include "chicken-thread-object-inlines") (include "inline-type-checks") (include "inline-queue") ;;; Mailbox Support (define-inline (%make-mailbox nm) (%make-structure 'mailbox nm (%make-queue) '())) (define-inline (%mailbox? obj) (%structure-instance? obj 'mailbox)) (define-inline (%mailbox-name mb) (%structure-ref mb 1)) (define-inline (%mailbox-queue mb) (%structure-ref mb 2)) (define-inline (%mailbox-waiters mb) (%structure-ref mb 3)) (define-inline (%valid-mailbox? obj) (and #;(%mailbox? obj) (%fx= 4 (%structure-length obj)) (%valid-queue? (%mailbox-queue obj)) (%list? (%mailbox-waiters obj)) ) ) ;; Message queue (define-inline (%mailbox-queue-first-pair mb) (%queue-first-pair (%mailbox-queue mb))) (define-inline (%mailbox-queue-last-pair mb) (%queue-last-pair (%mailbox-queue mb))) (define-inline (%mailbox-queue-empty? mb) (%queue-empty? (%mailbox-queue mb))) (define-inline (%mailbox-queue-count mb) (%queue-count (%mailbox-queue mb))) (define-inline (%mailbox-queue-add! mb x) (%queue-add! (%mailbox-queue mb) x)) (define-inline (%mailbox-queue-remove! mb) (%queue-remove! (%mailbox-queue mb))) (define-inline (%mailbox-queue-push-back! mb x) (%queue-push-back! (%mailbox-queue mb) x)) (define-inline (%mailbox-queue-push-back-list! mb ls) (%queue-push-back-list! (%mailbox-queue mb) ls)) ;; Waiting threads (define-inline (%mailbox-waiters-empty? mb) (%null? (%mailbox-waiters mb))) (define-inline (%mailbox-waiters-count mb) (%length (%mailbox-waiters mb))) (define-inline (%mailbox-waiters-set! mb v) (%structure-set!/mutate mb 3 v)) (define-inline (%mailbox-waiters-add! mb th) (%mailbox-waiters-set! mb (%append! (%mailbox-waiters mb) (%cons th '()))) ) (define-inline (%mailbox-waiters-delete! mb th) (%mailbox-waiters-set! mb (%delq! th (%mailbox-waiters mb))) ) (define-inline (%mailbox-waiters-pop! mb) (let ((ts (%mailbox-waiters mb))) (%mailbox-waiters-set! mb (%cdr ts)) (%car ts) ) ) ;;; Mailbox Cursor Support (define-inline (%make-mailbox-cursor mb) (%make-structure 'mailbox-cursor '() #f mb)) (define-inline (%mailbox-cursor? obj) (%structure-instance? obj 'mailbox-cursor)) (define-inline (%mailbox-cursor-next-pair mbc) (%structure-ref mbc 1)) (define-inline (%mailbox-cursor-prev-pair mbc) (%structure-ref mbc 2)) (define-inline (%mailbox-cursor-mailbox mbc) (%structure-ref mbc 3)) (define-inline (%valid-mailbox-cursor? obj) (and #;(%mailbox-cursor? obj) (%fx= 4 (%structure-length obj)) (%valid-mailbox? (%mailbox-cursor-mailbox obj)) (%list? (%mailbox-cursor-next-pair obj)) (let ((pp (%mailbox-cursor-prev-pair obj))) (or (not pp) (%list? pp) ) ) ) ) (define-inline (%mailbox-cursor-winding? mbc) (%->boolean (%mailbox-cursor-prev-pair mbc))) (define-inline (%mailbox-cursor-next-pair-set! mbc v) (%structure-set!/mutate mbc 1 v)) (define-inline (%mailbox-cursor-next-pair-empty! mbc) (%structure-set!/immediate mbc 1 '())) (define-inline (%mailbox-cursor-prev-pair-set! mbc v) (%structure-set!/mutate mbc 2 v)) (define-inline (%mailbox-cursor-prev-pair-clear! mbc) (%structure-set!/immediate mbc 2 #f)) (define-inline (%mailbox-cursor-rewind! mbc) (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-prev-pair-clear! mbc) ) (define-inline (%mailbox-cursor-extract! mbc) ; Unless 'mailbox-cursor-next' has been called don't remove (and-let* ((prev-pair (%mailbox-cursor-prev-pair mbc))) (%queue-extract-pair! (%mailbox-queue (%mailbox-cursor-mailbox mbc)) prev-pair) ) ) ;; Time Support ;Use of Unit srfi-18 implementation detail (define-inline (%time? obj) (%structure-instance? obj 'time)) ;Use of Unit srfi-18 implementation detail (define-inline (%valid-time? obj) (and #;(%time? obj) (%fx= 4 (%structure-length obj)) (%fixnum? (%structure-ref obj 1)) (%number? (%structure-ref obj 2)) (%fixnum? (%structure-ref obj 3)) ) ) ;Note that even though only fixnum arithmetic is compiled a Chicken ;generic number is acceptable as a timeout. (define-inline (%timeout? obj) (or (%number? obj) (%time? obj))) ;;; Argument Checking (define-inline (%check-mailbox loc obj) (unless (%mailbox? obj) (error-mailbox loc obj)) (unless (%valid-mailbox? obj) (error-corrupted-mailbox loc obj)) ) (define-inline (%check-mailbox-cursor loc obj) (unless (%mailbox-cursor? obj) (error-mailbox-cursor loc obj)) (unless (%valid-mailbox-cursor? obj) (error-corrupted-mailbox-cursor loc obj)) ) (define-inline (%check-timeout loc obj) (unless (%timeout? obj) (error-timeout loc obj)) (when (%time? obj) (unless (%valid-time? obj) (error-corrupted-time loc obj))) ) ;;; Errors (define-error-type mailbox) (define-error-type mailbox-cursor) (define-error-type timeout) (define (error-corrupted-mailbox loc obj) (##sys#signal-hook #:runtime-error loc "mailbox corrupted" obj) ) (define (error-corrupted-mailbox-cursor loc obj) (##sys#signal-hook #:runtime-error loc "mailbox-cursor corrupted" obj) ) (define (error-corrupted-time loc obj) (##sys#signal-hook #:runtime-error loc "time corrupted" obj) ) ;;; Mailbox Exceptions (define (make-mailbox-timeout-condition loc timout timout-value) (let ((args (if (%undefined-value? timout-value) (list timout) (list timout timout-value)))) (make-exn-condition+ loc "mailbox wait timeout occured" args 'mailbox 'timeout) ) ) ;;; Mailbox Threading ;Unique objects used as tags (define UNBLOCKED-TAG (%make-unique-object 'unblocked)) (define MESSAGE-WAITING-TAG (%make-unique-object 'message-waiting)) ;; Select next waiting thread for the mailbox (define (ready-mailbox! mb) ; Ready oldest waiting thread (unless (%mailbox-waiters-empty? mb) (let ((thread (%mailbox-waiters-pop! mb))) ; Ready the thread based on wait mode (if (not (%thread-blocked? thread)) (thread-resume! thread) ; else wake early if sleeping (when (%thread-blocked-for-timeout? thread) ; Ready the thread (##sys#thread-unblock! thread) ; Tell 'wait-mailbox!' we unblocked early (thread-signal! thread UNBLOCKED-TAG) ) ) ) ) ; Side-effect only (%undefined-value) ) ;; Sleep current thread until timeout, known condition, ;; or some other condition (define-inline (thread-sleep/maybe-unblock! tim tag) ; Sleep current thread for desired seconds, ; unless unblocked "early." ; Propagate any "real" exception. (call/cc (lambda (return) (with-exception-handler (lambda (exp) (if (%eq? tag exp) (return #f) (signal exp))) (lambda () (thread-sleep! tim) #t)))) ) ;; Wait current thread on the mailbox until timeout, available message ;; or some other condition (define (wait-mailbox! loc mb timout timout-value) ; Push current thread on mailbox waiting queue (%mailbox-waiters-add! mb (%current-thread)) ; Waiting action (cond ((not timout) ; No timeout so suspend until something delivered (thread-suspend! (%current-thread)) ; Were resumed! MESSAGE-WAITING-TAG ) (else ; Timeout wanted so sleep until something happens (cond ((not (thread-sleep/maybe-unblock! timout UNBLOCKED-TAG)) ; Unblocked early so we have a message MESSAGE-WAITING-TAG ) (else ; Timedout, so no message ; Remove from wait queue (%mailbox-waiters-delete! mb (%current-thread)) ; Indicate no available message (if (not (%undefined-value? timout-value)) timout-value (thread-signal! (%current-thread) (make-mailbox-timeout-condition loc timout timout-value)) ) ) ) ) ) ) ;; Wait current thread on the mailbox unless a message available (define-inline (wait-mailbox-if-empty! loc mb timout timout-value) (if (not (%mailbox-queue-empty? mb)) MESSAGE-WAITING-TAG (wait-mailbox! loc mb timout timout-value) ) ) ;;; Mailbox ;; Mailbox Exceptions (define mailbox-timeout-condition? (make-condition-predicate exn mailbox timeout)) (define mailbox-timeout-exception? mailbox-timeout-condition?) ;; Mailbox Constructor (define (make-mailbox #!optional (nm (gensym 'mailbox))) (%check-symbol 'make-mailbox nm) (%make-mailbox nm) ) (define (mailbox? obj) (%mailbox? obj)) ;; Mailbox Properties (define (mailbox-name mb) (%check-mailbox 'mailbox-name mb) (%mailbox-name mb) ) (define (mailbox-empty? mb) (%check-mailbox 'mailbox-empty? mb) (%mailbox-queue-empty? mb) ) (define (mailbox-count mb) (%check-mailbox 'mailbox-count mb) (%mailbox-queue-count mb) ) (define (mailbox-waiting? mb) (%check-mailbox 'mailbox-waiting? mb) (not (%null? (%mailbox-waiters mb))) ) (define (mailbox-waiters mb) (%check-mailbox 'mailbox-waiters mb) (%list-copy (%mailbox-waiters mb)) ) ;; Mailbox Operations (define (mailbox-send! mb x) (%check-mailbox 'mailbox-send! mb) (%mailbox-queue-add! mb x) (ready-mailbox! mb) ) (define (mailbox-wait! mb #!optional timout) (%check-mailbox 'mailbox-wait! mb) (when timout (%check-timeout 'mailbox-wait! timout)) (wait-mailbox-if-empty! 'mailbox-wait! mb timout (%undefined-value)) ) (define (mailbox-receive! mb #!optional timout (timout-value (%undefined-value))) (%check-mailbox 'mailbox-receive! mb) (when timout (%check-timeout 'mailbox-receive! timout)) (let ((res (wait-mailbox-if-empty! 'mailbox-receive! mb timout timout-value))) ; Return next item in mailbox, if any (if (%eq? MESSAGE-WAITING-TAG res) (%mailbox-queue-remove! mb) ; else return the timeout value res ) ) ) (define (mailbox-push-back! mb x) (%check-mailbox 'mailbox-send! mb) (%mailbox-queue-push-back! mb x) (ready-mailbox! mb) ) (define (mailbox-push-back-list! mb ls) (%check-mailbox 'mailbox-send! mb) (%check-list ls 'mailbox-send!) (%mailbox-queue-push-back-list! mb ls) (ready-mailbox! mb) ) ;;; Mailbox Cursor ;; Mailbox Cursor Constructor (define (make-mailbox-cursor mb) (%check-mailbox 'make-mailbox-cursor mb) (%make-mailbox-cursor mb) ) ;; Mailbox Cursor Properties (define (mailbox-cursor? obj) (%mailbox-cursor? obj) ) (define (mailbox-cursor-mailbox mbc) (%check-mailbox-cursor 'mailbox-cursor-mailbox mbc) (%mailbox-cursor-mailbox mbc) ) (define (mailbox-cursor-rewound? mbc) (%check-mailbox-cursor 'mailbox-cursor-rewound? mbc) (not (%mailbox-cursor-winding? mbc)) ) ;; Mailbox Cursor Operations (define (mailbox-cursor-rewind mbc) (%check-mailbox-cursor 'mailbox-cursor-rewind mbc) (%mailbox-cursor-rewind! mbc) ) (define (mailbox-cursor-next mbc #!optional timout (timout-value (%undefined-value))) (%check-mailbox-cursor 'mailbox-cursor-next mbc) (when timout (%check-timeout 'mailbox-cursor-next timout)) ; Waiting mailbox peek. (let ((mb (%mailbox-cursor-mailbox mbc))) (receive (mailbox-waiter cursor-pair-getter) (if (%mailbox-cursor-winding? mbc) ; then wait for something to be appended (values wait-mailbox! (lambda () (%mailbox-queue-last-pair mb))) ; else grab the start of a, probably, non-empty queue (values wait-mailbox-if-empty! (lambda () (%mailbox-queue-first-pair mb)))) (let scanning () (let ((next-pair (%mailbox-cursor-next-pair mbc))) ; Anything next? (if (not (%null? next-pair)) ; then peek into the queue for the next item (let ((item (%car next-pair))) (%mailbox-cursor-prev-pair-set! mbc next-pair) (%mailbox-cursor-next-pair-set! mbc (%cdr next-pair)) item ) ; else wait for something in the mailbox (let ((res (mailbox-waiter 'mailbox-cursor-next mb timout timout-value))) (cond ((%eq? MESSAGE-WAITING-TAG res) ; so continue scanning (%mailbox-cursor-next-pair-set! mbc (cursor-pair-getter)) (scanning) ) (else ; otherwise timedout res ) ) ) ) ) ) ) ) ) (define (mailbox-cursor-extract-and-rewind! mbc) (%check-mailbox-cursor 'mailbox-cursor-extract-and-rewind! mbc) (%mailbox-cursor-extract! mbc) (%mailbox-cursor-rewind! mbc) ) ;;; Read/Print Syntax (define-record-printer (mailbox mb out) (with-output-to-port out (lambda () (display "#") ) ) ) (define-record-printer (mailbox-cursor mbc out) (with-output-to-port out (lambda () (display "#") ) ) ) ) ;module mailbox