Add intercom

This commit is contained in:
Samuel Sloniker 2024-07-23 09:34:31 -07:00
parent 6e71059903
commit 61a9c2ca92
2 changed files with 55 additions and 36 deletions

34
navpoint/intercom.py Normal file
View File

@ -0,0 +1,34 @@
import queue
class Intercom:
def __init__(self, window, handler):
self.window = window
self.queue = queue.Queue()
self.handler = handler
self.window.bind("<<intercom>>", self._event)
def clear(self):
try:
while True:
self.queue.get(block=False)
except queue.Empty:
pass
def put(self, purpose, content):
self.queue.put(
(
purpose,
content,
)
)
self.window.event_generate("<<intercom>>", when="tail", state=1)
def _event(self, event):
try:
purpose, content = self.queue.get(block=False)
except queue.Empty:
return
self.handler(purpose, content)

View File

@ -9,19 +9,17 @@ import serial.tools.list_ports
import serial.serialutil import serial.serialutil
import navpoint.content import navpoint.content
import navpoint.fix_path import navpoint.fix_path
import navpoint.intercom
def main(port, window, updates): def main(port, intercom):
try: try:
with Serial(port, timeout=3) as stream: with Serial(port, timeout=3) as stream:
reader = NMEAReader(stream) reader = NMEAReader(stream)
updates.put( intercom.put(
(
"Connected", "Connected",
port, port,
) )
)
window.event_generate("<<newData>>", when="tail", state=1)
while True: while True:
_, parsed_data = reader.read() _, parsed_data = reader.read()
@ -31,8 +29,7 @@ def main(port, window, updates):
time = parsed_data.time time = parsed_data.time
date = parsed_data.date date = parsed_data.date
navpoint.content.content = f"""<?xml version='1.0' encoding='us-ascii'?><kml xmlns="http://www.opengis.net/kml/2.2" xmlns:gx="http://www.google.com/kml/ext/2.2"><Document><Style id="icon"><IconStyle><Icon><href>http://127.0.0.1:8888/marker.png</href></Icon></IconStyle></Style><name>Navpoint</name><open>1</open><Placemark><name>Position</name><styleUrl>#icon</styleUrl><Point><coordinates>{lon},{lat},0</coordinates></Point><description>{date} {time} UTC</description></Placemark></Document></kml>""" navpoint.content.content = f"""<?xml version='1.0' encoding='us-ascii'?><kml xmlns="http://www.opengis.net/kml/2.2" xmlns:gx="http://www.google.com/kml/ext/2.2"><Document><Style id="icon"><IconStyle><Icon><href>http://127.0.0.1:8888/marker.png</href></Icon></IconStyle></Style><name>Navpoint</name><open>1</open><Placemark><name>Position</name><styleUrl>#icon</styleUrl><Point><coordinates>{lon},{lat},0</coordinates></Point><description>{date} {time} UTC</description></Placemark></Document></kml>"""
updates.put( intercom.put(
(
"LocationUpdate", "LocationUpdate",
{ {
"lat": lat, "lat": lat,
@ -40,28 +37,22 @@ def main(port, window, updates):
"time": f"{date} {time} UTC", "time": f"{date} {time} UTC",
}, },
) )
)
window.event_generate("<<newData>>", when="tail", state=1)
except AttributeError: except AttributeError:
pass pass
except serial.serialutil.SerialException as e: except serial.serialutil.SerialException as e:
updates.put( intercom.put(
(
"Disconnected", "Disconnected",
port, port,
) )
)
window.event_generate("<<newData>>", when="tail", state=1) window.event_generate("<<newData>>", when="tail", state=1)
messagebox.showerror( messagebox.showerror(
"Navpoint Error", f"Error reading from port {port}: {str(e)}" "Navpoint Error", f"Error reading from port {port}: {str(e)}"
) )
def run(port, window, updates): def run(port, intercom):
threading.Thread( threading.Thread(target=main, args=(port, intercom), daemon=True).start()
target=main, args=(port, window, updates), daemon=True
).start()
def clear(queue_to_clear): def clear(queue_to_clear):
@ -84,15 +75,10 @@ def run_ui():
def selected(event): def selected(event):
selection = dropdown.get().split(": ", 1)[0] selection = dropdown.get().split(": ", 1)[0]
if not selection.startswith("["): if not selection.startswith("["):
clear(updates) intercom.clear()
run(selection, window, updates) run(selection, intercom)
def new_data(event):
try:
purpose, content = updates.get(block=False)
except queue.Empty:
return
def new_data(purpose, content):
if purpose == "LocationUpdate": if purpose == "LocationUpdate":
updated.config(text="Last updated: " + content["time"]) updated.config(text="Last updated: " + content["time"])
elif purpose == "Connected": elif purpose == "Connected":
@ -129,9 +115,8 @@ def run_ui():
refresh.pack() refresh.pack()
dropdown.bind("<<ComboboxSelected>>", selected) dropdown.bind("<<ComboboxSelected>>", selected)
window.bind("<<newData>>", new_data)
updates = queue.Queue() intercom = navpoint.intercom.Intercom(window, new_data)
try: try:
window.iconbitmap(navpoint.fix_path.fix_path("icon.ico")) window.iconbitmap(navpoint.fix_path.fix_path("icon.ico"))