Thursday, February 17, 2022

[SOLVED] Display live data from Battery Management System

Issue

I have a problem displaying the content of a message from a Battery management System (EMUS BMS) in Tkinter. I use a Raspberry-Pi 4 to collect the data via Serial, then parse the data through elif conditions and log the data on a usb stick.

import tkinter as tk
from tkinter import messagebox
import tkinter.font as font
import serial
import time
from USB_Status import usb_status
import threading, queue

class Application(tk.Frame):
    
    def __init__(self, master=None):
        super().__init__(master)
        self.master = master
        self.pack()
        self.create_widgets(master)

    def create_widgets(self, master):

        def button_emus():
            anweisungs_label.config(text="EMUS BMS ausgewählt!", fg="green")
            q = queue.Queue()
            thread_ = threading.Thread(
                            target=connection_emus, 
                            name="Emus-Thread",
                            args=[q],
                            daemon=True,
                            )
            thread_.start()
            thread_.join()
            text_box.insert("end-1c", q.get())

        emus_button.place(x = 420, y = 0, width=400, height=150)
        text_box.place(x = 0, y = 160, width=2048, height=800)    

def verarbeitung_emus(response, binary):
    telegram=[]

    #Battery Voltage Summary Sentence
    if response[0:3] == "BV1":
        number_of_cells = str(int(response[4:8], 16)) + " Zellen"
        binary.write((number_of_cells + "\n").encode())

        min_cell_voltage ="min. " + str((int(response[9:11], 16) + 200) * 0.01) + " V"
        binary.write((min_cell_voltage + "\n").encode())

        max_cell_voltage ="max. " + str((int(response[12:14], 16) + 200) * 0.01) + " V"
        binary.write((max_cell_voltage + "\n").encode())

        number = (int(response[15:17], 16) + 200) * 0.01
        average_cell_voltage = "avrg. " + str(format(number, '.2f')) + " V"
        binary.write((average_cell_voltage + "\n").encode())

        telegram.extend([number_of_cells, min_cell_voltage, max_cell_voltage, average_cell_voltage])

    # Battery Cell Module Temperature Summary Sentence
    elif response[0:3] == "BT1":

        number = int(response[9:11], 16) - 100
        if number > 32767:
            number = number - 65536
        min_cell_module_temp = "min. " + str(number) + " °C"
        binary.write((min_cell_module_temp + "\n").encode())

        number = int(response[12:14], 16) - 100
        if number > 32767:
            number = number - 65536
        max_cell_module_temp = "max. " + str(number) + " °C"
        binary.write((max_cell_module_temp + "\n").encode())

        number = int(response[15:17], 16) - 100
        if number > 32767:
            number = number - 65536
        average_cell_module_temp = "avrg. " + str(format(number, '.2f')) + " °C"
        binary.write((average_cell_module_temp + "\n").encode())

        telegram.extend([min_cell_module_temp, max_cell_module_temp, average_cell_module_temp])



    # Battery Balancing Rate Summary Sentence
    elif response[0:3] == "BB1":

        min_balancing_rate = "min. " + str(int(response[9:11], 16) * 100/255) + " %"
        binary.write((min_balancing_rate + "\n").encode())

        max_balancing_rate = "max. " + str(int(response[12:14], 16) * 100/255) + " %"
        binary.write((max_balancing_rate + "\n").encode())

        number = int(response[15:17], 16) * 100/255
        average_balancing_rate = "avrg. " + str(format(number, '.2f')) + " %"
        binary.write((average_balancing_rate + "\n").encode())

        balancing_voltage_threshold = str((int(response[19:20], 16) + 200) * 0.01) + " V"
        binary.write((balancing_voltage_threshold + "\n").encode())


        telegram.extend([min_balancing_rate, max_balancing_rate, average_balancing_rate, balancing_voltage_threshold])

    #  “Current and Voltage” Sentence
    elif response[0:3] == "CV1":
        total_voltage = "overall " +  str((int(response[4:12], 16) + 200) * 0.01) + " V"
        binary.write((total_voltage + "\n").encode())

        number = int(response[13:17], 16) * 0.1
        if number > 32767:
            number = number - 65536
        current = "overall " +  str(format(number, '.2f')) + " A"
        binary.write((current + "\n").encode())

        telegram.extend([total_voltage, current])
    return telegram

