;;;; thread-reaper.scm ;;;; Kon Lovett, Oct '09 ;; Issues ;; ;; - Could allow the stopping of an existing reaper and the startup of another. ;; Make '+stopping?+' thread thunk local w/ a set/get behavior. ; Used by threads that are cleanly terminating and wish to 'join' the ; primordial thread w/o any user intervention. (A thread that attempts ; to 'join' itself will cause a deadlock.) ; ; The "reaped" thread's end-exception, if any, is printed as a warning. ; ; The reaper can be stopped at any time (module thread-reaper (;export thread-reaper-shutdown? thread-reap! thread-reaper-stop! thread-reaper-greedy thread-reaper-quantum thread-reaper-timeout thread-reaper-retries) (import scheme chicken (only data-structures queue-empty? queue-remove! make-queue queue-add! queue->list) (only srfi-18 thread-join! thread-yield! thread-start! make-thread thread-quantum-set! thread-quantum terminated-thread-exception? uncaught-exception?) (only miscmacros until) (only synch make-object/synch synch-with %synch-with) (only record-variants define-record-type-variant) (only thread-utils check-thread print-exception-warning) (only type-checks check-positive-number check-natural-fixnum)) (require-library data-structures srfi-18 record-variants miscmacros synch thread-utils) ;; ;Local to this module (define-record-type-variant reap-item (unsafe unchecked inline) (make-reap-item th to rt) reap-item? (th reap-item-thread) (to reap-item-timeout set-reap-item-timeout!) (rt reap-item-retries set-reap-item-retries!) ) ;; (define REAP-TIMED-OUT '#(reap-timedout)) (define (reap-queue-thread thq) ;In case an unhandled-exception (let* ((ri (queue-remove! thq)) (th (reap-item-thread ri)) (to (reap-item-timeout ri)) ) (handle-exceptions exn (begin #; ;FIXME should we put the offender back? (queue-add! thq ri) (print-exception-warning exn) ) (let ((res (thread-join! th to REAP-TIMED-OUT))) ;Try again if it just timed-out (when (eq? REAP-TIMED-OUT res) (let ((rt (reap-item-retries ri))) (if (fx= 0 rt) (warning "cannot reap thread" th) (begin (set-reap-item-retries! ri (fx- rt 1)) (queue-add! thq ri) ) ) ) ) ) ) ) ) (define-inline (reap-thread-queue-top thq) (unless (queue-empty? thq) (reap-queue-thread thq)) ) (define-inline (reap-thread-queue thq) (until (queue-empty? thq) (reap-queue-thread thq)) ) ;; (define-constant DEFAULT-REAPER-QUANTUM 2500) (define-constant STOPPING-REAPER-QUANTUM 10000) (define-constant DEFAULT-REAPER-TIMEOUT #f) (define-constant STOPPING-REAPER-TIMEOUT 1.0) (define-constant DEFAULT-REAPER-RETRIES 1) (define-constant STOPPING-REAPER-RETRIES 0) (define +threads+ #f) ;Queue of threads to reap (define +reaper-thread+ #f) ;Needs a separate thread since asynch (define +greedy?+ #f) ;Reaper should empty the queue each time-slice (define +timeout+ DEFAULT-REAPER-TIMEOUT) ;Reaper join timeout (define +retries+ DEFAULT-REAPER-RETRIES) ;Reaper join attempts (define +stopping?+ #f) ;Reaper should cleanly stop (define +shutdown?+ #f) ;Program terminating ; (define-inline (reap-all) (%synch-with +threads+ threads (reap-thread-queue threads) ) ) (define-inline (reap-top) (%synch-with +threads+ threads (reap-thread-queue-top threads) ) ) (define-inline (reap) (%synch-with +threads+ threads (if +greedy?+ (reap-thread-queue threads) (reap-thread-queue-top threads) ) ) ) ; Reaper thread thunk (define (reaper) (if +stopping?+ (reap-all) (begin (reap) (thread-yield!) (reaper) ) ) ) (define (adjust-reap-items-for-stopping) (%synch-with +threads+ threads (for-each (lambda (ri) (set-reap-item-retries! ri STOPPING-REAPER-RETRIES) (set-reap-item-timeout! ri STOPPING-REAPER-TIMEOUT) ) (queue->list threads)) ) ) ;; (define (thread-reaper-shutdown!) (set! +shutdown?+ #t) (thread-reaper-stop!) ) (define (thread-reaper-start!) ;ensure reasonable state anyway. (unless +threads+ ;Only done once (set! +threads+ (make-object/synch (make-queue) '(queue/synch-))) ;Clean shutdown (on-exit thread-reaper-shutdown!) ) (unless +reaper-thread+ ;Whenever no reaper (set! +stopping?+ #f) (set! +reaper-thread+ (make-thread reaper 'thread-reaper)) (thread-quantum-set! +reaper-thread+ DEFAULT-REAPER-QUANTUM) (thread-start! +reaper-thread+) ) ) (define-inline (ensure-reaper) (unless +reaper-thread+ (thread-reaper-start!)) ) ;;; (define (thread-reaper-shutdown?) +shutdown?+) (define (thread-reaper-stop!) (when (and +reaper-thread+ (not +stopping?+)) (let ((th +reaper-thread+)) ;Bump up the time-slice so queue clears faster (thread-quantum-set! th (fxmax (thread-quantum th) STOPPING-REAPER-QUANTUM)) ;No long waits or retries (adjust-reap-items-for-stopping) ;Tell reaper we're quits (set! +stopping?+ #t) ;Waits until queue empty ;FIXME Timeout? Assuming each item joins/timed-out then not needed. (thread-join! th) ;No more reaping with this thread (set! +reaper-thread+ #f) ) ) ) (define (thread-reaper-greedy . args) (if (null? args) +greedy?+ (set! +greedy?+ (and (car args) #t)) ) ) (define (thread-reaper-quantum . args) (cond (+reaper-thread+ (if (null? args) (thread-quantum +reaper-thread+) (unless (or +stopping?+ +shutdown?+) (thread-quantum-set! +reaper-thread+ (car args)) ) ) ) (else (error 'thread-reaper-quantum "reaper is not running") ) ) ) (define (thread-reaper-timeout . args) (if (null? args) +timeout+ (let ((to (car args))) (set! +timeout+ (and to (check-positive-number 'thread-reaper-timeout to))) ) ) ) (define (thread-reaper-retries . args) (if (null? args) +retries+ (let ((rt (car args))) (set! +retries+ (check-natural-fixnum 'thread-reaper-retries rt)) ) ) ) (define (thread-reap! th) (check-thread 'thread-reap! th) ;Ignore request when cannot fulfill (if (or +stopping?+ +shutdown?+) (warning "attempt to reap a thread as reaper winding up" th) (begin (ensure-reaper) (%synch-with +threads+ threads (queue-add! threads (make-reap-item th +timeout+ +retries+))) ) ) ) ) ;module thread-reaper