Local USB

This commit is contained in:
Samuel Sloniker 2024-07-22 15:09:15 -07:00
parent 0046f47d88
commit cc672e1d24
4 changed files with 191 additions and 4 deletions

View File

@ -1,9 +1,48 @@
import tkinter as tk import tkinter as tk
from tkinter import ttk
import pyqrcode import pyqrcode
import navpoint.fix_path import navpoint.fix_path
import navpoint.usb
def run(link): def choose_mode():
def _usb_clicked():
window.destroy()
global mode
mode = "usb"
def _mobile_clicked():
window.destroy()
global mode
mode = "mobile"
window = tk.Tk(className="navpoint")
window.title("Navpoint")
window.resizable(width=False, height=False)
global mode
mode = ""
ttk.Label(window, text="How should Navpoint obtain location data?").pack()
usb_button = ttk.Button(
window, text="USB GPS receiver (NMEA)", command=_usb_clicked
)
usb_button.pack()
mobile_button = ttk.Button(
window, text="Phone running Navpoint Mobile", command=_mobile_clicked
)
mobile_button.pack()
try:
window.iconbitmap(navpoint.fix_path.fix_path("icon.ico"))
except tk.TclError:
pass
window.mainloop()
return mode
def show_qr(link):
window = tk.Tk(className="navpoint") window = tk.Tk(className="navpoint")
window.title("Navpoint") window.title("Navpoint")
window.resizable(width=False, height=False) window.resizable(width=False, height=False)

View File

@ -1,9 +1,15 @@
import navpoint.local_server import navpoint.local_server
import navpoint.phone_server import navpoint.phone_server
import navpoint.usb
import navpoint.gui import navpoint.gui
def run(): def run():
mode = navpoint.gui.choose_mode()
if mode == "mobile":
navpoint.local_server.run() navpoint.local_server.run()
link = navpoint.phone_server.run() link = navpoint.phone_server.run()
navpoint.gui.run(link) navpoint.gui.show_qr(link)
elif mode == "usb":
navpoint.local_server.run()
navpoint.usb.run_ui()

140
navpoint/usb.py Normal file
View File

@ -0,0 +1,140 @@
import threading
import queue
import tkinter as tk
from tkinter import ttk
from tkinter import messagebox
from serial import Serial
from pynmeagps import NMEAReader
import serial.tools.list_ports
import serial.serialutil
import navpoint.content
import navpoint.fix_path
def main(port, window, updates):
try:
with Serial(port, timeout=3) as stream:
reader = NMEAReader(stream)
updates.put(
(
"Connected",
port,
)
)
window.event_generate("<<newData>>", when="tail", state=1)
while True:
_, parsed_data = reader.read()
try:
lat = parsed_data.lat
lon = parsed_data.lon
time = parsed_data.time
date = parsed_data.date
navpoint.content.content = f"""<?xml version='1.0' encoding='us-ascii'?><kml xmlns="http://www.opengis.net/kml/2.2" xmlns:gx="http://www.google.com/kml/ext/2.2"><Document><Style id="icon"><IconStyle><Icon><href>http://127.0.0.1:8888/marker.png</href></Icon></IconStyle></Style><name>Navpoint</name><open>1</open><Placemark><name>Position</name><styleUrl>#icon</styleUrl><Point><coordinates>{lon},{lat},0</coordinates></Point><description>{date} {time} UTC</description></Placemark></Document></kml>"""
updates.put(
(
"LocationUpdate",
{
"lat": lat,
"lon": lon,
"time": f"{date} {time} UTC",
},
)
)
window.event_generate("<<newData>>", when="tail", state=1)
except AttributeError:
pass
except serial.serialutil.SerialException as e:
updates.put(
(
"Disconnected",
port,
)
)
window.event_generate("<<newData>>", when="tail", state=1)
messagebox.showerror(
"Navpoint Error", f"Error reading from port {port}: {str(e)}"
)
def run(port, window, updates):
threading.Thread(
target=main, args=(port, window, updates), daemon=True
).start()
def clear(queue_to_clear):
try:
while True:
queue_to_clear.get(block=False)
except queue.Empty:
pass
def update_dropdown(dropdown):
dropdown["values"] = ["[Select port]"] + [
f"{port.device} ({port.description})"
for port in serial.tools.list_ports.comports()
]
dropdown.current(newindex=0)
def run_ui():
def selected(event):
selection = dropdown.get().split()[0]
if not selection.startswith("["):
clear(updates)
run(selection, window, updates)
def new_data(event):
try:
purpose, content = updates.get(block=False)
except queue.Empty:
return
if purpose == "LocationUpdate":
updated.config(text="Last updated: " + content["time"])
elif purpose == "Connected":
status.config(text="Connected to " + content)
dropdown.pack_forget()
refresh.pack_forget()
elif purpose == "Disconnected":
status.config(text="Not connected")
update_dropdown(dropdown)
dropdown.pack()
refresh.pack()
def refresh_clicked():
update_dropdown(dropdown)
window = tk.Tk(className="navpoint")
window.title("Navpoint")
window.resizable(width=False, height=False)
status = ttk.Label(window, text="Not connected")
status.pack()
updated = ttk.Label(window, text="Waiting for location")
updated.pack()
n = tk.StringVar()
dropdown = ttk.Combobox(window, state="readonly", textvariable=n, width=40)
update_dropdown(dropdown)
dropdown.pack()
refresh = ttk.Button(
window, text="Refresh device list", command=refresh_clicked
)
refresh.pack()
dropdown.bind("<<ComboboxSelected>>", selected)
window.bind("<<newData>>", new_data)
updates = queue.Queue()
try:
window.iconbitmap(navpoint.fix_path.fix_path("icon.ico"))
except tk.TclError:
pass
window.mainloop()

View File

@ -1,2 +1,4 @@
pyqrcode pyqrcode
tornado tornado
pyserial
pynmeagps