Skip to content
Extraits de code Groupes Projets
mainwindow.py 5,85 ko
Newer Older
  • Learn to ignore specific revisions
  • Simon Collignon's avatar
    Simon Collignon a validé
    import logging
    import serial
    import serial.tools.list_ports
    from simple_pid import PID
    import time
    
    import PySide6.QtGui
    from PySide6.QtWidgets import QWidget, QMainWindow, QFileDialog, QSpacerItem, QSizePolicy
    from PySide6.QtCore import Qt, Slot, QTimer
    from PySide6 import QtGui, QtWidgets
    
    from app.ui.dist.ui_mainwindow import Ui_MainWindow
    from app.code.module import Module
    
    logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
    
    def find_serial_device(serial_number:str) -> str:
        """Check the `hwid` through the list ports for a `SER` key
        that matches `serial_number`
        """
        for port, desc, hwid in serial.tools.list_ports.comports():
            # print(port)
            infos = hwid.split(" ") 
            print(infos)
            for info in infos: 
                if not info.startswith("SER"):
                    continue
                serial_num = info.split("=")[-1]
                print(serial_num)
                if not serial_num == serial_number:
                    break
                return port
        return "" 
    
    
    
    class MainWindow(QMainWindow, Ui_MainWindow):
        """The MainWindow is the parent class of the application."""
    
    
        def __init__(self):
            """Initialize the MainWindow and setup the UI."""
    
            super().__init__()
            self.setupUi(self)
            self.setWindowTitle("Autowaveguide")
            
            self.timer = QTimer()
            self.timer.timeout.connect(self.the_loop)
            self.timer.start(1)  # Update every 200 ms
    
            self.timer_gui = QTimer()
            self.timer_gui.timeout.connect(self.gui_loop)
            self.timer_gui.start(100)
    
            self.timer_gui_true_pressure = QTimer()
            self.timer_gui_true_pressure.timeout.connect(self.true_pressure_loop)
            self.timer_gui_true_pressure.start(1000)
    
            self.qlevel = 0.0
            self.p_offset = 871
            self.p_scale = 4.818666666666666e-03
    
            self.mks_controller : MKS627H = MKS627H()
    
            # burkert solenoid valve
            SOLENOID_SERIAL_NUMBER : str = "3307142A37323835191D33324B572D44"
            VALVE_PORT : str = find_serial_device(SOLENOID_SERIAL_NUMBER)
            self.valve = serial.Serial()
            self.valve.port = VALVE_PORT
            self.valve.baudrate = 1_000_000
            self.valve.open()
    
            self.module : Module = Module()
            self.central_layout.addWidget(self.module)
    
            self.module.setpoint.editingFinished.connect(self.update_pid) 
            self.module.pid_kp.editingFinished.connect(self.update_pid)
            self.module.pid_ki.editingFinished.connect(self.update_pid)
            self.module.pid_kd.editingFinished.connect(self.update_pid)
    
            self.pid = PID(self.module.pid_kp.value(), 
                           self.module.pid_ki.value(), 
                           self.module.pid_kd.value(), 
                           self.module.setpoint.value(), 
                           output_limits=(40, 70))
    
    
    
    
        def update_valve(self, valve_percent:float):
            self.valve.write(b'u')
            data = str(valve_percent)
            self.valve.write(data.encode() + b'\n')
            try:
                response = float(self.valve.readline().decode('utf-8').rstrip().split()[0])
            except ValueError as error:
                logging.error(error)
                return -1
            return response
        
    
        def update_pid(self):
            self.pid.setpoint = self.pressure_to_qlevel(self.module.setpoint.value())
            self.pid.tunings = (self.module.pid_kp.value(), self.module.pid_ki.value(), self.module.pid_kd.value())
    
    
        def qlevel_to_pressure(self, level:int) -> float:
            """Convert the quantization level of the 14 bit ADC of the arduino R4 minima to pressure in ubar."""
            return float(self.p_scale*(level-self.p_offset))
    
    
        def pressure_to_qlevel(self, pressure:float) -> int:
            """Convert pressure in ubar to the quantization level of the 14 bit ADC of the arduino R4 minima."""
            return int(pressure/self.p_scale + self.p_offset)
        
        
        def the_loop(self):
            _ql = self.update_valve(self.pid(self.qlevel))
            if _ql != -1:
                self.qlevel = _ql
        
    
        def gui_loop(self):
            """This is the main loop of the GUI."""
            self.module.update_plot_data(self.qlevel_to_pressure(self.qlevel))
    
    
        def true_pressure_loop(self):
            """A simple loop to display the absolute pressure read from the MKS gauge controller."""
            self.module.true_pressure.setText(f"{self.mks_controller.read_pressure():.2f}")
    
    
        def closeEvent(self, event):
            try:
                self.timer.timeout.disconnect(self.the_loop)
                self.valve.write(b'r')
                logging.info("Exited normally.")
            except Exception as e:
                logging.error(f"Abnormal exit with error: {e}")
            event.accept()
    
    
    class MKS627H():
        def __init__(self):
            PGAUGE_SERIAL_NUMBER : str = "FT6VYF8DA"
            PGAUGE_PORT : str = find_serial_device(PGAUGE_SERIAL_NUMBER)
            self.pgauge = serial.Serial()
            self.pgauge.port = PGAUGE_PORT
            self.pgauge.timeout = 1
            self.pgauge.baudrate = 9600
            self.pgauge.open()
    
    
        def read_pressure(self) -> float:
            """Return the pressure read from the PDR 2000 MKS controller in ubar.
            
            I added the `in_waiting` while loop because I think I was accumulating 
            data in the serial buffer. This helps to prevent this issue.
    
            TODO: in the future we might implement a maximum number of tries to read the pressure
            and if it fails we should throw an error to stop the experiment and display something
            well. 
            """
            try:
                self.pgauge.write(b'p\n')
                # Check if there's data in the buffer
                while self.pgauge.in_waiting > 0:
                    # Read all data available in the buffer
                    response = self.pgauge.readline().decode('utf-8').rstrip() 
                return float(response.split()[0])*1e3
            except Exception as e:
                logging.error(e)
                return -1