;;;; mailbox.scm ;;;; Kon Lovett, Mar '09 (from Chicken 3 "mailbox" by Felix & Kon) ;; Issues ;; ;; - All operations inlined & primitive due to high-performance nature of IPC. ;; ;; - Uses ##sys#current-thread & ##sys#thread-unblock! ;; ;; - Has knowledge of Unit srfi-18 time object internals. ;;; Prelude (declare (usual-integrations) (disable-interrupts) (fixnum) (inline) (local) (no-procedure-checks) (bound-to-procedure ##sys#signal-hook ##sys#thread-unblock!) (always-bound ##sys#current-thread) ) ;; (include "chicken-primitive-object-inlines") (include "chicken-thread-object-inlines") ;; Queue Support (define-inline (%make-queue) (%make-structure 'queue '() '())) (define-inline (%queue? obj) (%structure-instance? obj 'queue)) (define-inline (%queue-first-pair q) (%structure-ref q 1)) (define-inline (%queue-last-pair q) (%structure-ref q 2)) (define-inline (%queue-valid? obj) (and #;(%queue? obj) (%fx= 3 (%structure-length obj)) (%list? (%queue-first-pair obj)) (%list? (%queue-last-pair obj)) ) ) (define-inline (%queue-empty? q) (%null? (%queue-first-pair q))) (define-inline (%queue-count q) (%length (%queue-first-pair q))) (define-inline (%queue-first-pair-set! q v) (%structure-set!/mutate q 1 v)) (define-inline (%queue-last-pair-set! q v) (%structure-set!/mutate q 2 v)) ;; Queue Operations (define-inline (%queue-last-pair-empty! q) (%structure-set!/immediate q 2 '())) (define-inline (%queue-add! q datum) (let ((new-pair (%cons datum '()))) (if (%null? (%queue-first-pair q)) (%queue-first-pair-set! q new-pair) (%set-cdr!/mutate (%queue-last-pair q) new-pair) ) (%queue-last-pair-set! q new-pair) ) ) (define-inline (%queue-remove! q) (let* ((first-pair (%queue-first-pair q)) (next-pair (%cdr first-pair))) (%queue-first-pair-set! q next-pair) (when (%null? next-pair) (%queue-last-pair-empty! q) ) (%car first-pair) ) ) (define-inline (%queue-push-back! q item) (let ((newlist (%cons item (%queue-first-pair q)))) (%queue-first-pair-set! q newlist) (when (%null? (%queue-last-pair q)) (%queue-last-pair-set! q newlist) ) ) ) (define-inline (%queue-push-back-list! q itemlist) (let ((newlist (%append! (%list-copy itemlist) (%queue-first-pair q)))) (%queue-first-pair-set! q newlist) (if (%null? newlist) (%queue-last-pair-empty! q) (%queue-last-pair-set! q (%last-pair newlist) ) ) ) ) (define-inline (%queue-extract-pair! q targ-pair) ; Scan queue list until we find the item to remove (let scanning ((this-pair (%queue-first-pair q)) (prev-pair '())) ; Found it? (if (%eq? this-pair targ-pair) ;then cut out the pair (let ((next-pair (%cdr this-pair))) ; At the head of the list, or in the body? (if (%null? prev-pair) (%queue-first-pair-set! q next-pair) (%set-cdr!/mutate prev-pair next-pair) ) ; When the cut pair is the last item update the last pair ref. (when (%eq? this-pair (%queue-last-pair q)) (%queue-last-pair-set! q prev-pair)) ) ;else keep looking for the pair (scanning (%cdr this-pair) this-pair) ) ) ) ;; Mailbox Support (define-inline (%make-mailbox nm) (%make-structure 'mailbox nm (%make-queue) '())) (define-inline (%mailbox? obj) (%structure-instance? obj 'mailbox)) (define-inline (%mailbox-name mb) (%structure-ref mb 1)) ;; Message queue (define-inline (%mailbox-queue mb) (%structure-ref mb 2)) (define-inline (%mailbox-queue-first-pair mb) (%queue-first-pair (%mailbox-queue mb))) (define-inline (%mailbox-queue-last-pair mb) (%queue-last-pair (%mailbox-queue mb))) (define-inline (%mailbox-queue-empty? mb) (%queue-empty? (%mailbox-queue mb))) (define-inline (%mailbox-queue-count mb) (%queue-count (%mailbox-queue mb))) (define-inline (%mailbox-queue-add! mb x) (%queue-add! (%mailbox-queue mb) x)) (define-inline (%mailbox-queue-remove! mb) (%queue-remove! (%mailbox-queue mb))) (define-inline (%mailbox-queue-push-back! mb x) (%queue-push-back! (%mailbox-queue mb) x)) (define-inline (%mailbox-queue-push-back-list! mb ls) (%queue-push-back-list! (%mailbox-queue mb) ls)) ;; Waiting threads (define-inline (%mailbox-waiters mb) (%structure-ref mb 3)) (define-inline (%mailbox-waiters-empty? mb) (%null? (%mailbox-waiters mb))) (define-inline (%mailbox-waiters-count mb) (%length (%mailbox-waiters mb))) (define-inline (%mailbox-waiters-set! mb v) (%structure-set!/mutate mb 3 v)) (define-inline (%mailbox-waiters-add! mb th) (%mailbox-waiters-set! mb (%append! (%mailbox-waiters mb) (%cons th '()))) ) (define-inline (%mailbox-waiters-delete! mb th) (%mailbox-waiters-set! mb (%delq! th (%mailbox-waiters mb))) ) (define-inline (%mailbox-waiters-pop! mb) (let ((ts (%mailbox-waiters mb))) (%mailbox-waiters-set! mb (%cdr ts)) (%car ts) ) ) ;; (define-inline (%mailbox-valid? obj) (and #;(%mailbox? obj) (%fx= 4 (%structure-length obj)) (%queue-valid? (%mailbox-queue obj)) (%list? (%mailbox-waiters obj)) ) ) ;; Mailbox Cursor Support (define-inline (%make-mailbox-cursor mb) (%make-structure 'mailbox-cursor '() #f mb)) (define-inline (%mailbox-cursor? obj) (%structure-instance? obj 'mailbox-cursor)) (define-inline (%mailbox-cursor-next-pair mbc) (%structure-ref mbc 1)) (define-inline (%mailbox-cursor-prev-pair mbc) (%structure-ref mbc 2)) (define-inline (%mailbox-cursor-mailbox mbc) (%structure-ref mbc 3)) (define-inline (%mailbox-cursor-valid? obj) (and #;(%mailbox-cursor? obj) (%fx= 4 (%structure-length obj)) (%mailbox-valid? (%mailbox-cursor-mailbox obj)) (%list? (%mailbox-cursor-next-pair obj)) (let ((pp (%mailbox-cursor-prev-pair obj))) (or (not pp) (%list? pp) ) ) ) ) (define-inline (%mailbox-cursor-winding? mbc) (%->boolean (%mailbox-cursor-prev-pair mbc))) (define-inline (%mailbox-cursor-next-pair-set! mbc v) (%structure-set!/mutate mbc 1 v)) (define-inline (%mailbox-cursor-next-pair-empty! mbc) (%structure-set!/immediate mbc 1 '())) (define-inline (%mailbox-cursor-prev-pair-set! mbc v) (%structure-set!/mutate mbc 2 v)) (define-inline (%mailbox-cursor-prev-pair-clear! mbc) (%structure-set!/immediate mbc 2 #f)) (define-inline (%mailbox-cursor-rewind! mbc) (%mailbox-cursor-next-pair-empty! mbc) (%mailbox-cursor-prev-pair-clear! mbc) ) (define-inline (%mailbox-cursor-extract! mbc) ; Unless 'mailbox-cursor-next' has been called don't remove (and-let* ((prev-pair (%mailbox-cursor-prev-pair mbc))) (%queue-extract-pair! (%mailbox-queue (%mailbox-cursor-mailbox mbc)) prev-pair) ) ) ;; Time Support (define-inline (%time? obj) (%structure-instance? obj 'time)) (define-inline (%time-valid? obj) (and #;(%time? obj) (%fx= 4 (%structure-length obj)) (%fixnum? (%structure-ref obj 1)) (%number? (%structure-ref obj 2)) (%fixnum? (%structure-ref obj 3)) ) ) (define-inline (%timeout? obj) (or (%number? obj) (%time? obj))) ;; Argument Checking (define-inline (%check-symbol loc obj) (unless (%symbol? obj) (error-type-symbol loc obj))) (define-inline (%check-list loc obj) (unless (%list? obj) (error-type-list obj loc))) (define-inline (%check-mailbox loc obj) (unless (%mailbox? obj) (error-type-mailbox loc obj)) (unless (%mailbox-valid? obj) (error-corrupted-mailbox loc obj)) ) (define-inline (%check-mailbox-cursor loc obj) (unless (%mailbox-cursor? obj) (error-type-mailbox-cursor loc obj)) (unless (%mailbox-cursor-valid? obj) (error-corrupted-mailbox-cursor loc obj)) ) (define-inline (%check-timeout loc obj) (unless (%timeout? obj) (error-type-timeout loc obj)) (unless (and (%time? obj) (%time-valid? obj)) (error-corrupted-time loc obj)) ) ;;; (require-library ports srfi-18) (module mailbox (;export ; mailbox-timeout-exception? ; make-mailbox mailbox? mailbox-name mailbox-empty? mailbox-count mailbox-waiting? mailbox-waiters mailbox-send! mailbox-wait! mailbox-receive! mailbox-push-back! mailbox-push-back-list! ; make-mailbox-cursor mailbox-cursor? mailbox-cursor-mailbox mailbox-cursor-next mailbox-cursor-rewind mailbox-cursor-rewound? mailbox-cursor-extract-and-rewind!) (import scheme (only chicken optional ;due to #!optional implementation let-optionals ;due to #!optional implementation handle-exceptions ;due to condition-case implementation with-exception-handler ;due to handle-exceptions implementation and-let* let-values unless when make-composite-condition make-property-condition condition-predicate condition-case error signal gensym define-record-printer) (only ports with-output-to-port) (only srfi-18 thread-signal! thread-resume! thread-sleep! thread-suspend!) ) ;;; Errors (define (error-type-list loc obj) (##sys#signal-hook #:type-error loc "bad argument type - not a list" obj) ) (define (error-type-symbol loc obj) (##sys#signal-hook #:type-error loc "bad argument type - not a symbol" obj) ) (define (error-type-mailbox loc obj) (##sys#signal-hook #:type-error loc "bad argument type - not a mailbox" obj) ) (define (error-type-mailbox-cursor loc obj) (##sys#signal-hook #:type-error loc "bad argument type - not a mailbox-cursor" obj) ) (define (error-type-timeout loc obj) (##sys#signal-hook #:type-error loc "bad argument type - not a timeout object" obj) ) (define (error-corrupted-mailbox loc obj) (##sys#signal-hook #:runtime-error loc "mailbox corrupted" obj) ) (define (error-corrupted-mailbox-cursor loc obj) (##sys#signal-hook #:runtime-error loc "mailbox-cursor corrupted" obj) ) (define (error-corrupted-time loc obj) (##sys#signal-hook #:runtime-error loc "time corrupted" obj) ) ;;; Mailbox Exceptions (define (make-mailbox-timeout-condition loc to-tim to-def) (make-composite-condition (make-property-condition 'exn 'location loc 'message "mailbox wait timeout occured" 'arguments (if (%undefined-value? to-def) (list to-tim) (list to-tim to-def))) (make-property-condition 'mailbox) (make-property-condition 'timeout)) ) ;;; Mailbox Threading (define UNBLOCKED-TAG (%make-unique-object 'unblocked)) (define (ready-mailbox! mb) ; Ready oldest waiting thread (unless (%mailbox-waiters-empty? mb) (let ((thread (%mailbox-waiters-pop! mb))) ; Ready the thread based on wait mode (if (not (%thread-blocked? thread)) (thread-resume! thread) ;else wake early if sleeping (when (%thread-blocked-for-timeout? thread) ; Ready the thread (##sys#thread-unblock! thread) ; Tell 'wait-mailbox!' we unblocked early (thread-signal! thread UNBLOCKED-TAG) ) ) ) ) ; Side-effect only (%undefined-value) ) (define MESSAGE-WAITING-TAG (%make-unique-object 'message-waiting)) (define (wait-mailbox! loc mb to-tim to-def) ; Push current thread on mailbox waiting queue (%mailbox-waiters-add! mb (%current-thread)) ; Waiting action (cond (to-tim ; Timeout wanted so sleep until something happens (let ((early? #f)) ; Sleep current thread until desired seconds elapsed (condition-case (thread-sleep! to-tim) (exn () ; Unless unblocked "early" then a real exception so propagate (if (%eq? UNBLOCKED-TAG exn) (set! early? #t) (signal exn) ) ) ) ; Awake (cond (early? ; Unblocked early so we have a message MESSAGE-WAITING-TAG ) (else ; Timedout ; Remove from wait queue (%mailbox-waiters-delete! mb (%current-thread)) ; Signal timeout when no default (when (%undefined-value? to-def) (thread-signal! (%current-thread) (make-mailbox-timeout-condition loc to-tim to-def)) ) ; No message waiting to-def ) ) ) ) (else ; Suspend until something delivered (thread-suspend! (%current-thread)) MESSAGE-WAITING-TAG ) ) ) (define (wait-mailbox-if-empty! loc mb to-tim to-def) (if (%mailbox-queue-empty? mb) (wait-mailbox! loc mb to-tim to-def) MESSAGE-WAITING-TAG ) ) ;;; Mailbox ;; Mailbox Exceptions (define mailbox-timeout-exception? (let ((exf (condition-predicate 'exn)) (mbf (condition-predicate 'mailbox)) (tmf (condition-predicate 'timeout))) (lambda (obj) (and (exf obj) (mbf obj) (tmf obj)) ) ) ) ;; Mailbox Constructor (define (make-mailbox #!optional (nm (gensym 'mailbox))) (%check-symbol 'make-mailbox nm) (%make-mailbox nm) ) (define (mailbox? obj) (%mailbox? obj) ) ;; Mailbox Properties (define (mailbox-name mb) (%check-mailbox 'mailbox-name mb) (%mailbox-name mb) ) (define (mailbox-empty? mb) (%check-mailbox 'mailbox-empty? mb) (%mailbox-queue-empty? mb) ) (define (mailbox-count mb) (%check-mailbox 'mailbox-count mb) (%mailbox-queue-count mb) ) (define (mailbox-waiting? mb) (%check-mailbox 'mailbox-waiting? mb) (not (%null? (%mailbox-waiters mb))) ) (define (mailbox-waiters mb) (%check-mailbox 'mailbox-waiters mb) (%list-copy (%mailbox-waiters mb)) ) ;; Mailbox Operations (define (mailbox-send! mb x) (%check-mailbox 'mailbox-send! mb) (%mailbox-queue-add! mb x) (ready-mailbox! mb) ) (define (mailbox-wait! mb #!optional to-tim) (%check-mailbox 'mailbox-wait! mb) (when to-tim (%check-timeout 'mailbox-wait! to-tim)) (wait-mailbox-if-empty! 'mailbox-wait! mb to-tim (%undefined-value)) ) (define (mailbox-receive! mb #!optional to-tim (to-def (%undefined-value))) (%check-mailbox 'mailbox-receive! mb) (when to-tim (%check-timeout 'mailbox-receive! to-tim)) (let ((res (wait-mailbox-if-empty! 'mailbox-receive! mb to-tim to-def))) ; Return next item in mailbox, if any (if (%eq? MESSAGE-WAITING-TAG res) (%mailbox-queue-remove! mb) ;else return the timeout default res ) ) ) (define (mailbox-push-back! mb x) (%check-mailbox 'mailbox-send! mb) (%mailbox-queue-push-back! mb x) (ready-mailbox! mb) ) (define (mailbox-push-back-list! mb ls) (%check-mailbox 'mailbox-send! mb) (%check-list ls 'mailbox-send!) (%mailbox-queue-push-back-list! mb ls) (ready-mailbox! mb) ) ;;; Mailbox Cursor ;; Mailbox Cursor Constructor (define (make-mailbox-cursor mb) (%check-mailbox 'make-mailbox-cursor mb) (%make-mailbox-cursor mb) ) ;; Mailbox Cursor Properties (define (mailbox-cursor? obj) (%mailbox-cursor? obj) ) (define (mailbox-cursor-mailbox mbc) (%check-mailbox-cursor 'mailbox-cursor-mailbox mbc) (%mailbox-cursor-mailbox mbc) ) (define (mailbox-cursor-rewound? mbc) (%check-mailbox-cursor 'mailbox-cursor-rewound? mbc) (not (%mailbox-cursor-winding? mbc)) ) ;; Mailbox Cursor Operations (define (mailbox-cursor-rewind mbc) (%check-mailbox-cursor 'mailbox-cursor-rewind mbc) (%mailbox-cursor-rewind! mbc) ) (define (mailbox-cursor-next mbc #!optional to-tim (to-def (%undefined-value))) (%check-mailbox-cursor 'mailbox-cursor-next mbc) (when to-tim (%check-timeout 'mailbox-cursor-next to-tim)) ; Waiting mailbox peek. (let ((mb (%mailbox-cursor-mailbox mbc))) (let-values (((mailbox-waiter cursor-pair-getter) (if (%mailbox-cursor-winding? mbc) ;then wait for something to be appended (values wait-mailbox! (lambda () (%mailbox-queue-last-pair mb))) ;else grab the start of a, probably, non-empty queue (values wait-mailbox-if-empty! (lambda () (%mailbox-queue-first-pair mb))) ) ) ) (let scanning ((next-pair (%mailbox-cursor-next-pair mbc))) ; Anything next? (if (not (%null? next-pair)) ;then peek into the queue for the next item (let ((item (%car next-pair))) (%mailbox-cursor-prev-pair-set! mbc next-pair) (%mailbox-cursor-next-pair-set! mbc (%cdr next-pair)) item ) ;else wait for something in the mailbox (let ((res (mailbox-waiter 'mailbox-cursor-next mb to-tim to-def))) (cond ((%eq? MESSAGE-WAITING-TAG res) ; so continue scanning (%mailbox-cursor-next-pair-set! mbc (cursor-pair-getter)) (scanning (%mailbox-cursor-next-pair mbc)) ) (else ; otherwise timedout res ) ) ) ) ) ) ) ) (define (mailbox-cursor-extract-and-rewind! mbc) (%check-mailbox-cursor 'mailbox-cursor-extract-and-rewind! mbc) (%mailbox-cursor-extract! mbc) (%mailbox-cursor-rewind! mbc) ) ;;; Read/Print Syntax (define-record-printer (mailbox mb out) (with-output-to-port out (lambda () (display "#") ) ) ) (define-record-printer (mailbox-cursor mbc out) (with-output-to-port out (lambda () (display "#") ) ) ) ) ;module mailbox