Peering1

//
// Broker peering simulation (part 1)
// Prototypes the state flow
//

#include "czmq.h"

int main (int argc, char *argv [])
{
// First argument is this broker's name
// Other arguments are our peers' names
//
if (argc < 2) {
printf ("syntax: peering1 me {you}…\n");
exit (EXIT_FAILURE);
}
char *self = argv [1];
printf ("I: preparing broker at %s…\n", self);
srandom ((unsigned) time (NULL));

// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *statebe = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (statebe, "ipc://%s-state.ipc", self);

// Connect statefe to all peers
void *statefe = zsocket_new (ctx, ZMQ_SUB);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to state backend at '%s'\n", peer);
zsocket_connect (statefe, "ipc://%s-state.ipc", peer);
}
// Send out status messages to peers, and collect from peers
// The zmq_poll timeout defines our own heartbeating
//
while (1) {
// Initialize poll set
zmq_pollitem_t items [] = {
{ statefe, 0, ZMQ_POLLIN, 0 }
};
// Poll for activity, or 1 second timeout
int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// Handle incoming status message
if (items [0].revents & ZMQ_POLLIN) {
char *peer_name = zstr_recv (statefe);
char *available = zstr_recv (statefe);
printf ("%s - %s workers free\n", peer_name, available);
free (peer_name);
free (available);
}
else {
// Send random value for worker availability
zstr_sendm (statebe, self);
zstr_sendf (statebe, "%d", randof (10));
}
}
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}