Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import logging
import serial
import serial.tools.list_ports
from simple_pid import PID
import time
import PySide6.QtGui
from PySide6.QtWidgets import QWidget, QMainWindow, QFileDialog, QSpacerItem, QSizePolicy
from PySide6.QtCore import Qt, Slot, QTimer
from PySide6 import QtGui, QtWidgets
from app.ui.dist.ui_mainwindow import Ui_MainWindow
from app.code.module import Module
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
def find_serial_device(serial_number:str) -> str:
"""Check the `hwid` through the list ports for a `SER` key
that matches `serial_number`
"""
for port, desc, hwid in serial.tools.list_ports.comports():
# print(port)
infos = hwid.split(" ")
print(infos)
for info in infos:
if not info.startswith("SER"):
continue
serial_num = info.split("=")[-1]
print(serial_num)
if not serial_num == serial_number:
break
return port
return ""
class MainWindow(QMainWindow, Ui_MainWindow):
"""The MainWindow is the parent class of the application."""
def __init__(self):
"""Initialize the MainWindow and setup the UI."""
super().__init__()
self.setupUi(self)
self.setWindowTitle("Autowaveguide")
self.timer = QTimer()
self.timer.timeout.connect(self.the_loop)
self.timer.start(1) # Update every 200 ms
self.timer_gui = QTimer()
self.timer_gui.timeout.connect(self.gui_loop)
self.timer_gui.start(100)
self.timer_gui_true_pressure = QTimer()
self.timer_gui_true_pressure.timeout.connect(self.true_pressure_loop)
self.timer_gui_true_pressure.start(1000)
self.qlevel = 0.0
self.p_offset = 871
self.p_scale = 4.818666666666666e-03
self.mks_controller : MKS627H = MKS627H()
# burkert solenoid valve
SOLENOID_SERIAL_NUMBER : str = "3307142A37323835191D33324B572D44"
VALVE_PORT : str = find_serial_device(SOLENOID_SERIAL_NUMBER)
self.valve = serial.Serial()
self.valve.port = VALVE_PORT
self.valve.baudrate = 1_000_000
self.valve.open()
self.module : Module = Module()
self.central_layout.addWidget(self.module)
self.module.setpoint.editingFinished.connect(self.update_pid)
self.module.pid_kp.editingFinished.connect(self.update_pid)
self.module.pid_ki.editingFinished.connect(self.update_pid)
self.module.pid_kd.editingFinished.connect(self.update_pid)
self.pid = PID(self.module.pid_kp.value(),
self.module.pid_ki.value(),
self.module.pid_kd.value(),
self.module.setpoint.value(),
output_limits=(40, 70))
def update_valve(self, valve_percent:float):
self.valve.write(b'u')
data = str(valve_percent)
self.valve.write(data.encode() + b'\n')
try:
response = float(self.valve.readline().decode('utf-8').rstrip().split()[0])
except ValueError as error:
logging.error(error)
return -1
return response
def update_pid(self):
self.pid.setpoint = self.pressure_to_qlevel(self.module.setpoint.value())
self.pid.tunings = (self.module.pid_kp.value(), self.module.pid_ki.value(), self.module.pid_kd.value())
def qlevel_to_pressure(self, level:int) -> float:
"""Convert the quantization level of the 14 bit ADC of the arduino R4 minima to pressure in ubar."""
return float(self.p_scale*(level-self.p_offset))
def pressure_to_qlevel(self, pressure:float) -> int:
"""Convert pressure in ubar to the quantization level of the 14 bit ADC of the arduino R4 minima."""
return int(pressure/self.p_scale + self.p_offset)
def the_loop(self):
_ql = self.update_valve(self.pid(self.qlevel))
if _ql != -1:
self.qlevel = _ql
def gui_loop(self):
"""This is the main loop of the GUI."""
self.module.update_plot_data(self.qlevel_to_pressure(self.qlevel))
def true_pressure_loop(self):
"""A simple loop to display the absolute pressure read from the MKS gauge controller."""
self.module.true_pressure.setText(f"{self.mks_controller.read_pressure():.2f}")
def closeEvent(self, event):
try:
self.timer.timeout.disconnect(self.the_loop)
self.valve.write(b'r')
logging.info("Exited normally.")
except Exception as e:
logging.error(f"Abnormal exit with error: {e}")
event.accept()
class MKS627H():
def __init__(self):
PGAUGE_SERIAL_NUMBER : str = "FT6VYF8DA"
PGAUGE_PORT : str = find_serial_device(PGAUGE_SERIAL_NUMBER)
self.pgauge = serial.Serial()
self.pgauge.port = PGAUGE_PORT
self.pgauge.timeout = 1
self.pgauge.baudrate = 9600
self.pgauge.open()
def read_pressure(self) -> float:
"""Return the pressure read from the PDR 2000 MKS controller in ubar.
I added the `in_waiting` while loop because I think I was accumulating
data in the serial buffer. This helps to prevent this issue.
TODO: in the future we might implement a maximum number of tries to read the pressure
and if it fails we should throw an error to stop the experiment and display something
well.
"""
try:
self.pgauge.write(b'p\n')
# Check if there's data in the buffer
while self.pgauge.in_waiting > 0:
# Read all data available in the buffer
response = self.pgauge.readline().decode('utf-8').rstrip()
return float(response.split()[0])*1e3
except Exception as e:
logging.error(e)
return -1