;;;; mailbox.scm ;;;; Kon Lovett, Mar '09 (from Chicken 3 "mailbox" by Felix & Kon) ;; Issues ;; ;; - All operations inlined & primitive due to high-performance nature of IPC. ;; ;; - Uses ##sys#current-thread & ##sys#thread-unblock! ;; ;; - Has knowledge of Unit srfi-18 time object internals. ;;; Prelude (declare (usual-integrations) (disable-interrupts) (fixnum) (inline) (local) (no-procedure-checks) (bound-to-procedure ##sys#signal-hook ##sys#thread-unblock!) (always-bound ##sys#current-thread) ) ;; (include "chicken-primitive-object-inlines") (include "chicken-thread-object-inlines") (include "inline-type-checks") ;; Queue Support (define-inline (%make-queue) (%make-structure 'queue '() '())) (define-inline (%queue? obj) (%structure-instance? obj 'queue)) (define-inline (%queue-first-pair q) (%structure-ref q 1)) (define-inline (%queue-last-pair q) (%structure-ref q 2)) (define-inline (%queue-valid? obj) (and #;(%queue? obj) (%fx= 3 (%structure-length obj)) (%list? (%queue-first-pair obj)) (%list? (%queue-last-pair obj)) ) ) (define-inline (%queue-empty? q) (%null? (%queue-first-pair q))) (define-inline (%queue-count q) (%length (%queue-first-pair q))) (define-inline (%queue-first-pair-set! q v) (%structure-set!/mutate q 1 v)) (define-inline (%queue-last-pair-set! q v) (%structure-set!/mutate q 2 v)) ;; Queue Operations (define-inline (%queue-last-pair-empty! q) (%structure-set!/immediate q 2 '())) (define-inline (%queue-add! q datum) (let ((new-pair (%cons datum '()))) (if (%null? (%queue-first-pair q)) (%queue-first-pair-set! q new-pair) (%set-cdr!/mutate (%queue-last-pair q) new-pair) ) (%queue-last-pair-set! q new-pair) ) ) (define-inline (%queue-remove! q) (let* ((first-pair (%queue-first-pair q)) (next-pair (%cdr first-pair))) (%queue-first-pair-set! q next-pair) (when (%null? next-pair) (%queue-last-pair-empty! q) ) (%car first-pair) ) ) (define-inline (%queue-push-back! q item) (let ((newlist (%cons item (%queue-first-pair q)))) (%queue-first-pair-set! q newlist) (when (%null? (%queue-last-pair q)) (%queue-last-pair-set! q newlist) ) ) ) (define-inline (%queue-push-back-list! q itemlist) (let ((newlist (%append! (%list-copy itemlist) (%queue-first-pair q)))) (%queue-first-pair-set! q newlist) (if (%null? newlist) (%queue-last-pair-empty! q) (%queue-last-pair-set! q (%last-pair newlist) ) ) ) ) (define-inline (%queue-extract-pair! q targ-pair) ; Scan queue list until we find the item to remove (let scanning ((this-pair (%queue-first-pair q)) (prev-pair '())) ; Keep scanning until found (if (not (%eq? this-pair targ-pair)) (scanning (%cdr this-pair) this-pair) ;found so cut out the pair (let ((next-pair (%cdr this-pair))) ; At the head of the list, or in the body? (if (%null? prev-pair) (%queue-first-pair-set! q next-pair) (%set-cdr!/mutate prev-pair next-pair) ) ; When the cut pair is the last item update the last pair ref. (when (%eq? this-pair (%queue-last-pair q)) (%queue-last-pair-set! q prev-pair)) ) ) ) ) ;; Mailbox Support (define-inline (%make-mailbox nm) (%make-structure 'mailbox nm (%make-queue) '())) (define-inline (%mailbox? obj) (%structure-instance? obj 'mailbox)) (define-inline (%mailbox-name mb) (%structure-ref mb 1)) ;; Message queue (define-inline (%mailbox-queue mb) (%structure-ref mb 2)) (define-inline (%mailbox-queue-first-pair mb) (%queue-first-pair (%mailbox-queue mb))) (define-inline (%mailbox-queue-last-pair mb) (%queue-last-pair (%mailbox-queue mb))) (define-inline (%mailbox-queue-empty? mb) (%queue-empty? (%mailbox-queue mb))) (define-inline (%mailbox-queue-count mb) (%queue-count (%mailbox-queue mb))) (define-inline (%mailbox-queue-add! mb x) (%queue-add! (%mailbox-queue mb) x)) (define-inline (%mailbox-queue-remove! mb) (%queue-remove! (%mailbox-queue mb))) (define-inline (%mailbox-queue-push-back! mb x) (%queue-push-back! (%mailbox-queue mb) x)) (define-inline (%mailbox-queue-push-back-list! mb ls) (%queue-push-back-list! (%mailbox-queue mb) ls)) ;; Waiting threads (define-inline (%mailbox-waiters mb) (%structure-ref mb 3)) (define-inline (%mailbox-waiters-empty? mb) (%null? (%mailbox-waiters mb))) (define-inline (%mailbox-waiters-count mb) (%length (%mailbox-waiters mb))) (define-inline (%mailbox-waiters-set! mb v) (%structure-set!/mutate mb 3 v)) (define-inline (%mailbox-waiters-add! mb th) (%mailbox-waiters-set! mb (%append! (%mailbox-waiters mb) (%cons th '()))) ) (define-inline (%mailbox-waiters-delete! mb th) (%mailbox-waiters-set! mb (%delq! th (%mailbox-waiters mb))) ) (define-inline (%mailbox-waiters-pop! mb) (let ((ts (%mailbox-waiters mb))) (%mailbox-waiters-set! mb (%cdr ts)) (%car ts) ) ) ;; (define-inline (%mailbox-valid? obj) (and #;(%mailbox? obj) (%fx= 4 (%structure-length obj)) (%queue-valid? (%mailbox-queue obj)) (%list? (%mailbox-waiters obj)) ) ) ;; Mailbox Cursor Support (define-inline (%make-mailbox-cursor mb) (%make-structure 'mailbox-cursor '() #f mb)) (define-inline (%mailbox-cursor? obj) (%structure-instance? obj 'mailbox-cursor)) (define-inline (%mailbox-cursor-next-pair mbc) (%structure-ref mbc 1)) (define-inline (%mailbox-cursor-prev-pair mbc) (%structure-ref mbc 2)) (define-inline (%mailbox-cursor-mailbox mbc) (%structure-ref mbc 3)) (define-inline (%mailbox-cursor-valid? obj) (and #;(%mailbox-cursor? obj) (%fx= 4 (%structure-length obj)) (%mailbox-valid? (%mailbox-cursor-mailbox obj)) (%list? (%mailbox-cursor-next-pair obj)) (let ((pp (%mailbox-cursor-prev-pair obj))) (or (not pp) (%list? pp) ) ) ) ) (define-inline (%mailbox-cursor-winding? mbc) (%->boolean (%mailbox-cursor-prev-pair mbc))) (define-inline (%mailbox-cursor-next-pair-set! mbc v) (%structure-set!/mutate mbc 1 v)) (define-inline (%mailbox-cursor-next-pair-empty! mbc) (%structure-set!/immediate mbc 1 '())) (define-inline (%mailbox-cursor-prev-pair-set! mbc v) (%structure-set!/mutate mbc 2 v)) (define-inline (%mailbox-cursor-prev-pair-clear! mbc) (%structure-set!/immediate mbc 2 #f)) (define-inline (%mailbox-cursor-rewind! mbc) (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-prev-pair-clear! mbc) ) (define-inline (%mailbox-cursor-extract! mbc) ; Unless 'mailbox-cursor-next' has been called don't remove (and-let* ((prev-pair (%mailbox-cursor-prev-pair mbc))) (%queue-extract-pair! (%mailbox-queue (%mailbox-cursor-mailbox mbc)) prev-pair) ) ) ;; Time Support (define-inline (%time? obj) (%structure-instance? obj 'time)) (define-inline (%time-valid? obj) (and #;(%time? obj) (%fx= 4 (%structure-length obj)) (%fixnum? (%structure-ref obj 1)) (%number? (%structure-ref obj 2)) (%fixnum? (%structure-ref obj 3)) ) ) (define-inline (%timeout? obj) (or (%number? obj) (%time? obj))) ;; Argument Checking (define-inline (%check-mailbox loc obj) (unless (%mailbox? obj) (error-mailbox loc obj)) (unless (%mailbox-valid? obj) (error-corrupted-mailbox loc obj)) ) (define-inline (%check-mailbox-cursor loc obj) (unless (%mailbox-cursor? obj) (error-mailbox-cursor loc obj)) (unless (%mailbox-cursor-valid? obj) (error-corrupted-mailbox-cursor loc obj)) ) (define-inline (%check-timeout loc obj) (unless (%timeout? obj) (error-timeout loc obj)) (when (%time? obj) (unless (%time-valid? obj) (error-corrupted-time loc obj))) ) ;;; (module mailbox (;export ; mailbox-timeout-condition? mailbox-timeout-exception? ; make-mailbox mailbox? mailbox-name mailbox-empty? mailbox-count mailbox-waiting? mailbox-waiters mailbox-send! mailbox-wait! mailbox-receive! mailbox-push-back! mailbox-push-back-list! ; make-mailbox-cursor mailbox-cursor? mailbox-cursor-mailbox mailbox-cursor-next mailbox-cursor-rewind mailbox-cursor-rewound? mailbox-cursor-extract-and-rewind!) (import scheme chicken (only ports with-output-to-port) (only srfi-18 thread-signal! thread-resume! thread-sleep! thread-suspend!) (only type-errors define-error-type error-list error-symbol) (only conditions make-exn-condition+ make-condition-predicate)) (require-library ports srfi-18 type-errors conditions) ;;; Errors (define-error-type mailbox) (define-error-type mailbox-cursor) (define-error-type timeout) (define (error-corrupted-mailbox loc obj) (##sys#signal-hook #:runtime-error loc "mailbox corrupted" obj) ) (define (error-corrupted-mailbox-cursor loc obj) (##sys#signal-hook #:runtime-error loc "mailbox-cursor corrupted" obj) ) (define (error-corrupted-time loc obj) (##sys#signal-hook #:runtime-error loc "time corrupted" obj) ) ;;; Mailbox Exceptions (define (make-mailbox-timeout-condition loc to-tim to-def) (let ((args (if (%undefined-value? to-def) (list to-tim) (list to-tim to-def)))) (make-exn-condition+ loc "mailbox wait timeout occured" args 'mailbox 'timeout) ) ) ;;; Mailbox Threading (define UNBLOCKED-TAG (%make-unique-object 'unblocked)) (define (ready-mailbox! mb) ; Ready oldest waiting thread (unless (%mailbox-waiters-empty? mb) (let ((thread (%mailbox-waiters-pop! mb))) ; Ready the thread based on wait mode (if (not (%thread-blocked? thread)) (thread-resume! thread) ; else wake early if sleeping (when (%thread-blocked-for-timeout? thread) ; Ready the thread (##sys#thread-unblock! thread) ; Tell 'wait-mailbox!' we unblocked early (thread-signal! thread UNBLOCKED-TAG) ) ) ) ) ; Side-effect only (%undefined-value) ) (define MESSAGE-WAITING-TAG (%make-unique-object 'message-waiting)) (define (wait-mailbox! loc mb to-tim to-def) ; Push current thread on mailbox waiting queue (%mailbox-waiters-add! mb (%current-thread)) ; Waiting action (cond (to-tim ; Timeout wanted so sleep until something happens (let ((early? #f)) ; Sleep current thread until desired seconds elapsed (condition-case (thread-sleep! to-tim) (exn () ; Unless unblocked "early" then a real exception so propagate (if (%eq? UNBLOCKED-TAG exn) (set! early? #t) (signal exn) ) ) ) ; Awake (cond (early? ; Unblocked early so we have a message MESSAGE-WAITING-TAG ) (else ; Timedout ; Remove from wait queue (%mailbox-waiters-delete! mb (%current-thread)) ; Signal timeout when no default (when (%undefined-value? to-def) (thread-signal! (%current-thread) (make-mailbox-timeout-condition loc to-tim to-def)) ) ; No message waiting to-def ) ) ) ) (else ; Suspend until something delivered (thread-suspend! (%current-thread)) MESSAGE-WAITING-TAG ) ) ) (define (wait-mailbox-if-empty! loc mb to-tim to-def) (if (%mailbox-queue-empty? mb) (wait-mailbox! loc mb to-tim to-def) MESSAGE-WAITING-TAG ) ) ;;; Mailbox ;; Mailbox Exceptions (define mailbox-timeout-condition? (make-condition-predicate exn mailbox timeout)) (define mailbox-timeout-exception? mailbox-timeout-condition?) ;; Mailbox Constructor (define (make-mailbox #!optional (nm (gensym 'mailbox))) (%check-symbol 'make-mailbox nm) (%make-mailbox nm) ) (define (mailbox? obj) (%mailbox? obj) ) ;; Mailbox Properties (define (mailbox-name mb) (%check-mailbox 'mailbox-name mb) (%mailbox-name mb) ) (define (mailbox-empty? mb) (%check-mailbox 'mailbox-empty? mb) (%mailbox-queue-empty? mb) ) (define (mailbox-count mb) (%check-mailbox 'mailbox-count mb) (%mailbox-queue-count mb) ) (define (mailbox-waiting? mb) (%check-mailbox 'mailbox-waiting? mb) (not (%null? (%mailbox-waiters mb))) ) (define (mailbox-waiters mb) (%check-mailbox 'mailbox-waiters mb) (%list-copy (%mailbox-waiters mb)) ) ;; Mailbox Operations (define (mailbox-send! mb x) (%check-mailbox 'mailbox-send! mb) (%mailbox-queue-add! mb x) (ready-mailbox! mb) ) (define (mailbox-wait! mb #!optional to-tim) (%check-mailbox 'mailbox-wait! mb) (when to-tim (%check-timeout 'mailbox-wait! to-tim)) (wait-mailbox-if-empty! 'mailbox-wait! mb to-tim (%undefined-value)) ) (define (mailbox-receive! mb #!optional to-tim (to-def (%undefined-value))) (%check-mailbox 'mailbox-receive! mb) (when to-tim (%check-timeout 'mailbox-receive! to-tim)) (let ((res (wait-mailbox-if-empty! 'mailbox-receive! mb to-tim to-def))) ; Return next item in mailbox, if any (if (%eq? MESSAGE-WAITING-TAG res) (%mailbox-queue-remove! mb) ; else return the timeout default res ) ) ) (define (mailbox-push-back! mb x) (%check-mailbox 'mailbox-send! mb) (%mailbox-queue-push-back! mb x) (ready-mailbox! mb) ) (define (mailbox-push-back-list! mb ls) (%check-mailbox 'mailbox-send! mb) (%check-list ls 'mailbox-send!) (%mailbox-queue-push-back-list! mb ls) (ready-mailbox! mb) ) ;;; Mailbox Cursor ;; Mailbox Cursor Constructor (define (make-mailbox-cursor mb) (%check-mailbox 'make-mailbox-cursor mb) (%make-mailbox-cursor mb) ) ;; Mailbox Cursor Properties (define (mailbox-cursor? obj) (%mailbox-cursor? obj) ) (define (mailbox-cursor-mailbox mbc) (%check-mailbox-cursor 'mailbox-cursor-mailbox mbc) (%mailbox-cursor-mailbox mbc) ) (define (mailbox-cursor-rewound? mbc) (%check-mailbox-cursor 'mailbox-cursor-rewound? mbc) (not (%mailbox-cursor-winding? mbc)) ) ;; Mailbox Cursor Operations (define (mailbox-cursor-rewind mbc) (%check-mailbox-cursor 'mailbox-cursor-rewind mbc) (%mailbox-cursor-rewind! mbc) ) (define (mailbox-cursor-next mbc #!optional to-tim (to-def (%undefined-value))) (%check-mailbox-cursor 'mailbox-cursor-next mbc) (when to-tim (%check-timeout 'mailbox-cursor-next to-tim)) ; Waiting mailbox peek. (let ((mb (%mailbox-cursor-mailbox mbc))) (receive (mailbox-waiter cursor-pair-getter) (if (%mailbox-cursor-winding? mbc) ; then wait for something to be appended (values wait-mailbox! (lambda () (%mailbox-queue-last-pair mb))) ; else grab the start of a, probably, non-empty queue (values wait-mailbox-if-empty! (lambda () (%mailbox-queue-first-pair mb))) ) (let scanning ((next-pair (%mailbox-cursor-next-pair mbc))) ; Anything next? (if (not (%null? next-pair)) ; then peek into the queue for the next item (let ((item (%car next-pair))) (%mailbox-cursor-prev-pair-set! mbc next-pair) (%mailbox-cursor-next-pair-set! mbc (%cdr next-pair)) item ) ; else wait for something in the mailbox (let ((res (mailbox-waiter 'mailbox-cursor-next mb to-tim to-def))) (cond ((%eq? MESSAGE-WAITING-TAG res) ; so continue scanning (%mailbox-cursor-next-pair-set! mbc (cursor-pair-getter)) (scanning (%mailbox-cursor-next-pair mbc)) ) (else ; otherwise timedout res ) ) ) ) ) ) ) ) (define (mailbox-cursor-extract-and-rewind! mbc) (%check-mailbox-cursor 'mailbox-cursor-extract-and-rewind! mbc) (%mailbox-cursor-extract! mbc) (%mailbox-cursor-rewind! mbc) ) ;;; Read/Print Syntax (define-record-printer (mailbox mb out) (with-output-to-port out (lambda () (display "#") ) ) ) (define-record-printer (mailbox-cursor mbc out) (with-output-to-port out (lambda () (display "#") ) ) ) ) ;module mailbox