Heartbeat code for cluster environment

Working in a cluster environment, I often need to check if some of the nodes of my cluster are dead or live. To do so, I have a class called Heartbeat in my Python toolbox. This simple heartbeat class does a ping on the cluster node, and returns True, or false depending on the health of the targeted node. This class implements a stripped down version of ping. It sends a ICMP_ECHO_REQUEST packet and waits for the answer.

To use it, I call the constructor with the node name, or IP address, followed by the number of seconds between heartbeats. Then every time I need to check if the node is still alive, I call the method is_alive(), which returns a Boolean.

Here an example of how to use it, followed by the code.

How to use it

>>> import heartbeat
>>> master = heartbeat.Heartbeat('172.16.2.1', 5)
>>> master.is_alive()
True
>>> master.is_alive()
False
>>> master.is_alive()
True
>>>

>>> import heartbeat
>>> master = heartbeat.Heartbeat('172.16.2.1', 5)
>>> master.is_alive()
True
>>> master.is_alive()
False
>>> master.is_alive()
True
>>>

The code

#
# (c) 2009 - Fred Cirera http://blogmag.net/blog/fred/
#

import array
import os
import socket
import time
import select
from struct import pack, unpack, calcsize

ICMP_TYPE = 8
ICMP_CODE = 0
ICMP_CHECKSUM = 0
ICMP_ID = 0
ICMP_SEQ_NR = 0

PACKET_SIZE = 56
HEARTBEAT_PROBE_TIME = 20
HEARTBEAT_SERVER = 'homeserver01.us.archive.org'

# This version of ping is the stripped down version of the ping.py by
# Lars Strand <lars strand at gnist org> we only need to see if the
# network is up and running.

def _construct(id):
    """
    Constructs a ICMP echo packet of variable size
    """
    # construct header
    header = pack('bbHHh', ICMP_TYPE, ICMP_CODE, ICMP_CHECKSUM, \
                         ICMP_ID, ICMP_SEQ_NR+id)
    # space for time
    size = PACKET_SIZE - calcsize("d")
    data = pack("d", time.time()) + 'X' * size
    packet = header + data          # ping packet without checksum
    checksum = _in_cksum(packet)    # make checksum

    # construct header with correct checksum
    header = pack('bbHHh', ICMP_TYPE, ICMP_CODE, checksum, ICMP_ID, \
                         ICMP_SEQ_NR+id)

    # ping packet *with* checksum
    packet = header + data 

    # a perfectly formatted ICMP echo packet
    return packet

def _in_cksum(packet):
    """THE RFC792 states: 'The 16 bit one's complement of
    the one's complement sum of all 16 bit words in the header.'

    Generates a checksum of a (ICMP) packet. Based on in_chksum found
    in ping.c on FreeBSD.
    """

    # add byte if not divisible by 2
    if len(packet) & 1:              
        packet = packet + '\0'

    # split into 16-bit word and insert into a binary array
    words = array.array('h', packet) 
    sum = 0

    # perform ones complement arithmetic on 16-bit words
    for word in words:
        sum += (word & 0xffff) 

    hi = sum >> 16 
    lo = sum & 0xffff 
    sum = hi + lo
    sum = sum + (sum >> 16)
    
    return (~sum) & 0xffff # return ones complement


def pingNode(node, sock, timeout=1.0):
    """
    Pings a node based on input given to the function.
    return False for dead, and True when alive.
    """
    pid = os.getpid()
    packet = _construct(pid) # make a ping packet

    # send the ping
    try:
        sock.sendto(packet,(node,1))
    except socket.error, e:
        return False

    # reset values
    pong = ""; iwtd = []
    
    # wait until there is data in the socket
    while 1:
        # input, output, exceptional conditions
        iwtd, owtd, ewtd = select.select([sock], [], [], timeout)
        break # no data and timout occurred 

    # data on socket - this means we have an answer
    if iwtd:  # ok, data on socket
        # read data (we only need the header)
        pong, address = sock.recvfrom(PACKET_SIZE+48)

        # fetch pong header
        pongHeader = pong[20:28]
        pongType, pongCode, pongChksum, pongID, pongSeqnr = \
                  unpack("bbHHh", pongHeader)
        
        # valid ping packet received?
        if not pongSeqnr == pid:
            pong = None

        # NO data on socket - timeout waiting for answer
        if not pong:
            return False

        return True
    
    return False

