-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlab_equipment.py
555 lines (458 loc) · 17.7 KB
/
lab_equipment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
##############################################################################################
## This module contains classes for working with Lab equipment ##
## In order to use this module, you need to install pyvisa library ##
## Author: Zvi Karp ##
## Date: 15/11/16 ##
## Edited by: Bar Kristal, 18/12/16 ##
##############################################################################################
#Errors:
class Error_connecting_device():
def __init__(self):
write_to_log("Error: failed connecting to device" %(self.type))
#Lab equipment classes:
class Agillent34401A():
def __init__(self,address):
resource_manager = visa.ResourceManager()
self.dev=resource_manager.open_resource(address)
self.name=self.dev.query('*IDN?')[:-1]
self.dev.timeout = 5000
def close(self):
self.dev.close()
#write_to_log ('The connection with: %s is now closed' %(self.name))
def send_command(self, command):
#Send a visa command to the device.
#This function and the next one can be used in order to send a command that doesn't covered by this module.
self.dev.write(command)
def read_response(self, command):
#read from the device. can be used after sending a query command to the device.
return self.dev.read()[-1]
def meas(self,meas):
if meas == 'DCV':
self.dev.write('MEAS:VOLT:DC?')
result = self.dev.read()[:-1]
write_to_log ("DC Measurement: %s" %(result))
elif meas == 'ACV':
self.dev.write('MEAS:VOLT:AC?')
result = self.dev.read()[:-1]
write_to_log ("AC Measurement: %s" %(result))
elif meas == 'frequency':
self.dev.write('MEAS:FREQ?')
result = self.dev.read()[:-1]
write_to_log ("Frequency Measurement: %s" %(result))
elif meas == 'resistance':
self.dev.write('MEAS:REAS?')
result = self.dev.read()[:-1]
write_to_log ("Resistance Measurement: %s" %(result))
elif meas == 'DCI':
self.dev.write('MEAS:CURR:DC?')
result = self.dev.read()[:-1]
write_to_log ("DC Current Measurement: %s" %(result))
elif meas == 'ACI':
self.dev.write('MEAS:CURR:AC?')
result = self.dev.read()[:-1]
write_to_log ("AC Current Measurement: %s" %(result))
return result
class QL355TPPwrSply():
def __init__(self,address):
resource_manager = visa.ResourceManager()
self.dev=resource_manager.open_resource(address)
self.dev.clear()
self.name=self.dev.query('*IDN?')[:-1]
self.dev.timeout = 5000
def close(self):
self.dev.close()
write_to_log ('The connection with: %s is now closed' %(self.name))
def set_timeout(self, time):
time = int(time)
self.dev.timeout = time
def send_command(self, command):
#Send a visa command to the device.
#This function and the next one can be used in order to send a command that doesn't covered by this module.
self.dev.write(command)
def read_response(self, command):
#read from the device. can be used after sending a query command to the device.
return self.dev.read()[-1]
def set_volt(self, channel, volt):
#set the voltage output of the specified channel
channel = str(channel)
self.dev.write("V%s %s" %(channel , volt))
time.sleep(0.5)
res = self.dev.query("V%s?" %(channel))[:-1]
return res
def set_current_lim(self, channel, current):
#set the current limit of the specified channel
channel = str(channel)
self.dev.write("I%s %s" %(channel ,current))
time.sleep(0.1)
res = self.dev.query("I%s?" %(channel))[:-1]
return res
def channel_on(self, channel):
#set ON the specified channel
channel = str(channel)
self.dev.write('OP%s 1' %(channel))
write_to_log('Channel %s is on' %(channel))
def channel_off(self, channel):
#set OFF the specified channel
channel = str(channel)
self.dev.write('OP%s 0' %(channel))
write_to_log('Channel %s is off' %(channel))
def increment_voltage(self, channel, step_size):
#increment the channel's output voltage in a step_size voltage
channel = str(channel)
self.dev.write('DELTAV%s %s' %(channel ,step_size))
self.dev.write('INCV%s' %(channel))
#write_to_log("Channel %s was incremented by %sV" %(channel ,step_size))
res = self.dev.write('V%s?' %(channel))
#write_to_log("Channel %s volatge is now %sV" %(channel ,self.dev.read()[3:]))
return res
def all_off(self):
#set ALL channels OFF
self.dev.write('OPALL 0')
#write_to_log("All channels set OFF")
def all_on(self):
#set ALL channels ON
self.dev.write('OPALL 1')
#write_to_log("All channels set ON")
def read_current(self, channel):
#reads the current flows right now through the channel
self.dev.write('I%sO?' %(str(channel)))
cur = self.dev.read()[:-1]
#write_to_log("The current at channel %s is: %s" %(channel, cur))
return cur
def sense(self, channel, mode):
#mode=0: local, mode=1: remote
self.dev.write('SENSE%s %s' %(channel, mode))
mod="local" if mode == '0' else "remote"
#write_to_log("Activated %s sense on channel %s" %(mod, str(channel)))
class HP53131aFreqCounter():
def __init__(self,address):
resource_manager = visa.ResourceManager()
self.dev=resource_manager.open_resource(address)
self.name=self.dev.query('*IDN?')[:-1]
self.dev.timeout = 5000
def close(self):
self.dev.close()
write_to_log ('The connection with: %s is now closed' %(self.name))
def send_command(self, command):
#Send a visa command to the device.
#This function and the next one can be used in order to send a command that doesn't covered by this module.
self.dev.write(command)
def read_response(self, command):
#read from the device. can be used after sending a query command to the device.
return self.dev.read()[-1]
#Take one of the following measurement options.
#Notice that the 'channel' argument is an option, only for 'frequency' mesurement, when the default is "1"
def meas(self, meas, channel="1"):
channel = str(channel)
if ((meas != "frequency" and meas != "volt_max_peak" and meas != "volt_min_peak") and (channel != "1")):
write_to_log("Error: %s measurement can be taken only from channel 1" %(meas))
return ""
else:
if (meas == "frequency"):
self.dev.write('MEAS%s:FREQ?' %(channel))
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "rise_time"):
self.dev.write('MEAS:RTIME?')
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "fall_time"):
self.dev.write('MEAS:FTIME?')
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "period"):
self.dev.write('MEAS:PER?')
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "pos_width"):
self.dev.write('MEAS:PWID?')
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "neg_width"):
self.dev.write('MEAS:NWID?')
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "duty_cycle"):
self.dev.write('MEAS:DCYC?')
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "volt_max_peak"):
self.dev.write('MEAS%s:MAX?' %(channel))
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "volt_min_peak"):
self.dev.write('MEAS%s:MIN?' %(channel))
result = self.dev.read()[:-1]
write_to_log ("%s measurement on channel %s: %s" %(meas, channel, result))
elif (meas == "ratio"):
self.dev.write('MEAS:FREQ:RAT?')
result = self.dev.read()[:-1]
write_to_log ("Ratio %s to %s: %s" %("1", "2", result))
elif (meas == "phase"):
self.dev.write('MEAS:PHAS?')
result = self.dev.read()[:-1]
write_to_log ("Phase %s to %s: %s" %("1", "2", result))
return result
class HP33120aWaveGen():
'''Compatible also for Agilent33250A device'''
def __init__(self, address):
resource_manager = visa.ResourceManager()
self.dev=resource_manager.open_resource(address)
self.name=self.dev.query('*IDN?')[:-1]
self.dev.timeout = 5000
def close(self):
self.dev.close()
write_to_log ('The connection with: %s is now closed' %(self.name))
def send_command(self, command):
#Send a visa command to the device.
#This function and the next one can be used in order to send a command that doesn't covered by this module.
self.dev.write(command)
def read_response(self, command):
#read from the device. can be used after sending a query command to the device.
return self.dev.read()[-1]
#Generate a waveform in single command.
#Example: self.generate("SIN", 3000, 1.5, -1) generates a sine wave, 3KHz, 1.5Vpp, -1V offset.
def generate(self, wave, frequency, amplitude, offset):
self.dev.write("APPL:%s %s, %s, %s" %(wave, frequency, amplitude, offset))
#Set specifiied shape : SINusoid|SQUare|TRIangle|RAMP|NOISe|DC|USER
def set_shape(self, shape):
self.dev.write("FUNC:SHAP %s" %(shape))
#Set frequency
def set_frequency(self, frequency):
self.dev.write("FREQ: %s" %(frequency))
def set_amplitude(self, amplitude):
self.dev.write("VOLT: %s" %(amplitude))
#Set the units of the amplitude: VPP|VRMS|DBM|DEFault
#Do not affect the offset units, which stays V.
def set_amplitude_units(self, units):
self.dev.write("VOLT:UNIT %s" %(amplitude))
def set_offset(self, offset):
self.dev.write("VOLT:OFFS %s" %(offset))
def get_shape(self):
result = self.dev.query("FUNC:SHAP?")[:-1]
write_to_log("Signal's shape is: %s" %(result))
return result
def get_frequency(self):
result = self.dev.query("FREQ?")[:-1]
write_to_log("Frequency is: %s" %(result))
return result
def get_amplitude(self):
result = self.dev.query("VOLT?")[:-1]
write_to_log("Amplitude is: %s" %(result))
return result
def get_amplitude_unit(self):
result = self.dev.query("VOLT:UNIT?")[:-1]
write_to_log("Amplitude units are: %s" %(result))
return result
def get_offset(self):
result = self.dev.query("VOLT:OFFS?")[:-1]
write_to_log("Offset is: %s" %(result))
return result
#Turn on the output channel. Doesn't necessary when using the generate() command,
#because output channel is being turn on automatically.
def output_on(self):
self.dev.write("OUTPUT ON")
def output_off(self):
self.dev.write("OUTPUT OFF")
class KikusuiPLZ70UA():
def __init__(self, address):
resource_manager = visa.ResourceManager()
self.dev=resource_manager.open_resource(address)
self.name=self.dev.query('*IDN?')[:-1]
self.dev.timeout = 5000
#Close the connection with the device
def close(self):
self.dev.close()
write_to_log ('The connection with: %s is now closed' %(self.name))
def send_command(self, command):
#Send a visa command to the device.
#This function and the next one can be used in order to send a command that doesn't covered by this module.
self.dev.write(command)
def read_response(self, command):
#read from the device. can be used after sending a query command to the device.
return self.dev.read()[-1]
#Reset the device to factory default settings.
def reset(self):
self.dev.write("*RST")
write_to_log ('%s configured to factory default settings' %(self.name[:-1]))
def load_on(self):
self.dev.write("INP ON")
if (self.dev.query("INP?") != '0'):
write_to_log("Load is on")
def load_off(self):
self.dev.write("INP OFF")
if (self.dev.query("INP?") != '1'):
write_to_log("Load is off")
#mode=CC/CR/CV/CCCV/CRCV, when CC='constant current' CR='constant resistance' CV='constant voltage'
def set_constant_mode(self, channel, mode):
self.dev.write("INST CH%s" %channel)
self.dev.write("FUNC %s" %mode)
#Set the conductance (in units of [S]). This funciton sets both of the channels together
def set_conductance(self, conductance):
self.dev.write("COND %s" %conductance)
#Set the resistance (in unit of [ohm]) by first changing to conductance units. This funciton sets both of the channels together
def set_resistance(self, resistance):
if (float(resistance) != 0):
conductance = str(1/float(resistance))
self.dev.write("COND %s" %conductance)
#Set current. This funciton sets both of the channels together
def set_current(self, current):
self.dev.write("CURR %s" %current)
#Set voltage. This funciton sets both of the channels together
def set_voltage(self, channel, voltage):
self.dev.write("INST CH%s" %channel)
self.dev.write("VOLT %s" %voltage)
#Read the conductance on the specified channel
def read_conductance(self, channel):
self.dev.write("INST CH%s" %channel)
res = self.dev.query("MEAS:COND?")[:-1]
write_to_log("The conductance on channel %s is: %sS" %(channel, res))
return res
#Read the conductance on the specified channel
def read_current(self, channel):
self.dev.write("INST CH%s" %channel)
res = self.dev.query("MEAS:CURR?")[:-1]
write_to_log("The current on channel %s is: %sA" %(channel, res))
return res
#Read the conductance on the specified channel
def read_voltage(self, channel):
self.dev.write("INST CH%s" %channel)
res = self.dev.query("MEAS:VOLT?")[:-1]
write_to_log("The voltage on channel %s is: %sV" %(channel, res))
return res
# The connection with this oven is a serial connection - rs232 - which is done using a usb-to-RS232 connector.
# This connector appears as a virtual port with a specified 'COM'- this is the COM we need to enter when initializing the connection.
class VotschVT4002():
def __init__(self, com):
self.name="VotschVT4002"
self.ser=serial.Serial()
self.ser.close()
self.ser.port = com
self.ser.baudrate = 9600
self.ser.bytesize = 8
self.ser.parity = 'N'
self.ser.stopbits = 1
self.ser.timeout = 1
self.ser.open()
#Close the serial connection with the device
def close(self):
self.ser.close()
#Changing the serial connection's parameters:
def change_port(self, com):
self.ser.port = com
write_to_log("{} port changed to {}".format(self.name, com))
#Sets the temperature and STARTS the chamber. temp can be from type int, float of str
def set_temp(self, temp):
typ = type(temp)
if (typ == str):
temp = float(temp)
neg = '0' if int(temp) >= 0 else '-'
if (abs(temp) < 10):
temp = neg + '00' + str(round(abs(temp), 1))
elif (temp <= 100 and temp >= -50):
temp = neg + '0' +str(round(abs(temp), 1))
else:
write_to_log("Error: temperature should be between -50C to 100C")
return
self.ser.write("$00E %s 0000.0 0000.0 0000.0 0000.0 010100000000111013" %temp)
self.ser.write("$00I\r\n")
#Return the temperature
def read_temp(self):
self.ser.write("$00I\r\n")
res = self.ser.readline()
res = res.split(" ")[1]
if (res[0] == '1'): #negative
res = '-' + res[2:]
else:
res = res[2:]
#write_to_log("Temperature is: %sC" %res)
return res
#Stops the function of the chamber
def stop_champer(self):
self.ser.write("$00E 0000.0 0000.0 0000.0 0000.0 0000.0 000100000000111013")
#Sets a temperature, and waits until it gets to the desirable temp.
def wait_for_temp(self,temp):
self.set_temp(temp)
time.sleep(0.5)
current_temp = float(self.read_temp())
write_to_log("wait for the temperature to reach %sC" %temp)
temp = float(temp)
while (current_temp <= temp-1 or current_temp >= temp+1):
time.sleep(1)
current_temp = float(self.read_temp())
write_to_log("Temperature has reached the desirable value")
def read_error(self):
self.ser.write("$00F\r\n")
time.sleep(1)
return self.ser.readline()
class Termotron3800():
def __init__(self,address):
self.tcp_ip = address[0]
self.port = address[1]
self.buffer_size = 1024
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.tcp_ip, self.port))
self.sock.send("IDEN?\r\n") #ask for device identity
self.name = self.read_line()
self.sock.send("VRSN?\r\n") #ask for software version
self.version = self.read_line()
#close connection with device
def close(self):
self.sock.close()
#read line from socket
def read_line(self):
line = ""
while True:
c = self.sock.recv(64)
if c == "":
break
elif "\r" in c:
line += c.split("\r")[0]
break
else:
line += c
return line
#before reading the device's answer:
def clear_buffer(self):
c = self.sock.recv(1)
while (c != ""):
c = self.sock.recv(16)
time.sleep(0.1)
#stop chamber from working
def stop_chamber(self):
err = "0"
k = 0
while (err != "5" and k<3): #5 means that Termotron received the stop command
self.sock.send("STOP\r\n") #send the STOP command
self.sock.send("SCOD?\r\n") #check if the command had been received
err = self.read_line()
k += 1
if (k == 10): #5 means that Termotron received the stop command
write_to_log("Error while sending STOP command to %s" %self.name)
def run_chamber(self):
self.sock.send("RUNM\r\n")
def set_temp(self, temp):
if type(temp != str):
temp = str(temp)
self.sock.send("SETP1,%s\r\n" %temp)
time.sleep(0.1)
self.run_chamber()
#read the current temperature of the chamber.
def read_temp(self):
self.sock.send("PVAR1?\r\n") #read the deviation from the configured value
time.sleep(0.1)
current_temp = self.read_line()
count = 0
while current_temp == '0' and count<3:
self.sock.send("PVAR1?\r\n")
current_temp = self.read_line()
count += 1
return float(current_temp)
def wait_for_temp(self, temp):
self.set_temp(temp)
time.sleep(0.1)
current_temp = self.read_temp()
while (abs(current_temp) <= abs(temp*0.98) or abs(current_temp) >= abs(temp*1.02)):
time.sleep(1)
current_temp = self.read_temp()
write_to_log("Temperature has reached the desirable value")