"""This file implements mcprotocol 3E type communication.
"""
import re
import time
import socket
import binascii
from . import mcprotocolerror
from . import mcprotocolconst as const
[docs]def isascii(text):
"""check text is all ascii character.
Python 3.6 does not support str.isascii()
"""
return all(ord(c) < 128 for c in text)
[docs]def twos_comp(val, mode="short"):
"""compute the 2's complement of int value val
"""
if mode =="byte":
bit = 8
elif mode =="short":
bit = 16
elif mode== "long":
bit = 32
else:
raise ValueError("cannnot calculate 2's complement")
if (val & (1 << (bit - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255
val = val - (1 << bit) # compute negative value
return val
[docs]def get_device_number(device):
"""Extract device number.
Ex: "D1000" → "1000"
"X0x1A" → "0x1A
"""
device_num = re.search(r"\d.*", device)
if device_num is None:
raise ValueError("Invalid device number, {}".format(device))
else:
device_num_str = device_num.group(0)
return device_num_str
[docs]class CommTypeError(Exception):
"""Communication type error. Communication type must be "binary" or "ascii"
"""
[docs] def __init__(self):
pass
def __str__(self):
return "communication type must be \"binary\" or \"ascii\""
[docs]class PLCTypeError(Exception):
"""PLC type error. PLC type must be"Q", "L", "QnA", "iQ-L", "iQ-R"
"""
[docs] def __init__(self):
pass
def __str__(self):
return "plctype must be \"Q\", \"L\", \"QnA\" \"iQ-L\" or \"iQ-R\""
[docs]class Type3E:
"""mcprotocol 3E communication class.
Attributes:
plctype(str): connect PLC type. "Q", "L", "QnA", "iQ-L", "iQ-R"
commtype(str): communication type. "binary" or "ascii". (Default: "binary")
subheader(int): Subheader for mc protocol
network(int): network No. of an access target. (0<= network <= 255)
pc(int): network module station No. of an access target. (0<= pc <= 255)
dest_moduleio(int): When accessing a multidrop connection station via network,
specify the start input/output number of a multidrop connection source module.
the CPU module of the multiple CPU system and redundant system.
dest_modulesta(int): accessing a multidrop connection station via network,
specify the station No. of aaccess target module
timer(int): time to raise Timeout error(/250msec). default=4(1sec)
If PLC elapsed this time, PLC returns Timeout answer.
Note: python socket timeout is always set timer+1sec. To recieve Timeout answer.
"""
plctype = const.Q_SERIES
commtype = const.COMMTYPE_BINARY
subheader = 0x5000
network = 0
pc = 0xFF
dest_moduleio = 0X3FF
dest_modulesta = 0X0
timer = 4 # MC protocol timeout. 250msec * 4 = 1 sec
soc_timeout = 2 # 2 sec
_is_connected = False
_SOCKBUFSIZE = 4096
_wordsize = 2 #how many byte is required to describe word value
#binary: 2, ascii:4.
_debug = False
[docs] def __init__(self, plctype ="Q"):
"""Constructor
"""
self._set_plctype(plctype)
def _set_debug(self, debug=False):
"""Turn on debug mode
"""
self._debug = debug
[docs] def connect(self, ip, port):
"""Connect to PLC
Args:
ip (str): ip address(IPV4) to connect PLC
port (int): port number of connect PLC
timeout (float): timeout second in communication
"""
self._ip = ip
self._port = port
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(self.soc_timeout)
self._sock.connect((ip, port))
self._is_connected = True
[docs] def close(self):
"""Close connection
"""
self._sock.close()
self._is_connected = False
def _send(self, send_data):
"""send mc protorocl data
Args:
send_data(bytes): mc protocol data
"""
if self._is_connected:
if self._debug:
print(binascii.hexlify(send_data))
self._sock.send(send_data)
else:
raise Exception("socket is not connected. Please use connect method")
def _recv(self):
"""recieve mc protocol data
Returns:
recv_data
"""
recv_data = self._sock.recv(self._SOCKBUFSIZE)
return recv_data
def _set_plctype(self, plctype):
"""Check PLC type. If plctype is vaild, set self.commtype.
Args:
plctype(str): PLC type. "Q", "L", "QnA", "iQ-L", "iQ-R",
"""
if plctype == "Q":
self.plctype = const.Q_SERIES
elif plctype == "L":
self.plctype = const.L_SERIES
elif plctype == "QnA":
self.plctype = const.QnA_SERIES
elif plctype == "iQ-L":
self.plctype = const.iQL_SERIES
elif plctype == "iQ-R":
self.plctype = const.iQR_SERIES
else:
raise PLCTypeError()
def _set_commtype(self, commtype):
"""Check communication type. If commtype is vaild, set self.commtype.
Args:
commtype(str): communication type. "binary" or "ascii". (Default: "binary")
"""
if commtype == "binary":
self.commtype = const.COMMTYPE_BINARY
self._wordsize = 2
elif commtype == "ascii":
self.commtype = const.COMMTYPE_ASCII
self._wordsize = 4
else:
raise CommTypeError()
def _get_answerdata_index(self):
"""Get answer data index from return data byte.
"""
if self.commtype == const.COMMTYPE_BINARY:
return 11
else:
return 22
def _get_answerstatus_index(self):
"""Get command status index from return data byte.
"""
if self.commtype == const.COMMTYPE_BINARY:
return 9
else:
return 18
[docs] def setaccessopt(self, commtype=None, network=None,
pc=None, dest_moduleio=None,
dest_modulesta=None, timer_sec=None):
"""Set mc protocol access option.
Args:
commtype(str): communication type. "binary" or "ascii". (Default: "binary")
network(int): network No. of an access target. (0<= network <= 255)
pc(int): network module station No. of an access target. (0<= pc <= 255)
dest_moduleio(int): When accessing a multidrop connection station via network,
specify the start input/output number of a multidrop connection source module.
the CPU module of the multiple CPU system and redundant system.
dest_modulesta(int): accessing a multidrop connection station via network,
specify the station No. of aaccess target module
timer_sec(int): Time out to return Timeout Error from PLC.
MC protocol time is per 250msec, but for ease, setaccessopt requires per sec.
Socket time out is set timer_sec + 1 sec.
"""
if commtype:
self._set_commtype(commtype)
if network:
try:
network.to_bytes(1, "little")
self.network = network
except:
raise ValueError("network must be 0 <= network <= 255")
if pc:
try:
pc.to_bytes(1, "little")
self.pc = pc
except:
raise ValueError("pc must be 0 <= pc <= 255")
if dest_moduleio:
try:
dest_moduleio.to_bytes(2, "little")
self.dest_moduleio = dest_moduleio
except:
raise ValueError("dest_moduleio must be 0 <= dest_moduleio <= 65535")
if dest_modulesta:
try:
dest_modulesta.to_bytes(1, "little")
self.dest_modulesta = dest_modulesta
except:
raise ValueError("dest_modulesta must be 0 <= dest_modulesta <= 255")
if timer_sec:
try:
timer_250msec = 4 * timer_sec
timer_250msec.to_bytes(2, "little")
self.timer = timer_250msec
self.soc_timeout = timer_sec + 1
if self._is_connected:
self._sock.settimeout(self.soc_timeout)
except:
raise ValueError("timer_sec must be 0 <= timer_sec <= 16383, / sec")
return None
def _make_senddata(self, requestdata):
"""Makes send mc protorocl data.
Args:
requestdata(bytes): mc protocol request data.
data must be converted according to self.commtype
Returns:
mc_data(bytes): send mc protorocl data
"""
mc_data = bytes()
# subheader is big endian
if self.commtype == const.COMMTYPE_BINARY:
mc_data += self.subheader.to_bytes(2, "big")
else:
mc_data += format(self.subheader, "x").ljust(4, "0").upper().encode()
mc_data += self._encode_value(self.network, "byte")
mc_data += self._encode_value(self.pc, "byte")
mc_data += self._encode_value(self.dest_moduleio, "short")
mc_data += self._encode_value(self.dest_modulesta, "byte")
#add self.timer size
mc_data += self._encode_value(self._wordsize + len(requestdata), "short")
mc_data += self._encode_value(self.timer, "short")
mc_data += requestdata
return mc_data
def _make_commanddata(self, command, subcommand):
"""make mc protocol command and subcommand data
Args:
command(int): command code
subcommand(int): subcommand code
Returns:
command_data(bytes):command data
"""
command_data = bytes()
command_data += self._encode_value(command, "short")
command_data += self._encode_value(subcommand, "short")
return command_data
def _make_devicedata(self, device):
"""make mc protocol device data. (device code and device number)
Args:
device(str): device. (ex: "D1000", "Y1")
Returns:
device_data(bytes): device data
"""
device_data = bytes()
devicetype = re.search(r"\D+", device)
if devicetype is None:
raise ValueError("Invalid device ")
else:
devicetype = devicetype.group(0)
if self.commtype == const.COMMTYPE_BINARY:
devicecode, devicebase = const.DeviceConstants.get_binary_devicecode(self.plctype, devicetype)
devicenum = int(get_device_number(device), devicebase)
if self.plctype is const.iQR_SERIES:
device_data += devicenum.to_bytes(4, "little")
device_data += devicecode.to_bytes(2, "little")
else:
device_data += devicenum.to_bytes(3, "little")
device_data += devicecode.to_bytes(1, "little")
else:
devicecode, devicebase = const.DeviceConstants.get_ascii_devicecode(self.plctype, devicetype)
devicenum = str(int(get_device_number(device), devicebase))
if self.plctype is const.iQR_SERIES:
device_data += devicecode.encode()
device_data += devicenum.rjust(8, "0").upper().encode()
else:
device_data += devicecode.encode()
device_data += devicenum.rjust(6, "0").upper().encode()
return device_data
def _encode_value(self, value, mode="short", isSigned=False):
"""encode mc protocol value data to byte.
Args:
value(int): readsize, write value, and so on.
mode(str): value type.
isSigned(bool): convert as sigend value
Returns:
value_byte(bytes): value data
"""
try:
if self.commtype == const.COMMTYPE_BINARY:
if mode == "byte":
value_byte = value.to_bytes(1, "little", signed=isSigned)
elif mode == "short":
value_byte = value.to_bytes(2, "little", signed=isSigned)
elif mode == "long":
value_byte = value.to_bytes(4, "little", signed=isSigned)
else:
raise ValueError("Please input value type")
else:
#check value range by to_bytes
#convert to unsigned value
if mode == "byte":
value.to_bytes(1, "little", signed=isSigned)
value = value & 0xff
value_byte = format(value, "x").rjust(2, "0").upper().encode()
elif mode == "short":
value.to_bytes(2, "little", signed=isSigned)
value = value & 0xffff
value_byte = format(value, "x").rjust(4, "0").upper().encode()
elif mode == "long":
value.to_bytes(4, "little", signed=isSigned)
value = value & 0xffffffff
value_byte = format(value, "x").rjust(8, "0").upper().encode()
else:
raise ValueError("Please input value type")
except:
raise ValueError("Exceeeded Device value range")
return value_byte
def _decode_value(self, byte, mode="short", isSigned=False):
"""decode byte to value
Args:
byte(bytes): readsize, write value, and so on.
mode(str): value type.
isSigned(bool): convert as sigend value
Returns:
value_data(int): value data
"""
try:
if self.commtype == const.COMMTYPE_BINARY:
value =int.from_bytes(byte, "little", signed = isSigned)
else:
value = int(byte.decode(), 16)
if isSigned:
value = twos_comp(value, mode)
except:
raise ValueError("Could not decode byte to value")
return value
def _check_cmdanswer(self, recv_data):
"""check command answer. If answer status is not 0, raise error according to answer
"""
answerstatus_index = self._get_answerstatus_index()
answerstatus = self._decode_value(recv_data[answerstatus_index:answerstatus_index+self._wordsize], "short")
mcprotocolerror.check_mcprotocol_error(answerstatus)
return None
[docs] def batchread_wordunits(self, headdevice, readsize):
"""batch read in word units.
Args:
headdevice(str): Read head device. (ex: "D1000")
readsize(int): Number of read device points
Returns:
wordunits_values(list[int]): word units value list
"""
command = 0x0401
if self.plctype == const.iQR_SERIES:
subcommand = 0x0002
else:
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._make_devicedata(headdevice)
request_data += self._encode_value(readsize)
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
word_values = []
data_index = self._get_answerdata_index()
for _ in range(readsize):
wordvalue = self._decode_value(recv_data[data_index:data_index+self._wordsize], mode="short", isSigned=True)
word_values.append(wordvalue)
data_index += self._wordsize
return word_values
[docs] def batchread_bitunits(self, headdevice, readsize):
"""batch read in bit units.
Args:
headdevice(str): Read head device. (ex: "X1")
size(int): Number of read device points
Returns:
bitunits_values(list[int]): bit units value(0 or 1) list
"""
command = 0x0401
if self.plctype == const.iQR_SERIES:
subcommand = 0x0003
else:
subcommand = 0x0001
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._make_devicedata(headdevice)
request_data += self._encode_value(readsize)
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
bit_values = []
if self.commtype == const.COMMTYPE_BINARY:
for i in range(readsize):
data_index = i//2 + self._get_answerdata_index()
value = int.from_bytes(recv_data[data_index:data_index+1], "little")
#if i//2==0, bit value is 4th bit
if(i%2==0):
bitvalue = 1 if value & (1<<4) else 0
else:
bitvalue = 1 if value & (1<<0) else 0
bit_values.append(bitvalue)
else:
data_index = self._get_answerdata_index()
byte_range = 1
for i in range(readsize):
bitvalue = int(recv_data[data_index:data_index+byte_range].decode())
bit_values.append(bitvalue)
data_index += byte_range
return bit_values
[docs] def batchwrite_wordunits(self, headdevice, values):
"""batch write in word units.
Args:
headdevice(str): Write head device. (ex: "D1000")
values(list[int]): Write values.
"""
write_size = len(values)
command = 0x1401
if self.plctype == const.iQR_SERIES:
subcommand = 0x0002
else:
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._make_devicedata(headdevice)
request_data += self._encode_value(write_size)
for value in values:
request_data += self._encode_value(value, isSigned=True)
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def batchwrite_bitunits(self, headdevice, values):
"""batch read in bit units.
Args:
headdevice(str): Write head device. (ex: "X10")
values(list[int]): Write values. each value must be 0 or 1. 0 is OFF, 1 is ON.
"""
write_size = len(values)
#check values
for value in values:
if not (value == 0 or value == 1):
raise ValueError("Each value must be 0 or 1. 0 is OFF, 1 is ON.")
command = 0x1401
if self.plctype == const.iQR_SERIES:
subcommand = 0x0003
else:
subcommand = 0x0001
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._make_devicedata(headdevice)
request_data += self._encode_value(write_size)
if self.commtype == const.COMMTYPE_BINARY:
#evary value is 0 or 1.
#Even index's value turns on or off 4th bit, odd index's value turns on or off 0th bit.
#First, create send data list. Length must be ceil of len(values).
bit_data = [0 for _ in range((len(values) + 1)//2)]
for index, value in enumerate(values):
#calc which index data should be turns on.
value_index = index//2
#calc which bit should be turns on.
bit_index = 4 if index%2 == 0 else 0
#turns on or off value of 4th or 0th bit, depends on value
bit_value = value << bit_index
#Take or of send data
bit_data[value_index] |= bit_value
request_data += bytes(bit_data)
else:
for value in values:
request_data += str(value).encode()
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def randomread(self, word_devices, dword_devices):
"""read word units and dword units randomly.
Moniter condition does not support.
Args:
word_devices(list[str]): Read device word units. (ex: ["D1000", "D1010"])
dword_devices(list[str]): Read device dword units. (ex: ["D1000", "D1012"])
Returns:
word_values(list[int]): word units value list
dword_values(list[int]): dword units value list
"""
command = 0x0403
if self.plctype == const.iQR_SERIES:
subcommand = 0x0002
else:
subcommand = 0x0000
word_size = len(word_devices)
dword_size = len(dword_devices)
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(word_size, mode="byte")
request_data += self._encode_value(dword_size, mode="byte")
for word_device in word_devices:
request_data += self._make_devicedata(word_device)
for dword_device in dword_devices:
request_data += self._make_devicedata(dword_device)
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
data_index = self._get_answerdata_index()
word_values= []
dword_values= []
for word_device in word_devices:
wordvalue = self._decode_value(recv_data[data_index:data_index+self._wordsize], mode="short", isSigned=True)
word_values.append(wordvalue)
data_index += self._wordsize
for dword_device in dword_devices:
dwordvalue = self._decode_value(recv_data[data_index:data_index+self._wordsize*2], mode="long", isSigned=True)
dword_values.append(dwordvalue)
data_index += self._wordsize*2
return word_values, dword_values
[docs] def randomwrite(self, word_devices, word_values,
dword_devices, dword_values):
"""write word units and dword units randomly.
Args:
word_devices(list[str]): Write word devices. (ex: ["D1000", "D1020"])
word_values(list[int]): Values for each word devices. (ex: [100, 200])
dword_devices(list[str]): Write dword devices. (ex: ["D1000", "D1020"])
dword_values(list[int]): Values for each dword devices. (ex: [100, 200])
"""
if len(word_devices) != len(word_values):
raise ValueError("word_devices and word_values must be same length")
if len(dword_devices) != len(dword_values):
raise ValueError("dword_devices and dword_values must be same length")
word_size = len(word_devices)
dword_size = len(dword_devices)
command = 0x1402
if self.plctype == const.iQR_SERIES:
subcommand = 0x0002
else:
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(word_size, mode="byte")
request_data += self._encode_value(dword_size, mode="byte")
for word_device, word_value in zip(word_devices, word_values):
request_data += self._make_devicedata(word_device)
request_data += self._encode_value(word_value, mode="short", isSigned=True)
for dword_device, dword_value in zip(dword_devices, dword_values):
request_data += self._make_devicedata(dword_device)
request_data += self._encode_value(dword_value, mode="long", isSigned=True)
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def randomwrite_bitunits(self, bit_devices, values):
"""write bit units randomly.
Args:
bit_devices(list[str]): Write bit devices. (ex: ["X10", "X20"])
values(list[int]): Write values. each value must be 0 or 1. 0 is OFF, 1 is ON.
"""
if len(bit_devices) != len(values):
raise ValueError("bit_devices and values must be same length")
write_size = len(values)
#check values
for value in values:
if not (value == 0 or value == 1):
raise ValueError("Each value must be 0 or 1. 0 is OFF, 1 is ON.")
command = 0x1402
if self.plctype == const.iQR_SERIES:
subcommand = 0x0003
else:
subcommand = 0x0001
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(write_size, mode="byte")
for bit_device, value in zip(bit_devices, values):
request_data += self._make_devicedata(bit_device)
#byte value for iQ-R requires 2 byte data
if self.plctype == const.iQR_SERIES:
request_data += self._encode_value(value, mode="short", isSigned=True)
else:
request_data += self._encode_value(value, mode="byte", isSigned=True)
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def remote_run(self, clear_mode, force_exec=False):
"""Run PLC
Args:
clear_mode(int): Clear mode. 0: does not clear. 1: clear except latch device. 2: clear all.
force_exec(bool): Force to execute if PLC is operated remotely by other device.
"""
if not (clear_mode == 0 or clear_mode == 1 or clear_mode == 2):
raise ValueError("clear_device must be 0, 1 or 2. 0: does not clear. 1: clear except latch device. 2: clear all.")
if not (force_exec is True or force_exec is False):
raise ValueError("force_exec must be True or False")
command = 0x1001
subcommand = 0x0000
if force_exec:
mode = 0x0003
else:
mode = 0x0001
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(mode, mode="short")
request_data += self._encode_value(clear_mode, mode="byte")
request_data += self._encode_value(0, mode="byte")
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def remote_stop(self):
""" Stop remotely.
"""
command = 0x1002
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(0x0001, mode="short") #fixed value
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def remote_pause(self, force_exec=False):
"""pause PLC remotely.
Args:
force_exec(bool): Force to execute if PLC is operated remotely by other device.
"""
if not (force_exec is True or force_exec is False):
raise ValueError("force_exec must be True or False")
command = 0x1003
subcommand = 0x0000
if force_exec:
mode = 0x0003
else:
mode = 0x0001
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(mode, mode="short")
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def remote_latchclear(self):
"""Clear latch remotely.
PLC must be stop when use this command.
"""
command = 0x1005
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(0x0001, mode="short") #fixed value
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def remote_reset(self):
"""Reset remotely.
PLC must be stop when use this command.
"""
command = 0x1006
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(0x0001, mode="short") #fixed value
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
#set time out 1 seconds. Because remote reset may not return data since clone socket
try:
self._sock.settimeout(1)
recv_data = self._recv()
self._check_cmdanswer(recv_data)
except:
self._is_connected = False
# after wait 1 sec
# try reconnect
time.sleep(1)
self.connect(self._ip, self._port)
return None
[docs] def read_cputype(self):
"""Read CPU type
Returns:
CPU type(str): CPU type
CPU code(str): CPU code (4 length number)
"""
command = 0x0101
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
data_index = self._get_answerdata_index()
cpu_name_length = 16
if self.commtype == const.COMMTYPE_BINARY:
cpu_type = recv_data[data_index:data_index+cpu_name_length].decode()
cpu_type = cpu_type.replace("\x20", "")
cpu_code = int.from_bytes(recv_data[data_index+cpu_name_length:], "little")
cpu_code = format(cpu_code, "x").rjust(4, "0")
else:
cpu_type = recv_data[data_index:data_index+cpu_name_length].decode()
cpu_type = cpu_type.replace("\x20", "")
cpu_code = recv_data[data_index+cpu_name_length:].decode()
return cpu_type, cpu_code
[docs] def remote_unlock(self, password="", request_input=False):
"""Unlock PLC by inputting password.
Args:
password(str): Remote password
request_input(bool): If true, require inputting password.
If false, use password.
"""
if request_input:
password = input("Please enter password\n")
if isascii(password) is False:
raise ValueError("password must be only ascii code")
if self.plctype is const.iQR_SERIES:
if not (6 <= len(password) <= 32):
raise ValueError("password length must be from 6 to 32")
else:
if not (4 == len(password)):
raise ValueError("password length must be 4")
command = 0x1630
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(len(password), mode="short")
request_data += password.encode()
send_data = self._make_senddata(request_data)
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def remote_lock(self, password="", request_input=False):
"""Lock PLC by inputting password.
Args:
password(str): Remote password
request_input(bool): If true, require inputting password.
If false, use password.
"""
if request_input:
password = input("Please enter password\n")
if isascii(password) is False:
raise ValueError("password must be only ascii code")
if self.plctype is const.iQR_SERIES:
if not (6 <= len(password) <= 32):
raise ValueError("password length must be from 6 to 32")
else:
if not (4 == len(password)):
raise ValueError("password length must be 4")
command = 0x1631
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(len(password), mode="short")
request_data += password.encode()
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
return None
[docs] def echo_test(self, echo_data):
"""Do echo test.
Send data and answer data should be same.
Args:
echo_data(str): send data to PLC
Returns:
answer_len(int): answer data length from PLC
answer_data(str): answer data from PLC
"""
if echo_data.isalnum() is False:
raise ValueError("echo_data must be only alphabet or digit code")
if not ( 1 <= len(echo_data) <= 960):
raise ValueError("echo_data length must be from 1 to 960")
command = 0x0619
subcommand = 0x0000
request_data = bytes()
request_data += self._make_commanddata(command, subcommand)
request_data += self._encode_value(len(echo_data), mode="short")
request_data += echo_data.encode()
send_data = self._make_senddata(request_data)
#send mc data
self._send(send_data)
#reciev mc data
recv_data = self._recv()
self._check_cmdanswer(recv_data)
data_index = self._get_answerdata_index()
answer_len = self._decode_value(recv_data[data_index:data_index+self._wordsize], mode="short")
answer = recv_data[data_index+self._wordsize:].decode()
return answer_len, answer