station
Classes:
|
Independent networking class used for messaging between computers. |
|
|
|
- class Station(id: Optional[str] = None, push_ip: Optional[str] = None, push_port: Optional[int] = None, push_id: Optional[str] = None, pusher: bool = False, listen_port: Optional[int] = None, listens: Optional[Dict[str, Callable]] = None)[source]
Bases:
multiprocessing.context.Process
Independent networking class used for messaging between computers.
These objects send and handle
networking.Message
s by using a dictionary oflistens
, or methods that are called to respond to different types of messages.Each sent message is given an ID, and a thread is spawned to periodically resend it (up until some time-to-live, typically 5 times) until confirmation is received.
By default, the only listen these objects have is
l_confirm()
, which responds to message confirmations. Accordingly, listens should be added by usingdict.update()
rather than reassigning the attribute.Station objects can be made with or without a
pusher
, azmq.DEALER
socket that connects to thezmq.ROUTER
socket of an upstream Station object.This class can be instantiated on its own if all of the required arguments are supplied, but the intended pattern of use is to subclass it with any custom
listen
methods for handling message types and other logic that would be specific for an agent type that uses it.Note
This object will likely be deprecated in v0.5.0, as the gains of a separate messaging process are not as great as the complications caused by having two different kinds of networking object in the system. In the future we will move to having a single type of networking object that can either be spawned as a separate process or as a thread.
Args are similar to the documented Attributes, and so only those that differ from attributes are documented here
- Parameters
pusher (bool) – If
True
, create azmq.DEALER
socket connected topush_ip
,push_port
, andpush_id
. (Default:False
).- Variables
context (
zmq.Context
) – zeromq contextloop (
tornado.ioloop.IOLoop
) – a tornado iolooppusher (
zmq.Socket
) – pusher socket - a dealer socket that connects to other routerspush_ip (str) – If we have a dealer, IP to push messages to
push_port (str) – If we have a dealer, port to push messages to
push_id (str) –
identity
of the Router we push tolistener (
zmq.Socket
) – The main router socket to send/recv messageslisten_port (str) – Port our router listens on
logger (
logging.Logger
) – Used to log messages and network events.id (str) – What are we known as? What do we set our
identity
as?ip (str) – Device IP
listens (dict) – Dictionary of functions to call for different types of messages. keys match the
Message.key
.senders (dict) – Identities of other sockets (keys, ie. directly connected) and their state (values) if they keep one
push_outbox (dict) – Messages that have been sent but have not been confirmed to our
Station.pusher
send_outbox (dict) – Messages that have been sent but have not been confirmed to our
Station.listener
timers (dict) – dict of
threading.Timer
s that will check in on outbox messagesmsg_counter (
itertools.count
) – counter to index our sent messagesfile_block (
threading.Event
) – Event to signal when a file is being received.
Attributes:
Methods:
run
()A
zmq.Context
andtornado.IOLoop
are spawned, the listener and optionally the pusher are instantiated and connected tohandle_listen()
usingon_recv()
.prepare_message
(to, key, value[, repeat, flags])If a message originates with us, a
Message
class is instantiated, given an ID and the rest of its attributes.send
([to, key, value, msg, repeat, flags])Send a message via our
listener
, ROUTER socket.push
([to, key, value, msg, repeat, flags])Send a message via our
pusher
, DEALER socket.repeat
()Periodically (according to
repeat_interval
) resend messages that haven't been confirmedl_confirm
(msg)Confirm that a message was received.
l_stream
(msg)Reconstitute the original stream of messages and call their handling methods
handle_listen
(msg)Upon receiving a message, call the appropriate listen method in a new thread.
get_ip
()Find our IP address
release
()periodic callback called by the IOLoop to check if the closing flag has been set, and closing process if so
- repeat_interval = 5.0
- pusher: Union[bool, zmq.sugar.socket.Socket]
- run()[source]
A
zmq.Context
andtornado.IOLoop
are spawned, the listener and optionally the pusher are instantiated and connected tohandle_listen()
usingon_recv()
.The process is kept open by the
tornado.IOLoop
.
- prepare_message(to, key, value, repeat=True, flags=None)[source]
If a message originates with us, a
Message
class is instantiated, given an ID and the rest of its attributes.- Parameters
flags
repeat
to (str) – The identity of the socket this message is to
key (str) – The type of message - used to select which method the receiver uses to process this message.
value – Any information this message should contain. Can be any type, but must be JSON serializable.
- send(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]
Send a message via our
listener
, ROUTER socket.Either an already created
Message
should be passed as msg, or at least to and key must be provided for a new message created byprepare_message()
.A
threading.Timer
is created to resend the message usingrepeat()
unless repeat is False.- Parameters
flags
to (str) – The identity of the socket this message is to
key (str) – The type of message - used to select which method the receiver uses to process this message.
value – Any information this message should contain. Can be any type, but must be JSON serializable.
msg (.Message) – An already created message.
repeat (bool) – Should this message be resent if confirmation is not received?
- push(to=None, key=None, value=None, msg=None, repeat=True, flags=None)[source]
Send a message via our
pusher
, DEALER socket.Unlike
send()
, to is not required. Every message is always sent topush_id
. to can be included to send a message further up the network tree to a networking object we’re not directly connected to.Either an already created
Message
should be passed as msg, or at least key must be provided for a new message created byprepare_message()
.A
threading.Timer
is created to resend the message usingrepeat()
unless repeat is False.- Parameters
flags
to (str) – The identity of the socket this message is to. If not included, sent to
push_id()
.key (str) – The type of message - used to select which method the receiver uses to process this message.
value – Any information this message should contain. Can be any type, but must be JSON serializable.
msg (.Message) – An already created message.
repeat (bool) – Should this message be resent if confirmation is not received?
- repeat()[source]
Periodically (according to
repeat_interval
) resend messages that haven’t been confirmedTTL is decremented, and messages are resent until their TTL is 0.
- l_confirm(msg)[source]
Confirm that a message was received.
- Parameters
msg (
Message
) – A confirmation message - note that this message has its own unique ID, so the value of this message contains the ID of the message that is being confirmed
- l_stream(msg)[source]
Reconstitute the original stream of messages and call their handling methods
The
msg
should contain aninner_key
that indicates the key, and thus the handling method.- Parameters
msg (dict) – Compressed stream sent by
Net_Node._stream()
- handle_listen(msg: List[bytes])[source]
Upon receiving a message, call the appropriate listen method in a new thread.
If the message is
to
us, send confirmation.If the message is not
to
us, attempt to forward it.- Parameters
msg (str) – JSON
Message.serialize()
d message.
- class Terminal_Station(pilots)[source]
Bases:
autopilot.networking.station.Station
Station
object used byTerminal
objects.Spawned without a
pusher
.Listens
Key
Method
Description
‘PING’
We are asked to confirm that we are alive
‘INIT’
Ask all pilots to confirm that they are alive
‘CHANGE’
Change a parameter on the Pi
‘STOPALL’
Stop all pilots and plots
‘KILL’
Terminal wants us to die :(
‘DATA’
Stash incoming data from a Pilot
‘STATE’
A Pilot has changed state
‘HANDSHAKE’
A Pi is telling us it’s alive and its IP
‘FILE’
The pi needs some file from us
- Parameters
pilots (dict) – The
Terminal.pilots
dictionary.
Attributes:
Methods:
Start a timer that controls how often streamed video frames are sent to
gui.Video
plots.l_ping
(msg)We are asked to confirm that we are alive
l_init
(msg)Ask all pilots to confirm that they are alive
l_change
(msg)Change a parameter on the Pi
l_stopall
(msg)Stop all pilots and plots
l_kill
(msg)Terminal wants us to die :(
l_data
(msg)Stash incoming data from a Pilot
l_continuous
(msg)Handle the storage of continuous data
l_state
(msg)A Pilot has changed state.
l_handshake
(msg)A Pi is telling us it's alive and its IP.
l_file
(msg)A Pilot needs some file from us.
- plot_timer = None
- sent_plot = {}
- pusher: Union[bool, zmq.sugar.socket.Socket]
- start_plot_timer()[source]
Start a timer that controls how often streamed video frames are sent to
gui.Video
plots.
- l_ping(msg: autopilot.networking.message.Message)[source]
We are asked to confirm that we are alive
Respond with a blank ‘STATE’ message.
- Parameters
msg (
Message
)
- l_init(msg: autopilot.networking.message.Message)[source]
Ask all pilots to confirm that they are alive
Sends a “PING” to everyone in the pilots dictionary.
- Parameters
msg (
Message
)
- l_change(msg: autopilot.networking.message.Message)[source]
Change a parameter on the Pi
Warning
Not Implemented
- Parameters
msg (
Message
)
- l_stopall(msg: autopilot.networking.message.Message)[source]
Stop all pilots and plots
- Parameters
msg (
Message
)
- l_kill(msg: autopilot.networking.message.Message)[source]
Terminal wants us to die :(
Stop the
Station.loop
- Parameters
msg (
Message
)
- l_data(msg: autopilot.networking.message.Message)[source]
Stash incoming data from a Pilot
Just forward this along to the internal terminal object (‘_T’) and a copy to the relevant plot.
- Parameters
msg (
Message
)
- l_continuous(msg: autopilot.networking.message.Message)[source]
Handle the storage of continuous data
Forwards all data on to the Terminal’s internal
Net_Node
, send toPlot
according to update rate inprefs.get('DRAWFPS')
- Parameters
msg (
Message
) – A continuous data message
- l_state(msg: autopilot.networking.message.Message)[source]
A Pilot has changed state.
Stash in ‘state’ field of pilot dict and send along to _T
- Parameters
msg (
Message
)
- l_handshake(msg: autopilot.networking.message.Message)[source]
A Pi is telling us it’s alive and its IP.
Send along to _T
- Parameters
msg (
Message
)
- l_file(msg: autopilot.networking.message.Message)[source]
A Pilot needs some file from us.
Send it back after
base64.b64encode()
ing it.Todo
Split large files into multiple messages…
- Parameters
msg (
Message
) – The value field of the message should contain some relative path to a file contained within prefs.get(‘SOUNDDIR’) . eg. ‘/songs/sadone.wav’ would return ‘os.path.join(prefs.get(‘SOUNDDIR’)/songs.sadone.wav’
- class Pilot_Station[source]
Bases:
autopilot.networking.station.Station
Station
object used byPilot
objects.Spawned with a
pusher
connected back to theTerminal
.Listens
Key
Method
Description
‘STATE’ ‘COHERE’ ‘PING’ ‘START’ ‘STOP’ ‘PARAM’ ‘FILE’
l_state()
l_cohere()
l_ping()
l_start()
l_stop()
l_change()
l_file()
Pilot has changed state Make sure our data and the Terminal’s match. The Terminal wants to know if we’re listening We are being sent a task to start We are being told to stop the current task The Terminal is changing some task parameter We are receiving a file
Attributes:
Methods:
_pinger
()Periodically ping the terminal with our status
l_noop
(msg)l_state
(msg)Pilot has changed state
l_cohere
(msg)Send our local version of the data table so the terminal can double check
l_ping
([msg])The Terminal wants to know our status
l_start
(msg)We are being sent a task to start
l_stop
(msg)Tell the pi to stop the task
l_change
(msg)The terminal is changing a parameter
l_file
(msg)We are receiving a file.
l_continuous
(msg)Forwards continuous data sent by children back to terminal.
l_child
(msg)Tell one or more children to start running a task.
l_forward
(msg)Just forward the message to the pi.
- pusher: Union[bool, zmq.sugar.socket.Socket]
- _pinger()[source]
Periodically ping the terminal with our status
Calls its own timer to replace it
Returns:
- l_state(msg: autopilot.networking.message.Message)[source]
Pilot has changed state
Stash it and alert the Terminal
- Parameters
msg (
Message
)
- l_cohere(msg: autopilot.networking.message.Message)[source]
Send our local version of the data table so the terminal can double check
Warning
Not Implemented
- Parameters
msg (
Message
)
- l_ping(msg: Optional[autopilot.networking.message.Message] = None)[source]
The Terminal wants to know our status
Push back our current state.
- Parameters
msg (
Message
)
- l_start(msg: autopilot.networking.message.Message)[source]
We are being sent a task to start
If we need any files, request them.
Then send along to the pilot.
- Parameters
msg (
Message
) – value will contain a dictionary containing a task description.
- l_stop(msg: autopilot.networking.message.Message)[source]
Tell the pi to stop the task
- Parameters
msg (
Message
)
- l_change(msg: autopilot.networking.message.Message)[source]
The terminal is changing a parameter
Warning
Not implemented
- Parameters
msg (
Message
)
- l_file(msg: autopilot.networking.message.Message)[source]
We are receiving a file.
Decode from b64 and save. Set the file_block.
- Parameters
msg (
Message
) – value will have ‘path’ and ‘file’, where the path determines where in prefs.get(‘SOUNDDIR’) the b64 encoded ‘file’ will be saved.
- l_continuous(msg: autopilot.networking.message.Message)[source]
Forwards continuous data sent by children back to terminal.
Continuous data sources from this pilot should be streamed directly to the terminal.
- Parameters
msg (
Message
) – Continuous data message
- l_child(msg: autopilot.networking.message.Message)[source]
Tell one or more children to start running a task.
By default, the key argument passed to self.send is ‘START’. However, this can be overriden by providing the desired string as msg.value[‘KEY’].
This checks the pref CHILDID to get the names of one or more children. If that pref is a string, sends the message to just that child. If that pref is a list, sends the message to each child in the list.
- Parameters
msg () – A message to send to the child or children.
- Returns
nothing
- l_forward(msg: autopilot.networking.message.Message)[source]
Just forward the message to the pi.