'''
Created on 2020-02-01
@author: wf
'''
import unittest
import socket
import json
import time
import getpass
from Vertx.eventbus import Eventbus, TcpEventBusBridgeStarter
from Vertx.eventbus import State
[docs]class EchoCommand(dict):
""" an Echo Command object """
[docs] def __init__(self,cmd,msgType,address):
""" construct me
Args:
cmd(str): a command either "time" or "counter"
msgType(str): a message type either "send" or "publish"
address(str): an address to be used for the echo
"""
dict.__init__(self, cmd=cmd,msgType=msgType,address=address)
self.cmd=cmd;
self.msgType=msgType;
self.address=address;
[docs] def asJson(self):
"""
return me as a json String
Returns:
str: the json representation of the EchoCommand
"""
return json.dumps(self.__dict__)
[docs]class Handler(object):
""" a Handler for messages"""
[docs] def __init__(self,debug=False):
"""
construct me
Args:
debug(bool): if True show debug messages - default: True
"""
self.debug=debug
self.err=None
self.result=None
self.headers=None
[docs] def handle(self, err, message=None):
"""
handle the given vert.x tcp-event bus message
Args:
err(dict): potential error message
message(dict): the message dict to handle
it may contain a body and headers
"""
if err !=None:
self.err = err
if message != None:
self.result = message['body']
if 'headers' in message:
self.headers= message['headers']
if self.debug:
print("handler received "+str(self.result))
if self.headers:
print(" headers: "+str(self.headers))
RECEIVE_WAIT=0.4 # number of seconds to wait for result to be received
[docs]class TestEventbus(unittest.TestCase):
"""
test the Eventbus for the vert.x tcp eventbus bridge
"""
[docs] def __init__(self, *args, **kwargs):
""" construct me """
# https://stackoverflow.com/a/19102520/1497139
super(TestEventbus, self).__init__(*args, **kwargs)
self.debug=True
if getpass.getuser()=="travis":
Eventbus.DEFAULT_TIMEOUT=10.0
elif getpass.getuser()=="wf":
# could be as low as 0.1 secs on a quick computer ...
Eventbus.DEFAULT_TIMEOUT=0.1
else:
Eventbus.DEFAULT_TIMEOUT=1.0
[docs] def testCmd(self):
""" test json encoding of a Cmd"""
cmd=EchoCommand("time","send","me")
json=cmd.asJson()
if self.debug:
print("echo command as json='%s'" % json)
assert json=='{"cmd": "time", "msgType": "send", "address": "me"}'
[docs] def testCreateWithInvalidPort(self):
"""
test creating an event bus for an invalid port
"""
hasErr=False
try:
eb=Eventbus(port=9999,debug=self.debug)
assert eb is not None
except Exception as e:
if self.debug:
print(e)
hasErr=e is not None
pass
assert hasErr,"There should be an exception that eventbus is not expected at this port"
[docs] def testRegisterWithClosedBus(self):
"""
try registering an event bus for a closed port
"""
eb=Eventbus(port=7001,debug=self.debug)
eb.close();
handler=Handler(self.debug);
err=False
try:
eb.registerHandler("address1", handler.handle)
except Exception as e:
if self.debug:
print (e)
err=e is not None
pass
assert err, "It should not be possible to register a handler while the eventbus is closed"
[docs] def test_registerHandler(self):
""" test registering a handler"""
eb = Eventbus(port=7001,debug=self.debug)
handler=Handler(self.debug);
eb.registerHandler('echo', handler.handle)
assert(eb.handlers['echo'] != None)
eb.unregisterHandler('echo',handler.handle)
assert eb.handlers is not None
assert len(eb.handlers) == 0
# close
eb.close()
[docs] def testHandler(self):
""" test handler """
handler=Handler(self.debug)
body = {'msg': 'test Handler' }
message={}
message["body"]=body
handler.handle(None,message)
assert handler.result==body
[docs] def testRegisterHandler(self):
"""
test a successful handler registration
"""
eb=Eventbus(port=7001)
handler=Handler(self.debug)
eb.registerHandler("echo", handler.handle)
assert(eb.handlers['echo'] != None)
eb.close()
[docs] def testWait(self):
"""
test waiting for the eventbus to open and close
"""
eb=Eventbus(port=7001,debug=True)
eb.wait()
eb.close()
eb.wait(State.CLOSED)
eb=Eventbus(port=7001,debug=True,connect=False)
timedOut=False
try:
eb.wait(State.OPEN,timeOut=0.1)
except Exception as e:
print(e)
timedOut=e is not None
assert timedOut
[docs] def test_publish(self):
""" test publishing a message to the echo server """
eb = Eventbus(port=7001,debug=True)
handler=Handler(self.debug)
eb.registerHandler("echo", handler.handle)
#jsonObject -body
body1 = {'msg': 'testpublish 1'}
eb.wait(State.OPEN)
# publish without headers
eb.publish('echo', body1)
# wait for the message to arrive
time.sleep(RECEIVE_WAIT)
eb.close()
assert handler.result == body1
[docs] def test_publishWithMultipleHandlers(self):
""" test publishing a message to be handle by multiple handlers"""
eb = Eventbus(port=7001,debug=self.debug)
handler1=Handler(self.debug)
handler2=Handler(self.debug)
address="echoMe"
eb.registerHandler(address, handler1.handle)
eb.registerHandler(address, handler2.handle)
eb.wait(State.OPEN)
eb.send('echo',EchoCommand("reset","send",address))
cmd=EchoCommand("counter","publish",address)
eb.send('echo',cmd)
time.sleep(RECEIVE_WAIT)
eb.close()
assert 'counter' in handler1.result
assert handler1.result['counter']==1
assert 'counter' in handler2.result
assert handler1.result['counter']==1
[docs] def test_sendInvalidAddress(self):
""" test trying to send to an invalid address"""
eb = Eventbus(port=7001,debug=self.debug)
handler=Handler(self.debug)
errorHandler=Handler(self.debug)
address="unpermitted_address"
eb.registerHandler(address, handler.handle)
eb.onError=errorHandler.handle
cmd=EchoCommand("time","send",address)
eb.send('echo',cmd)
time.sleep(RECEIVE_WAIT)
# 40 message bytes with payload {'type': 'err', 'message': 'access_denied'} not handled yet ... expected
eb.close()
assert errorHandler.err is not None
assert errorHandler.result is None
assert handler.err is None
assert handler.result is None
[docs] def test_reply(self):
""" test sending a message with a reply handler """
eb = Eventbus(port=7001,debug=self.debug)
handler=Handler(self.debug)
body = {'msg': 'test reply' }
eb.wait(State.OPEN)
eb.send('echo',body,callback=handler.handle)
# wait for the message to arrive
time.sleep(RECEIVE_WAIT)
eb.close()
assert handler.result==body
[docs] def test_send(self):
""" test sending a message"""
eb = Eventbus(port=7001,debug=self.debug)
handler=Handler(self.debug)
address="echoMe"
eb.registerHandler(address, handler.handle)
cmd=EchoCommand("time","send",address)
eb.wait(State.OPEN)
eb.send('echo',cmd)
# wait for the message to arrive
time.sleep(RECEIVE_WAIT)
eb.close()
assert 'received_nanotime' in handler.result
assert 'iso_time' in handler.result
[docs] def test_ping(self):
""" test sending a ping"""
eb = Eventbus(port=7001,options={"vertxbus_ping_interval":500},debug=self.debug)
eb.wait(State.OPEN)
time.sleep(1.2)
eb.close()
assert eb.pongCount==2
[docs] def testSocketDirect(self):
""" test direct socket communication with echo server"""
with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as s:
s.connect(('localhost',7001))
s.close()
[docs] def testSocketOfEventBus(self):
""" send a message using the private function of the event bus"""
eb = Eventbus(port=7001,debug=True,connect=False)
eb._connect()
eb.state=State.OPEN
body = {'msg': 'test socketOfEventBus' }
eb._send('publish','echo', body)
eb.sock.close()
[docs] @classmethod
def setUpClass(cls):
cls.starter=TcpEventBusBridgeStarter(7001,debug=True)
cls.starter.start()
cls.starter.wait()
[docs] @classmethod
def tearDownClass(cls):
cls.starter.stop()
[docs] def testTcpEventBusBridgeStarter(self):
""" test the TcpEventBusBridgeStarter"""
starter=TcpEventBusBridgeStarter(7002,debug=True)
starter.start()
starter.wait()
time.sleep(2.0)
portAvailable=starter.checkPort()
# kill the process (in any case)
starter.stop()
assert portAvailable
time.sleep(2.0)
assert not starter.checkPort()
if __name__ == "__main__":
unittest.main()