Taskwork2

//
// Task worker - design 2
// Adds pub-sub flow to receive and respond to kill signal
//

#include "zhelpers.h"

int main (void)
{
void *context = zmq_init (1);

// Socket to receive messages on
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");

// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");

// Socket for control input
void *controller = zmq_socket (context, ZMQ_SUB);
zmq_connect (controller, "tcp://localhost:5559");
zmq_setsockopt (controller, ZMQ_SUBSCRIBE, "", 0);

// Process messages from receiver and controller
zmq_pollitem_t items [] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ controller, 0, ZMQ_POLLIN, 0 }
};
// Process messages from both sockets
while (1) {
zmq_msg_t message;
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN) {
zmq_msg_init (&message);
zmq_recv (receiver, &message, 0);

// Do the work
s_sleep (atoi ((char *) zmq_msg_data (&message)));

// Send results to sink
zmq_msg_init (&message);
zmq_send (sender, &message, 0);

// Simple progress indicator for the viewer
printf (".");
fflush (stdout);

zmq_msg_close (&message);
}
// Any waiting controller command acts as 'KILL'
if (items [1].revents & ZMQ_POLLIN)
break; // Exit loop
}
// Finished
zmq_close (receiver);
zmq_close (sender);
zmq_close (controller);
zmq_term (context);
return 0;
}