Top

forestbus.cbor_rpc module

The cbor_rpc module provides a multi-thread safe client API for making RPC calls encoded with CBOR. The RPC protocol supported is that defined by Go (see http://golang.org/pkg/net/rpc/), encoded with the CBOR (see http://cbor.io) codec. This package is used internally by the Forest Bus client.

"""
The cbor_rpc module provides a multi-thread safe client API for making RPC calls encoded with CBOR.  The RPC protocol supported is that defined by Go (see http://golang.org/pkg/net/rpc/), encoded with the CBOR (see http://cbor.io) codec.  This package is used internally by the Forest Bus client.
"""

import cbor
import threading
import io
import socket

RpcIOException = Exception ("Connection failed while waiting for RPC response")
""" RpcIOException is thrown by Call when the connection to the RPC server failed before recieving a response."""

class _Response:
	"""
	An internal Response class used by the library to hold an outstanding RPC call and capture the result of the call.
	"""
	def __init__ (self, id):
		self.id = id
		self.cond = threading.Condition()
		self.receivedResponse = False
		self.linkBroken = False
		self.response = None
		self.errorMsg = None

	def getResponse (self):
		self.cond.acquire()
		# Double check to see if we already have a response before waiting
		if not self.receivedResponse:
			self.cond.wait()
		self.cond.release()
		# If the link is broken, raise an exception.
		if self.linkBroken:
			raise RpcIOException
		return (self.response, self.errorMsg)

	def setReponse (self, errorMsg, result):
		self.cond.acquire()
		self.response = result
		self.errorMsg = errorMsg
		self.receivedResponse = True
		self.cond.notify()
		self.cond.release()

	def setIOError (self):
		self.cond.acquire()
		self.receivedResponse = True
		self.linkBroken = True
		self.cond.notify()
		self.cond.release()



class Client:
	"""
	The RPC Client object wraps an existing socket connection and provides an RPC interface.  This is used by the forest.Client class to establish connections to each node in the cluster as required.

	The RPC protocol consists of sending two objects to make an RPC call:

 * A header object: {"ServiceMethod": "service.method", "Seq": sequenceNumber} where the service.method is the name of the service being called and sequenceNumber is a 64 bit number that is unique to this TCP session.
 * The parameters object required by the service.

The RPC response also consists of two objects:
 * A response object: {"ServiceMethod": "service.method", "Seq": sequenceNumber, "Error": errorMsg}.  The sequenceNumber is used to relate this RPC response to the call that was made and allows for multiple calls to be interleaved and returned in any order.  The Error field is an empty string for a successful call, or the error message if one occured.
 * The response object returned by the service.  This will be sent even if Error is not empty, although values in the response object may not have been populated depending on the error.

	"""
	def __init__ (self, connection):
		""" Create a Client using the given socket connection. """
		self._Closed = True
		self._Connection = connection
		self._Lock = threading.Lock()
		#self._Reader = io.BufferedReader (connection)
		#self._Writer = io.BufferedWriter (connection)
		fileobj = connection.makefile(mode='rwb')
		self._Reader = fileobj
		self._Writer = fileobj
		self._InFlightCalls = {}
		self._RPCIndex = 1
		self._Closed = False

		# Start a thread to manage reading responses
		self._ReadThread = threading.Thread (target = self.ReadResponses)
		self._ReadThread.start()

	def Call (self, serviceName, args):
		"""
		Call sends the RPC request to the serviceName given and with the given args object.

		This method is multi-thread safe and will block until there is either an error or a response from the service.

		Exceptions can be thrown if the client object does not have a connection to the RPC server.
		"""
		self._Lock.acquire()
		if self._Closed:
			self._Lock.release()
			raise Exception ("Client connection closed.")
		# Write the RPC header out
		cbor.dump({"ServiceMethod": serviceName, "Seq": self._RPCIndex}, self._Writer)
		# Now write out the arguments
		cbor.dump(args, self._Writer)
		resp = _Response(self._RPCIndex)
		self._InFlightCalls[self._RPCIndex] = resp
		self._RPCIndex += 1
		self._Writer.flush()
		self._Lock.release()
		# Wait on the reader notifying us that this call has completed.
		return resp.getResponse()

	def Close (self):
		"""
		Close shutsdown the RPC client.  This closes the wrapped connection, which triggers the end of the recieve thread and notifies any outstanding RPC client calls of the failure to complete. 
		"""
		self._Lock.acquire()
		self._Closed = True
		self._Connection.shutdown(socket.SHUT_RDWR)
		self._Connection.close()
		self._Lock.release()

	def ReadResponses (self):
		"""
		The ReadResposnes method is called automatically by the library to handle incoming RPC responses.  These will be handed off to the correct calling thread.  When the Client is closed or the socket connection fails any outstanding requests are notified of their failure.
		"""
		running = True
		while (running):
			# Get a response header
			try:
				resp = cbor.load (self._Reader)
				reply = cbor.load (self._Reader)
			except:
				running = False

			if running:
				self._Lock.acquire()
				waitingResponse = self._InFlightCalls[resp["Seq"]]
				del self._InFlightCalls[resp["Seq"]]
				self._Lock.release()
				waitingResponse.setReponse(resp["Error"], reply)
		# Tell any waiting clients that we have an exception
		self._Lock.acquire()
		self._Closed = True
		for seq in self._InFlightCalls:
			waitingResponse = self._InFlightCalls[seq]
			waitingResponse.setIOError()
		self._InFlightCalls = {}
		self._Lock.release()

		



Module variables

var RpcIOException

RpcIOException is thrown by Call when the connection to the RPC server failed before recieving a response.

Classes

class Client

The RPC Client object wraps an existing socket connection and provides an RPC interface. This is used by the forest.Client class to establish connections to each node in the cluster as required.

    The RPC protocol consists of sending two objects to make an RPC call:
  • A header object: {"ServiceMethod": "service.method", "Seq": sequenceNumber} where the service.method is the name of the service being called and sequenceNumber is a 64 bit number that is unique to this TCP session.
  • The parameters object required by the service.

The RPC response also consists of two objects: * A response object: {"ServiceMethod": "service.method", "Seq": sequenceNumber, "Error": errorMsg}. The sequenceNumber is used to relate this RPC response to the call that was made and allows for multiple calls to be interleaved and returned in any order. The Error field is an empty string for a successful call, or the error message if one occured. * The response object returned by the service. This will be sent even if Error is not empty, although values in the response object may not have been populated depending on the error.

class Client:
	"""
	The RPC Client object wraps an existing socket connection and provides an RPC interface.  This is used by the forest.Client class to establish connections to each node in the cluster as required.

	The RPC protocol consists of sending two objects to make an RPC call:

 * A header object: {"ServiceMethod": "service.method", "Seq": sequenceNumber} where the service.method is the name of the service being called and sequenceNumber is a 64 bit number that is unique to this TCP session.
 * The parameters object required by the service.

The RPC response also consists of two objects:
 * A response object: {"ServiceMethod": "service.method", "Seq": sequenceNumber, "Error": errorMsg}.  The sequenceNumber is used to relate this RPC response to the call that was made and allows for multiple calls to be interleaved and returned in any order.  The Error field is an empty string for a successful call, or the error message if one occured.
 * The response object returned by the service.  This will be sent even if Error is not empty, although values in the response object may not have been populated depending on the error.

	"""
	def __init__ (self, connection):
		""" Create a Client using the given socket connection. """
		self._Closed = True
		self._Connection = connection
		self._Lock = threading.Lock()
		#self._Reader = io.BufferedReader (connection)
		#self._Writer = io.BufferedWriter (connection)
		fileobj = connection.makefile(mode='rwb')
		self._Reader = fileobj
		self._Writer = fileobj
		self._InFlightCalls = {}
		self._RPCIndex = 1
		self._Closed = False

		# Start a thread to manage reading responses
		self._ReadThread = threading.Thread (target = self.ReadResponses)
		self._ReadThread.start()

	def Call (self, serviceName, args):
		"""
		Call sends the RPC request to the serviceName given and with the given args object.

		This method is multi-thread safe and will block until there is either an error or a response from the service.

		Exceptions can be thrown if the client object does not have a connection to the RPC server.
		"""
		self._Lock.acquire()
		if self._Closed:
			self._Lock.release()
			raise Exception ("Client connection closed.")
		# Write the RPC header out
		cbor.dump({"ServiceMethod": serviceName, "Seq": self._RPCIndex}, self._Writer)
		# Now write out the arguments
		cbor.dump(args, self._Writer)
		resp = _Response(self._RPCIndex)
		self._InFlightCalls[self._RPCIndex] = resp
		self._RPCIndex += 1
		self._Writer.flush()
		self._Lock.release()
		# Wait on the reader notifying us that this call has completed.
		return resp.getResponse()

	def Close (self):
		"""
		Close shutsdown the RPC client.  This closes the wrapped connection, which triggers the end of the recieve thread and notifies any outstanding RPC client calls of the failure to complete. 
		"""
		self._Lock.acquire()
		self._Closed = True
		self._Connection.shutdown(socket.SHUT_RDWR)
		self._Connection.close()
		self._Lock.release()

	def ReadResponses (self):
		"""
		The ReadResposnes method is called automatically by the library to handle incoming RPC responses.  These will be handed off to the correct calling thread.  When the Client is closed or the socket connection fails any outstanding requests are notified of their failure.
		"""
		running = True
		while (running):
			# Get a response header
			try:
				resp = cbor.load (self._Reader)
				reply = cbor.load (self._Reader)
			except:
				running = False

			if running:
				self._Lock.acquire()
				waitingResponse = self._InFlightCalls[resp["Seq"]]
				del self._InFlightCalls[resp["Seq"]]
				self._Lock.release()
				waitingResponse.setReponse(resp["Error"], reply)
		# Tell any waiting clients that we have an exception
		self._Lock.acquire()
		self._Closed = True
		for seq in self._InFlightCalls:
			waitingResponse = self._InFlightCalls[seq]
			waitingResponse.setIOError()
		self._InFlightCalls = {}
		self._Lock.release()

Ancestors (in MRO)

Methods

def __init__(

self, connection)

Create a Client using the given socket connection.

def __init__ (self, connection):
	""" Create a Client using the given socket connection. """
	self._Closed = True
	self._Connection = connection
	self._Lock = threading.Lock()
	#self._Reader = io.BufferedReader (connection)
	#self._Writer = io.BufferedWriter (connection)
	fileobj = connection.makefile(mode='rwb')
	self._Reader = fileobj
	self._Writer = fileobj
	self._InFlightCalls = {}
	self._RPCIndex = 1
	self._Closed = False
	# Start a thread to manage reading responses
	self._ReadThread = threading.Thread (target = self.ReadResponses)
	self._ReadThread.start()

def Call(

self, serviceName, args)

Call sends the RPC request to the serviceName given and with the given args object.

This method is multi-thread safe and will block until there is either an error or a response from the service.

Exceptions can be thrown if the client object does not have a connection to the RPC server.

def Call (self, serviceName, args):
	"""
	Call sends the RPC request to the serviceName given and with the given args object.
	This method is multi-thread safe and will block until there is either an error or a response from the service.
	Exceptions can be thrown if the client object does not have a connection to the RPC server.
	"""
	self._Lock.acquire()
	if self._Closed:
		self._Lock.release()
		raise Exception ("Client connection closed.")
	# Write the RPC header out
	cbor.dump({"ServiceMethod": serviceName, "Seq": self._RPCIndex}, self._Writer)
	# Now write out the arguments
	cbor.dump(args, self._Writer)
	resp = _Response(self._RPCIndex)
	self._InFlightCalls[self._RPCIndex] = resp
	self._RPCIndex += 1
	self._Writer.flush()
	self._Lock.release()
	# Wait on the reader notifying us that this call has completed.
	return resp.getResponse()

def Close(

self)

Close shutsdown the RPC client. This closes the wrapped connection, which triggers the end of the recieve thread and notifies any outstanding RPC client calls of the failure to complete.

def Close (self):
	"""
	Close shutsdown the RPC client.  This closes the wrapped connection, which triggers the end of the recieve thread and notifies any outstanding RPC client calls of the failure to complete. 
	"""
	self._Lock.acquire()
	self._Closed = True
	self._Connection.shutdown(socket.SHUT_RDWR)
	self._Connection.close()
	self._Lock.release()

def ReadResponses(

self)

The ReadResposnes method is called automatically by the library to handle incoming RPC responses. These will be handed off to the correct calling thread. When the Client is closed or the socket connection fails any outstanding requests are notified of their failure.

def ReadResponses (self):
	"""
	The ReadResposnes method is called automatically by the library to handle incoming RPC responses.  These will be handed off to the correct calling thread.  When the Client is closed or the socket connection fails any outstanding requests are notified of their failure.
	"""
	running = True
	while (running):
		# Get a response header
		try:
			resp = cbor.load (self._Reader)
			reply = cbor.load (self._Reader)
		except:
			running = False
		if running:
			self._Lock.acquire()
			waitingResponse = self._InFlightCalls[resp["Seq"]]
			del self._InFlightCalls[resp["Seq"]]
			self._Lock.release()
			waitingResponse.setReponse(resp["Error"], reply)
	# Tell any waiting clients that we have an exception
	self._Lock.acquire()
	self._Closed = True
	for seq in self._InFlightCalls:
		waitingResponse = self._InFlightCalls[seq]
		waitingResponse.setIOError()
	self._InFlightCalls = {}
	self._Lock.release()