Round-trip demonstrator in C

//
// Round-trip demonstrator
//
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Client thread signals to
// main when it's ready.
//
#include "czmq.h"

static void
client_task (void *args, zctx_t *ctx, void *pipe)
{
void *client = zsocket_new (ctx, ZMQ_DEALER);
zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);
zsocket_connect (client, "tcp://localhost:5555");

printf ("Setting up test…\n");
zclock_sleep (100);

int requests;
int64_t start;

printf ("Synchronous round-trip test…\n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
zstr_send (client, "hello");
char *reply = zstr_recv (client);
free (reply);
}
printf (" %d calls/second\n",
(1000 * 10000) / (int) (zclock_time () - start));

printf ("Asynchronous round-trip test…\n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++)
zstr_send (client, "hello");
for (requests = 0; requests < 100000; requests++) {
char *reply = zstr_recv (client);
free (reply);
}
printf (" %d calls/second\n",
(1000 * 100000) / (int) (zclock_time () - start));

zstr_send (pipe, "done");
}

static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);
zsocket_connect (worker, "tcp://localhost:5556");

while (1) {
zmsg_t *msg = zmsg_recv (worker);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}

static void *
broker_task (void *args)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555");
zsocket_bind (backend, "tcp://*:5556");

// Initialize poll set
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
while (1) {
int rc = zmq_poll (items, 2, -1);
if (rc == -1)
break; // Interrupted
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (frontend);
zframe_t *address = zmsg_pop (msg);
zframe_destroy (&address);
zmsg_pushstr (msg, "W");
zmsg_send (&msg, backend);
}
if (items [1].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (backend);
zframe_t *address = zmsg_pop (msg);
zframe_destroy (&address);
zmsg_pushstr (msg, "C");
zmsg_send (&msg, frontend);
}
}
zctx_destroy (&ctx);
return NULL;
}

int main (void)
{
// Create threads
zctx_t *ctx = zctx_new ();
void *client = zthread_fork (ctx, client_task, NULL);
zthread_new (worker_task, NULL);
zthread_new (broker_task, NULL);

// Wait for signal on client pipe
char *signal = zstr_recv (client);
free (signal);

zctx_destroy (&ctx);
return 0;
}