Binary Star server in C

//
// Binary Star server
//

#include "czmq.h"

// We send state information every this often
// If peer doesn't respond in two heartbeats, it is 'dead'

#define HEARTBEAT 1000 // In msecs

// States we can be in at any point in time
typedef enum {
STATE_PRIMARY = 1, // Primary, waiting for peer to connect
STATE_BACKUP = 2, // Backup, waiting for peer to connect
STATE_ACTIVE = 3, // Active - accepting connections
STATE_PASSIVE = 4 // Passive - not accepting connections
} state_t;

// Events, which start with the states our peer can be in
typedef enum {
PEER_PRIMARY = 1, // HA peer is pending primary
PEER_BACKUP = 2, // HA peer is pending backup
PEER_ACTIVE = 3, // HA peer is active
PEER_PASSIVE = 4, // HA peer is passive
CLIENT_REQUEST = 5 // Client makes request
} event_t;

// Our finite state machine
typedef struct {
state_t state; // Current state
event_t event; // Current event
int64_t peer_expiry; // When peer is considered 'dead'
} bstar_t;

// Execute finite state machine (apply event to state)
// Returns TRUE if there was an exception

static Bool
s_state_machine (bstar_t *fsm)
{
Bool exception = FALSE;
// Primary server is waiting for peer to connect
// Accepts CLIENT_REQUEST events in this state
if (fsm->state == STATE_PRIMARY) {
if (fsm->event == PEER_BACKUP) {
printf ("I: connected to backup (slave), ready as master\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_ACTIVE) {
printf ("I: connected to backup (master), ready as slave\n");
fsm->state = STATE_PASSIVE;
}
}
else
// Backup server is waiting for peer to connect
// Rejects CLIENT_REQUEST events in this state
if (fsm->state == STATE_BACKUP) {
if (fsm->event == PEER_ACTIVE) {
printf ("I: connected to primary (master), ready as slave\n");
fsm->state = STATE_PASSIVE;
}
else
if (fsm->event == CLIENT_REQUEST)
exception = TRUE;
}
else
// Server is active
// Accepts CLIENT_REQUEST events in this state
if (fsm->state == STATE_ACTIVE) {
if (fsm->event == PEER_ACTIVE) {
// Two masters would mean split-brain
printf ("E: fatal error - dual masters, aborting\n");
exception = TRUE;
}
}
else
// Server is passive
// CLIENT_REQUEST events can trigger failover if peer looks dead
if (fsm->state == STATE_PASSIVE) {
if (fsm->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
printf ("I: primary (slave) is restarting, ready as master\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
printf ("I: backup (slave) is restarting, ready as master\n");
fsm->state = STATE_ACTIVE;
}
else
if (fsm->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
printf ("E: fatal error - dual slaves, aborting\n");
exception = TRUE;
}
else
if (fsm->event == CLIENT_REQUEST) {
// Peer becomes master if timeout has passed
// It's the client request that triggers the failover
assert (fsm->peer_expiry > 0);
if (zclock_time () >= fsm->peer_expiry) {
// If peer is dead, switch to the active state
printf ("I: failover successful, ready as master\n");
fsm->state = STATE_ACTIVE;
}
else
// If peer is alive, reject connections
exception = TRUE;
}
}
return exception;
}

int main (int argc, char *argv [])
{
// Arguments can be either of:
// -p primary server, at tcp://localhost:5001
// -b backup server, at tcp://localhost:5002
zctx_t *ctx = zctx_new ();
void *statepub = zsocket_new (ctx, ZMQ_PUB);
void *statesub = zsocket_new (ctx, ZMQ_SUB);
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
bstar_t fsm = { 0 };

if (argc == 2 && streq (argv [1], "-p")) {
printf ("I: Primary master, waiting for backup (slave)\n");
zsocket_bind (frontend, "tcp://*:5001");
zsocket_bind (statepub, "tcp://*:5003");
zsocket_connect (statesub, "tcp://localhost:5004");
fsm.state = STATE_PRIMARY;
}
else
if (argc == 2 && streq (argv [1], "-b")) {
printf ("I: Backup slave, waiting for primary (master)\n");
zsocket_bind (frontend, "tcp://*:5002");
zsocket_bind (statepub, "tcp://*:5004");
zsocket_connect (statesub, "tcp://localhost:5003");
fsm.state = STATE_BACKUP;
}
else {
printf ("Usage: bstarsrv { -p | -b }\n");
zctx_destroy (&ctx);
exit (0);
}
// Set timer for next outgoing state message
int64_t send_state_at = zclock_time () + HEARTBEAT;

while (!zctx_interrupted) {
zmq_pollitem_t items [] = {
{ frontend, 0, ZMQ_POLLIN, 0 },
{ statesub, 0, ZMQ_POLLIN, 0 }
};
int time_left = (int) ((send_state_at - zclock_time ()));
if (time_left < 0)
time_left = 0;
int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Context has been shut down

if (items [0].revents & ZMQ_POLLIN) {
// Have a client request
zmsg_t *msg = zmsg_recv (frontend);
fsm.event = CLIENT_REQUEST;
if (s_state_machine (&fsm) == FALSE)
// Answer client by echoing request back
zmsg_send (&msg, frontend);
else
zmsg_destroy (&msg);
}
if (items [1].revents & ZMQ_POLLIN) {
// Have state from our peer, execute as event
char *message = zstr_recv (statesub);
fsm.event = atoi (message);
free (message);
if (s_state_machine (&fsm))
break; // Error, so exit
fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT;
}
// If we timed-out, send state to peer
if (zclock_time () >= send_state_at) {
char message [2];
sprintf (message, "%d", fsm.state);
zstr_send (statepub, message);
send_state_at = zclock_time () + HEARTBEAT;
}
}
if (zctx_interrupted)
printf ("W: interrupted\n");

// Shutdown sockets and context
zctx_destroy (&ctx);
return 0;
}