From cc672e1d2484f97836c4b27d4821137b0f5d398e Mon Sep 17 00:00:00 2001 From: Samuel Sloniker Date: Mon, 22 Jul 2024 15:09:15 -0700 Subject: [PATCH] Local USB --- navpoint/gui.py | 41 +++++++++++++- navpoint/run.py | 12 +++- navpoint/usb.py | 140 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 + 4 files changed, 191 insertions(+), 4 deletions(-) create mode 100644 navpoint/usb.py diff --git a/navpoint/gui.py b/navpoint/gui.py index 4f3c428..7a1bd94 100755 --- a/navpoint/gui.py +++ b/navpoint/gui.py @@ -1,9 +1,48 @@ import tkinter as tk +from tkinter import ttk import pyqrcode import navpoint.fix_path +import navpoint.usb -def run(link): +def choose_mode(): + def _usb_clicked(): + window.destroy() + global mode + mode = "usb" + + def _mobile_clicked(): + window.destroy() + global mode + mode = "mobile" + + window = tk.Tk(className="navpoint") + window.title("Navpoint") + window.resizable(width=False, height=False) + + global mode + mode = "" + + ttk.Label(window, text="How should Navpoint obtain location data?").pack() + usb_button = ttk.Button( + window, text="USB GPS receiver (NMEA)", command=_usb_clicked + ) + usb_button.pack() + mobile_button = ttk.Button( + window, text="Phone running Navpoint Mobile", command=_mobile_clicked + ) + mobile_button.pack() + + try: + window.iconbitmap(navpoint.fix_path.fix_path("icon.ico")) + except tk.TclError: + pass + + window.mainloop() + return mode + + +def show_qr(link): window = tk.Tk(className="navpoint") window.title("Navpoint") window.resizable(width=False, height=False) diff --git a/navpoint/run.py b/navpoint/run.py index 3355443..f67672a 100755 --- a/navpoint/run.py +++ b/navpoint/run.py @@ -1,9 +1,15 @@ import navpoint.local_server import navpoint.phone_server +import navpoint.usb import navpoint.gui def run(): - navpoint.local_server.run() - link = navpoint.phone_server.run() - navpoint.gui.run(link) + mode = navpoint.gui.choose_mode() + if mode == "mobile": + navpoint.local_server.run() + link = navpoint.phone_server.run() + navpoint.gui.show_qr(link) + elif mode == "usb": + navpoint.local_server.run() + navpoint.usb.run_ui() diff --git a/navpoint/usb.py b/navpoint/usb.py new file mode 100644 index 0000000..8e0bb24 --- /dev/null +++ b/navpoint/usb.py @@ -0,0 +1,140 @@ +import threading +import queue +import tkinter as tk +from tkinter import ttk +from tkinter import messagebox +from serial import Serial +from pynmeagps import NMEAReader +import serial.tools.list_ports +import serial.serialutil +import navpoint.content +import navpoint.fix_path + + +def main(port, window, updates): + try: + with Serial(port, timeout=3) as stream: + reader = NMEAReader(stream) + updates.put( + ( + "Connected", + port, + ) + ) + window.event_generate("<>", when="tail", state=1) + + while True: + _, parsed_data = reader.read() + try: + lat = parsed_data.lat + lon = parsed_data.lon + time = parsed_data.time + date = parsed_data.date + navpoint.content.content = f"""Navpoint1Position#icon{lon},{lat},0{date} {time} UTC""" + updates.put( + ( + "LocationUpdate", + { + "lat": lat, + "lon": lon, + "time": f"{date} {time} UTC", + }, + ) + ) + window.event_generate("<>", when="tail", state=1) + + except AttributeError: + pass + except serial.serialutil.SerialException as e: + updates.put( + ( + "Disconnected", + port, + ) + ) + window.event_generate("<>", when="tail", state=1) + messagebox.showerror( + "Navpoint Error", f"Error reading from port {port}: {str(e)}" + ) + + +def run(port, window, updates): + threading.Thread( + target=main, args=(port, window, updates), daemon=True + ).start() + + +def clear(queue_to_clear): + try: + while True: + queue_to_clear.get(block=False) + except queue.Empty: + pass + + +def update_dropdown(dropdown): + dropdown["values"] = ["[Select port]"] + [ + f"{port.device} ({port.description})" + for port in serial.tools.list_ports.comports() + ] + dropdown.current(newindex=0) + + +def run_ui(): + def selected(event): + selection = dropdown.get().split()[0] + if not selection.startswith("["): + clear(updates) + run(selection, window, updates) + + def new_data(event): + try: + purpose, content = updates.get(block=False) + except queue.Empty: + return + + if purpose == "LocationUpdate": + updated.config(text="Last updated: " + content["time"]) + elif purpose == "Connected": + status.config(text="Connected to " + content) + dropdown.pack_forget() + refresh.pack_forget() + elif purpose == "Disconnected": + status.config(text="Not connected") + update_dropdown(dropdown) + dropdown.pack() + refresh.pack() + + def refresh_clicked(): + update_dropdown(dropdown) + + window = tk.Tk(className="navpoint") + window.title("Navpoint") + window.resizable(width=False, height=False) + + status = ttk.Label(window, text="Not connected") + status.pack() + + updated = ttk.Label(window, text="Waiting for location") + updated.pack() + + n = tk.StringVar() + dropdown = ttk.Combobox(window, state="readonly", textvariable=n, width=40) + update_dropdown(dropdown) + dropdown.pack() + + refresh = ttk.Button( + window, text="Refresh device list", command=refresh_clicked + ) + refresh.pack() + + dropdown.bind("<>", selected) + window.bind("<>", new_data) + + updates = queue.Queue() + try: + window.iconbitmap(navpoint.fix_path.fix_path("icon.ico")) + except tk.TclError: + pass + + window.mainloop() diff --git a/requirements.txt b/requirements.txt index adca132..33291f6 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ pyqrcode tornado +pyserial +pynmeagps