Binary Star core class in C

/* =====================================================================
bstar - Binary Star reactor

---------------------------------------------------------------------
Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
Copyright other contributors as noted in the AUTHORS file.

This file is part of the ZeroMQ Guide: http://zguide.zeromq.org

This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or (at
your option) any later version.

This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this program. If not, see
<http://www.gnu.org/licenses/>.
=====================================================================
*/

#include "bstar.h"

// States we can be in at any point in time
typedef enum {
STATE_PRIMARY = 1, // Primary, waiting for peer to connect
STATE_BACKUP = 2, // Backup, waiting for peer to connect
STATE_ACTIVE = 3, // Active - accepting connections
STATE_PASSIVE = 4 // Passive - not accepting connections
} state_t;

// Events, which start with the states our peer can be in
typedef enum {
PEER_PRIMARY = 1, // HA peer is pending primary
PEER_BACKUP = 2, // HA peer is pending backup
PEER_ACTIVE = 3, // HA peer is active
PEER_PASSIVE = 4, // HA peer is passive
CLIENT_REQUEST = 5 // Client makes request
} event_t;

// We send state information every this often
// If peer doesn't respond in two heartbeats, it is 'dead'
#define BSTAR_HEARTBEAT 1000
// In msecs

// Structure of our class

struct _bstar_t {
zctx_t *ctx; // Our private context
zloop_t *loop; // Reactor loop
void *statepub; // State publisher
void *statesub; // State subscriber
state_t state; // Current state
event_t event; // Current event
int64_t peer_expiry; // When peer is considered 'dead'
zloop_fn *voter_fn; // Voting socket handler
void *voter_arg; // Arguments for voting handler
zloop_fn *master_fn; // Call when become master
void *master_arg; // Arguments for handler
zloop_fn *slave_fn; // Call when become slave
void *slave_arg; // Arguments for handler
};

// ---------------------------------------------------------------------
// Binary Star finite state machine (applies event to state)
// Returns -1 if there was an exception, 0 if event was valid.

