Majordomo worker API in C

/* =====================================================================

Majordomo Protocol Worker API
Implements the MDP/Worker spec at

Copyright (c) 1991-2011 iMatix Corporation <>
Copyright other contributors as noted in the AUTHORS file.

This file is part of the ZeroMQ Guide:

This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.

This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see

#include "mdwrkapi.h"

// Reliability parameters
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable

// Structure of our class
// We access these properties only via class methods

struct _mdwrk_t {
zctx_t *ctx; // Our context
char *broker;
char *service;
void *worker; // Socket to broker
int verbose; // Print activity to stdout

// Heartbeat management
uint64_t heartbeat_at; // When to send HEARTBEAT
size_t liveness; // How many attempts left
int heartbeat; // Heartbeat delay, msecs
int reconnect; // Reconnect delay, msecs

// Internal state
int expect_reply; // Zero only at start

// Return address, if any
zframe_t *reply_to;

// ---------------------------------------------------------------------
// Send message to broker
// If no msg is provided, creates one internally

static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
zmsg_t *msg)
msg = msg? zmsg_dup (msg): zmsg_new ();

// Stack protocol envelope to start of message
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);
zmsg_pushstr (msg, "");

if (self->verbose) {
zclock_log ("I: sending %s to broker",
mdps_commands [(int) *command]);
zmsg_dump (msg);
zmsg_send (&msg, self->worker);

// ---------------------------------------------------------------------
// Connect or reconnect to broker

void s_mdwrk_connect_to_broker (mdwrk_t *self)
if (self->worker)
zsocket_destroy (self->ctx, self->worker);
self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
zmq_connect (self->worker, self->broker);
if (self->verbose)
zclock_log ("I: connecting to broker at %s…", self->broker);

// Register service with broker
s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);

// If liveness hits zero, queue is considered disconnected
self->liveness = HEARTBEAT_LIVENESS;
self->heartbeat_at = zclock_time () + self->heartbeat;

// ---------------------------------------------------------------------
// Constructor

mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)
assert (broker);
assert (service);

mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->service = strdup (service);
self->verbose = verbose;
self->heartbeat = 2500; // msecs
self->reconnect = 2500; // msecs

s_mdwrk_connect_to_broker (self);
return self;

// ---------------------------------------------------------------------
// Destructor

mdwrk_destroy (mdwrk_t **self_p)
assert (self_p);
if (*self_p) {
mdwrk_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self->service);
free (self);
*self_p = NULL;

// ---------------------------------------------------------------------
// Set heartbeat delay

mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
self->heartbeat = heartbeat;

// ---------------------------------------------------------------------
// Set reconnect delay

mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
self->reconnect = reconnect;

// ---------------------------------------------------------------------
// Send reply, if any, to broker and wait for next request.

zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
// Format and send the reply if we were provided one
assert (reply_p);
zmsg_t *reply = *reply_p;
assert (reply || !self->expect_reply);
if (reply) {
assert (self->reply_to);
zmsg_wrap (reply, self->reply_to);
s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
zmsg_destroy (reply_p);
self->expect_reply = 1;

while (TRUE) {
zmq_pollitem_t items [] = {
{ self->worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->worker);
if (!msg)
break; // Interrupted
if (self->verbose) {
zclock_log ("I: received message from broker:");
zmsg_dump (msg);
self->liveness = HEARTBEAT_LIVENESS;

// Don't try to handle errors, just assert noisily
assert (zmsg_size (msg) >= 3);

zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
zframe_destroy (&empty);

zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPW_WORKER));
zframe_destroy (&header);

zframe_t *command = zmsg_pop (msg);
if (zframe_streq (command, MDPW_REQUEST)) {
// We should pop and save as many addresses as there are
// up to a null part, but for now, just save one…
self->reply_to = zmsg_unwrap (msg);
zframe_destroy (&command);
return msg; // We have a request to process
if (zframe_streq (command, MDPW_HEARTBEAT))
; // Do nothing for heartbeats
if (zframe_streq (command, MDPW_DISCONNECT))
s_mdwrk_connect_to_broker (self);
else {
zclock_log ("E: invalid input message");
zmsg_dump (msg);
zframe_destroy (&command);
zmsg_destroy (&msg);
if (--self->liveness == 0) {
if (self->verbose)
zclock_log ("W: disconnected from broker - retrying…");
zclock_sleep (self->reconnect);
s_mdwrk_connect_to_broker (self);
// Send HEARTBEAT if it's time
if (zclock_time () > self->heartbeat_at) {
s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
self->heartbeat_at = zclock_time () + self->heartbeat;
if (zctx_interrupted)
printf ("W: interrupt received, killing worker…\n");
return NULL;