class Heartbeat:
    last_check = 0
    status = 0
    def __init__(self, node=HEARTBEAT_SERVER, probe_time=HEARTBEAT_PROBE_TIME):
        self.node = node
        self.probe_time = probe_time
        return
    def is_alive(self):
        now = int(time.time())
        if self.last_check + self.probe_time > now:
            return self.status
        
        self.last_check = now
        self.status = False
        try:
            host = socket.gethostbyname(self.node)
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, \
                                 socket.getprotobyname("icmp"))
            self.status = pingNode(self.node, sock)
            sock.close()
        except socket.gaierror:
            pass
        
        return self.status

#
# (c) 2009 - Fred Cirera http://blogmag.net/blog/fred/
#

import array
import os
import socket
import time
import select
from struct import pack, unpack, calcsize

ICMP_TYPE = 8
ICMP_CODE = 0
ICMP_CHECKSUM = 0
ICMP_ID = 0
ICMP_SEQ_NR = 0

PACKET_SIZE = 56
HEARTBEAT_PROBE_TIME = 20
HEARTBEAT_SERVER = 'homeserver01.us.archive.org'

# This version of ping is the stripped down version of the ping.py by
# Lars Strand <lars strand at gnist org> we only need to see if the
# network is up and running.

def _construct(id):
"""
Constructs a ICMP echo packet of variable size
"""
# construct header
header = pack('bbHHh', ICMP_TYPE, ICMP_CODE, ICMP_CHECKSUM, \
ICMP_ID, ICMP_SEQ_NR+id)
# space for time
size = PACKET_SIZE - calcsize("d")
data = pack("d", time.time()) + 'X' * size
packet = header + data # ping packet without checksum
checksum = _in_cksum(packet) # make checksum

# construct header with correct checksum
header = pack('bbHHh', ICMP_TYPE, ICMP_CODE, checksum, ICMP_ID, \
ICMP_SEQ_NR+id)

# ping packet *with* checksum
packet = header + data

# a perfectly formatted ICMP echo packet
return packet

def _in_cksum(packet):
"""THE RFC792 states: 'The 16 bit one's complement of
the one's complement sum of all 16 bit words in the header.'

Generates a checksum of a (ICMP) packet. Based on in_chksum found
in ping.c on FreeBSD.
"""

# add byte if not divisible by 2
if len(packet) & 1:
packet = packet + '\0'

# split into 16-bit word and insert into a binary array
words = array.array('h', packet)
sum = 0

# perform ones complement arithmetic on 16-bit words
for word in words:
sum += (word & 0xffff)

hi = sum >> 16
lo = sum & 0xffff
sum = hi + lo
sum = sum + (sum >> 16)

return (~sum) & 0xffff # return ones complement


def pingNode(node, sock, timeout=1.0):
"""
Pings a node based on input given to the function.
return False for dead, and True when alive.
"""
pid = os.getpid()
packet = _construct(pid) # make a ping packet

# send the ping
try:
sock.sendto(packet,(node,1))
except socket.error, e:
return False

# reset values
pong = ""; iwtd = []

# wait until there is data in the socket
while 1:
# input, output, exceptional conditions
iwtd, owtd, ewtd = select.select([sock], [], [], timeout)
break # no data and timout occurred

# data on socket - this means we have an answer
if iwtd: # ok, data on socket
# read data (we only need the header)
pong, address = sock.recvfrom(PACKET_SIZE+48)

# fetch pong header
pongHeader = pong[20:28]
pongType, pongCode, pongChksum, pongID, pongSeqnr = \
unpack("bbHHh", pongHeader)

# valid ping packet received?
if not pongSeqnr == pid:
pong = None

# NO data on socket - timeout waiting for answer
if not pong:
return False

return True

return False

class Heartbeat:
last_check = 0
status = 0
def __init__(self, node=HEARTBEAT_SERVER, probe_time=HEARTBEAT_PROBE_TIME):
self.node = node
self.probe_time = probe_time
return
def is_alive(self):
now = int(time.time())
if self.last_check + self.probe_time > now:
return self.status

self.last_check = now
self.status = False
try:
host = socket.gethostbyname(self.node)
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, \
socket.getprotobyname("icmp"))
self.status = pingNode(self.node, sock)
sock.close()
except socket.gaierror:
pass

return self.status
 

Leave a message

(Required)
(Required and not displayed)
(Optional)
obfuscated letters Enter the text shown in the image