# Authors:
# 2016: Jayamine Alupotha
# 2020: Wolfgang Fahl

import socket
import json
import os
import struct
import threading
import time
import uuid
from enum import IntEnum
from queue import Queue, Empty
from subprocess import *
from threading import Timer
from threading import Thread

[docs]class State(IntEnum): """ Eventbus state see""" CONNECTING=0 OPEN=1 CLOSING=2 CLOSED=3
[docs]class RepeatTimer(Timer): """ repeating timer """
[docs] def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs)
[docs]class TcpEventBusBridgeStarter(): """ starter for the java based TcpEventBusBridge and test EchoVerticle """
[docs] def __init__(self,port,jar=None,waitFor="EchoVerticle started",debug=False): """ construct me Args: port(int): the port to listen to jar(str): the path to the TcpEventBusBridge jar file waitFor(str): the output string on stderr of the java process to waitFor debug(bool): True if debugging output should be shown else False - default: False """ self.port=port self.waitFor=waitFor self.debug=debug if jar is None: scriptpath=os.path.dirname(os.path.abspath(__file__)) if self.debug: print("scriptpath is %s" % scriptpath) self.jar=scriptpath+"/TcpEventBusBridge.jar" else: self.jar=jar self.started=False
[docs] def checkPort(self): """ check that a socket connection is possible on the given port Args: port(int): the port to check Returns: bool: True if the port is available else False """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) host='localhost' check=None try: sock.connect((host, self.port)) check=True except ConnectionRefusedError: check=False finally: sock.close() return check
[docs] def start(self): """ start the jar file""" self._javaStart()
[docs] def wait(self,timeOut=30.0,timeStep=0.1): """ wait for the java server to be started Args: timeOut(float): the timeOut in secs after which the wait fails with an Exception timeStep(float): the timeStep in secs in which the state should be regularly checked :raise: :Exception: wait timed out """ timeLeft=timeOut; while not self.started and timeLeft>0: time.sleep(timeStep) timeLeft=timeLeft-timeStep if timeLeft<=0: raise Exception("wait for start timedOut after %.3f secs" % (timeOut)) if self.debug: print("wait for start successful after %.3f secs" % (timeOut-timeLeft))
[docs] def stop(self): """ stop the jar file""" self.process.kill() self.started=False
def _handleJavaOutput(self): """ handle the output of the java program""" out=self.process.stderr for bline in iter(out.readline, b''): line=bline.decode('utf8') if self.debug: print("java: %s" % line) if self.waitFor in line: self.started=True out.close() def _javaStart(self): """ call java jar """ if self.debug: print ("starting java -jar %s" % self.jar) self.process = Popen(['java', '-jar' , self.jar, "--port",str(self.port)], stderr=PIPE) t = Thread(target=self._handleJavaOutput) t.daemon = True # thread dies with the program t.start()
[docs]class Eventbus(object): """ Vert.x TCP eventbus client for python :ivar headers: any headers to be sent as per the vertx-tcp-eventbus-bridge specification :ivar state: the state of the the eventbus :vartype state: State.CONNECTING: State :ivar host: 'localhost' the host the eventbus is connected to :vartype host: str :ivar port: 7000 : the port to be used for the socket connection :vartype port: int :ivar pingInterval:5000:the ping interval in millisecs :vartype pingInterval: int :ivar pongCount:0:the number of pongs received :vartype pongCount: int :ivar timeOut: DEFAULT_TIMEOUT:time in secs to be used as the socket timeout :vartype timeOut: float :ivar debug: False: True if debugging should be enabled :vartype debug: bool :ivar onError:onError:the function to handle errors messages with no address :vartype onError: function :ivar handlers:{}: the dict of handlers for incoming messages :vartype handlers: dict :ivar replyHandler:{}: the dict of handlers for reply messages :vartype replyHandlers: dict """ DEFAULT_TIMEOUT=60.0
[docs] def __init__(self, host='localhost', port=7000,options=None, onError=None,timeOut=None,connect=True,debug=False): """ constructor Args: host(str): the host to connect to - default: 'localhost' port(int): the port to use - default: 7000 options(dict): e.g. { vertxbus_ping_interval=5000 } onError(function): the handler to use for erromessages with no address- default: None will be replaced by default onError timeOut(float): time in secs to be used as the socket timeout - default: 60 secs - the minimium timeOut is 10 msecs and will be enforced connect(bool): True if the eventbus should automatically be opened - default: True debug(bool): True if debugging should be enabled - default: False :raise: :IOError: - the socket could not be opened :Exception: - some other issue e.g. with starting the listening thread """ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.handlers = {} self.replyHandler = {} self.headers = {} = host self.port = port self.debug=debug if options is None: self.pingInterVal=5000; else: if "vertxbus_ping_interval" in options: self.pingInterVal=options["vertxbus_ping_interval"]; if onError is None: self.onError=self.onErrorHandler else: self.onError=onError self.pongCount=0 self.pingTimer=RepeatTimer(self.pingInterVal/1000, if timeOut is None: timeOut=Eventbus.DEFAULT_TIMEOUT if timeOut < 0.01: self.timeOut = 0.01 else: self.timeOut = timeOut self.state = State.CONNECTING if connect: # connect
[docs] def open(self): """ open the eventbus by connecting the eventbus socket and starting a listening thread by default the connection is opened on construction of an Eventbus instance :raise: :IOError: - the socket could not be opened :Exception: - some other issue e.g. with starting the listening thread """ try: self._connect() t1 = threading.Thread(target=self._receivingThread) t1.start() except IOError as e: self.close() raise e except Exception as e: self.close() raise e
def _connect(self): """ connect my socket """ self.sock.connect((, self.port)) self.sock.settimeout(self.timeOut)
[docs] def wait(self,state=State.OPEN,timeOut=5.0, timeStep=0.01): """ wait for the eventbus to reach the given state Args: state(State): the state to wait for - default: State.OPEN timeOut(float): the timeOut in secs after which the wait fails with an Exception timeStep(float): the timeStep in secs in which the state should be regularly checked :raise: :Exception: wait timed out """ timeLeft=timeOut; while not self.state is state and timeLeft>0: time.sleep(timeStep) timeLeft=timeLeft-timeStep if timeLeft<=0: raise Exception("wait for %s timedOut after %.3f secs" % (,timeOut)) if self.debug: print("wait for %s successful after %.3f secs" % (,timeOut-timeLeft))
[docs] def addHeader(self, header, value): """ add a header with the given header key and value Args: header(str): the key of the header value to add value(object): the value of the header value to add """ self.headers[header] = value
[docs] def isOpen(self): """ Checks if the eventbus state is OPEN. Returns: bool: True if State is OPEN else False """ if self.state is State.OPEN: return True return False
[docs] def onErrorHandler(self,message): """ default onError Handler - only gives debug output """ if self.debug: print("error message '%s' not handled" % message)
[docs] def pongHandler(self): """ default pong Handler - counts the number of pongs Received """ self.pongCount=self.pongCount+1 if self.debug: print("pong %d received" %self.pongCount)
def _sendFrame(self, message_s): """ send the given message Args: message_s (str): the message to be sent. """ message = message_s.encode('utf-8') msgLen=len(message) frame = struct.pack('!I', msgLen) + message if self.debug: print("sending %d bytes '%s'" % (msgLen,message_s)) self.sock.sendall(frame) def _receive(self): """ receive a message as specified in <Length: uInt32><{ type: String, address: String, (replyAddress: String)?, headers: JsonObject, body: JsonObject }: JsonObject> """ if (self.debug): print ("trying to receive a message in state %s" % # this is a blocking call which should run in separate thread # receive the first uInt32 4 bytes if self.state < State.CLOSING: # closing socket len_str = self.sock.recv(4) else: raise Exception("eventbus is closed while trying to receive first 4 bytes of message/Length") len1 = struct.unpack("!i", len_str)[0] if (self.debug): print ("trying to receive %d bytes in state %s" % (len1, if self.state < State.CLOSING: # closing socket payload = self.sock.recv(len1) else: raise Exception("eventbus is closed while trying to receive payload of %d bytes" % (len1)) json_message = payload.decode('utf-8') message = json.loads(json_message) debugInfo="%d message bytes with payload %s" % (len1,message) # check if (self.debug): print(debugInfo) if not 'type' in message: raise Exception("invalid message - type missing in: '%s'" % debugInfo) msgType=message['type']; if msgType == 'message': if 'address' not in message: raise Exception("invalid message - address missing in '%s'" % debugInfo) address=message['address'] if address in self.handlers: for handler in self.handlers[address]: handler(None,message) elif address in self.replyHandler: handler=self.replyHandler[address] handler(None,message) del self.replyHandler[address] else: raise Exception("no handler for address %s" % debugInfo) elif msgType == 'err': self.onError(message) elif msgType == 'pong': self.pongHandler() else: raise Exception("invalid message type %s in '%s'" %(msgType,debugInfo) ) def _receivingThread(self): """ receive loop to be started in separate Thread """ self.state = State.OPEN self.pingTimer.start() # debug message after open .. if self.debug: print ("starting receiving thread") while self.state < State.CLOSING: # CONNECTING=0, OPEN=1 try: self._receive() except Exception as e: if self.debug: print(e) if self.debug: print ("receiving thread finished in state %s" % self.sock.close() self.state = State.CLOSED
[docs] def close(self): """ close the eventbus connection after staying in the CLOSING state for the given timeInterval Args: timeInterval(float): the number of seconds to sleep before actually closing the eventbus - default: 30 seconds """ if self.state == State.CONNECTING: self.sock.close() return self.pingTimer.cancel() self.state = State.CLOSING # wait for the socket timeout self.wait(State.CLOSED,timeOut=self.timeOut)
def _mergeHeaders(self,headers=None): """ merge the given headers with the default headers Args: headers(dict): the headers to merge - default:None Returns: dict: the merged headers dict """ if headers is None: return self.headers else: # mergedHeaders=self.headers.copy() mergedHeaders.update(headers) return mergedHeaders def _send(self,msgType,address,replyAddress=None,body=None, headers=None): """ send a message of the given message type to the given address with the givne body Args: msgType(str): the type of the message publish, send or ping address(str): the target address to send the message to body(str): the body of the message e.g. a JSON object headers(dict): headers to be added - default: None :raise: :Exception: - eventbus is not open """ if not self.isOpen(): raise Exception("eventbus is not open when trying to %s to %s" % (msgType,address)) headers=self._mergeHeaders(headers) if msgType=='send' and replyAddress is not None: message = json.dumps( {'type': msgType, 'address': address, 'replyAddress':replyAddress, 'headers': headers, 'body': body }) else: message = json.dumps( {'type': msgType, 'address': address, 'headers': headers, 'body': body }) self._sendFrame(message)
[docs] def ping(self): """ send a ping :raise: :Exception: - eventbus is not open """ msgType='ping' if not self.isOpen(): raise Exception("eventbus is not open when trying to %s" % (msgType)) message = json.dumps( {'type': msgType}) self._sendFrame(message)
[docs] def send(self, address, body=None, callback=None, headers=None): """ send a message Args: address(str): the target address to send the message to body(str): the body of the message e.g. a JSON object- default: None headers(dict): headers to be added - default: None :raise: :Exception: - eventbus is not open """ replyAddress=None if callback is not None: replyAddress=str(uuid.uuid4()) self.replyHandler[replyAddress]=callback self._send('send',address,replyAddress=replyAddress,body=body,headers=headers)
[docs] def publish(self, address, body=None,headers=None): """ publish a message Args: address(str): the target address to send the message to body(str): the body of the message e.g. a JSON object headers(dict): headers to be added - default: None :raise: :Exception: - eventbus is not open """ self._send('publish',address,body=body)
[docs] def registerHandler(self, address, callback, headers=None): """ register a handler Args: address(str): the address to register a handler for callback(function): a callback for the address headers(dict): headers to be added - default: None :raise: :Exception: - eventbus is not open - callback not callable """ if not self.isOpen(): raise Exception("eventbus is not open when trying to register Handler for %s" % address) if not callable(callback): raise Exception("callback for registerHandler must be callable") if not address in self.handlers: self.handlers[address]=[] self._send('register', address, headers=headers) self.handlers[address].append(callback)
[docs] def unregisterHandler(self, address,callback,headers=None): """ unregister a callback for a given address if there is more than one callback for the address it will be remove from the handler list if there is only one callback left an unregister message will be sent over the bus and then the address is fully removed Args: address(str): the address to unregister the handler for callback(function): the callback to unregister headers(dict): headers to be added - default: None :raise: :Exception: - eventbus is not open - address not registered - callback not registered """ if not self.isOpen(): raise Exception("eventbus is not open when trying to unregister handler for %s" % (address)) if address not in self.handlers: raise Exception("can't unregister address %s - address not registered" % (address)) callbacks=self.handlers[address] if callback not in callbacks: raise Exception("can't unregister callback for %s - callback not registered" % (address)) callbacks.remove(callback) if len(callbacks) == 0: self._send('unregister', address, body=None, headers=headers) del self.handlers[address]