def connection_emus(queue):
    DEVICE = '/dev/ttyUSB0'
    BAUD = 57600
    ser = serial.Serial(DEVICE, BAUD)
    binary = open('/media/data_logger/EMUS.bin', "ab")

    try:
        while True:
            response = ser.readline()[:-2]
            response = response.decode()
            temp = verarbeitung_emus(response, binary)
            if temp:
                queue.put(temp)
                while not queue.empty():
                    print(queue.get())

    except KeyboardInterrupt:
        usb_status()
        ser.close()
        binary.close()

root = tk.Tk()
app = Application(master=root)
app.mainloop()

I've tried out threading, to run the loop in the background and access it to read out the values, but I've never worked with threading and besides starting a thread I cannot access anything.

Edit: threading works now with the queue class, I can display the data from a daemon process. But now I want to pass the data to the textbox object in tkinter. Any suggestions how to achieve this ?


Solution

I solved the issue by breaking the while loop and avoid using the multiprocessing or threading libary.

import tkinter as tk
    from tkinter import messagebox
    import tkinter.font as font
    import serial
    import time
    from USB_Status import usb_status
    
    class Application(tk.Frame):
        
        def __init__(self, master=None):
            super().__init__(master)
            self.master = master
            self.pack()
            self.create_widgets(master)
    
        def create_widgets(self, master):
            
            def button_emus():
                anweisungs_label.config(text="EMUS BMS ausgewählt!", fg="green")
                antwort = self.connection_emus()
                [number_of_cells, min_cell_voltage, max_cell_voltage, average_cell_voltage] = antwort[0]
                [min_cell_module_temp, max_cell_module_temp, average_cell_module_temp] = antwort[1]
                [min_balancing_rate, max_balancing_rate, average_balancing_rate, balancing_voltage_threshold] = antwort[2]
                [total_voltage, current] = antwort[3]
    
                # = self.connection_emus()
                text_box.insert("end-1c", number_of_cells + "\n")
                text_box.insert("end-1c", min_cell_voltage + "\n")
                text_box.insert("end-1c", max_cell_voltage + "\n")
                text_box.insert("end-1c", average_cell_voltage + "\n")
                text_box.insert("end-1c", min_cell_module_temp + "\n")
                text_box.insert("end-1c", max_cell_module_temp + "\n")
                text_box.insert("end-1c", average_cell_module_temp + "\n")
                text_box.insert("end-1c", min_balancing_rate + "\n")
                text_box.insert("end-1c", max_balancing_rate + "\n")
                text_box.insert("end-1c", average_balancing_rate + "\n")
                text_box.insert("end-1c", balancing_voltage_threshold + "\n")
                text_box.insert("end-1c", total_voltage + "\n")
                text_box.insert("end-1c", current + "\n")
    
            emus_button = tk.Button(master, text="EMUS BMS", command=button_emus)
            exit_button = tk.Button(master, text="Beenden", command=master.quit)
            text_box = tk.Text(master)

            emus_button.place(x = 420, y = 0, width=400, height=150)  
            text_box.place(x = 0, y = 160, width=2048, height=600)    
            exit_button.place(x = 1500, y = 820, width=400, height=150)
    
        def verarbeitung_emus(self, response, binary):
            telegram=[]
    
            #Battery Voltage Summary Sentence
            if response[0:3] == "BV1":
                number_of_cells = format(int(response[4:8], 16), '.2f')
                #str(format(number, '.2f')) + " Zellen"
                binary.write(str(number_of_cells).encode())
    
                min_cell_voltage = format((int(response[9:11], 16) + 200) * 0.01, '.2f')
                #"min. " + str(format(number, '.2f')) + " V"
                binary.write(str(min_cell_voltage).encode())
    
                max_cell_voltage = format((int(response[12:14], 16) + 200) * 0.01, '.2f')
                #"max. " + str(format(number, '.2f')) + " V"
                binary.write(str(max_cell_voltage).encode())
    
                average_cell_voltage = format((int(response[15:17], 16) + 200) * 0.01, '.2f')
                #"avrg. " + str(format(number, '.2f')) + " V"
                binary.write(str(average_cell_voltage).encode())
    
                telegram.extend([number_of_cells, min_cell_voltage, max_cell_voltage, average_cell_voltage])
    
            # Battery Cell Module Temperature Summary Sentence
            elif response[0:3] == "BT1":
    
                number = int(response[9:11], 16) - 100
                if number > 32767:
                    number = number - 65536
                min_cell_module_temp = format(number, '.2f')
                #"min. " + str(format(number, '.2f')) + " °C"
                binary.write(str(min_cell_module_temp).encode())
    
                number = int(response[12:14], 16) - 100
                if number > 32767:
                    number = number - 65536
                max_cell_module_temp = format(number, '.2f')
                #"max. " + str(format(number, '.2f')) + " °C"
                binary.write(str(max_cell_module_temp).encode())
    
                number = int(response[15:17], 16) - 100
                if number > 32767:
                    number = number - 65536
                average_cell_module_temp = format(number, '.2f')
                #"avrg. " + str(format(number, '.2f')) + " °C"
                binary.write(str(average_cell_module_temp).encode())
    
                telegram.extend([min_cell_module_temp, max_cell_module_temp, average_cell_module_temp])
    
  
    
            # Battery Balancing Rate Summary Sentence
            elif response[0:3] == "BB1":
                
                min_balancing_rate = format(int(response[9:11], 16) * 100/255, '.2f')
                #"min. " + str(format(number, '.2f')) + " %"
                binary.write(str(min_balancing_rate).encode())
    
                max_balancing_rate = format(int(response[12:14], 16) * 100/255, '.2f')
                #"max. " + str(format(number, '.2f')) + " %"
                binary.write(str(max_balancing_rate).encode())
    
                average_balancing_rate = format(int(response[15:17], 16) * 100/255, '.2f')
                #"avrg. " + str(format(number, '.2f')) + " %"
                binary.write(str(max_balancing_rate).encode())
    
                balancing_voltage_threshold = format((int(response[19:20], 16) + 200) * 0.01, '.2f')
                #str(format(number, '.2f')) + " V"
                binary.write(str(balancing_voltage_threshold).encode())
                
                telegram.extend([min_balancing_rate, max_balancing_rate, average_balancing_rate, balancing_voltage_threshold])
    
            #  “Current and Voltage” Sentence
            elif response[0:3] == "CV1":
                total_voltage = format((int(response[4:12], 16) + 200) * 0.01, '.2f')
                #"overall " +  str(format(number, '.2f')) + " V"
                binary.write(str(total_voltage).encode())
    
                number = int(response[13:17], 16) * 0.1
                if number > 32767:
                    number = number - 65536
                current = format(number, '.2f')
                #"overall " +  str(format(number, '.2f')) + " A"
                binary.write(str(current).encode())
    
                telegram.extend([total_voltage, current])
    
            if telegram:
                print(telegram)
                return telegram
    
        def connection_emus(self):
            DEVICE = '/dev/ttyUSB0'
            BAUD = 57600
            ser = serial.Serial(DEVICE, BAUD)
            binary = open('/media/data_logger/EMUS.bin', "ab")
            temp = []
    
            while True:
                response = ser.readline()[:-2]
                response = response.decode()
                if response[0:3] == "CV1" or response[0:3] == "BB1" or response[0:3] == "BT1" or response[0:3] == "BV1":
                    temp.append(self.verarbeitung_emus(response, binary))
                    print(temp)
                if response[0:3] == "CV1":
                    usb_status()
                    ser.close()
                    binary.close()
                    break
            if response[0:3] == "CV1":
                return temp
    '''
            except KeyboardInterrupt:
                usb_status()
                ser.close()  
                binary.close()
                '''
    
    root = tk.Tk()
    app = Application(master=root)
    app.mainloop()


Answered By - c0nr3f
Answer Checked By - Mary Flores (WPSolving Volunteer)