Lruqueue3

//
// Least-recently used (LRU) queue device
// Demonstrates use of the libczmq API and reactor style
//
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Each thread has its own
// context and conceptually acts as a separate process.
//
#include "czmq.h"

#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // Signals worker is ready

// Basic request-reply client using REQ socket
//

static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://frontend.ipc");

// Send request, get reply
while (1) {
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break;
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}

// Worker using REQ socket to do LRU routing
//

static void *
worker_task (void *arg_ptr)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://backend.ipc");

// Tell broker we're ready for work
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);

// Process messages as they arrive
while (1) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted
//zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

// Our LRU queue structure, passed to reactor handlers
typedef struct {
void *frontend; // Listen to clients
void *backend; // Listen to workers
zlist_t *workers; // List of ready workers
} lruqueue_t;

// Handle input from client, on frontend
int s_handle_frontend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->frontend);
if (msg) {
zmsg_wrap (msg, (zframe_t *) zlist_pop (self->workers));
zmsg_send (&msg, self->backend);

// Cancel reader on frontend if we went from 1 to 0 workers
if (zlist_size (self->workers) == 0) {
zmq_pollitem_t poller = { self->frontend, 0, ZMQ_POLLIN };
zloop_poller_end (loop, &poller);
}
}
return 0;
}

// Handle input from worker, on backend
int s_handle_backend (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
// Use worker address for LRU routing
lruqueue_t *self = (lruqueue_t *) arg;
zmsg_t *msg = zmsg_recv (self->backend);
if (msg) {
zframe_t *address = zmsg_unwrap (msg);
zlist_append (self->workers, address);

// Enable reader on frontend if we went from 0 to 1 workers
if (zlist_size (self->workers) == 1) {
zmq_pollitem_t poller = { self->frontend, 0, ZMQ_POLLIN };
zloop_poller (loop, &poller, s_handle_frontend, self);
}
// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
else
zmsg_send (&msg, self->frontend);
}
return 0;
}

int main (void)
{
zctx_t *ctx = zctx_new ();
lruqueue_t *self = (lruqueue_t *) zmalloc (sizeof (lruqueue_t));
self->frontend = zsocket_new (ctx, ZMQ_ROUTER);
self->backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (self->frontend, "ipc://frontend.ipc");
zsocket_bind (self->backend, "ipc://backend.ipc");

int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);

// Queue of available workers
self->workers = zlist_new ();

// Prepare reactor and fire it up
zloop_t *reactor = zloop_new ();
zmq_pollitem_t poller = { self->backend, 0, ZMQ_POLLIN };
zloop_poller (reactor, &poller, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);

// When we're done, clean up properly
while (zlist_size (self->workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (self->workers);
zframe_destroy (&frame);
}
zlist_destroy (&self->workers);
zctx_destroy (&ctx);
free (self);
return 0;
}