static int
s_execute_fsm (bstar_t *self)
{
int rc = 0;
// Primary server is waiting for peer to connect
// Accepts CLIENT_REQUEST events in this state
if (self->state == STATE_PRIMARY) {
if (self->event == PEER_BACKUP) {
zclock_log ("I: connected to backup (slave), ready as master");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
else
if (self->event == PEER_ACTIVE) {
zclock_log ("I: connected to backup (master), ready as slave");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn) (self->loop, NULL, self->slave_arg);
}
else
if (self->event == CLIENT_REQUEST) {
zclock_log ("I: request from client, ready as master");
self->state = STATE_ACTIVE;
if (self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
}
else
// Backup server is waiting for peer to connect
// Rejects CLIENT_REQUEST events in this state
if (self->state == STATE_BACKUP) {
if (self->event == PEER_ACTIVE) {
zclock_log ("I: connected to primary (master), ready as slave");
self->state = STATE_PASSIVE;
if (self->slave_fn)
(self->slave_fn) (self->loop, NULL, self->slave_arg);
}
else
if (self->event == CLIENT_REQUEST)
rc = -1;
}
else
// Server is active
// Accepts CLIENT_REQUEST events in this state
// The only way out of ACTIVE is death
if (self->state == STATE_ACTIVE) {
if (self->event == PEER_ACTIVE) {
// Two masters would mean split-brain
zclock_log ("E: fatal error - dual masters, aborting");
rc = -1;
}
}
else
// Server is passive
// CLIENT_REQUEST events can trigger failover if peer looks dead
if (self->state == STATE_PASSIVE) {
if (self->event == PEER_PRIMARY) {
// Peer is restarting - become active, peer will go passive
zclock_log ("I: primary (slave) is restarting, ready as master");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_BACKUP) {
// Peer is restarting - become active, peer will go passive
zclock_log ("I: backup (slave) is restarting, ready as master");
self->state = STATE_ACTIVE;
}
else
if (self->event == PEER_PASSIVE) {
// Two passives would mean cluster would be non-responsive
zclock_log ("E: fatal error - dual slaves, aborting");
rc = -1;
}
else
if (self->event == CLIENT_REQUEST) {
// Peer becomes master if timeout has passed
// It's the client request that triggers the failover
assert (self->peer_expiry > 0);
if (zclock_time () >= self->peer_expiry) {
// If peer is dead, switch to the active state
zclock_log ("I: failover successful, ready as master");
self->state = STATE_ACTIVE;
}
else
// If peer is alive, reject connections
rc = -1;
}
// Call state change handler if necessary
if (self->state == STATE_ACTIVE && self->master_fn)
(self->master_fn) (self->loop, NULL, self->master_arg);
}
return rc;
}

// ---------------------------------------------------------------------
// Reactor event handlers…

// Publish our state to peer
int s_send_state (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
zstr_sendf (self->statepub, "%d", self->state);
return 0;
}

// Receive state from peer, execute finite state machine
int s_recv_state (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
char *state = zstr_recv (poller->socket);
if (state) {
self->event = atoi (state);
self->peer_expiry = zclock_time () + 2 * BSTAR_HEARTBEAT;
free (state);
}
return s_execute_fsm (self);
}

// Application wants to speak to us, see if it's possible
int s_voter_ready (zloop_t *loop, zmq_pollitem_t *poller, void *arg)
{
bstar_t *self = (bstar_t *) arg;
// If server can accept input now, call appl handler
self->event = CLIENT_REQUEST;
if (s_execute_fsm (self) == 0) {
puts ("CLIENT REQUEST");
(self->voter_fn) (self->loop, poller->socket, self->voter_arg);
}
else {
// Destroy waiting message, no-one to read it
zmsg_t *msg = zmsg_recv (poller->socket);
zmsg_destroy (&msg);
}
return 0;
}

// ---------------------------------------------------------------------
// Constructor

bstar_t *
bstar_new (int primary, char *local, char *remote)
{
bstar_t
*self;

self = (bstar_t *) zmalloc (sizeof (bstar_t));

// Initialize the Binary Star
self->ctx = zctx_new ();
self->loop = zloop_new ();
self->state = primary? STATE_PRIMARY: STATE_BACKUP;

// Create publisher for state going to peer
self->statepub = zsocket_new (self->ctx, ZMQ_PUB);
zsocket_bind (self->statepub, local);

// Create subscriber for state coming from peer
self->statesub = zsocket_new (self->ctx, ZMQ_SUB);
zsocket_connect (self->statesub, remote);

// Set-up basic reactor events
zloop_timer (self->loop, BSTAR_HEARTBEAT, 0, s_send_state, self);
zmq_pollitem_t poller = { self->statesub, 0, ZMQ_POLLIN };
zloop_poller (self->loop, &poller, s_recv_state, self);
return self;
}

// ---------------------------------------------------------------------
// Destructor

void
bstar_destroy (bstar_t **self_p)
{
assert (self_p);
if (*self_p) {
bstar_t *self = *self_p;
zloop_destroy (&self->loop);
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}

// ---------------------------------------------------------------------
// Return underlying zloop reactor, lets you add additional timers and
// readers.

zloop_t *
bstar_zloop (bstar_t *self)
{
return self->loop;
}

// ---------------------------------------------------------------------
// Create socket, bind to local endpoint, and register as reader for
// voting. The socket will only be available if the Binary Star state
// machine allows it. Input on the socket will act as a "vote" in the
// Binary Star scheme. We require exactly one voter per bstar instance.

int
bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
void *arg)
{
// Hold actual handler+arg so we can call this later
void *socket = zsocket_new (self->ctx, type);
zsocket_bind (socket, endpoint);
assert (!self->voter_fn);
self->voter_fn = handler;
self->voter_arg = arg;
zmq_pollitem_t poller = { socket, 0, ZMQ_POLLIN };
return zloop_poller (self->loop, &poller, s_voter_ready, self);
}

// ---------------------------------------------------------------------
// Register state change handlers

void
bstar_new_master (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->master_fn);
self->master_fn = handler;
self->master_arg = arg;
}

void
bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg)
{
assert (!self->slave_fn);
self->slave_fn = handler;
self->slave_arg = arg;
}

// ---------------------------------------------------------------------
// Enable/disable verbose tracing

void bstar_set_verbose (bstar_t *self, Bool verbose)
{
zloop_set_verbose (self->loop, verbose);
}

// ---------------------------------------------------------------------
// Start the reactor, ends if a callback function returns -1, or the
// process received SIGINT or SIGTERM.

int
bstar_start (bstar_t *self)
{
assert (self->voter_fn);
return zloop_start (self->loop);
}