Monday, April 13, 2015

Python/Arduino Event Tracker, Pt 1: Python Side

Quick Note: please forgive the terrible code formatting. I'll work on a solution soon. Regardless, the full code is available on github, and updates may be available as I make changes not reflected here.

Event Tracking

For some time, I've been looking for a meaningful solution to being able to track a continuous event with the following characteristics:
  • Subjectivity - It's not very easy to track automatically - there needs to be some human input
  • Hysteresis - Once it changes, it tends to stay put for a while
  • Unpredictability - at least so far as I can tell. I want to track it to see if there are actually any underlying trends
What I'm actually interested in tracking is nasal air flow, specifically, which side dominates flow at any given time. (Turns out there is some information out there about how the side of the nose that dominates air flow changes from time to time, but not much else. Strictly speaking, this is not a subjective issue, but measuring airflow comfortably is a hack beyond my current budget and abilities). However, the specifics aren't too important as long as I get something that gets this job done and may be useful elsewhere.

And I'll admit, this is sort of an excuse to do some learning with some fun toys...

Searching for a Tracker

Putting in this effort only makes sense if there isn't an easy solution already out there. If there is one, I don't know about it. I've tried tracking apps like Keep Track, timing/alarm apps, methods like pen and paper and spreadsheets, timing paradigms such as random timing and hourly tracking, and on and on. Nothing really suffices. Tracking and timing apps never seem to have quite the right features, and every input and timing method I've tried is just too cumbersome or invasive into daily routines.

A Python Solution

Having played and worked with Python a bit, I decided it wouldn't be too hard to put together a simple application to track an ongoing event with minimal invasiveness, at least while I'm working at my desk.

Event Tracker's Simple Tkinter Interface
A very simply interface

Here's the basic setup:
  • The GUI is simple and small, using Tkinter to track 4 states.
  • There's an audio tone that plays when it's time for another input.
  • It requests a new input every 30-seconds. The timing is locked to real time status of 00:00 and 00:30 to make things easier in analysis and synchronization if/when the application gets more complicated. This is somewhat tricky as Tkinter is not very robust in terms of timing and synchronization.
  • It provides visual feedback on the current state (if one has been input), both in the GUI and with Arduino outputs (more on the specifics of the Arduino setup in Pt. 2).
  • Input can be from 3 sources: buttons in the application, keyboard, or from inputs on an Arduino setup (more on that in Pt. 2).
  • The latest input is saved/appended to .csv when a new 30-second window starts or when the program is stopped.

The Code

Okay. Let's dive into the actual source code one chunk at a time (and of course, the whole thing used here is on the defenestra branch on github).
import pygame.mixer as pyg
import serial
import time
try:
    import Tkinter as tk
except ImportError:
    import tkinter as tk
Here I'm using the pygame library's mixer method to play the audio tone when we're ready for a new input. The Serial library will be used to track inputs from an Arduino. The time library is used to time the 30-second epochs between events being tracked. The Tkinter library is used for the based GUI application interface. Note that for Python 2 you use tkinter, and for Python 3 you use Tkinter.

Now for the Display class, which handles drawing and updating the feedback display on the GUI screen. The logic here deals with making sure the display shows the actual current state and that everything gets updated at the right time in the right way.
class Display(tk.Canvas):
    DELAY = 200
    def __init__(self, parent):
        tk.Canvas.__init__(self, parent, width=300, height=20)
        self.parent = parent
        self.create_text(150, 10, anchor=tk.CENTER, text=self.parent.state, tag="state")
        self.after(Display.DELAY, self.onTimer)
    def drawStatus(self):
        status = self.find_withtag("state")
        self.delete(status[0])
        self.create_text(150, 10, anchor=tk.CENTER, text=self.parent.state, tag="state")
    def onTimer(self):
        self.parent.trackSerial()
        self.parent.synch()
        self.after(Display.DELAY, self.onTimer)
        self.drawStatus()
And, blow by blow, how it works:
    DELAY = 200
The window will refresh every 200ms, so I set up a constant to handle that timing in case I want to adjust it later.
    def __init__(self, parent):
        tk.Canvas.__init__(self, parent, width=300, height=20)
        self.parent = parent
To initialize this class, start by using the initialization of the class it inherits from, tk.Canvas. The window GUI's parent will be an instance of the Tracker class (below) which handles most of the application logic. You can adjust the simple window size by changing the width/height parameters.
        self.create_text(150, 10, anchor=tk.CENTER, text=self.parent.state, tag="state")
