Peering3

//
// Broker peering simulation (part 3)
// Prototypes the full flow of status and tasks
//
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Each thread has its own
// context and conceptually acts as a separate process.
//
#include "czmq.h"

#define NBR_CLIENTS 10
#define NBR_WORKERS 5
#define LRU_READY "\001" // Signals worker is ready

// Our own name; in practice this'd be configured per node
static char *self;

// Request-reply client using REQ socket
// To simulate load, clients issue a burst of requests and then
// sleep for a random period.
//

static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
void *monitor = zsocket_new (ctx, ZMQ_PUSH);
zsocket_connect (monitor, "ipc://%s-monitor.ipc", self);

while (1) {
sleep (randof (5));
int burst = randof (15);
while (burst--) {
char task_id [5];
sprintf (task_id, "%04X", randof (0x10000));

// Send request with random hex ID
zstr_send (client, task_id);

// Wait max ten seconds for a reply, then complain
zmq_pollitem_t pollset [1] = { { client, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (pollset, 1, 10 * 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

if (pollset [0].revents & ZMQ_POLLIN) {
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
// Worker is supposed to answer us with our task id
assert (streq (reply, task_id));
zstr_sendf (monitor, "%s", reply);
free (reply);
}
else {
zstr_sendf (monitor,
"E: CLIENT EXIT - lost task %s", task_id);
return NULL;
}
}
}
zctx_destroy (&ctx);
return NULL;
}

// Worker using REQ socket to do LRU routing
//

static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);

// Tell broker we're ready for work
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);

while (1) {
// Workers are busy for 0/1 seconds
zmsg_t *msg = zmsg_recv (worker);
sleep (randof (2));
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

int main (int argc, char *argv [])
{
// First argument is this broker's name
// Other arguments are our peers' names
//
if (argc < 2) {
printf ("syntax: peering3 me {you}…\n");
exit (EXIT_FAILURE);
}
self = argv [1];
printf ("I: preparing broker at %s…\n", self);
srandom ((unsigned) time (NULL));

// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
char endpoint [256];

// Bind cloud frontend to endpoint
void *cloudfe = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_identity (cloudfe, self);
zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);

// Bind state backend / publisher to endpoint
void *statebe = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (statebe, "ipc://%s-state.ipc", self);

// Connect cloud backend to all peers
void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_identity (cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to cloud frontend at '%s'\n", peer);
zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
}

// Connect statefe to all peers
void *statefe = zsocket_new (ctx, ZMQ_SUB);
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to state backend at '%s'\n", peer);
zsocket_connect (statefe, "ipc://%s-state.ipc", peer);
}
// Prepare local frontend and backend
void *localfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);

void *localbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);

// Prepare monitor socket
void *monitor = zsocket_new (ctx, ZMQ_PULL);
zsocket_bind (monitor, "ipc://%s-monitor.ipc", self);

// Start local workers
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);

// Start local clients
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);

// Interesting part
// -------------------------------------------------------------
// Publish-subscribe flow
// - Poll statefe and process capacity updates
// - Each time capacity changes, broadcast new value
// Request-reply flow
// - Poll primary and process local/cloud replies
// - While worker available, route localfe to local or cloud

// Queue of available workers
int local_capacity = 0;
int cloud_capacity = 0;
zlist_t *workers = zlist_new ();

while (1) {
zmq_pollitem_t primary [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 },
{ statefe, 0, ZMQ_POLLIN, 0 },
{ monitor, 0, ZMQ_POLLIN, 0 }
};
// If we have no workers anyhow, wait indefinitely
int rc = zmq_poll (primary, 4,
local_capacity? 1000 * ZMQ_POLL_MSEC: -1);
if (rc == -1)
break; // Interrupted

// Track if capacity changes during this iteration
int previous = local_capacity;

// Handle reply from local worker
zmsg_t *msg = NULL;

if (primary [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // Interrupted
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
local_capacity++;

// If it's READY, don't route the message any further
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
}
// Or handle reply from peer broker
else
if (primary [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudbe);
if (!msg)
break; // Interrupted
// We don't use peer broker address for anything
zframe_t *address = zmsg_unwrap (msg);
zframe_destroy (&address);
}
// Route reply to cloud if it's addressed to a broker
for (argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data (zmsg_first (msg));
size_t size = zframe_size (zmsg_first (msg));
if (size == strlen (argv [argn])
&& memcmp (data, argv [argn], size) == 0)
zmsg_send (&msg, cloudfe);
}
// Route reply to client if we still need to
if (msg)
zmsg_send (&msg, localfe);

// Handle capacity updates
if (primary [2].revents & ZMQ_POLLIN) {
char *status = zstr_recv (statefe);
cloud_capacity = atoi (status);
free (status);
}
// Handle monitor message
if (primary [3].revents & ZMQ_POLLIN) {
char *status = zstr_recv (monitor);
printf ("%s\n", status);
free (status);
}

// Now route as many clients requests as we can handle
// - If we have local capacity we poll both localfe and cloudfe
// - If we have cloud capacity only, we poll just localfe
// - Route any request locally if we can, else to cloud
//
while (local_capacity + cloud_capacity) {
zmq_pollitem_t secondary [] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
if (local_capacity)
rc = zmq_poll (secondary, 2, 0);
else
rc = zmq_poll (secondary, 1, 0);
assert (rc >= 0);

if (secondary [0].revents & ZMQ_POLLIN)
msg = zmsg_recv (localfe);
else
if (secondary [1].revents & ZMQ_POLLIN)
msg = zmsg_recv (cloudfe);
else
break; // No work, go back to primary

if (local_capacity) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zmsg_wrap (msg, frame);
zmsg_send (&msg, localbe);
local_capacity--;
}
else {
// Route to random broker peer
int random_peer = randof (argc - 2) + 2;
zmsg_pushmem (msg, argv [random_peer], strlen (argv [random_peer]));
zmsg_send (&msg, cloudbe);
}
}
if (local_capacity != previous) {
// We stick our own address onto the envelope
zstr_sendm (statebe, self);
// Broadcast new capacity
zstr_sendf (statebe, "%d", local_capacity);
}
}
// When we're done, clean up properly
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}