;;;; thread-reaper.scm ;;;; Kon Lovett, Oct '09 ;; Issues ;; ;; - What's up w/ thread-yield! ;; ;; - Could allow the stopping of an existing reaper and the startup of another. ;; Make '*stopping?*' thread thunk local w/ a set/get behavior. ; Used by threads that are cleanly terminating and wish to 'join' the ; primordial thread w/o any user intervention. (A thread that attempts ; to 'join' itself will cause a deadlock.) ; ; The "reaped" thread's end-exception, if any, is printed as a warning. ; ; The reaper can be stopped at any time (module thread-reaper (;export zombie-threads thread-reaper-shutdown? thread-reap! thread-reaper-stop! ; thread-reaper-greedy thread-reaper-greedy-set! thread-reaper-quantum thread-reaper-quantum-set! thread-reaper-wait-seconds thread-reaper-wait-seconds-set! thread-reaper-timeout thread-reaper-timeout-set! thread-reaper-retries thread-reaper-retries-set!) (import scheme) (import (chicken base)) (import (chicken type)) (import (only (chicken condition) handle-exceptions)) (import (only queues queue-empty? queue-remove! make-queue queue-add! queue->list)) (import (only (srfi 18) thread-name thread-sleep! thread-join! thread-yield! thread-start! make-thread thread-quantum-set! thread-quantum terminated-thread-exception? uncaught-exception?)) (import (only miscmacros until)) (import (only synch-object make-synch-with-object)) (import (prefix (only synch-dyn synch-with) dyn:)) (import (only synch-open %synch-with)) (import (only record-variants define-record-type-variant)) (import (only thread-utils check-thread print-exception-warning)) (import (only type-checks check-positive-number check-natural-fixnum)) ;; (define (->boolean x) (and x #t)) ;; ;modes: normal & stopping (during reaper termination) (define-constant REAPER-WAIT-SECONDS 1.0) ;reaper wait time (define-constant NORMAL-REAPER-QUANTUM 10) ;reaper thread normal (define-constant STOPPING-REAPER-QUANTUM 100) (define-constant NORMAL-REAPER-TIMEOUT #f) ;reaped thread join wait (define-constant STOPPING-REAPER-TIMEOUT 1.0) (define-constant NORMAL-REAPER-RETRIES 1) ;reaped thread reap retries (define-constant STOPPING-REAPER-RETRIES 0) ;; ;Local to this module (define-constant reap-item 'reap-item) ;type tag variable (define-record-type-variant reap-item (unsafe unchecked inline) (make-reap-item th to rt) reap-item? (th reap-item-thread) (to reap-item-timeout set-reap-item-timeout!) (rt reap-item-retries set-reap-item-retries!) ) ;; (define *zombie-threads* (make-queue)) ;queue of threads cannot reap (define-inline (empty-zombies) (set! *zombie-threads* (make-queue))) (define (zombie-threads) (queue->list *zombie-threads*)) (define REAP-TIMED-OUT '(reap-timed-out)) (define (reap-queue-thread thq) (let* ( (ri (queue-remove! thq)) (th (reap-item-thread ri)) ) ;unhandled-exception (handle-exceptions exn (let () ;(queue-add! thq ri) ;∞-loop possible if put back offender (queue-add! *zombie-threads* th) (print-exception-warning exn) ) (let ((res (thread-join! th (reap-item-timeout ri) REAP-TIMED-OUT))) ;timed-out? (if (not (eq? REAP-TIMED-OUT res)) res ;try again? (let ((rt (reap-item-retries ri))) (if (zero? rt) (warning "cannot reap thread" (reap-item-thread ri)) (begin (set-reap-item-retries! ri (sub1 rt)) (queue-add! thq ri) ) ) ) ) ) ) ) ) (define-inline (reap-thread-queue-top thq) (empty-zombies) (unless (queue-empty? thq) (reap-queue-thread thq)) ) (define-inline (reap-thread-queue thq) (empty-zombies) (until (queue-empty? thq) (reap-queue-thread thq)) ) ;; (define *threads* #f) ;queue of threads to reap (define *reaper-thread* #f) ;needs a separate thread since asynch (define *greedy?* #f) ;reaper should empty the queue each time-slice (define *wait-seconds* REAPER-WAIT-SECONDS) ;reaper sleep time (define *stopping?* #f) ;reaper should cleanly stop (define *shutdown?* #f) ;program terminating (define *timeout* NORMAL-REAPER-TIMEOUT) ;reaped thread join timeout (define *retries* NORMAL-REAPER-RETRIES) ;reaped thread join attempts ; Reapers ;NOTE `reap-queue-thread' is under `handle-exceptions' so only synch-dyn ;needed (define-inline (reap-all) (dyn:synch-with *threads* threads (reap-thread-queue threads))) (define-inline (reap-top) (dyn:synch-with *threads* threads (reap-thread-queue-top threads))) (define-inline (reap) (if *greedy?* (reap-all) (reap-top))) ; Reaper thread thunk (define (reaper) (let loop () (if *stopping?* (reap-all) (begin (reap) #; ;FIXME this causes busy loop! (thread-yield!) (thread-sleep! 1.0) (loop) ) ) ) ) (define (adjust-reap-items-for-stopping) (%synch-with *threads* threads (for-each (lambda (ri) (set-reap-item-retries! ri STOPPING-REAPER-RETRIES) (set-reap-item-timeout! ri STOPPING-REAPER-TIMEOUT) ) (queue->list threads)) ) ) ;; (define (thread-reaper-shutdown!) (set! *shutdown?* #t) (thread-reaper-stop!) ) (define (thread-reaper-start!) ;ensure reasonable state anyway (unless *threads* ;only done once (set! *threads* (make-synch-with-object (make-queue) '(queue/synch-))) ;clean shutdown (on-exit thread-reaper-shutdown!) ) ;no reaper? (unless *reaper-thread* (set! *stopping?* #f) (set! *reaper-thread* (make-thread reaper 'thread-reaper)) (thread-quantum-set! *reaper-thread* NORMAL-REAPER-QUANTUM) (thread-start! *reaper-thread*) ) ) (define-inline (ensure-reaper) (unless *reaper-thread* (thread-reaper-start!))) ;;; Public (define (thread-reaper-shutdown?) (or *stopping?* *shutdown?*)) (define (thread-reap! th) (check-thread 'thread-reap! th) ;ignore request when cannot fulfill (if (thread-reaper-shutdown?) (warning "attempt to reap a thread as reaper winding up" th) (begin (ensure-reaper) (%synch-with *threads* threads (queue-add! threads (make-reap-item th *timeout* *retries*))) ) ) ) (define (thread-reaper-stop!) (when (and *reaper-thread* (not *stopping?*)) (let ( (th *reaper-thread*) ) ;bump up the time-slice so queue clears faster (thread-quantum-set! th (max (thread-quantum th) STOPPING-REAPER-QUANTUM)) ;no long waits or retries (adjust-reap-items-for-stopping) ;tell reaper we're quits (set! *stopping?* #t) ;waits until queue empty ;FIXME Timeout? Assuming each item joins/timed-out then not needed. (thread-join! th) ;no more reaping with this thread (set! *reaper-thread* #f) ) ) ) ;; ;"location" style calling (define (thread-reaper-greedy-set! flag) (set! *greedy?* (->boolean flag))) (define thread-reaper-greedy (getter-with-setter (lambda args (if (null? args) *greedy?* (begin (warning 'thread-reaper-greedy "mutation deprecated") (thread-reaper-greedy-set! (car args)) ) ) ) thread-reaper-greedy-set!)) (define (thread-reaper-quantum-set! qt) (unless *reaper-thread* (error 'thread-reaper-quantum-set! "reaper is not running")) (unless (thread-reaper-shutdown?) (thread-quantum-set! *reaper-thread* qt)) ) (define thread-reaper-quantum (getter-with-setter (lambda args (unless *reaper-thread* (error 'thread-reaper-quantum "reaper is not running")) (if (null? args) (thread-quantum *reaper-thread*) (begin (warning 'thread-reaper-quantum "mutation deprecated") (thread-reaper-quantum-set! (car args)) ) ) ) thread-reaper-quantum-set!)) (define (thread-reaper-wait-seconds-set! to) (set! *wait-seconds* (check-positive-number 'thread-reaper-wait-seconds to)) ) (define thread-reaper-wait-seconds (getter-with-setter (lambda args (if (null? args) *wait-seconds* (begin (warning 'thread-reaper-wait-seconds "mutation deprecated") (thread-reaper-wait-seconds-set! (car args)) ) ) ) thread-reaper-wait-seconds-set!)) (define (thread-reaper-timeout-set! to) (set! *timeout* (and to (check-positive-number 'thread-reaper-timeout to))) ) (define thread-reaper-timeout (getter-with-setter (lambda args (if (null? args) *timeout* (begin (warning 'thread-timeout "mutation deprecated") (thread-reaper-timeout-set! (car args)) ) ) ) thread-reaper-timeout-set!)) (define (thread-reaper-retries-set! rt) (set! *retries* (check-natural-fixnum 'thread-reaper-retries rt)) ) (define thread-reaper-retries (getter-with-setter (lambda args (if (null? args) *retries* (begin (warning 'thread-reaper-retries "mutation deprecated") (thread-reaper-retries-set! (car args)) ) ) ) thread-reaper-retries-set!)) ) ;module thread-reaper