Make sure to show the initial state when the GUI is started. self.create_text draws text (the Tracker's state variable) to the canvas (here, the Display) with a specific size (150x10).
        self.after(Display.DELAY, self.onTimer)
After the DELAY (of 200ms), call the onTime method (see below) to get the GUI running.
    def drawStatus(self):
        status = self.find_withtag("state")
        self.delete(status[0])
        self.create_text(150, 10, anchor=tk.CENTER, text=self.parent.state, tag="state")
    def onTimer(self):
        self.parent.trackSerial()
        self.parent.synch()
        self.after(Display.DELAY, self.onTimer)
        self.drawStatus()
The onTimer method is called after every 200ms. This method makes sure we have the latest status, synchronizes everything (such as starting a new epoch if it's time, see more on the Tracker.synch method below), updates the display, and calls the onTimer method again to do it all again in 200ms. The drawStatus method is simple: it gets the most current event state, deletes the displayed state, then draws the current state.

Most of the big work happens in the Tracker class, which inherits from the tk.Frame class in order to handle the rest of the GUI. But all the back end logic happens here, too: timing issues, tracking input from and providing output to the Arduino, and saving data to file.
class Tracker(tk.Frame):
    KEYS = {"Left":"Left", "Right":"Right", "Up":"Both", "Down":"Neither",
            "h":"Left", "j":"Neither", "k":"Both", "l":"Right", "0":"Left",
            "1":"Right", "2":"Both", "3":"Neither"}
    STATES = {"Left":0, "Right":1, "Both":2, "Neither":3, "NA":4}
    STATES2 = ["Left", "Right", "Up", "Down", "NA"]
    EPOCH_LENGTH = 30
    SOUND = "smb2_cherry.wav"
    OUTFN = "tracker.csv"
    def __init__(self, parent):
        tk.Frame.__init__(self, parent)
        self.parent = parent
        self.currentEpoch = int(time.time()/Tracker.EPOCH_LENGTH)
        self.savedEpoch = int(time.time()/Tracker.EPOCH_LENGTH)
        self.state = "NA"
        pyg.init()
        pyg.music.load(Tracker.SOUND)
        self.bind_all("<Key>", self.onKeyPressed)
        self.initUI()
        self.port = '/dev/ttyACM0'
        self.baudrate = 9600
        self.ser = serial.Serial(self.port, self.baudrate, timeout=1)    
        self.trackSerial()
    def initUI(self):
        self.parent.title("Tracker")
        self.columnconfigure(0, pad=3)
        self.columnconfigure(1, pad=3)
        self.columnconfigure(2, pad=3)
        self.columnconfigure(3, pad=3)
        self.rowconfigure(0, pad=3)
        self.rowconfigure(1, pad=3)
        self.rowconfigure(2, pad=3)
        self.display = Display(self)
        self.display.grid(row=0, columnspan=4, sticky=(tk.W+tk.E))
        leftButton = tk.Button(self, text="Left",
                               command=lambda: self.onKeyPressed("Left"))
        leftButton.grid(row=1, column=0)
        rightButton = tk.Button(self, text="Right",
                                command=lambda: self.onKeyPressed("Right"))
        rightButton.grid(row=1, column=1)
        neitherButton = tk.Button(self, text="Neither",
                                  command=lambda: self.onKeyPressed("Down"))
        neitherButton.grid(row=1, column=2)
        bothButton = tk.Button(self, text="Both",
                               command=lambda: self.onKeyPressed("Up"))
        bothButton.grid(row=1, column=3)
        exitButton = tk.Button(self, text="Exit",
                               command=lambda: self.onKeyPressed("q"))
        exitButton.grid(row=2, columnspan=4)
        self.pack()
    def onKeyPressed(self, e):
        try:
            key = e.keysym
        except AttributeError:
            key = e
        if key == "q":
            self.onExit()
        self.setState(key)
        self.ser.write("W"+str(Tracker.STATES[self.state]))
        self.display.drawStatus()
    def setState(self, key):
        if key in Tracker.KEYS:    
            self.state = Tracker.KEYS[key]
    def onExit(self):
        self.synch()
        self.quit()
    def synch(self):
        self.setCurrentEpoch()
        n = self.checkEpoch()
        if n > 0:
            if n > 1:
                for i in range(n-1):
                    self.updateFile(i, "NA, synch problem")
            self.updateFile(0, self.state + ",")
            self.savedEpoch = self.currentEpoch
            self.state = "NA"
            self.ser.write("W"+str(Tracker.STATES[self.state]))
            pyg.music.play()
    def trackSerial(self):
        if self.ser.inWaiting() > 0:
            thisByte = self.ser.read()
            self.ser.flushInput()
            if thisByte == "E":
                self.ser.write("W"+str(Tracker.STATES[self.state]))
            elif thisByte in ["0", "1", "2", "3", "4"]:
                self.setState(Tracker.STATES2[int(thisByte)])
            else:
                self.ser.write("RR")
    def updateFile(self, n, data):
        with open(Tracker.OUTFN, "a") as f:
            f.write(str(self.savedEpoch+n+1) + "," + data + "\n")
    def setCurrentEpoch(self):
        t = int(time.time()/Tracker.EPOCH_LENGTH)
        if self.currentEpoch != t:
            self.currentEpoch = t
    def checkEpoch(self):
        return self.currentEpoch-self.savedEpoch
There's a lot going on here, so let's take it one piece at a time:
class Tracker(tk.Frame):
    KEYS = {"Left":"Left", "Right":"Right", "Up":"Both", "Down":"Neither",
            "h":"Left", "j":"Neither", "k":"Both", "l":"Right", "0":"Left",
            "1":"Right", "2":"Both", "3":"Neither"}
    STATES = {"Left":0, "Right":1, "Both":2, "Neither":3, "NA":4}
    STATES2 = ["Left", "Right", "Up", "Down", "NA"]
    EPOCH_LENGTH = 30
    SOUND = "smb2_cherry.wav"
    OUTFN = "tracker.csv"
I've set up a few constants to handle inputs/outputs for the event tracking. There are 3 input sources (keyboard, GUI buttons, and Arduino) so the naming got a bit wonky. KEYS handles keyboard and GUI inputs, STATES/STATES2 handle Arduino input/output. There is definitely room for improvement here.

EPOCH_LENGTH sets the length of time each input tracks for the event. We're tracking continuous events, but discretely. It would be too burdensome to track the event every second or more, but we may not get enough resolution if we only track every 15 minutes or something. I've set it to track every 30s, which is a good balance for me (but maybe 1-minute would be better as it's a little less invasive).

SOUND is the wav file that plays when a new epoch starts. This particular sound is the cherry grabbing/player selection sound from Super Mario Bros. 2. It's short, sweet and doesn't get too annoying after hearing it every 30s.

OUTFN is just the file name for saving the data to comma-separated format. This will save the file to the same directory as the python script, which is probably fine in most cases, and it nicely avoids issues with dealing with different OS's.

Next I initialize the class to get things running:
    def __init__(self, parent):
        tk.Frame.__init__(self, parent)
        self.parent = parent
        self.currentEpoch = int(time.time()/Tracker.EPOCH_LENGTH)
        self.savedEpoch = int(time.time()/Tracker.EPOCH_LENGTH)
        self.state = "NA"
        pyg.init()
        pyg.music.load(Tracker.SOUND)
        self.bind_all("<Key>", self.onKeyPressed)
        self.initUI()
        self.port = '/dev/ttyACM0'
        self.baudrate = 9600
        self.ser = serial.Serial(self.port, self.baudrate, timeout=1)    
        self.trackSerial()
In this case, the parent is the root tkinter object instance. Since I'm inheriting from the tk.Frame class, I need to make sure I point to the parent object instance.
        self.currentEpoch = int(time.time()/Tracker.EPOCH_LENGTH)
        self.savedEpoch = int(time.time()/Tracker.EPOCH_LENGTH)
I need to keep track of the current epoch and the last epoch that has been saved in order to know what's been saved and what the current status is. time.time() returns the system time as the number of seconds since January 1, 1970, 00:00:00. By dividing that by the epoch length, it returns a unique and always increasing epoch number (except if the system time gets set back, such as with daylight savings - so avoid using this program during the fall back return to standard time).
        self.state = "NA"
Easy enough: the initial state of the event being tracked is unknown until something is input, so it's initialized to NA.
        pyg.init()
        pyg.music.load(Tracker.SOUND)
Next, the sound system is set up so that at the beginning of epochs, a little wav file can be played.
        self.bind_all("<Key>", self.onKeyPressed)
        self.initUI()
The tkinter bind_all method allows you to track almost any HID input throughout your application. Here, I'm just setting up the keyboard input. initUI (below) sets up the GUI button inputs and display output.
        self.port = '/dev/ttyACM0'
        self.baudrate = 9600
        self.ser = serial.Serial(self.port, self.baudrate, timeout=1)    
        self.trackSerial()
Here, I'm setting up the serial interface with my Arduino Uno, and allowing Python to track it. You'll need to set the port and baud rate according to your setup (we'll get into more detail around this in Pt. 2). If you don't want to use an Arduino for additional input/output, you can just comment any serial related stuff out of the code. The trackSerial method (below) handles the custom protocol for communicating with the Arduino.
    def initUI(self):
        self.parent.title("Tracker")
        self.columnconfigure(0, pad=3)
        self.columnconfigure(1, pad=3)
        self.columnconfigure(2, pad=3)
        self.columnconfigure(3, pad=3)
        self.rowconfigure(0, pad=3)
        self.rowconfigure(1, pad=3)
        self.rowconfigure(2, pad=3)
        self.display = Display(self)
        self.display.grid(row=0, columnspan=4, sticky=(tk.W+tk.E))
        leftButton = tk.Button(self, text="Left",
                               command=lambda: self.onKeyPressed("Left"))
        leftButton.grid(row=1, column=0)
        rightButton = tk.Button(self, text="Right",
                                command=lambda: self.onKeyPressed("Right"))
        rightButton.grid(row=1, column=1)
        neitherButton = tk.Button(self, text="Neither",
                                  command=lambda: self.onKeyPressed("Down"))
        neitherButton.grid(row=1, column=2)
        bothButton = tk.Button(self, text="Both",
                               command=lambda: self.onKeyPressed("Up"))
        bothButton.grid(row=1, column=3)
        exitButton = tk.Button(self, text="Exit",
                               command=lambda: self.onKeyPressed("q"))
        exitButton.grid(row=2, columnspan=4)
        self.pack()
I'm using tkinter's Frame class to set up a simple grid GUI with 3 rows and 4 columns. The first row holds the feedback display (controlled by the Display class above) spanning all 4 columns. The second row holds the input buttons. In my case, I'm tracking four states, so 4 columns/buttons. The third row just holds an exit button. When pressed, the buttons make calls to onKeyPressed (below) with an argument indicating which button was pressed.
    def onKeyPressed(self, e):
        try:
            key = e.keysym
        except AttributeError:
            key = e
        if key == "q":
            self.onExit()
        self.setState(key)
        self.ser.write("W"+str(Tracker.STATES[self.state]))
        self.display.drawStatus()
So... there's a little hacking going on here. When a key is pressed, we just set the value to the key pressed. If a button is pressed in the GUI, we fake a key press by having the button pass a value to onKeyPressed. onKeyPressed then sets the current state of the event to whatever was input and has the Display update immediately to show the current state.
        self.ser.write("W"+str(Tracker.STATES[self.state]))
Here, I'm sending a message to the Arduino, so that it can update its current state to whatever key was pressed. The "W" that's prepending to the message just lets the Arduino know that it is to take the state as in input, not as a request to send an update.
    def setState(self, key):
        if key in Tracker.KEYS:    
            self.state = Tracker.KEYS[key]
When an input comes in and is handled by onKeyPressed (above - from keyboard and GUI button inputs) or trackSerial (below - from the Arduino), there is a call to setState. This method checks to see that it was a valid input, and if so, actually sets the current state based on that input.
    def onExit(self):
        self.synch()
        self.quit()
Pressing "q" on the keyboard, or the "Exit" button on the GUI allows a user to safely exit the program in a way that makes sure the latest data is saved to file. This is done by synching data one last time (the synch method is next, below), and then telling tkinter to quit the application.
    def synch(self):
        self.setCurrentEpoch()
        n = self.checkEpoch()
        if n > 0:
            if n > 1:
                for i in range(n-1):
                    self.updateFile(i, "NA, synch problem")
            self.updateFile(0, self.state + ",")
            self.savedEpoch = self.currentEpoch
            self.state = "NA"
            self.ser.write("W"+str(Tracker.STATES[self.state]))
            pyg.music.play()
A large part of the heavy lifting for this program comes here, in the synch method. This method is called in two situations: the Display class makes a call after every 200ms delay - see the Display.onTimer method above - and when the application is safely quit as in onExit (above). Tkinter is a single thread interface, so when we want things to happen later, we can only guarantee that they don't happen early. That is, the after method will wait at least as long as you tell it, and it may wait longer. In order to make sure that timing stays very tight, I have locked timing by making calls to the system time, and identifying the current epoch and knowing what the last epoch was that was saved to file.

Set the current epoch (see below), then check how long it's been since an epoch was saved to file. If it's more than one epoch behind, fill in the gaps with NAs - there was clearly a timing issue, and we can't, at this point, presume to know how or why that happened, or what the event state was during the intervening lost time. Otherwise, if the time has moved into the next epoch (00:00:00 or 00:00:30) save the epoch and state to file (see updateFile below), the last saved epoch is updated, the current state is set to NA, and the NA state is sent to the Arduino. Then a tone is played to indicate to the user that a new epoch has started.

If synch is called, but the epoch hasn't changed, not much happens.
    def trackSerial(self):
        if self.ser.inWaiting() > 0:
            thisByte = self.ser.read()
            self.ser.flushInput()
            if thisByte == "E":
                self.ser.write("W"+str(Tracker.STATES[self.state]))
            elif thisByte in ["0", "1", "2", "3", "4"]:
                self.setState(Tracker.STATES2[int(thisByte)])
            else:
                self.ser.write("RR")
The trackSerial method is meant for handling serial communication with an Arduino. This occurs when first initializing the program interface, and every time the display updates. This is for handling input from the Arduino. Output to Arduino is handled in the onKeyPressed (above) and synch (above) methods.

trackSerial looks to see if any messages have been sent by the Arduino. If so, then the message is handled. If the message was an error ("E"), the current state is sent to the Arduino. This will happen if we ask the Arduino for the current state and it doesn't know for some reason - loss of connection, or power or some other error state. If the message is one of the expected states, we update the current state accordingly. Otherwise, the incoming message from the Arduino is unexpected, so a message is sent back asking for the Arduino to repeat - maybe there was a corruption of the signal or some other error condition, so just ask again.

Though not perfect, this sets up a fairly robust communication protocol with the Arduino that ensures that inputs from keyboard, GUI, and Arduino are fairly well synchronized, while handling some potential error conditions.
    def updateFile(self, n, data):
        with open(Tracker.OUTFN, "a") as f:
            f.write(str(self.savedEpoch+n+1) + "," + data + "\n")
The updateFile method simply writes the epoch and data to file by appending it to the end of the indicated file. The "with open() as f:" syntax is efficient and ensures that the file gets closed after every use.
    def setCurrentEpoch(self):
        t = int(time.time()/Tracker.EPOCH_LENGTH)
        if self.currentEpoch != t:
            self.currentEpoch = t
    def checkEpoch(self):
        return self.currentEpoch-self.savedEpoch
These two simple methods update the current epoch being tracked based on clock time from the system. The checkEpoch method then determines the last time an epoch was written to file so that when synch starts telling updateFile to save data, we put gaps in anywhere data was missing for whatever reason. This is very important later, when it comes to analyzing data in a time series. If data are missing but the gaps aren't filled, it just makes life more difficult, even if there are good time stamps.

This last bit is outside the class definitions above and is a standard way of running a tkinter program.
def main():
    root = tk.Tk()
    app = Tracker(parent=root)
    app.mainloop()
if __name__ == '__main__':
    main()
Set up the tkinter program, and start the mainloop. The "if __name__ == '__main__':" syntax allows us to import the class definitions for use elsewhere with Python, without necessarily starting an instance of the tkinter program as an application GUI.

That's All for Now

Well, that was a lot - but it's only the Python side of the story. There's still the Arduino to consider. However, when I first put this application together, I didn't have an Arduino connected. It's actually pretty useful without it - you get an audio reminder every 30s to input a single keystroke/button press, you just need to find the right window to input into, then move on. The advantage of connected an Arduino is so the input can be handled outside the computer's system, and minimize any interruption to any workflow on the computer.

Look for the Python/Arduino Event Tracker, Pt 2: Arduino Side coming someday soon. It'll be my next post. Hopefully someone can find this useful. If there are any questions, please feel free to leave a comment below or contact me directly.

No comments:

Post a Comment