;; remote-mailbox-threads for Chicken Scheme ;; ;; Copyright (c) 2010, Moritz Heidkamp ;; All rights reserved. ;; ;; Redistribution and use in source and binary forms, with or without ;; modification, are permitted provided that the following conditions are met: ;; ;; 1. Redistributions of source code must retain the above copyright ;; notice, this list of conditions and the following disclaimer. ;; ;; 2. Redistributions in binary form must reproduce the above copyright ;; notice, this list of conditions and the following disclaimer in the ;; documentation and/or other materials provided with the distribution. ;; ;; 3. Neither the name of the nor the ;; names of its contributors may be used to endorse or promote products ;; derived from this software without specific prior written permission. ;; ;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ;; ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE ;; DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY ;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES ;; (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; ;; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ;; ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT ;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS ;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (module remote-mailbox-threads (thread-send publish-thread! connect-remote-thread serializer deserializer) (import chicken scheme) (use zmq ports srfi-18 mailbox-threads) (reexport (except mailbox-threads thread-send)) (define-record remote-thread socket) (define serializer (make-parameter write)) (define deserializer (make-parameter read)) (define local-thread-send thread-send) (define (connect-remote-thread . endpoints) (let ((s (make-socket 'push))) (for-each (cut connect-socket s <>) endpoints) (make-remote-thread s))) (define (remote-thread-send thread message) (send-message (remote-thread-socket thread) (with-output-to-string (cut (serializer) message)) non-blocking: #t)) (define (thread-send thread message) (if (remote-thread? thread) (remote-thread-send thread message) (local-thread-send thread message))) (define (receive-message/serialized socket) (with-input-from-string (let loop () (let ((msg (receive-message socket non-blocking: #t))) (if msg (message->string msg) (begin (thread-wait-for-i/o! (socket-fd socket) #:input) (loop))))) (deserializer))) (define (publish-thread! thread . endpoints) (let ((s (make-socket 'pull))) (for-each (cut bind-socket s <>) endpoints) (thread-start! (lambda () (let loop () (thread-send thread (receive-message/serialized s)) (loop)))))) )