Distributed Go/No-Go
Note
This example was written for a very early version of Autopilot, v0.1.0, and much has changed since then!
This example is being preserved as an example of some basic approaches to networked behavior problems, as much of that is still useful, even if the precise syntax of how to use Autopilot has changed.
To demonstrate the use of Child agents, we’ll build the distributed Go/No-Go task described in section 4.3 of the Autopilot whitepaper.
In short, a subject runs on a circular running wheel whose velocity is measured by a laser computer mouse. When the subject ‘fixates’ by slowing below a threshold velocity, an drifting Gabor grating is presented. If the grating changes angles, the subject is rewarded if they lick in an IR beambreak sensor. If the grating doesn’t change angles, the subject is rewarded if they refrain from licking until the stimulus has ended.
Additional Prefs
To use a Child with this task, we will need to have a second Raspberry Pi setup with the same routine as a Pilot, except it needs the following values in its prefs.json
file:
Child Prefs
{
"NAME" : "wheel_child",
"LINEAGE" : "CHILD",
"PARENTID" : "parent_pilot",
"PARENTIP" : "ip.of.parent.pilot",
"PARENTPORT": "<MSGPORT of parent>",
}
And the parent pilot needs to have
Parent Prefs
{
"NAME": "parent_pilot",
"CHILDID": "wheel_child",
"LINEAGE": "PARENT"
}
Go/No-Go Parameterization
The parameterization for this task is similar to that of the Nafc task above with a few extensions…
from autopilot.tasks import Task
class GoNoGo(Task):
# Task parameterization
PARAMS = odict()
PARAMS['reward'] = {'tag': 'Reward Duration (ms)',
'type': 'int'}
PARAMS['timeout'] = {'tag':'Delay Timeout (ms)',
'type':'int'}
PARAMS['stim'] = {'tag': 'Visuals',
'type': 'visuals'}
# Plot parameterization
PLOT = {
'data': {
'x': 'shaded',
'target': 'point',
'response': 'segment'
},
# our plot will use time as its x-axis rather than the trial number
'continuous': True
}
# TrialData descriptor
class TrialData(tables.IsDescription):
trial_num = tables.Int32Col()
target = tables.BoolCol()
response = tables.StringCol(1)
correct = tables.Int32Col()
RQ_timestamp = tables.StringCol(26)
DC_timestamp = tables.StringCol(26)
shift = tables.Float32Col()
angle = tables.Float32Col()
delay = tables.Float32Col()
We add one additional data descriptor that describes the continuous data that will be sent from the Wheel
object:
class ContinuousData(tables.IsDescription):
x = tables.Float64Col()
y = tables.Float64Col()
t = tables.Float64Col()
The hardware specification is also similar, with one additional Flag
object which
behaves identically to the Beambreak
object with reversed logic (triggered by 0->1 rather than 1->0).
HARDWARE = {
'POKES': {
'C': hardware.Beambreak,
},
'LEDS': {
'C': hardware.LED_RGB,
},
'PORTS': {
'C': hardware.Solenoid,
},
'FLAGS': {
'F': hardware.Flag
}
}
Finally, we add an additional CHILDREN
dictionary to specify the type of Child
that we need to run the task, as well as any additional parameters needed to configure it.
The task_type
must refer to some key in the autopilot.tasks.CHILDREN_LIST
.
Note
The Child
agent is a subconfiguration of the Pilot
agent, they will be delineated more
explicitly as the agent framework is solidified.
CHILDREN = {
'WHEEL': {
'task_type': "Wheel Child",
}
}
Initialization
When initializing this task, we need to make our own Net_Node
object
as well as initialize our child. Assuming that the child is
connected to the parent and appropriately configured (see the
additional params above), then things should go smoothly.
Warning
Some of the parameters – most egregiously the Grating stimulus – are hardcoded in the initialization routine. This is bad practice but an unfortunately necessary evil because the visual stimulus infrastructure is not well developed yet.
from autopilot.stim.visual.visuals import Grating
def __init__(self, stim=None, reward = 50., timeout = 1000., stage_block = None,
punish_dur = 500., **kwargs):
super(GoNoGo, self).__init__()
# we receive a stage_block from the pilot as usual, we won't use it
# for task operation though.
self.stage_block = stage_block
self.trial_counter = itertools.count()
# save parameters passed to us as arguments
self.punish_dur = punish_dur
self.reward = reward
self.timeout = timeout
self.subject = kwargs['subject']
# init hardware and set reward as before
self.init_hardware()
self.set_reward(self.reward)
# hardcoding stimulus while visual stim still immature
self.stim = Grating(angle=0, freq=(4,0), rate=1, size=(1,1), debug=True)
self.stages = itertools.cycle([self.request, self.discrim, self.reinforce])
Initializing the Net Node.
The Net_Node
gets the following arguments:
id
: The name that is used to identify the task’s networking object so other networking objects can send it messages. We prefix the pilot’sprefs.NAME
withT_
because it is a task, though this is not required.upstream
: The name of the network node that is directly upstream from us, we will be sending our messages to thePilot
that is running us – and thus address it by its nameport
: The port of our upstream mode, most commonly theprefs.MSGPORT
listens
: A dictionary that maps messages with different ``KEY``s to specific handling methods. Since we don’t need to receive any data for this task, this is blank,instance
: Optional, denotes whether this node shouldn’t be the only node that exists within the Agent – ie. it uses the same instance of thetornado
IOLoop as other nodes.
self.node = Net_Node(id="T_{}".format(prefs.NAME),
upstream=prefs.NAME,
port=prefs.MSGPORT,
listens={},
instance=True)
And then to initialize our Child we construct a message to send along to it.
Note that we send the message to prefs.NAME
– we don’t want to have to know the IP address/etc.
for our child because it connects to us – so the Station
object handles
sending it along with its Pilot_Station.l_child()
listen
.
# construct a message to send to the child
value = {
'child': {'parent': prefs.NAME, 'subject': self.subject},
'task_type': self.CHILDREN['WHEEL']['task_type'],
'subject': self.subject
}
# send to the station object with a 'CHILD' key
self.node.send(to=prefs.NAME, key='CHILD', value=value)
The Child Task
The Wheel_Child
task is a very thin
wrapper around a Wheel
object, which does most of the work.
It creates a stages
iterator with a function that returns nothing to fit in with the general task structure.
class Wheel_Child(object):
STAGE_NAMES = ['collect']
PARAMS = odict()
PARAMS['fs'] = {'tag': 'Velocity Reporting Rate (Hz)',
'type': 'int'}
PARAMS['thresh'] = {'tag': 'Distance Threshold',
'type': 'int'}
HARDWARE = {
"OUTPUT": Digital_Out,
"WHEEL": Wheel
}
def __init__(self, stage_block=None, fs=10, thresh=100, **kwargs):
self.fs = fs
self.thresh = thresh
self.hardware = {}
self.hardware['OUTPUT'] = Digital_Out(prefs.PINS['OUTPUT'])
self.hardware['WHEEL'] = Wheel(digi_out = self.hardware['OUTPUT'],
fs = self.fs,
thresh = self.thresh,
mode = "steady")
self.stages = cycle([self.noop])
self.stage_block = stage_block
def noop(self):
# just fitting in with the task structure.
self.stage_block.clear()
return {}
def end(self):
self.hardware['WHEEL'].release()
self.stage_block.set()
A Very Smart Wheel
Most of the Child’s contribution to the task is performed by the Wheel
object.
The Wheel accesses a USB mouse connected to the Pilot,
continuously collects its movements, and reports them back to the
Terminal with a specified frequency (fs
) with an internal Net_Node
An abbreviated version…
from inputs import devices
class Wheel(Hardware):
def __init__(self, mouse_idx=0, fs=10, thresh=100, thresh_type='dist', start=True,
digi_out = False, mode='vel_total', integrate_dur=5):
self.mouse = devices.mice[mouse_idx]
self.fs = fs
self.thresh = thresh
# time between updates
self.update_dur = 1./float(self.fs)
The Wheel has three message types,
'MEASURE'
- the main task is telling us to monitor for a threshold crossing, ie. previous trial is over and it’s ready for another one.'CLEAR'
- stop measuring for a threshold crossing event!'STOP'
- the task is over, clear resources and shut down.
# initialize networking
self.listens = {'MEASURE': self.l_measure,
'CLEAR' : self.l_clear,
'STOP' : self.l_stop}
self.node = Net_Node('wheel_{}'.format(mouse_idx),
upstream=prefs.NAME,
port=prefs.MSGPORT,
listens=self.listens,
)
# if we are being used in a child object,
# we send our trigger via a GPIO pin
self.digi_out = digi_out
self.thread = None
if start:
self.start()
def start(self):
self.thread = threading.Thread(target=self._record)
self.thread.daemon = True
self.thread.start()
The wheel starts two threads, one that captures mouse movement events and puts them in a queue, and another that processes movements, transmits them to the Terminal, and handles the threshold triggers when the subject falls below a certain velocity.
def _mouse(self):
# read mouse movements and put them in a queue
while self.quit_evt:
events = self.mouse.read()
self.q.put(events)
def _record(self):
threading.Thread(target=self._mouse).start()
# a threading.Event is used to terminate the wheel's operation
while not self.quit_evt.is_set():
# ... mouse movements are collected into a 2d numpy array ...
# if the main task has told us to measure for a velocity threshold
# we check if our recent movements (move) trigger the threshold
if self.measure_evt.is_set():
do_trigger = self.check_thresh(move)
if do_trigger:
self.thresh_trig()
self.measure_evt.clear()
# and we report recent movements back to the Terminal
# the recent velocities and timestamp have been calculated as
# x_vel, y_vel, and nowtime
self.node.send(key='CONTINUOUS',
value={
'x':x_vel,
'y':y_vel,
't':nowtime
})
If the threshold is triggered, a method (…``thresh_trig``…) is called that
sends a voltage pulse through the Digital_Out
given to it by the Child task.
def thresh_trig(self):
if self.digi_out:
self.digi_out.pulse()
Go/No-Go Stage Methods
After the child is initialized, the Parent pilot begins to call the three stage functions for the task in a cycle
Very similar to the Nafc task above…
request
- Tell the Child to begin measuring for a velocity threshold crossing, prepare the stimulus for deliverydiscrim
- Present the stimulusreinforce
- Reward the subject if they were correct
The code here has been abbreviated for the purpose of the example:
def request(self):
# Set the event lock
self.stage_block.clear()
# wait on any ongoing punishment stimulus
self.punish_block.wait()
# set triggers
self.triggers['F'] = [
lambda: self.stim.play('shift', self.shift )
]
# tell our wheel to start measuring
self.node.send(to=[prefs.NAME, prefs.CHILDID, 'wheel_0'],
key="MEASURE",
value={'mode':'steady',
'thresh':100})
# return data from current stage
self.current_trial = self.trial_counter.next()
data = {
'target': self.target, # whether to 'go' or 'not go'
'shift': self.shift, # how much to shift the
# angle of the stimulus
'trial_num': self.current_trial
}
return data
def discrim(self):
# if the subject licks on a good trial, reward.
# set a trigger to respond false if delay time elapses
if self.target:
self.triggers['C'] = [lambda: self.respond(True), self.pins['PORTS']['C'].open]
self.triggers['T'] = [lambda: self.respond(False), self.punish]
# otherwise punish
else:
self.triggers['C'] = [lambda: self.respond(True), self.punish]
self.triggers['T'] = [lambda: self.respond(False), self.pins['PORTS']['C'].open]
# the stimulus has just started playing, wait a bit and then shift it (if we're gonna
# choose a random delay
delay = 0.0
if self.shift != 0:
delay = (random()*3000.0)+1000.0
# a delay timer is set that shifts the stimulus after
# <delay> milliseconds
self.delayed_set(delay, 'shift', self.shift)
# trigger the timeout in 5 seconds
self.timer = threading.Timer(5.0, self.handle_trigger, args=('T', True, None)).start()
# return data to the pilot
data = {
'delay': delay,
'RQ_timestamp': datetime.datetime.now().isoformat(),
'trial_num': self.current_trial
}
return data
def reinforce(self):
# stop timer if it's still going
try:
self.timer.cancel()
except AttributeError:
pass
self.timer = None
data = {
'DC_timestamp': datetime.datetime.now().isoformat(),
'response': self.response,
'correct': self.correct,
'trial_num': self.current_trial,
'TRIAL_END': True
}
return data
Viola.