Majordomo broker in C

//
// Majordomo Protocol broker
// A minimal implementation of http://rfc.zeromq.org/spec:7 and spec:8
//
#include "czmq.h"
#include "mdp.h"

// We'd normally pull these from config data

#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 2500 // msecs
#define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

// This defines a single broker
typedef struct {
zctx_t *ctx; // Our context
void *socket; // Socket for clients & workers
int verbose; // Print activity to stdout
char *endpoint; // Broker binds to this endpoint
zhash_t *services; // Hash of known services
zhash_t *workers; // Hash of known workers
zlist_t *waiting; // List of waiting workers
uint64_t heartbeat_at; // When to send HEARTBEAT
} broker_t;

// This defines a single service
typedef struct {
char *name; // Service name
zlist_t *requests; // List of client requests
zlist_t *waiting; // List of waiting workers
size_t workers; // How many workers we have
} service_t;

// This defines one worker, idle or active
typedef struct {
char *identity; // Identity of worker
zframe_t *address; // Address frame to route to
service_t *service; // Owning service, if known
int64_t expiry; // Expires at unless heartbeat
} worker_t;

// ---------------------------------------------------------------------
// Broker functions

static broker_t *
s_broker_new (int verbose);
static void
s_broker_destroy (broker_t **self_p);
static void
s_broker_bind (broker_t *self, char *endpoint);
static void
s_broker_purge_workers (broker_t *self);

// Service functions
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame);
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);

// Worker functions
static worker_t *
s_worker_require (broker_t *self, zframe_t *address);
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
s_worker_destroy (void *argument);
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
char *option, zmsg_t *msg);
static void
s_worker_waiting (broker_t *self, worker_t *worker);

// Client functions
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);

// ---------------------------------------------------------------------
// Main broker work happens here

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));

broker_t *self = s_broker_new (verbose);
s_broker_bind (self, "tcp://*:5555");

// Get and process messages forever or until interrupted
while (TRUE) {
zmq_pollitem_t items [] = {
{ self->socket, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// Process next input message, if any
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->socket);
if (!msg)
break; // Interrupted
if (self->verbose) {
zclock_log ("I: received message:");
zmsg_dump (msg);
}
zframe_t *sender = zmsg_pop (msg);
zframe_t *empty = zmsg_pop (msg);
zframe_t *header = zmsg_pop (msg);

if (zframe_streq (header, MDPC_CLIENT))
s_client_process (self, sender, msg);
else
if (zframe_streq (header, MDPW_WORKER))
s_worker_process (self, sender, msg);
else {
zclock_log ("E: invalid message:");
zmsg_dump (msg);
zmsg_destroy (&msg);
}
zframe_destroy (&sender);
zframe_destroy (&empty);
zframe_destroy (&header);
}
// Disconnect and delete any expired workers
// Send heartbeats to idle workers if needed
if (zclock_time () > self->heartbeat_at) {
s_broker_purge_workers (self);
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *) zlist_next (self->waiting);
}
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
}
if (zctx_interrupted)
printf ("W: interrupt received, shutting down…\n");

s_broker_destroy (&self);
return 0;
}

// ---------------------------------------------------------------------
// Constructor for broker object

static broker_t *
s_broker_new (int verbose)
{
broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));

// Initialize broker state
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
self->verbose = verbose;
self->services = zhash_new ();
self->workers = zhash_new ();
self->waiting = zlist_new ();
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
return self;
}

// ---------------------------------------------------------------------
// Destructor for broker object

static void
s_broker_destroy (broker_t **self_p)
{
assert (self_p);
if (*self_p) {
broker_t *self = *self_p;
zctx_destroy (&self->ctx);
zhash_destroy (&self->services);
zhash_destroy (&self->workers);
zlist_destroy (&self->waiting);
free (self);
*self_p = NULL;
}
}

// ---------------------------------------------------------------------
// Bind broker to endpoint, can call this multiple times
// We use a single socket for both clients and workers.

void
s_broker_bind (broker_t *self, char *endpoint)
{
zsocket_bind (self->socket, endpoint);
zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint);
}

// ---------------------------------------------------------------------
// Delete any idle workers that haven't pinged us in a while.
// We know that workers are ordered from oldest to most recent.

static void
s_broker_purge_workers (broker_t *self)
{
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
if (zclock_time () < worker->expiry)
break; // Worker is alive, we're done here
if (self->verbose)
zclock_log ("I: deleting expired worker: %s",
worker->identity);

s_worker_delete (self, worker, 0);
worker = (worker_t *) zlist_first (self->waiting);
}
}

// ---------------------------------------------------------------------
// Locate or create new service entry

static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{
assert (service_frame);
char *name = zframe_strdup (service_frame);

service_t *service =
(service_t *) zhash_lookup (self->services, name);
if (service == NULL) {
service = (service_t *) zmalloc (sizeof (service_t));
service->name = name;
service->requests = zlist_new ();
service->waiting = zlist_new ();
zhash_insert (self->services, name, service);
zhash_freefn (self->services, name, s_service_destroy);
if (self->verbose)
zclock_log ("I: added service: %s", name);
}
else
free (name);

return service;
}

// ---------------------------------------------------------------------
// Destroy service object, called when service is removed from
// broker->services.

