Majordomo asynchronous client API in C

/* =====================================================================

Majordomo Protocol Client API (async version)
Implements the MDP/Worker spec at

Copyright (c) 1991-2011 iMatix Corporation <>
Copyright other contributors as noted in the AUTHORS file.

This file is part of the ZeroMQ Guide:

This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.

This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see

#include "mdcliapi2.h"

// Structure of our class
// We access these properties only via class methods

struct _mdcli_t {
zctx_t *ctx; // Our context
char *broker;
void *client; // Socket to broker
int verbose; // Print activity to stdout
int timeout; // Request timeout

// ---------------------------------------------------------------------
// Connect or reconnect to broker

void s_mdcli_connect_to_broker (mdcli_t *self)
if (self->client)
zsocket_destroy (self->ctx, self->client);
self->client = zsocket_new (self->ctx, ZMQ_DEALER);
zmq_connect (self->client, self->broker);
if (self->verbose)
zclock_log ("I: connecting to broker at %s…", self->broker);

// ---------------------------------------------------------------------
// Constructor

mdcli_t *
mdcli_new (char *broker, int verbose)
assert (broker);

mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->verbose = verbose;
self->timeout = 2500; // msecs

s_mdcli_connect_to_broker (self);
return self;

// ---------------------------------------------------------------------
// Destructor

mdcli_destroy (mdcli_t **self_p)
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;

// ---------------------------------------------------------------------
// Set request timeout

mdcli_set_timeout (mdcli_t *self, int timeout)
assert (self);
self->timeout = timeout;

// ---------------------------------------------------------------------
// Send request to broker
// Takes ownership of request message and destroys it when sent.

mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
assert (self);
assert (request_p);
zmsg_t *request = *request_p;

// Prefix request with protocol frames
// Frame 0: empty (REQ emulation)
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
zmsg_pushstr (request, "");
if (self->verbose) {
zclock_log ("I: send request to '%s' service:", service);
zmsg_dump (request);
zmsg_send (&request, self->client);
return 0;

// ---------------------------------------------------------------------
// Returns the reply message or NULL if there was no reply. Does not
// attempt to recover from a broker failure, this is not possible
// without storing all unanswered requests and resending them all…

zmsg_t *
mdcli_recv (mdcli_t *self)
assert (self);

// Poll socket for a reply, with timeout
zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
return NULL; // Interrupted

// If we got a reply, process it
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: received reply:");
zmsg_dump (msg);
// Don't try to handle errors, just assert noisily
assert (zmsg_size (msg) >= 4);

zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);

zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);

zframe_t *service = zmsg_pop (msg);
zframe_destroy (&service);

return msg; // Success
if (zctx_interrupted)
printf ("W: interrupt received, killing client…\n");
if (self->verbose)
zclock_log ("W: permanent error, abandoning request");

return NULL;