(module zmq (zmq-default-context zmq-io-threads zmq-version make-context terminate-context context? make-message message? message-size message->string copy-message make-socket socket? close-socket bind-socket connect-socket socket-option-set! socket-option socket-fd send-message receive-message make-poll-item poll poll-item-socket poll-item-fd poll-item-in? poll-item-out? poll-item-error?) (import chicken scheme foreign data-structures) (use lolevel foreigners srfi-1 srfi-18) (foreign-declare "#include ") (foreign-declare "#include ") (define-record context pointer sockets) (define-foreign-type context c-pointer) (define-record message pointer mutex) (define-foreign-type message (c-pointer "zmq_msg_t")) (define-record socket pointer) (define-foreign-type socket c-pointer) (define-foreign-enum-type (socket-type int) (socket-type->int int->socket-type) ((pair) ZMQ_PAIR) ((pub) ZMQ_PUB) ((sub) ZMQ_SUB) ((req) ZMQ_REQ) ((rep) ZMQ_REP) ((xreq) ZMQ_XREQ) ((xrep) ZMQ_XREP) ((pull) ZMQ_PULL) ((push) ZMQ_PUSH)) (define-foreign-enum-type (socket-option int) (socket-option->int int->socket-option) ((hwm) ZMQ_HWM) ((swap) ZMQ_SWAP) ((affinity) ZMQ_AFFINITY) ((identity) ZMQ_IDENTITY) ((subscribe) ZMQ_SUBSCRIBE) ((unsubscribe) ZMQ_UNSUBSCRIBE) ((rate) ZMQ_RATE) ((recovery-ivl) ZMQ_RECOVERY_IVL) ((mcast-loop) ZMQ_MCAST_LOOP) ((sndbuf) ZMQ_SNDBUF) ((rcvbuf) ZMQ_RCVBUF) ((rcvmore) ZMQ_RCVMORE)) (define socket-options '((integer hwm swap affinity rate recovery-ivl sndbuf rcvbuf) (boolean rcvmore mcast-loop) (string subscribe unsubscribe identity))) (define-foreign-enum-type (socket-flag int) (socket-flag->int int->socket-flag) ((noblock zmq/noblock) ZMQ_NOBLOCK) ((sndmore zmq/sndmore) ZMQ_SNDMORE)) (define-foreign-enum-type (poll-flag short) (poll-flat->int short->poll-int) ((in zmq/pollin) ZMQ_POLLIN) ((out zmq/pollout) ZMQ_POLLOUT) ((err zmq/pollerr) ZMQ_POLLERR)) (define-record poll-item pointer socket in out) (define-foreign-record-type (poll-item zmq_pollitem_t) (constructor: make-foreign-poll-item) (destructor: free-foreign-poll-item) (socket socket %poll-item-socket %poll-item-socket-set!) (int fd %poll-item-fd %poll-item-fd-set!) (short events %poll-item-events %poll-item-events-set!) (short revents %poll-item-revents %poll-item-revents-set!)) (define-foreign-enum-type (errno int) (errno->int int->errno) ((again) EAGAIN) ((term) ETERM)) ;; helpers (define (zmq-error location) (error location ((foreign-lambda c-string zmq_strerror int) (foreign-value errno int)))) (define (errno) (foreign-value errno errno)) (define (type-error value expected-type) (error (format "invalid value: ~S (expected ~A)" value expected-type))) (define (zmq-version) (let-location ((major int) (minor int) (patch int)) ((foreign-lambda void zmq_version (c-pointer int) (c-pointer int) (c-pointer int)) (location major) (location minor) (location patch)) (list major minor patch))) ;; contexts (define zmq-io-threads (make-parameter 1)) (define zmq-default-context (make-parameter #f)) (define (zmq-default-context/initialize) (or (zmq-default-context) (begin (zmq-default-context (make-context (zmq-io-threads))) (zmq-default-context)))) (define %make-context make-context) (define (make-context io-threads) (let ((c (%make-context ((foreign-lambda context zmq_init int) io-threads) (make-mutex)))) (if (or (not (context-pointer c)) (null-pointer? (context-pointer c))) (zmq-error 'make-context) (begin (mutex-specific-set! (context-sockets c) '()) (set-finalizer! c (lambda (c) (for-each close-socket (mutex-specific (context-sockets c))) (terminate-context c))))))) (define (terminate-context ctx) (or (zero? ((foreign-lambda int zmq_term context) (context-pointer ctx))) (zmq-error 'terminate-context))) ;; messages (define %make-message make-message) (define (make-message #!optional (data-or-size #f)) (let-location ((m message)) (if (zero? (cond ((string? data-or-size) ((foreign-lambda int zmq_msg_init_data message blob unsigned-int c-pointer c-pointer) (location m) data-or-size (number-of-bytes data-or-size) (null-pointer) (null-pointer))) ((integer? data-or-size) ((foreign-lambda int zmq_msg_init_size message unsigned-int) (location m) data-or-size)) (else ((foreign-lambda int zmq_msg_init message) (location m))))) (set-finalizer! (%make-message (location m) (make-mutex)) close-message) (zmq-error 'make-message)))) (define (close-message m) (let ((mx (message-mutex m))) (mutex-lock! mx) (when (message-pointer m) (if (zero? ((foreign-lambda int zmq_msg_close message) (message-pointer m))) (message-pointer-set! m #f) (zmq-error 'close-message))) (mutex-unlock! mx))) (define (message-size m) ((foreign-lambda unsigned-integer zmq_msg_size message) (message-pointer m))) (define (message->string m) (let* ((size (message-size m)) (result (make-string size))) (move-memory! ((foreign-lambda c-pointer zmq_msg_data message) (message-pointer m)) result size) result)) (define (copy-message m) (make-message (message->string m))) ;; sockets (define %make-socket make-socket) (define (make-socket type #!optional (context (zmq-default-context/initialize))) (let ((sp ((foreign-lambda socket zmq_socket context socket-type) (context-pointer context) type))) (if (null-pointer? sp) (zmq-error 'make-socket) (let ((m (context-sockets context)) (s (%make-socket sp))) (mutex-lock! m) (mutex-specific-set! m (cons sp (mutex-specific m))) (mutex-unlock! m) (set-finalizer! s close-socket))))) (define (close-socket socket) (let ((sp (cond ((socket? socket) (socket-pointer socket)) ((pointer? socket) socket) (else (type-error socket 'socket))))) (when sp (if (zero? ((foreign-lambda int zmq_close socket) sp)) (when (socket? socket) (socket-pointer-set! socket #f)) (zmq-error 'close-socket))))) (define (bind-socket socket endpoint) (or (zero? ((foreign-lambda int zmq_bind socket c-string) (socket-pointer socket) endpoint)) (zmq-error 'bind-socket))) (define (connect-socket socket endpoint) (or (zero? ((foreign-lambda int zmq_connect socket c-string) (socket-pointer socket) endpoint)) (zmq-error 'connect-socket))) ;; integer64 is used instead of unsigned-integer64 for uint64_t ;; options since the latter has only been added to the experimental ;; branch recently. Also, we must use foreign-lambda* to be able to ;; pass in integer64 values because let-location doesn't accept ;; integer64 (also fixed in experimental) (define (socket-option-set! socket option value) (or (zero? (case option ((hwm affinity sndbuf rcvbuf swap rate recovery-ivl mcast-loop) (if (integer? value) ((foreign-safe-lambda* int ((scheme-object error) (scheme-object error_location) (socket socket) (socket-option option) (integer64 value)) "size_t size = sizeof(value); if (0 == zmq_setsockopt(socket, option, &value, size)) { C_return(0); } else { C_save(error_location); C_callback(error, 1); }") zmq-error 'socket-option-set! (socket-pointer socket) option value) (type-error value 'integer))) ((identity subscribe unsubscribe) (if (string? value) (or ((foreign-lambda int zmq_setsockopt socket socket-option c-string unsigned-int) (socket-pointer socket) option value (number-of-bytes value)) (zmq-error 'socket-option-set!)) (type-error value 'string))) (else (error (format "unknown socket option: ~A" option))))) (zmq-error 'socket-option-set!))) (define (%socket-option socket option value size) (or (zero? ((foreign-lambda int zmq_getsockopt socket int c-pointer c-pointer) (socket-pointer socket) option value size)) (zmq-error 'socket-option))) (define (socket-option socket option) (cond ((eq? option 'identity) (let-location ((value c-pointer) (size unsigned-int)) (%socket-option socket (socket-option->int option) value (location size)) (let ((str (make-string size))) (move-memory! value str size) str))) ((memq option (alist-ref 'integer socket-options)) ((foreign-safe-lambda* integer64 ((scheme-object error) (scheme-object error_location) (socket socket) (socket-option option)) "uint64_t value; size_t size = sizeof(value); if (0 == zmq_getsockopt(socket, option, &value, &size)) { C_return(value); } else { C_save(error_location); C_callback(error, 1); }") zmq-error 'socket-option (socket-pointer socket) option)) ((memq option (alist-ref 'boolean socket-options)) (let-location ((value bool) (size unsigned-int)) (%socket-option socket (socket-option->int option) (location value) (location size)) (let ((value value)) value))) (else (error (format "socket option ~A is not retrievable" option))))) (define (socket-fd socket) (let-location ((fd integer) (size unsigned-int)) (%socket-option socket (foreign-value "ZMQ_FD" int) (location fd) (location size)) (let ((fd fd)) fd))) ;; communication (define (send-message socket message #!key non-blocking send-more) (or (zero? ((foreign-lambda int zmq_send socket message int) (socket-pointer socket) (if (string? message) (message-pointer (make-message message)) (message-pointer message)) (bitwise-ior (if non-blocking zmq/noblock 0) (if send-more zmq/sndmore 0)))) (zmq-error 'send-message))) (define (receive-message socket #!key non-blocking) (let ((message (make-message))) (if (zero? ((foreign-lambda int zmq_recv socket message int) (socket-pointer socket) (message-pointer message) (if non-blocking zmq/noblock 0))) message (if (memq (errno) '(again term)) #f (zmq-error 'receive-message))))) ;; polling (define %make-poll-item make-poll-item) (define (make-poll-item socket/fd #!key in out) (let ((item (%make-poll-item (make-foreign-poll-item) (and (socket? socket/fd) socket/fd) in out))) (if (socket? socket/fd) (%poll-item-socket-set! (poll-item-pointer item) (socket-pointer socket/fd)) (%poll-item-fd-set! (poll-item-pointer item) socket/fd)) (%poll-item-events-set! (poll-item-pointer item) (bitwise-ior (if in zmq/pollin 0) (if out zmq/pollout 0))) (%poll-item-revents-set! (poll-item-pointer item) 0) (set-finalizer! item (lambda (i) (free-foreign-poll-item (poll-item-pointer i)))))) (define (poll-item-fd item) (%poll-item-fd (poll-item-pointer item))) (define (poll-item-revents item) (%poll-item-revents (poll-item-pointer item))) (define (poll-item-in? item) (not (zero? (bitwise-and zmq/pollin (poll-item-revents item))))) (define (poll-item-out? item) (not (zero? (bitwise-and zmq/pollout (poll-item-revents item))))) (define (poll-item-error? item) (not (zero? (bitwise-and zmq/pollerr (poll-item-revents item))))) (define-external (zmq_poll_item_ref (scheme-object items) (unsigned-int i)) (c-pointer poll-item) (noop) (list-ref items i)) (define %poll-sockets (foreign-safe-lambda* int ((scheme-object poll_items) (unsigned-int length) (long timeout)) "zmq_pollitem_t items[length]; zmq_pollitem_t *item_ptrs[length]; int i; for (i = 0; i < length; i++) { item_ptrs[i] = (zmq_pollitem_t *)zmq_poll_item_ref(poll_items, i); } for (i = 0; i < length; i++) { items[i] = *item_ptrs[i]; } int rc = zmq_poll(items, length, timeout); if (rc != -1) { for (i = 0; i < length; i++) { (*item_ptrs[i]).revents = items[i].revents; } } C_return(rc);")) (define (poll poll-items timeout/block) (if (null? poll-items) (error 'poll-sockets "null list passed for poll-items") (let ((result (%poll-sockets (map poll-item-pointer poll-items) (length poll-items) (case timeout/block ((#f) 0) ((#t) -1) (else timeout/block))))) (if (= result -1) (zmq-error 'poll-sockets) result)))) )