static void
s_service_destroy (void *argument)
{
service_t *service = (service_t *) argument;
// Destroy all queued requests
while (zlist_size (service->requests)) {
zmsg_t *msg = zlist_pop (service->requests);
zmsg_destroy (&msg);
}
zlist_destroy (&service->requests);
zlist_destroy (&service->waiting);
free (service->name);
free (service);
}

// ---------------------------------------------------------------------
// Dispatch requests to waiting workers as possible

static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg)
{
assert (service);
if (msg) // Queue message if any
zlist_append (service->requests, msg);

s_broker_purge_workers (self);
while (zlist_size (service->waiting)
&& zlist_size (service->requests))
{
worker_t *worker = zlist_pop (service->waiting);
zlist_remove (self->waiting, worker);
zmsg_t *msg = zlist_pop (service->requests);
s_worker_send (self, worker, MDPW_REQUEST, NULL, msg);
zmsg_destroy (&msg);
}
}

// ---------------------------------------------------------------------
// Handle internal service according to 8/MMI specification

static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg)
{
char *return_code;
if (zframe_streq (service_frame, "mmi.service")) {
char *name = zframe_strdup (zmsg_last (msg));
service_t *service =
(service_t *) zhash_lookup (self->services, name);
return_code = service && service->workers? "200": "404";
free (name);
}
else
return_code = "501";

zframe_reset (zmsg_last (msg), return_code, strlen (return_code));

// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_push (msg, zframe_dup (service_frame));
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
}

// ---------------------------------------------------------------------
// Creates worker if necessary

static worker_t *
s_worker_require (broker_t *self, zframe_t *address)
{
assert (address);

// self->workers is keyed off worker identity
char *identity = zframe_strhex (address);
worker_t *worker =
(worker_t *) zhash_lookup (self->workers, identity);

if (worker == NULL) {
worker = (worker_t *) zmalloc (sizeof (worker_t));
worker->identity = identity;
worker->address = zframe_dup (address);
zhash_insert (self->workers, identity, worker);
zhash_freefn (self->workers, identity, s_worker_destroy);
if (self->verbose)
zclock_log ("I: registering new worker: %s", identity);
}
else
free (identity);
return worker;
}

// ---------------------------------------------------------------------
// Deletes worker from all data structures, and destroys worker

static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect)
{
assert (worker);
if (disconnect)
s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL);

if (worker->service) {
zlist_remove (worker->service->waiting, worker);
worker->service->workers--;
}
zlist_remove (self->waiting, worker);
// This implicitly calls s_worker_destroy
zhash_delete (self->workers, worker->identity);
}

// ---------------------------------------------------------------------
// Destroy worker object, called when worker is removed from
// broker->workers.

static void
s_worker_destroy (void *argument)
{
worker_t *worker = (worker_t *) argument;
zframe_destroy (&worker->address);
free (worker->identity);
free (worker);
}

// ---------------------------------------------------------------------
// Process message sent to us by a worker

static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 1); // At least, command

zframe_t *command = zmsg_pop (msg);
char *identity = zframe_strhex (sender);
int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
free (identity);
worker_t *worker = s_worker_require (self, sender);

if (zframe_streq (command, MDPW_READY)) {
if (worker_ready) // Not first command in session
s_worker_delete (self, worker, 1);
else
if (zframe_size (sender) >= 4 // Reserved service name
&& memcmp (zframe_data (sender), "mmi.", 4) == 0)
s_worker_delete (self, worker, 1);
else {
// Attach worker to service and mark as idle
zframe_t *service_frame = zmsg_pop (msg);
worker->service = s_service_require (self, service_frame);
worker->service->workers++;
s_worker_waiting (self, worker);
zframe_destroy (&service_frame);
}
}
else
if (zframe_streq (command, MDPW_REPLY)) {
if (worker_ready) {
// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
s_worker_waiting (self, worker);
}
else
s_worker_delete (self, worker, 1);
}
else
if (zframe_streq (command, MDPW_HEARTBEAT)) {
if (worker_ready)
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
else
s_worker_delete (self, worker, 1);
}
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_worker_delete (self, worker, 0);
else {
zclock_log ("E: invalid input message");
zmsg_dump (msg);
}
free (command);
zmsg_destroy (&msg);
}

// ---------------------------------------------------------------------
// Send message to worker
// If pointer to message is provided, sends that message. Does not
// destroy the message, this is the caller's job.

static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
char *option, zmsg_t *msg)
{
msg = msg? zmsg_dup (msg): zmsg_new ();

// Stack protocol envelope to start of message
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);

// Stack routing envelope to start of message
zmsg_wrap (msg, zframe_dup (worker->address));

if (self->verbose) {
zclock_log ("I: sending %s to worker",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->socket);
}

// ---------------------------------------------------------------------
// This worker is now waiting for work

static void
s_worker_waiting (broker_t *self, worker_t *worker)
{
// Queue to broker and service waiting lists
zlist_append (self->waiting, worker);
zlist_append (worker->service->waiting, worker);
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (self, worker->service, NULL);
}

// ---------------------------------------------------------------------
// Process a request coming from a client

static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 2); // Service name + body

zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);

// Set reply return address to client sender
zmsg_wrap (msg, zframe_dup (sender));
if (zframe_size (service_frame) >= 4
&& memcmp (zframe_data (service_frame), "mmi.", 4) == 0)
s_service_internal (self, service_frame, msg);
else
s_service_dispatch (self, service, msg);
zframe_destroy (&service_frame);
}