;;;; thread-reaper.scm ;;;; Kon Lovett, Oct '09 ;; Used by threads that are cleanly terminating and wish to 'join' the ;; primordial thread w/o any user intervention. (A thread that attempts ;; to 'join' itself will cause a deadlock.) ;; ;; The "reaped" thread's end-exception, if any, is printed as a warning. ;; ;; The reaper can be stopped at any time ;; Issues ;; ;; - What's up w/ thread-yield! ;; ;; - Could allow the stopping of an existing reaper and the startup of another. ;; Make '*stopping?*' thread thunk local w/ a set/get behavior. (module thread-reaper (;export zombie-threads thread-reaper-shutdown? thread-reap! thread-reaper-stop! ; thread-reaper-greedy thread-reaper-greedy-set! thread-reaper-quantum thread-reaper-quantum-set! thread-reaper-timeout thread-reaper-timeout-set! thread-reaper-retries thread-reaper-retries-set!) (import scheme (chicken base) (chicken type) (only (chicken condition) handle-exceptions) (only queues queue-empty? queue-remove! make-queue queue-add! queue->list) (only (srfi 18) thread-name thread-sleep! thread-join! thread-yield! thread-start! make-thread thread-quantum-set! thread-quantum terminated-thread-exception? uncaught-exception?) (only miscmacros until) (only synch-object make-synch-with-object) (prefix (only synch-dyn synch-with) dyn:) (only synch-open %synch-with) (only record-variants define-record-type-variant) (only thread-utils check-thread print-exception-warning check-thread-timeout) (only (check-errors sys) check-fixnum-in-range)) (define (check-natural-fixnum loc obj) (import (only (chicken fixnum) most-positive-fixnum)) (check-fixnum-in-range loc obj 0 most-positive-fixnum) ) (define-type real (or integer float ratnum)) (define-type time (struct time)) (define-type thread-timeout (or false time real)) (: zombie-threads (-> list)) (: thread-reaper-shutdown? (-> boolean)) (: thread-reap! (thread -> void)) (: thread-reaper-stop! (-> void)) (: thread-reaper-greedy (-> boolean)) (: thread-reaper-greedy-set! (boolean -> void)) (: thread-reaper-quantum (-> fixnum)) (: thread-reaper-quantum-set! (fixnum -> void)) (: thread-reaper-timeout (-> thread-timeout)) (: thread-reaper-timeout-set! (thread-timeout -> void)) (: thread-reaper-retries (-> fixnum)) (: thread-reaper-retries-set! (fixnum -> void)) ;; (define (->boolean x) (and x #t)) ;; ;modes: normal & stopping (during reaper termination) (define-constant NORMAL-REAPER-QUANTUM 10) ;reaper thread normal (define-constant STOPPING-REAPER-QUANTUM 100) (define-constant NORMAL-REAPER-TIMEOUT #f) ;reaped thread join wait (define-constant STOPPING-REAPER-TIMEOUT 1.0) (define-constant NORMAL-REAPER-RETRIES 1) ;reaped thread reap retries (define-constant STOPPING-REAPER-RETRIES 0) ;; ;Local to this module (define-constant reap-item 'reap-item) ;type tag variable (define-record-type-variant reap-item (unsafe unchecked inline) (make-reap-item th to rt) reap-item? (th reap-item-thread) (to reap-item-timeout set-reap-item-timeout!) (rt reap-item-retries set-reap-item-retries!) ) ;; (define *zombie-threads* (make-queue)) ;queue of threads cannot reap (define-inline (empty-zombies) (set! *zombie-threads* (make-queue))) (define (zombie-threads) (queue->list *zombie-threads*)) (define REAP-TIMED-OUT '(reap-timed-out)) (define (reap-queue-thread thq) (let* ((ri (queue-remove! thq)) (th (reap-item-thread ri)) ) ;unhandled-exception (handle-exceptions exn (begin ;(queue-add! thq ri) ;∞-loop possible if put back offender (queue-add! *zombie-threads* th) (print-exception-warning exn) ) (let ((res (thread-join! th (reap-item-timeout ri) REAP-TIMED-OUT))) ;timed-out? (if (not (eq? REAP-TIMED-OUT res)) res ;try again? (let ((rt (reap-item-retries ri))) (if (zero? rt) (warning "cannot reap thread" (reap-item-thread ri)) (begin (set-reap-item-retries! ri (sub1 rt)) (queue-add! thq ri) ) ) ) ) ) ) ) ) (define-inline (reap-thread-queue-top thq) (empty-zombies) (unless (queue-empty? thq) (reap-queue-thread thq)) ) (define-inline (reap-thread-queue thq) (empty-zombies) (until (queue-empty? thq) (reap-queue-thread thq)) ) ;; (define *threads* #f) ;queue of threads to reap (define *reaper-thread* #f) ;needs a separate thread since asynch (define *greedy?* #f) ;reaper should empty the queue each time-slice (define *stopping?* #f) ;reaper should cleanly stop (define *shutdown?* #f) ;program terminating (define *timeout* NORMAL-REAPER-TIMEOUT) ;reaped thread join timeout (define *retries* NORMAL-REAPER-RETRIES) ;reaped thread join attempts ; Reapers ;NOTE `reap-queue-thread' is under `handle-exceptions' so only synch-dyn ;needed (define-inline (reap-all) (dyn:synch-with *threads* threads (reap-thread-queue threads))) (define-inline (reap-top) (dyn:synch-with *threads* threads (reap-thread-queue-top threads))) (define-inline (reap) (if *greedy?* (reap-all) (reap-top))) ; Reaper thread thunk (define (reaper) (let loop () (if *stopping?* (reap-all) (begin (reap) #; ;FIXME this causes busy loop! (thread-yield!) (thread-sleep! 1.0) (loop) ) ) ) ) (define (adjust-reap-items-for-stopping) (%synch-with *threads* threads (for-each (lambda (ri) (set-reap-item-retries! ri STOPPING-REAPER-RETRIES) (set-reap-item-timeout! ri STOPPING-REAPER-TIMEOUT) ) (queue->list threads)) ) ) ;; (define (thread-reaper-shutdown!) (set! *shutdown?* #t) (thread-reaper-stop!) ) (define (thread-reaper-start!) ;ensure reasonable state anyway (unless *threads* ;only done once (set! *threads* (make-synch-with-object (make-queue) '(queue/synch-))) ;clean shutdown (on-exit thread-reaper-shutdown!) ) ;no reaper? (unless *reaper-thread* (set! *stopping?* #f) (set! *reaper-thread* (make-thread reaper 'thread-reaper)) (thread-quantum-set! *reaper-thread* NORMAL-REAPER-QUANTUM) (thread-start! *reaper-thread*) ) ) (define-inline (ensure-reaper) (unless *reaper-thread* (thread-reaper-start!))) ;;; Public (define (thread-reaper-shutdown?) (or *stopping?* *shutdown?*)) (define (thread-reap! th) (check-thread 'thread-reap! th) ;ignore request when cannot fulfill (if (thread-reaper-shutdown?) (warning "attempt to reap a thread as reaper winding up" th) (begin (ensure-reaper) (%synch-with *threads* threads (queue-add! threads (make-reap-item th *timeout* *retries*))) ) ) ) (define (thread-reaper-stop!) (when (and *reaper-thread* (not *stopping?*)) (let ((th *reaper-thread*)) ;bump up the time-slice so queue clears faster (thread-quantum-set! th (max (thread-quantum th) STOPPING-REAPER-QUANTUM)) ;no long waits or retries (adjust-reap-items-for-stopping) ;tell reaper we're quits (set! *stopping?* #t) ;waits until queue empty ;FIXME Timeout? Assuming each item joins/timed-out then not needed. (thread-join! th) ;no more reaping with this thread (set! *reaper-thread* #f) ) ) ) ;; (define (thread-reaper-greedy-set! flag) (set! *greedy?* (->boolean flag))) (define (thread-reaper-quantum-set! qt) (unless *reaper-thread* (error 'thread-reaper-quantum-set! "reaper is not running")) (unless (thread-reaper-shutdown?) (thread-quantum-set! *reaper-thread* qt)) ) (define (thread-reaper-timeout-set! to) (set! *timeout* (and to (check-thread-timeout 'thread-reaper-timeout to))) ) (define (thread-reaper-retries-set! rt) (set! *retries* (check-natural-fixnum 'thread-reaper-retries rt)) ) ;; ;"location" style calling (define thread-reaper-greedy (getter-with-setter (lambda args *greedy?*) thread-reaper-greedy-set!)) (define thread-reaper-quantum (getter-with-setter (lambda args (unless *reaper-thread* (error 'thread-reaper-quantum "reaper is not running")) (thread-quantum *reaper-thread*)) thread-reaper-quantum-set!)) (define thread-reaper-timeout (getter-with-setter (lambda args *timeout*) thread-reaper-timeout-set!)) (define thread-reaper-retries (getter-with-setter (lambda args *retries*) thread-reaper-retries-set!)) ) ;module thread-reaper