vertx-eventbus-python

Vert.x logo

This is a python client for the Vert.x Vert.x-tcp-eventbus-bridge

API

Vertx.eventbus module

class Vertx.eventbus.Eventbus(host='localhost', port=7000, options=None, onError=None, timeOut=None, connect=True, debug=False)[source]

Bases: object

Vert.x TCP eventbus client for python

Variables:
  • headers – any headers to be sent as per the vertx-tcp-eventbus-bridge specification
  • state (State.CONNECTING: State) – the state of the the eventbus
  • host (str) – ‘localhost’ the host the eventbus is connected to
  • port (int) – 7000 : the port to be used for the socket connection

:ivar pingInterval:5000:the ping interval in millisecs :vartype pingInterval: int

:ivar pongCount:0:the number of pongs received :vartype pongCount: int

Variables:
  • timeOut (float) – DEFAULT_TIMEOUT:time in secs to be used as the socket timeout
  • debug (bool) – False: True if debugging should be enabled

:ivar onError:onError:the function to handle errors messages with no address :vartype onError: function

:ivar handlers:{}: the dict of handlers for incoming messages :vartype handlers: dict

:ivar replyHandler:{}: the dict of handlers for reply messages :vartype replyHandlers: dict

__init__(host='localhost', port=7000, options=None, onError=None, timeOut=None, connect=True, debug=False)[source]

constructor

Parameters:
  • host (str) – the host to connect to - default: ‘localhost’
  • port (int) – the port to use - default: 7000
  • options (dict) – e.g. { vertxbus_ping_interval=5000 }
  • onError (function) – the handler to use for erromessages with no address- default: None will be replaced by default onError
  • timeOut (float) – time in secs to be used as the socket timeout - default: 60 secs - the minimium timeOut is 10 msecs and will be enforced
  • connect (bool) – True if the eventbus should automatically be opened - default: True
  • debug (bool) – True if debugging should be enabled - default: False
Raise:
IOError:
  • the socket could not be opened
Exception:
  • some other issue e.g. with starting the listening thread
addHeader(header, value)[source]

add a header with the given header key and value

Parameters:
  • header (str) – the key of the header value to add
  • value (object) – the value of the header value to add
close()[source]

close the eventbus connection after staying in the CLOSING state for the given timeInterval

Parameters:timeInterval (float) – the number of seconds to sleep before actually closing the eventbus - default: 30 seconds
isOpen()[source]

Checks if the eventbus state is OPEN.

Returns:True if State is OPEN else False
Return type:bool
onErrorHandler(message)[source]

default onError Handler - only gives debug output

open()[source]

open the eventbus by connecting the eventbus socket and starting a listening thread by default the connection is opened on construction of an Eventbus instance

Raise:
IOError:
  • the socket could not be opened
Exception:
  • some other issue e.g. with starting the listening thread
ping()[source]

send a ping

Raise:
Exception:
  • eventbus is not open
pongHandler()[source]

default pong Handler - counts the number of pongs Received

publish(address, body=None, headers=None)[source]

publish a message

Parameters:
  • address (str) – the target address to send the message to
  • body (str) – the body of the message e.g. a JSON object
  • headers (dict) – headers to be added - default: None
Raise:
Exception:
  • eventbus is not open
registerHandler(address, callback, headers=None)[source]

register a handler

Parameters:
  • address (str) – the address to register a handler for
  • callback (function) – a callback for the address
  • headers (dict) – headers to be added - default: None
Raise:
Exception:
  • eventbus is not open
  • callback not callable
send(address, body=None, callback=None, headers=None)[source]

send a message

Parameters:
  • address (str) – the target address to send the message to
  • body (str) – the body of the message e.g. a JSON object- default: None
  • headers (dict) – headers to be added - default: None
Raise:
Exception:
  • eventbus is not open
unregisterHandler(address, callback, headers=None)[source]

unregister a callback for a given address if there is more than one callback for the address it will be remove from the handler list if there is only one callback left an unregister message will be sent over the bus and then the address is fully removed

Parameters:
  • address (str) – the address to unregister the handler for
  • callback (function) – the callback to unregister
  • headers (dict) – headers to be added - default: None
Raise:
Exception:
  • eventbus is not open
  • address not registered
  • callback not registered
wait(state=<State.OPEN: 1>, timeOut=5.0, timeStep=0.01)[source]

wait for the eventbus to reach the given state

Parameters:
  • state (State) – the state to wait for - default: State.OPEN
  • timeOut (float) – the timeOut in secs after which the wait fails with an Exception
  • timeStep (float) – the timeStep in secs in which the state should be regularly checked
Raise:
Exception:wait timed out
class Vertx.eventbus.State[source]

Bases: enum.IntEnum

Eventbus state see https://github.com/vert-x3/vertx-bus-bower/blob/master/vertx-eventbus.js

CLOSED = 3
CLOSING = 2
CONNECTING = 0
OPEN = 1

Unit Tests

test_eventbus module

Created on 2020-02-01

@author: wf

class tests.test_eventbus.EchoCommand(cmd, msgType, address)[source]

Bases: dict

an Echo Command object

__init__(cmd, msgType, address)[source]

construct me

Parameters:
  • cmd (str) – a command either “time” or “counter”
  • msgType (str) – a message type either “send” or “publish”
  • address (str) – an address to be used for the echo
asJson()[source]

return me as a json String

Returns:the json representation of the EchoCommand
Return type:str
class tests.test_eventbus.Handler(debug=False)[source]

Bases: object

a Handler for messages

__init__(debug=False)[source]

construct me

Parameters:debug (bool) – if True show debug messages - default: True
handle(err, message=None)[source]

handle the given vert.x tcp-event bus message

Parameters:
  • err (dict) – potential error message
  • message (dict) – the message dict to handle
  • may contain a body and headers (it) –
class tests.test_eventbus.TestEventbus(*args, **kwargs)[source]

Bases: unittest.case.TestCase

test the Eventbus for the vert.x tcp eventbus bridge

__init__(*args, **kwargs)[source]

construct me

classmethod setUpClass()[source]

Hook method for setting up class fixture before running tests in the class.

classmethod tearDownClass()[source]

Hook method for deconstructing the class fixture after running all tests in the class.

testCmd()[source]

test json encoding of a Cmd

testCreateWithInvalidPort()[source]

test creating an event bus for an invalid port

testHandler()[source]

test handler

testMergeHeaders()[source]

test merging headers

testRegisterHandler()[source]

test a successful handler registration

testRegisterWithClosedBus()[source]

try registering an event bus for a closed port

testSocketDirect()[source]

test direct socket communication with echo server

testSocketOfEventBus()[source]

send a message using the private function of the event bus

testTcpEventBusBridgeStarter()[source]

test the TcpEventBusBridgeStarter

testWait()[source]

test waiting for the eventbus to open and close

test_ping()[source]

test sending a ping

test_publish()[source]

test publishing a message to the echo server

test_publishWithHeader()[source]

test publishing a message with headers

test_publishWithMultipleHandlers()[source]

test publishing a message to be handle by multiple handlers

test_registerHandler()[source]

test registering a handler

test_reply()[source]

test sending a message with a reply handler

test_send()[source]

test sending a message

test_sendInvalidAddress()[source]

test trying to send to an invalid address

Indices and tables