#************************************************************************** # # Copyright (c) 2021 - 2021 by esd electronics gmbh # # This software is copyrighted by and is the sole property of # esd gmbh. All rights, title, ownership, or other interests # in the software remain the property of esd gmbh. This # software may only be used in accordance with the corresponding # license agreement. Any unauthorized use, duplication, transmission, # distribution, or disclosure of this software is expressly forbidden. # # This Copyright notice may not be removed or modified without prior # written consent of esd gmbh. # # esd gmbh, reserves the right to modify this software without notice. # # esd electronics gmbh Tel. +49-511-37298-0 # Vahrenwalder Strasse 207 Fax. +49-511-37298-68 # 30165 Hannover http://www.esd.eu # Germany sales@esd.eu # #************************************************************************** # # Module Name: ntcanfd.py # Version: 1.10 # Date: 08-June-2021 # Author: Hu/esd # Language: python # #************************************************************************** # Description # NTCAN via ctypes # Note: This script works only on litte endian machines (Intel PC) ! # For usage examples see at the end of this module in the test area. # # Edit Date/Ver Edit Description # ============== =================================================== # HU 2021-04-30 created # HU CIF/CMSG/T functions compatibility import platform import ctypes import array as arr #settings EXCEPTIONS=True #constants _SYSTEM_PLATFORM = platform.system() MAX_ID_11=2047 MAX_ID_29=0x1FFFFFFF NO_TIMEOUT=0 NO_FLAGS=0 if "Windows" == _SYSTEM_PLATFORM: NTCAN_MAX_TX_QUEUESIZE = 16383 NTCAN_MAX_RX_QUEUESIZE = 16383 elif "Java" == _SYSTEM_PLATFORM: NTCAN_MAX_TX_QUEUESIZE = 16383 NTCAN_MAX_RX_QUEUESIZE = 16383 else: NTCAN_MAX_TX_QUEUESIZE = 2047 NTCAN_MAX_RX_QUEUESIZE = 2047 # Flags in CMSG / CMSG_T / CMSG_X member 'len' NTCAN_FD = 0x80 NTCAN_20B_BASE = 0x20000000 NTCAN_NO_BRS = 0x10 NTCAN_RTR = 0x10 NTCAN_NO_DATA = 0x20 NTCAN_INTERACTION = 0x20 # Value for CMSG_X, CMSG_T, CMSG_X member 'esi' NTCAN_ESI_FD_ERROR_PASSIVE=0xFF #Tx node is Error Passive #Handle specific flags in canOpen() NTCAN_MODE_NO_RTR = 0x00000010 # Ignore RTR frames NTCAN_MODE_NO_DATA = 0x00000020 # Ignore data frames NTCAN_MODE_NO_INTERACTION = 0x00000100 # No interaction NTCAN_MODE_MARK_INTERACTION = 0x00000200 # Mark interaction NTCAN_MODE_LOCAL_ECHO = 0x00000400 # Local echo NTCAN_MODE_TIMESTAMPED_TX = 0x00020000 # Timestamped TX NTCAN_MODE_FD = 0x00040000 # Enable CAN-FD support #NTCAN_MODE_OBJECT = 0x10000000 # Open in Rx object mode #Defines for canSetBaudrate() / canGetBaudrate() NTCAN_USER_BAUDRATE = 0x80000000 # Configure BTR register NTCAN_LISTEN_ONLY_MODE = 0x40000000 # Configure listen only mode NTCAN_USER_BAUDRATE_NUM = 0x20000000 # Numerical baudrate NTCAN_SELF_TEST_MODE = 0x10000000 # Configure self test mode NTCAN_AUTOBAUD = 0x00FFFFFE # Auto baudrate detection NTCAN_BAUD_FD = 0x00FFFFFD # CAN FD baudrate NTCAN_NO_BAUDRATE = 0x7FFFFFFF # No baudrate configured #Flags of NTCAN_BAUDRATE_X NTCAN_BAUDRATE_FLAG_FD = 0x0001 #Enable CAN FD Mode NTCAN_BAUDRATE_FLAG_LOM = 0x0002 #Enable Listen Only mode NTCAN_BAUDRATE_FLAG_STM = 0x0004 #Enable Self test mode NTCAN_BAUDRATE_FLAG_TRS = 0x0008 #Enable Triple Sampling # Mode of NTCAN_BAUDRATE_X NTCAN_BAUDRATE_MODE_DISABLE = 0 #Remove from bus NTCAN_BAUDRATE_MODE_INDEX = 1 #ESD (CiA) bit rate idx NTCAN_BAUDRATE_MODE_BTR_CTRL = 2 #BTR values (Controller) NTCAN_BAUDRATE_MODE_BTR_CANONICAL = 3 #BTR values (Canonical) NTCAN_BAUDRATE_MODE_NUM = 4 #Numerical bit rate NTCAN_BAUDRATE_MODE_AUTOBAUD = 5 #Autobaud #Predefined CiA-recommended baudrates for canSetBaudrate() / canGetBaudrate() of the ESD baudrate table NTCAN_BAUD_1000 = 0x00 NTCAN_BAUD_800 = 0x0E NTCAN_BAUD_500 = 0x02 NTCAN_BAUD_250 = 0x04 NTCAN_BAUD_125 = 0x06 NTCAN_BAUD_100 = 0x07 NTCAN_BAUD_50 = 0x09 NTCAN_BAUD_20 = 0x0B NTCAN_BAUD_10 = 0x0D #Predefined CAN bit rates for the data phase of CAN FD communication NTCAN_BAUD_10000 = 0x15 NTCAN_BAUD_8000 = 0x14 NTCAN_BAUD_5000 = 0x13 NTCAN_BAUD_4000 = 0x12 NTCAN_BAUD_2000 = 0x11 # Data len codes DLC_LEN_0 = 0 DLC_LEN_1 = 1 DLC_LEN_2 = 2 DLC_LEN_3 = 3 DLC_LEN_4 = 4 DLC_LEN_5 = 5 DLC_LEN_6 = 6 DLC_LEN_7 = 7 DLC_LEN_8 = 8 # Data len codes CAN FD only DLC_LEN_12 = 9 DLC_LEN_16 = 10 DLC_LEN_20 = 11 DLC_LEN_24 = 12 DLC_LEN_32 = 13 DLC_LEN_48 = 14 DLC_LEN_64 = 15 DLC_TO_DATA_LEN = arr.array('B', [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64 ]) DATA_LEN_TO_DLC = arr.array('B', [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 10, 10, 10, 10, 11, 11, 11, 11, 12, 12, 12, 12, 13, 13, 13, 13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15 ]) # Interface/Driver feature flags NTCAN_FEATURE_FULL_CAN = (1<<0) # Full CAN controller NTCAN_FEATURE_CAN_20B = (1<<1) # CAN 2.OB support NTCAN_FEATURE_DEVICE_NET = (1<<2) # Device net adapter NTCAN_FEATURE_CYCLIC_TX = (1<<3) # Cyclic Tx support NTCAN_FEATURE_TIMESTAMPED_TX = (1<<3) # SAME AS CYCLIC_TX, timestamped TX support NTCAN_FEATURE_RX_OBJECT_MODE = (1<<4) # Rx object mode support NTCAN_FEATURE_TIMESTAMP = (1<<5) # Timestamp support NTCAN_FEATURE_LISTEN_ONLY_MODE = (1<<6) # Listen-only-mode support NTCAN_FEATURE_SMART_DISCONNECT = (1<<7) # Go-from-bus-after-last-close NTCAN_FEATURE_LOCAL_ECHO = (1<<8) # Interaction w. local echo NTCAN_FEATURE_SMART_ID_FILTER = (1<<9) # Adaptive ID filter NTCAN_FEATURE_SCHEDULING = (1<<10) # Scheduling feature NTCAN_FEATURE_DIAGNOSTIC = (1<<11) # CAN bus diagnostic support NTCAN_FEATURE_ERROR_INJECTION = (1<<12) # esdACC error injection support NTCAN_FEATURE_IRIGB = (1<<13) # IRIG-B support NTCAN_FEATURE_PXI = (1<<14) # Board supports backplane clock and start trigger for timestamp NTCAN_FEATURE_CAN_FD = (1<<15) # CAN-FD support NTCAN_FEATURE_SELF_TEST = (1<<16) # Self-test mode support NTCAN_FEATURE_TRIPLE_SAMPLING = (1<<17) # Triple sampling support #Bus states delivered by status-event (NTCAN_EV_CAN_ERROR) and canIoctl(NTCAN_IOCTL_GET_CTRL_STATUS) NTCAN_BUSSTATE_OK = 0x00 NTCAN_BUSSTATE_WARN = 0x40 NTCAN_BUSSTATE_ERRPASSIVE = 0x80 NTCAN_BUSSTATE_BUSOFF = 0xC0 # IO controls NTCAN_IOCTL_FLUSH_RX_FIFO = 0x0001 #Flush Rx FIFO NTCAN_IOCTL_GET_RX_MSG_COUNT = 0x0002 #Ret # CMSG in Rx FIFO NTCAN_IOCTL_GET_RX_TIMEOUT = 0x0003 #Ret configured Rx tout NTCAN_IOCTL_GET_TX_TIMEOUT = 0x0004 #Ret configured Tx tout NTCAN_IOCTL_SET_20B_HND_FILTER = 0x0005 #Set 2.0B filter mask NTCAN_IOCTL_GET_SERIAL = 0x0006 #Get HW serial number NTCAN_IOCTL_GET_TIMESTAMP_FREQ = 0x0007 #Get timestamp frequency in Hz NTCAN_IOCTL_GET_TIMESTAMP = 0x0008 #Get timestamp counter NTCAN_IOCTL_ABORT_RX = 0x0009 #Abort pending Rx I/O NTCAN_IOCTL_ABORT_TX = 0x000A #Abort pending Tx I/O NTCAN_IOCTL_SET_RX_TIMEOUT = 0x000B #Change Rx timeout NTCAN_IOCTL_SET_TX_TIMEOUT = 0x000C #Change Tx timeout NTCAN_IOCTL_SET_BUSLOAD_INTERVAL = 0x0016 # Set busload event interval (ms) NTCAN_IOCTL_GET_BUSLOAD_INTERVAL = 0x0017 # Get busload event interval (ms) NTCAN_IOCTL_GET_BUS_STATISTIC = 0x0018 # Get CAN bus statistic NTCAN_IOCTL_GET_CTRL_STATUS = 0x0019 # Get Controller status NTCAN_IOCTL_GET_BITRATE_DETAILS = 0x001A # Get detailed baudrate info NTCAN_IOCTL_GET_NATIVE_HANDLE = 0x001B # Get native (OS) handle NTCAN_IOCTL_SET_HND_FILTER = 0x001C # Set hnd filter mask NTCAN_IOCTL_GET_INFO = 0x001D # Get NTCAN_INFO structure #NTCAN event BootloaderDefines NTCAN_EV_BASE = 0x40000000 NTCAN_EV_USER = 0x40000080 NTCAN_EV_LAST = 0x400000FF NTCAN_EV_CAN_ERROR = NTCAN_EV_BASE NTCAN_EV_BAUD_CHANGE = (NTCAN_EV_BASE + 0x1) NTCAN_EV_CAN_ERROR_EXT = (NTCAN_EV_BASE + 0x2) NTCAN_EV_BUSLOAD = (NTCAN_EV_BASE + 0x3) # Error codes SUCCESS=0 NTCAN_ERRNO_BASE = 0xE0000000 NTCAN_RX_TIMEOUT = (NTCAN_ERRNO_BASE + 1) NTCAN_RX_TIMEOUT = (NTCAN_ERRNO_BASE + 1) NTCAN_TX_TIMEOUT = (NTCAN_ERRNO_BASE + 2) NTCAN_TX_ERROR = (NTCAN_ERRNO_BASE + 4) NTCAN_CONTR_OFF_BUS = (NTCAN_ERRNO_BASE + 5) NTCAN_CONTR_BUSY = (NTCAN_ERRNO_BASE + 6) NTCAN_CONTR_WARN = (NTCAN_ERRNO_BASE + 7) NTCAN_OLDDATA = (NTCAN_ERRNO_BASE + 8) NTCAN_NO_ID_ENABLED = (NTCAN_ERRNO_BASE + 9) NTCAN_ID_ALREADY_ENABLED = (NTCAN_ERRNO_BASE + 10) NTCAN_ID_NOT_ENABLED = (NTCAN_ERRNO_BASE + 11) NTCAN_INVALID_FIRMWARE = (NTCAN_ERRNO_BASE + 13) NTCAN_MESSAGE_LOST = (NTCAN_ERRNO_BASE + 14) NTCAN_INVALID_HARDWARE = (NTCAN_ERRNO_BASE + 15) NTCAN_PENDING_WRITE = (NTCAN_ERRNO_BASE + 16) NTCAN_PENDING_READ = (NTCAN_ERRNO_BASE + 17) NTCAN_INVALID_DRIVER = (NTCAN_ERRNO_BASE + 18) NTCAN_WRONG_DEVICE_STATE = (NTCAN_ERRNO_BASE + 19) NTCAN_HANDLE_FORCED_CLOSE = (NTCAN_ERRNO_BASE + 20) NTCAN_NOT_SUPPORTED = (NTCAN_ERRNO_BASE + 21) NTCAN_CONTR_ERR_PASSIVE = (NTCAN_ERRNO_BASE + 22) NTCAN_ERROR_NO_BAUDRATE = (NTCAN_ERRNO_BASE + 23) NTCAN_ERROR_LOM = (NTCAN_ERRNO_BASE + 24) NTCAN_SOCK_CONN_TIMEOUT = (NTCAN_ERRNO_BASE + 0x80) NTCAN_SOCK_CMD_TIMEOUT = (NTCAN_ERRNO_BASE + 0x81) NTCAN_SOCK_HOST_NOT_FOUND = (NTCAN_ERRNO_BASE + 0x82) def _str_cmsg_(msg): return msg.__class__.__name__ + "("+\ (('29-bit ID:$' if msg.is_id29 else '11-bit ID:$') if not msg.is_event else 'event ID$:')+\ (format(msg.id29, '08x') if msg.is_id29 else format(msg.id_number, '03x'))+\ ", dlc:$" + format(msg.dlc, '02x')+\ ", data len:" + str(msg.data_len)+\ ")" def ntcan_error_text( ntcanresult ): NTCAN_ERROR_FORMAT_LONG = 0x0000 buf = ctypes.create_string_buffer( 80 ) res = canFormatError( ntcanresult, NTCAN_ERROR_FORMAT_LONG, ctypes.byref( buf ), ctypes.sizeof( buf )-1 ) if 0==res: return buf.value.decode(encoding="ascii", errors="ignore") return "Error?" def _throwNtcanError(res): message = ntcan_error_text( res ) + " (NTCAN Error " + hex(res) +")" if res == NTCAN_RX_TIMEOUT\ or res == NTCAN_TX_TIMEOUT\ or res == NTCAN_OPERATION_ABORTED\ or res == NTCAN_WRONG_DEVICE_STATE\ or res == NTCAN_TX_ERROR\ or res == NTCAN_CONTR_OFF_BUS\ or res == NTCAN_CONTR_BUSY\ or res == NTCAN_CONTR_WARN\ or res == NTCAN_MESSAGE_LOST\ or res == NTCAN_PENDING_WRITE\ or res == NTCAN_PENDING_READ\ or res == NTCAN_NET_NOT_FOUND\ or res == NTCAN_HANDLE_FORCED_CLOSE\ or res == NTCAN_SOCK_CONN_TIMEOUT: raise IOError( message ) else: raise RuntimeError( message ) #raise IOError('') #raise ValueError('') #raise RuntimeError('') #raise MemoryError('') class CanMsg(ctypes.Structure): _s_data_init = False _can_write_func = None _can_send_func = None _can_read_func = None _can_take_func = None def __init__(self): if not CanMsg._s_data_init: CanMsg._can_write_func = CanFdHandle.can_write_x CanMsg._can_send_func = CanFdHandle.can_send_x CanMsg._can_read_func = CanFdHandle.can_read_x CanMsg._can_take_func = CanFdHandle.can_take_x CanMsg._s_data_init = True self.data = DataIndexer(self) # -> ntcanpy compatible indexer syntax: cmsg.data.c[5] def __str__(self): return _str_cmsg_(self) def set_id11(self,id): self.id = id def set_id29(self,id): self.id = id | NTCAN_20B_BASE def set_event_id(self,id): self.id = id | NTCAN_EV_BASE def get_id11(self): if (self.id&(NTCAN_20B_BASE|NTCAN_EV_BASE))!=0: return -1 else: return self.id & (NTCAN_20B_BASE-1) def get_id29(self): if 0==(self.id&NTCAN_20B_BASE): return -1 else: return self.id&(NTCAN_20B_BASE-1) def get_event_id(self): if 0==(self.id&NTCAN_EV_BASE): return -1 else: return self.id&(NTCAN_EV_BASE-1) def get_id_number(self): return self.id&(0x1FFFFFFF) def get_is_id11(self): return (self.get_id11() != -1) def get_is_id29(self): return (self.get_id29() != -1) def get_is_event(self): return (self.get_event_id() != -1) #set can classic rtr def set_rtr(self,len=0): self.len = len | NTCAN_RTR #set can classic data length def set_data8_len(self,len): self.len = len #set can fd data length by dlc code def set_data_fd_dlc(self, dlc): self.len = dlc | NTCAN_FD #set can fd data length with no bitrate switch flag by dlc code def set_data_fd_dlc_no_brs(self, dlc): self.len = dlc | NTCAN_NO_BRS | NTCAN_FD #set can fd data length(0-8,12,16,20,24,32,48,64) from desired length def set_data_fd_len(self, len): if (len>=0) and (len<=64): self.len = DATA_LEN_TO_DLC[len]|NTCAN_FD #set can fd data length with no bitrate switch from desired length def set_data_fd_len_no_brs(self, len): self.set_data_fd_len(len) self.len |= NTCAN_NO_BRS def get_dlc(self): return self.len&0xF def get_is_fd(self): return (self.len&NTCAN_FD)!=0 def get_is_rtr(self): return (not self.get_is_fd()) and ((self.len&NTCAN_RTR)!=0) def get_is_fd_no_brs(self): return self.get_is_fd() and ((self.len&NTCAN_NO_BRS)!=0) def get_is_fd_passive(self): return self.get_is_fd() and (NTCAN_ESI_FD_ERROR_PASSIVE==self.esi) #get data length classic or fd def get_data_len(self): if not self.get_is_fd(): if self.get_is_rtr(): return 0 else: return self.dlc&0xF else: return DLC_TO_DATA_LEN[self.dlc&0xF] id11 = property( fset=set_id11, fget=get_id11 ) id29 = property( fset=set_id29, fget=get_id29 ) event_id = property( fset=set_event_id, fget=get_event_id ) id_number = property( fget=get_id_number ) can_id = id_number #same is_id11 = property( fget=get_is_id11 ) is_id29 = property( fget=get_is_id29 ) is_event = property( fget=get_is_event ) data8_len = property( fset=set_data8_len, fget=get_data_len ) data_fd_dlc = property( fset=set_data_fd_dlc ) data_fd_dlc_no_brs = property( fset=set_data_fd_dlc_no_brs ) data_len = property( fget=get_data_len ) is_fd = property( fget=get_is_fd ) is_fd_no_brs = property( fget=get_is_fd_no_brs ) is_fd_passive = property( fget=get_is_fd_passive ) is_rtr = property( fget=get_is_rtr ) dlc = property( fget=get_dlc ) def set_data_byte(self,offset,val): self.candata[0+offset] = val def set_data_short(self,offset,val): self.candata[0+offset] = val&0xFF self.candata[1+offset] = (val>>8)&0xFF def set_data_long(self,offset,val): self.candata[0+offset] = val&0xFF self.candata[1+offset] = (val>>8)&0xFF self.candata[2+offset] = (val>>16)&0xFF self.candata[3+offset] = (val>>24)&0xFF def set_data_long_long(self,offset,val): self.candata[0+offset] = val&0xFF self.candata[1+offset] = (val>>8)&0xFF self.candata[2+offset] = (val>>16)&0xFF self.candata[3+offset] = (val>>24)&0xFF self.candata[4+offset] = (val>>32)&0xFF self.candata[5+offset] = (val>>40)&0xFF self.candata[6+offset] = (val>>48)&0xFF self.candata[7+offset] = (val>>56)&0xFF def set_data8_byte(self,val1,val2=0,val3=0,val4=0,val5=0,val6=0,val7=0,val8=0): self.candata[0] = val1 self.candata[1] = val2 self.candata[2] = val3 self.candata[3] = val4 self.candata[4] = val5 self.candata[5] = val6 self.candata[6] = val7 self.candata[7] = val8 def set_data8_short(self,val1,val2=0,val3=0,val4=0): self.candata[0] = val1&0xFF self.candata[1] = (val1>>8)&0xFF self.candata[2] = val2&0xFF self.candata[3] = (val2>>8)&0xFF self.candata[4] = val3&0xFF self.candata[5] = (val3>>8)&0xFF self.candata[6] = val4&0xFF self.candata[7] = (val4>>8)&0xFF def set_data8_long(self,val1,val2=0): self.candata[0] = val1&0xFF self.candata[1] = (val1>>8)&0xFF self.candata[2] = (val1>>16)&0xFF self.candata[3] = (val1>>24)&0xFF self.candata[4] = val2&0xFF self.candata[5] = (val2>>8)&0xFF self.candata[6] = (val2>>16)&0xFF self.candata[7] = (val2>>24)&0xFF def set_data8_long_long(self,val1): self.candata[0] = val1&0xFF self.candata[1] = (val1>>8)&0xFF self.candata[2] = (val1>>16)&0xFF self.candata[3] = (val1>>24)&0xFF self.candata[4] = (val1>>32)&0xFF self.candata[5] = (val1>>40)&0xFF self.candata[6] = (val1>>48)&0xFF self.candata[7] = (val1>>56)&0xFF def _canWriteSend(self, canFdHnd, id, dlc, send): if id != -1: self.id = id if send: res = canFdHnd.can_send(self) else: res = canFdHnd.can_write(self) if EXCEPTIONS and ( res != 0 ): _throwNtcanError(res) return res def canSendByte(self, canFdHnd, id=-1, dlc=-1, d0=0, d1=0, d2=0, d3=0, d4=0, d5=0, d6=0, d7=0, send=True ): #ntcanpy compatibility if dlc != -1: self.len = dlc if self.data_len != 0: self.set_data8_byte(d0,d1,d2,d3,d4,d5,d6,d7) return self._canWriteSend(canFdHnd, id, dlc, send) def canSendShort(self, canFdHnd, id=-1, dlc=-1, dh0=0, dh1=0, dh2=0, dh3=0, send=True ): #ntcanpy compatibility if dlc != -1: self.len = dlc if self.data_len != 0: self.set_data8_short(dh0,dh1,dh2,dh3) return self._canWriteSend(canFdHnd, id, dlc, send) def canSendLong(self, canFdHnd, id=-1, dlc=-1, dl0=0, dl1=0, send=True ): #ntcanpy compatibility if dlc != -1: self.len = dlc if self.data_len != 0: self.set_data8_long(dl0,dl1) return self._canWriteSend(canFdHnd, id, dlc, send) def canWriteByte(self, canFdHnd, id=-1, dlc=-1, d0=0, d1=0, d2=0, d3=0, d4=0, d5=0, d6=0, d7=0 ): #ntcanpy compatibility return self.canSendByte( canFdHnd, id, dlc, d0, d1, d2, d3, d4, d5, d6, d7, False ) def canWriteShort(self, canFdHnd, id=-1, dlc=-1, dh0=0, dh1=0, dh2=0, dh3=0 ): #ntcanpy compatibility return self.canSendShort( canFdHnd, id, dlc, dh0, dh1, dh2, dh3, False ) def canWriteLong(self, canFdHnd, id=-1, dlc=-1, dl0=0, dl1=0 ): #ntcanpy compatibility return self.canSendLong( canFdHnd, id, dlc, dl0, dl1, False ) def canRead(self,canFdHnd): #print("CanMsg:canRead") res = self.canTake(canFdHnd,False) return res def canTake(self,canFdHnd,take=True): #print("CanMsg:canTake") if take: #print("CanMsg:canTake/Read:take") res = canFdHnd.can_take(self) if -1 == res: res = 0 self.len |= NTCAN_NO_DATA #ntcanpy compatibility else: #print("CanMsg:canTake/Read:read") res = canFdHnd.can_read(self) if EXCEPTIONS and ( res != 0 ): _throwNtcanError(res) return res _fields_ = [ ("id", ctypes.c_int32), ("len", ctypes.c_int8), ("msg_lost", ctypes.c_int8), ("reserved", ctypes.c_int8), ("esi", ctypes.c_int8), ("candata", ctypes.c_uint8 * 64), #("candata", []), ("timestamp", ctypes.c_uint64) ] #end CanMsg class CanMsgT(ctypes.Structure): _s_data_init = False _can_write_func = None _can_send_func = None _can_read_func = None _can_take_func = None def __init__(self): if not CanMsgT._s_data_init: CanMsgT._can_write_func = CanFdHandle.can_write_t CanMsgT._can_send_func = CanFdHandle.can_send_t CanMsgT._can_read_func = CanFdHandle.can_read_t CanMsgT._can_take_func = CanFdHandle.can_take_t CanMsgT._s_data_init = True self.data = DataIndexer(self) # -> ntcanpy compatible indexer syntax: cmsg.data.c[5] def __str__(self): return _str_cmsg_(self) def set_id11(self,id): # CanMsg.set_id11(self,id) self.id = id def set_id29(self,id): CanMsg.set_id29(self,id) def set_event_id(self,id): CanMsg.set_event_id(self,id) def get_id11(self): if (self.id&(NTCAN_20B_BASE|NTCAN_EV_BASE))!=0: return -1 else: return self.id & (NTCAN_20B_BASE-1) def get_id29(self): return CanMsg.get_id29(self) def get_event_id(self): return CanMsg.get_event_id(self) def get_id_number(self): return CanMsg.get_id_number(self) def get_is_id11(self): return CanMsg.get_is_id11(self) def get_is_id29(self): return CanMsg.get_is_id29(self) def get_is_event(self): return CanMsg.get_is_event(self) def set_rtr(self,len=0): CanMsg.set_rtr(self,len) def set_data8_len(self,len): # CanMsg.set_data8_len(self,len) self.len = len def get_dlc(self): return CanMsg.get_dlc(self) def get_is_fd(self): return False def get_is_rtr(self): return CanMsg.get_is_rtr(self) def get_is_fd_no_brs(self): return False def get_is_fd_passive(self): return False def get_data_len(self): return CanMsg.get_data_len(self) id11 = property( fset=set_id11, fget=get_id11 ) id29 = property( fset=set_id29, fget=get_id29 ) event_id = property( fset=set_event_id, fget=get_event_id ) id_number = property( fget=get_id_number ) can_id = id_number #same is_id11 = property( fget=get_is_id11 ) is_id29 = property( fget=get_is_id29 ) is_event = property( fget=get_is_event ) data8_len = property( fset=set_data8_len, fget=get_data_len ) data_len = property( fget=get_data_len ) is_fd = property( fget=get_is_fd ) is_fd_no_brs = property( fget=get_is_fd_no_brs ) is_fd_passive = property( fget=get_is_fd_passive ) is_rtr = property( fget=get_is_rtr ) dlc = property( fget=get_dlc ) def set_data8_byte(self,val1,val2=0,val3=0,val4=0,val5=0,val6=0,val7=0,val8=0): # return CanMsg.set_data8_byte(self,val1,val2,val3,val4,val5,val6,val7,val8) self.candata[0] = val1 self.candata[1] = val2 self.candata[2] = val3 self.candata[3] = val4 self.candata[4] = val5 self.candata[5] = val6 self.candata[6] = val7 self.candata[7] = val8 def set_data8_short(self,val1,val2=0,val3=0,val4=0): return CanMsg.set_data8_short(self,val1,val2,val3,val4) def set_data8_long(self,val1,val2=0): return CanMsg.set_data8_long(self,val1,val2) def set_data8_long_long(self,val1): return CanMsg.set_data8_long_long(self,val1) def _canWriteSend(self, canFdHnd, id, dlc, send): return CanMsg._canWriteSend(self, canFdHnd, id, dlc, send) def canSendByte(self, canFdHnd, id=-1, dlc=-1, d0=0, d1=0, d2=0, d3=0, d4=0, d5=0, d6=0, d7=0, send=True ): return CanMsg.canSendByte(self, canFdHnd, id, dlc, d0, d1, d2, d3, d4, d5, d6, d7, send ) def canSendShort(self, canFdHnd, id=-1, dlc=-1, dh0=0, dh1=0, dh2=0, dh3=0, send=True ): return CanMsg.canSendShort(self, canFdHnd, id, dlc, dh0, dh1, dh2, dh3, send ) def canSendLong(self, canFdHnd, id=-1, dlc=-1, dl0=0, dl1=0, send=True ): return CanMsg.canSendLong(self, canFdHnd, id, dlc, dl0, dl1, send ) def canWriteByte(self, canFdHnd, id=-1, dlc=-1, d0=0, d1=0, d2=0, d3=0, d4=0, d5=0, d6=0, d7=0 ): return CanMsg.canWriteByte(self, canFdHnd, id, dlc, d0, d1, d2, d3, d4, d5, d6, d7 ) def canWriteShort(self, canFdHnd, id=-1, dlc=-1, dh0=0, dh1=0, dh2=0, dh3=0 ): return CanMsg.canWriteShort(self, canFdHnd, id, dlc, dh0, dh1, dh2, dh3 ) def canWriteLong(self, canFdHnd, id=-1, dlc=-1, dl0=0, dl1=0 ): return CanMsg.canWriteLong(self, canFdHnd, id, dlc, dl0, dl1 ) def canRead(self,canFdHnd): #print("CMSG_T:canRead") return CanMsg.canTake(self,canFdHnd,False) def canTake(self,canFdHnd,take=True): #print("CMSG_T:canTake") return CanMsg.canTake(self,canFdHnd,take) _fields_ = [ ("id", ctypes.c_int32), ("len", ctypes.c_int8), ("msg_lost", ctypes.c_int8), ("reserved", ctypes.c_int8), ("esi", ctypes.c_int8), ("candata", ctypes.c_uint8 * 8), ("timestamp", ctypes.c_uint64) ] #end CMSG_T class CanMsgC(ctypes.Structure): _s_data_init = False _can_write_func = None _can_send_func = None _can_read_func = None _can_take_func = None def __init__(self): if not CanMsgC._s_data_init: CanMsgC._can_write_func = CanFdHandle.can_write_c CanMsgC._can_send_func = CanFdHandle.can_send_c CanMsgC._can_read_func = CanFdHandle.can_read_c CanMsgC._can_take_func = CanFdHandle.can_take_c CanMsgC._s_data_init = True self.data = DataIndexer(self) # -> ntcanpy compatible indexer syntax: cmsg.data.c[5] def __str__(self): return _str_cmsg_(self) def set_id11(self,id): self.id = id # CanMsg.set_id11(self,id) def set_id29(self,id): CanMsg.set_id29(self,id) def set_event_id(self,id): CanMsg.set_event_id(self,id) def get_id11(self): if (self.id&(NTCAN_20B_BASE|NTCAN_EV_BASE))!=0: return -1 else: return self.id & (NTCAN_20B_BASE-1) # return CanMsg.get_id11(self) def get_id29(self): if 0==(self.id&NTCAN_20B_BASE): return -1 else: return self.id&(NTCAN_20B_BASE-1) # return CanMsg.get_id29(self) def get_event_id(self): if 0==(self.id&NTCAN_EV_BASE): return -1 else: return self.id&(NTCAN_EV_BASE-1) # return CanMsg.get_event_id(self) def get_id_number(self): return self.id&(0x1FFFFFFF) # return CanMsg.get_id_number(self) def get_is_id11(self): return CanMsg.get_is_id11(self) def get_is_id29(self): return (self.get_id29() != -1) # return CanMsg.get_is_id29(self) def get_is_event(self): return (self.get_event_id() != -1) # return CanMsg.get_is_event(self) def set_rtr(self,len=0): self.len = len | NTCAN_RTR # CanMsg.set_rtr(self,len) def set_data8_len(self,len): self.len = len # CanMsg.set_data8_len(self,len) def get_dlc(self): return self.len&0xF # return CanMsg.get_dlc(self) def get_is_fd(self): return False def get_is_rtr(self): return (not self.get_is_fd()) and ((self.len&NTCAN_RTR)!=0) # return CanMsg.get_is_rtr(self) def get_is_fd_no_brs(self): return False def get_is_fd_passive(self): return False def get_data_len(self): if not self.get_is_fd(): if self.get_is_rtr(): return 0 else: return self.dlc&0xF else: return DLC_TO_DATA_LEN[self.dlc&0xF] # return CanMsg.get_data_len(self) id11 = property( fset=set_id11, fget=get_id11 ) id29 = property( fset=set_id29, fget=get_id29 ) event_id = property( fset=set_event_id, fget=get_event_id ) id_number = property( fget=get_id_number ) can_id = id_number #same is_id11 = property( fget=get_is_id11 ) is_id29 = property( fget=get_is_id29 ) is_event = property( fget=get_is_event ) data8_len = property( fset=set_data8_len, fget=get_data_len ) data_len = property( fget=get_data_len ) is_fd = property( fget=get_is_fd ) is_fd_no_brs = property( fget=get_is_fd_no_brs ) is_fd_passive = property( fget=get_is_fd_passive ) is_rtr = property( fget=get_is_rtr ) dlc = property( fget=get_dlc ) def set_data8_byte(self,val1,val2=0,val3=0,val4=0,val5=0,val6=0,val7=0,val8=0): self.candata[0] = val1 self.candata[1] = val2 self.candata[2] = val3 self.candata[3] = val4 self.candata[4] = val5 self.candata[5] = val6 self.candata[6] = val7 self.candata[7] = val8 # return CanMsg.set_data8_byte(self,val1,val2,val3,val4,val5,val6,val7,val8) def set_data8_short(self,val1,val2=0,val3=0,val4=0): return CanMsg.set_data8_short(self,val1,val2,val3,val4) def set_data8_long(self,val1,val2=0): return CanMsg.set_data8_long(self,val1,val2) def set_data8_long_long(self,val1): return CanMsg.set_data8_long_long(self,val1) def _canWriteSend(self, canFdHnd, id, dlc, send): return CanMsg._canWriteSend(self, canFdHnd, id, dlc, send) def canSendByte(self, canFdHnd, id=-1, dlc=-1, d0=0, d1=0, d2=0, d3=0, d4=0, d5=0, d6=0, d7=0, send=True ): return CanMsg.canSendByte(self, canFdHnd, id, dlc, d0, d1, d2, d3, d4, d5, d6, d7, send ) def canSendShort(self, canFdHnd, id=-1, dlc=-1, dh0=0, dh1=0, dh2=0, dh3=0, send=True ): return CanMsg.canSendShort(self, canFdHnd, id, dlc, dh0, dh1, dh2, dh3, send ) def canSendLong(self, canFdHnd, id=-1, dlc=-1, dl0=0, dl1=0, send=True ): return CanMsg.canSendLong(self, canFdHnd, id, dlc, dl0, dl1, send ) def canWriteByte(self, canFdHnd, id=-1, dlc=-1, d0=0, d1=0, d2=0, d3=0, d4=0, d5=0, d6=0, d7=0 ): return CanMsg.canWriteByte(self, canFdHnd, id, dlc, d0, d1, d2, d3, d4, d5, d6, d7 ) def canWriteShort(self, canFdHnd, id=-1, dlc=-1, dh0=0, dh1=0, dh2=0, dh3=0 ): return CanMsg.canWriteShort(self, canFdHnd, id, dlc, dh0, dh1, dh2, dh3 ) def canWriteLong(self, canFdHnd, id=-1, dlc=-1, dl0=0, dl1=0 ): return CanMsg.canWriteLong(self, canFdHnd, id, dlc, dl0, dl1 ) def canRead(self,canFdHnd): #print("CMSG_C:canRead") return CanMsg.canTake(self,canFdHnd,False) def canTake(self,canFdHnd,take=True): #print("CMSG_C:canTake") return CanMsg.canTake(self,canFdHnd,take) _fields_ = [ ("id", ctypes.c_int32), ("len", ctypes.c_int8), ("msg_lost", ctypes.c_int8), ("reserved", ctypes.c_int8), ("esi", ctypes.c_int8), ("candata", ctypes.c_uint8 * 8) ] #end CMSG class NTCAN_BAUDRATE_X_BTD(ctypes.Structure): _fields_ = [ ("mode", ctypes.c_uint16), ("flags", ctypes.c_uint16), ("reserved", ctypes.c_uint32), ("arb_brp", ctypes.c_uint16), ("arb_tseg1", ctypes.c_uint16), ("arb_tseg2", ctypes.c_uint16), ("arb_sjw", ctypes.c_uint16), ("data_brp", ctypes.c_uint16), ("data_tseg1", ctypes.c_uint16), ("data_tseg2", ctypes.c_uint16), ("data_sjw", ctypes.c_uint16) ] #end NTCAN_BAUDRATE_X_BTD class NTCAN_BAUDRATE_X(ctypes.Structure): def btd(self): btd = NTCAN_BAUDRATE_X_BTD() btd.mode = self.mode btd.flags = self.flags btd.reserved = self.reserved btd.arb_brp = self.arb & 0xFFFF btd.arb_tseg1 = self.arb >> 16 btd.arb_tseg2 = self._btd_arb & 0xFFFF btd.arb_sjw = self._btd_arb >> 16 btd.data_brp = self.data & 0xFFFF btd.data_tseg1 = self.data >> 16 btd.data_tseg2 = self._btd_data & 0xFFFF btd.data_sjw = self._btd_data >> 16 return btd _fields_ = [ ("mode", ctypes.c_uint16), ("flags", ctypes.c_uint16), ("reserved", ctypes.c_uint32), ("arb", ctypes.c_uint32), ("_btd_arb", ctypes.c_uint32), ("data", ctypes.c_uint32), ("_btd_data", ctypes.c_uint32) ] #end NTCAN_BAUDRATE_X class CAN_IF_STATUS(ctypes.Structure): def get_boardid(self): return ctypes.c_char_p(ctypes.addressof(self._boardid)).value.decode(encoding="ascii", errors="ignore") boardid = property( fget=get_boardid ) _fields_ = [ ("hardware", ctypes.c_uint16), ("firmware", ctypes.c_uint16), ("driver", ctypes.c_uint16), ("dll", ctypes.c_uint16), ("boardstatus", ctypes.c_uint32), ("_boardid", ctypes.c_uint8 * 14), ("features", ctypes.c_uint16) ] #end CAN_IF_STATUS #NTCAN_IOCTL_GET_CTRL_STATUS class NTCAN_CTRL_STATE(ctypes.Structure): _fields_ = [ ("rcv_err_counter", ctypes.c_uint8), ("xmit_err_counter", ctypes.c_uint8), ("status", ctypes.c_uint8), ("type", ctypes.c_uint8) ] #end NTCAN_CTRL_STATE #NTCAN_IOCTL_GET_BITRATE_DETAILS class NTCAN_BITRATE(ctypes.Structure): _fields_ = [ ("baud", ctypes.c_uint32), ("valid", ctypes.c_uint32), ("rate", ctypes.c_uint32), ("clock", ctypes.c_uint32), ("ctrl_type", ctypes.c_uint8), ("tq_pre_sp", ctypes.c_uint8), ("tq_post_sp", ctypes.c_uint8), ("sjw", ctypes.c_uint8), ("error", ctypes.c_uint32), ("flags", ctypes.c_uint32), ("rate_d", ctypes.c_uint32), ("tq_pre_sp_d", ctypes.c_uint8), ("tq_post_sp_d", ctypes.c_uint8), ("sjw_d", ctypes.c_uint8), ("mode", ctypes.c_uint8), ("reserved", ctypes.c_uint32) ] #end NTCAN_BITRATE # Supported values for idArea in NTCAN_FILTER_MASK NTCAN_IDS_REGION_20A = 0 # Filter for 11-bit CAN frames NTCAN_IDS_REGION_20B = 1 # Filter for 29-bit CAN frames NTCAN_IDS_REGION_EV = 2 # Filter for NTCAN events #NTCAN_IOCTL_SET_HND_FILTER class NTCAN_FILTER_MASK(ctypes.Structure): _fields_ = [ ("acr", ctypes.c_uint32), ("amr", ctypes.c_uint32), ("idArea", ctypes.c_uint32) ] #end NTCAN_FILTER_MASK #NTCAN_IOCTL_GET_INFO class NTCAN_INFO(ctypes.Structure): def get_boardid(self): return ctypes.c_char_p(ctypes.addressof(self._boardid)).value.decode(encoding="ascii", errors="ignore") def get_serial_string(self): return ctypes.c_char_p(ctypes.addressof(self._serial_string)).value.decode(encoding="ascii", errors="ignore") def get_drv_build_info(self): return ctypes.c_char_p(ctypes.addressof(self._drv_build_info)).value.decode(encoding="ascii", errors="ignore") def get_lib_build_info(self): return ctypes.c_char_p(ctypes.addressof(self._lib_build_info)).value.decode(encoding="ascii", errors="ignore") boardid = property( fget=get_boardid ) serial_string = property( fget=get_serial_string ) drv_build_info = property( fget=get_drv_build_info ) lib_build_info = property( fget=get_lib_build_info ) _fields_ = [ ("hardware", ctypes.c_uint16), ("firmware", ctypes.c_uint16), ("driver", ctypes.c_uint16), ("dll", ctypes.c_uint16), ("features", ctypes.c_uint32), ("serial", ctypes.c_uint32), ("timestamp_freq", ctypes.c_uint64), ("ctrl_clock", ctypes.c_uint32), ("ctrl_type", ctypes.c_uint8), ("base_net", ctypes.c_uint8), ("ports", ctypes.c_uint8), ("transceiver", ctypes.c_uint8), ("boardstatus", ctypes.c_uint16), ("firmware2", ctypes.c_uint16), ("_boardid", ctypes.c_uint8 * 32), ("_serial_string", ctypes.c_uint8 * 16), ("_drv_build_info", ctypes.c_uint8 * 64), ("_lib_build_info", ctypes.c_uint8 * 64), ("open_handle", ctypes.c_uint16), ("reserved2", ctypes.c_uint8 * 42) ] #end NTCAN_INFO #NTCAN_IOCTL_GET_BUS_STATISTIC class NTCAN_BUS_STATISTIC(ctypes.Structure): _fields_ = [ ("timestamp", ctypes.c_uint64), ("rcv_std_data", ctypes.c_uint32), ("rcv_std_rtr", ctypes.c_uint32), ("rcv_ext_data", ctypes.c_uint32), ("rcv_ext_rtr", ctypes.c_uint32), ("xmit_std_data", ctypes.c_uint32), ("xmit_std_rtr", ctypes.c_uint32), ("xmit_ext_data", ctypes.c_uint32), ("xmit_ext_rtr", ctypes.c_uint32), ("ctrl_ovr", ctypes.c_uint32), ("fifo_ovr", ctypes.c_uint32), ("err_frames", ctypes.c_uint32), ("rcv_byte_count", ctypes.c_uint32), ("xmit_byte_count", ctypes.c_uint32), ("aborted_frames", ctypes.c_uint32), ("rcv_count_fd", ctypes.c_uint32), ("xmit_count_fd", ctypes.c_uint32), ("bit_count", ctypes.c_uint64) ] #end NTCAN_BUS_STATISTIC CMSG_X = CanMsg #ntcanpy equivalent CMSG_T = CanMsgT #ntcanpy compatibility CMSG = CanMsgC #ntcanpy compatibility #64/32-bit-Windows if "Windows" == _SYSTEM_PLATFORM: NTCAN_DLL = ctypes.cdll.ntcan elif "Linux" == _SYSTEM_PLATFORM: NTCAN_DLL = ctypes.CDLL("libntcan.so.4") elif "Java" == _SYSTEM_PLATFORM: NTCAN_DLL = ctypes.cdll.ntcan else: raise RuntimeError( "Unsupported platform:" + _SYSTEM_PLATFORM ) canOpenPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_int,ctypes.c_uint32,ctypes.c_int32,ctypes.c_int32,ctypes.c_int32,ctypes.c_int32,ctypes.POINTER(ctypes.c_void_p)) canOpenParamflags = ((1, "net"), (1, "flags",0 ), (1, "txqueuesize", 0), (1, "rxqueuesize", 0), (1, "txtimeout", 0), (1, "rxtimeout", 0), (1, "handle"), ) canOpen = canOpenPrototype(("canOpen", NTCAN_DLL), canOpenParamflags) # canSendX/canReadX.. canSendXPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsg),ctypes.POINTER(ctypes.c_int32)) canSendXParamflags = ((1, "handle", 0), (1, "cmsgX"), (1, "len"), ) canSendX = canSendXPrototype(("canSendX", NTCAN_DLL), canSendXParamflags) canWriteXPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsg),ctypes.POINTER(ctypes.c_int32),ctypes.c_void_p) canWriteXParamflags = ((1, "handle", 0), (1, "cmsgX"), (1, "len"), (1, "nullovl"), ) canWriteX = canWriteXPrototype(("canWriteX", NTCAN_DLL), canWriteXParamflags) canReadXPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsg),ctypes.POINTER(ctypes.c_int32),ctypes.c_void_p) canReadXParamflags = ((1, "handle", 0), (1, "cmsgX"), (1, "len"), (1, "nullovl"), ) canReadX = canReadXPrototype(("canReadX", NTCAN_DLL), canReadXParamflags) canTakeXPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsg),ctypes.POINTER(ctypes.c_int32)) canTakeXParamflags = ((1, "handle", 0), (1, "cmsgX"), (1, "len"), ) canTakeX = canTakeXPrototype(("canTakeX", NTCAN_DLL), canTakeXParamflags) # canSend/canRead.. canSendPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsgC),ctypes.POINTER(ctypes.c_int32)) canSendParamflags = ((1, "handle", 0), (1, "cmsg"), (1, "len"), ) canSend = canSendPrototype(("canSend", NTCAN_DLL), canSendParamflags) canWritePrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsgC),ctypes.POINTER(ctypes.c_int32),ctypes.c_void_p) canWriteParamflags = ((1, "handle", 0), (1, "cmsg"), (1, "len"), (1, "nullovl"), ) canWrite = canWritePrototype(("canWrite", NTCAN_DLL), canWriteParamflags) canReadPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsgC),ctypes.POINTER(ctypes.c_int32),ctypes.c_void_p) canReadParamflags = ((1, "handle", 0), (1, "cmsg"), (1, "len"), (1, "nullovl"), ) canRead = canReadPrototype(("canRead", NTCAN_DLL), canReadParamflags) canTakePrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsgC),ctypes.POINTER(ctypes.c_int32)) canTakeParamflags = ((1, "handle", 0), (1, "cmsg"), (1, "len"), ) canTake = canTakePrototype(("canTake", NTCAN_DLL), canTakeParamflags) # canSendT/canReadT.. canSendTPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsgT),ctypes.POINTER(ctypes.c_int32)) canSendTParamflags = ((1, "handle", 0), (1, "cmsgT"), (1, "len"), ) canSendT = canSendTPrototype(("canSendT", NTCAN_DLL), canSendTParamflags) canWriteTPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsgT),ctypes.POINTER(ctypes.c_int32),ctypes.c_void_p) canWriteTParamflags = ((1, "handle", 0), (1, "cmsgT"), (1, "len"), (1, "nullovl"), ) canWriteT = canWriteTPrototype(("canWriteT", NTCAN_DLL), canWriteTParamflags) canReadTPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsgT),ctypes.POINTER(ctypes.c_int32),ctypes.c_void_p) canReadTParamflags = ((1, "handle", 0), (1, "cmsgT"), (1, "len"), (1, "nullovl"), ) canReadT = canReadTPrototype(("canReadT", NTCAN_DLL), canReadTParamflags) canTakeTPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CanMsgT),ctypes.POINTER(ctypes.c_int32)) canTakeTParamflags = ((1, "handle", 0), (1, "cmsgT"), (1, "len"), ) canTakeT = canTakeTPrototype(("canTakeT", NTCAN_DLL), canTakeTParamflags) # ----- canSetBaudratePrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32) canSetBaudrateParamflags = ((1, "handle", 0), (1, "baud"), ) canSetBaudrate = canSetBaudratePrototype(("canSetBaudrate", NTCAN_DLL), canSetBaudrateParamflags) canGetBaudratePrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(ctypes.c_uint32)) canGetBaudrateParamflags = ((1, "handle", 0), (1, "baud"), ) canGetBaudrate = canGetBaudratePrototype(("canGetBaudrate", NTCAN_DLL), canGetBaudrateParamflags) canSetBaudrateXPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(NTCAN_BAUDRATE_X)) canSetBaudrateXParamflags = ((1, "handle", 0), (1, "baudX"), ) canSetBaudrateX = canSetBaudrateXPrototype(("canSetBaudrateX", NTCAN_DLL), canSetBaudrateXParamflags) canGetBaudrateX = canSetBaudrateXPrototype(("canGetBaudrateX", NTCAN_DLL), canSetBaudrateXParamflags) #same param like set canSetBaudrateXbtdPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(NTCAN_BAUDRATE_X_BTD)) canSetBaudrateXbtdParamflags = ((1, "handle", 0), (1, "baudXbtd"), ) canSetBaudrateXbtd = canSetBaudrateXbtdPrototype(("canSetBaudrateX", NTCAN_DLL), canSetBaudrateXbtdParamflags) canStatusPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.POINTER(CAN_IF_STATUS)) canStatusParamflags = ((1, "handle", 0), (1, "cstat"), ) canStatus = canStatusPrototype(("canStatus", NTCAN_DLL), canStatusParamflags) canIdAddPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32) canIdAddParamflags = ((1, "handle", 0), (1, "id"), ) canIdAdd = canIdAddPrototype(("canIdAdd", NTCAN_DLL), canIdAddParamflags) canIdDelete = canIdAddPrototype(("canIdDelete", NTCAN_DLL), canIdAddParamflags) #same as canIdAdd canIdRegionAddPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER(ctypes.c_int32)) canIdRegionAddParamflags = ((1, "handle", 0), (1, "id"), (1, "count"), ) canIdRegionAdd = canIdRegionAddPrototype(("canIdRegionAdd", NTCAN_DLL), canIdRegionAddParamflags) canIdRegionDelete = canIdRegionAddPrototype(("canIdRegionDelete", NTCAN_DLL), canIdRegionAddParamflags) getTimestampPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER(ctypes.c_uint64)) getTimestampParamflags = ((1, "handle", 0), (1, "control"), (1, "timestamp"), ) getTimestamp = getTimestampPrototype(("canIoctl", NTCAN_DLL), getTimestampParamflags) #getTimestampFreqPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER(ctypes.c_uint64)) getTimestampFreqParamflags = ((1, "handle", 0), (1, "control"), (1, "freq"), ) getTimestampFreq = getTimestampPrototype(("canIoctl", NTCAN_DLL), getTimestampFreqParamflags) # IoControl .. IoControlUint32Prototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER(ctypes.c_uint32)) IoControlUint32Paramflags = ((1, "handle", 0), (1, "control"), (1, "value32_ptr"), ) IoControlUint32 = IoControlUint32Prototype(("canIoctl", NTCAN_DLL), IoControlUint32Paramflags) IoControlNullPtrPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.c_void_p) IoControlNullPtrParamflags = ((1, "handle", 0), (1, "control"), (1, "null"), ) IoControlNullPtr = IoControlNullPtrPrototype(("canIoctl", NTCAN_DLL), IoControlNullPtrParamflags) IoControlBusStatisticStructPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER( NTCAN_BUS_STATISTIC )) IoControlStructPtrParamflags = ((1, "handle", 0), (1, "control"), (1, "struct"), ) IoControlGetBusStatistic = IoControlBusStatisticStructPrototype(("canIoctl", NTCAN_DLL), IoControlStructPtrParamflags) IoControlCtrlStateStructPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER( NTCAN_CTRL_STATE )) IoControlGetCtrlState = IoControlCtrlStateStructPrototype(("canIoctl", NTCAN_DLL), IoControlStructPtrParamflags) IoControlGetBitrateDetailsStructPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER( NTCAN_BITRATE )) IoControlGetBitrateDetails = IoControlGetBitrateDetailsStructPrototype(("canIoctl", NTCAN_DLL), IoControlStructPtrParamflags) IoControlHndFilterMaskStructPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER( NTCAN_FILTER_MASK )) IoControlFilterMask = IoControlHndFilterMaskStructPrototype(("canIoctl", NTCAN_DLL), IoControlStructPtrParamflags) IoControlGetInfoStructPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p,ctypes.c_uint32,ctypes.POINTER( NTCAN_INFO )) IoControlGetInfo = IoControlGetInfoStructPrototype(("canIoctl", NTCAN_DLL), IoControlStructPtrParamflags) canClosePrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_void_p) canCloseParamflags = ((1, "handle"), ) canClose = canClosePrototype(("canClose", NTCAN_DLL), canCloseParamflags) canFormatErrorPrototype = ctypes.CFUNCTYPE(ctypes.c_uint,ctypes.c_uint,ctypes.c_uint32,ctypes.c_void_p,ctypes.c_uint32) canFormatErrorParamflags = ( (1, "error"),(1, "type"),(1, "buf"), (1, "bufsize"), ) canFormatError = canFormatErrorPrototype(("canFormatError", NTCAN_DLL), canFormatErrorParamflags) del canOpenPrototype,\ canSendXPrototype, canWriteXPrototype, canReadXPrototype, canTakeXPrototype,\ canSendTPrototype, canWriteTPrototype, canReadTPrototype, canTakeTPrototype,\ canSendPrototype, canWritePrototype, canReadPrototype, canTakePrototype,\ canSetBaudratePrototype,\ canGetBaudratePrototype, canSetBaudrateXPrototype, canSetBaudrateXbtdPrototype, canIdAddPrototype, canIdRegionAddPrototype, getTimestampPrototype,\ IoControlUint32Prototype, IoControlNullPtrPrototype,\ IoControlBusStatisticStructPrototype, IoControlCtrlStateStructPrototype, IoControlGetBitrateDetailsStructPrototype, IoControlHndFilterMaskStructPrototype, IoControlGetInfoStructPrototype,\ canStatusPrototype,\ canClosePrototype,\ canFormatErrorPrototype del canOpenParamflags,\ canSendXParamflags, canWriteXParamflags, canReadXParamflags, canTakeXParamflags,\ canSendTParamflags, canWriteTParamflags, canReadTParamflags, canTakeTParamflags,\ canSendParamflags, canWriteParamflags, canReadParamflags, canTakeParamflags,\ canSetBaudrateParamflags,\ canGetBaudrateParamflags, canSetBaudrateXParamflags, canSetBaudrateXbtdParamflags,canIdAddParamflags, canIdRegionAddParamflags, getTimestampFreqParamflags, getTimestampParamflags,\ IoControlUint32Paramflags, IoControlNullPtrParamflags,IoControlStructPtrParamflags,\ canStatusParamflags,\ canCloseParamflags,\ canFormatErrorParamflags class CanFdHandle: def __init__(self,netno,flags,TxQueueSize=1,RxQueueSize=1,TxTout=0,RxTout=0): self.ifstatus = None self.closefunc = canClose self.hcan = ctypes.c_void_p(0) res = canOpen(netno,flags,TxQueueSize,RxQueueSize,TxTout,RxTout,self.hcan) if res!=0: raise Exception("CanFdInterface creation error") # info for __str__ function self.net=netno def __del__(self): return self.closefunc(self.hcan) def __str__(self): #CanFdHandle(Net:4, Id:CAN_VIRTUAL, Drv:4.0.3, Lib:5.0.0, Hw:1.0.0, Fw:0.0.0) d=self.driver l=self.library h=self.hardware f=self.firmware return self.__class__.__name__ + "(" + "Net:" + str( self.net ) + ", Id:" + self.boardid +\ ", Drv:" +str(d>> 12 & 0x0F)+"."+str(d>> 8 & 0x0F)+"."+str(d& 0xFF) +\ ", Lib:" +str(l>>12 & 0x0F)+"."+str(l>> 8 & 0x0F)+"."+str(l& 0xFF) +\ ", Hw:" +str(h>> 12 & 0x0F)+"."+str(h>> 8 & 0x0F)+"."+str(h& 0xFF) +\ ", Fw:" +str(f>> 12 & 0x0F)+"."+str(f>> 8 & 0x0F)+"."+str(f& 0xFF) +\ ")" def set_bitrate_index(self,index_arb,index_data=NTCAN_NO_BAUDRATE,flagsX=0,modeX=NTCAN_BAUDRATE_MODE_INDEX): if index_data != NTCAN_NO_BAUDRATE: flagsX |= NTCAN_BAUDRATE_FLAG_FD baudX = NTCAN_BAUDRATE_X() baudX.mode=modeX baudX.flags=flagsX baudX.reserved=0 baudX.arb=index_arb baudX.data=index_data return canSetBaudrateX(self.hcan,baudX); def set_bitrate_numeric(self,value_arb,value_data=0,flagsX=0,modeX=NTCAN_BAUDRATE_MODE_NUM): return self.set_bitrate_index(value_arb,value_data,flagsX,modeX) def set_bitrate_btr(self,btr_val,flagsX=0): # controller bit timing register return self.set_bitrate_index(btr_val,NTCAN_NO_BAUDRATE,flagsX, NTCAN_BAUDRATE_MODE_BTR_CTRL) # controller independent bitrate parameters (canonical) def set_bitrate_btd(self, arb_brp, arb_tseg1, arb_tseg2, arb_sjw, data_brp=NTCAN_NO_BAUDRATE, data_tseg1=0, data_tseg2=0, data_sjw=0, flagsX=0): if data_brp != NTCAN_NO_BAUDRATE: flagsX |= NTCAN_BAUDRATE_FLAG_FD baudXbtd = NTCAN_BAUDRATE_X_BTD() baudXbtd.mode=NTCAN_BAUDRATE_MODE_BTR_CANONICAL baudXbtd.flags=flagsX baudXbtd.reserved=0 baudXbtd.arb_brp=arb_brp baudXbtd.arb_tseg1=arb_tseg1 baudXbtd.arb_tseg2=arb_tseg2 baudXbtd.arb_sjw=arb_sjw if data_brp != NTCAN_NO_BAUDRATE: baudXbtd.data_brp=data_brp baudXbtd.data_tseg1=data_tseg1 baudXbtd.data_tseg2=data_tseg2 baudXbtd.data_sjw=data_sjw return canSetBaudrateXbtd(self.hcan,baudXbtd); def get_bitrate(self): baudX = NTCAN_BAUDRATE_X() result = canGetBaudrateX(self.hcan,baudX) if ( 0 == result ) and ( NTCAN_BAUDRATE_MODE_BTR_CANONICAL == baudX.mode ): return_struct = baudX.btd() #replace else: return_struct = baudX return_struct.result = result #add ntcan result return return_struct def set_bitrate_index_c(self,value,flags=0): #set bitrate via classical bitrate API return canSetBaudrate(self.hcan,value|flags) def set_bitrate_numeric_c(self,value,flags=0): #set bitrate via classical bitrate API return canSetBaudrate(self.hcan,value|NTCAN_USER_BAUDRATE_NUM|flags); def set_bitrate_btr_c(self,value,flags=0): #set bitrate via classical bitrate API return canSetBaudrate(self.hcan,value|NTCAN_USER_BAUDRATE|flags) def get_bitrate_c(self): #get bitrate via classical bitrate API baudrate = ctypes.c_uint32(0) result = canGetBaudrate(self.hcan,baudrate) if SUCCESS == result: return baudrate.value else: return NTCAN_NO_BAUDRATE baudrate = property( fset=set_bitrate_index_c, fget=get_bitrate_c ) def id_add(self,id): return canIdAdd(self.hcan,id) def id_region_add(self,id,count): countadd = ctypes.c_int32(count) ret = canIdRegionAdd(self.hcan,id,countadd) return ret def id29_add(self,id): return canIdAdd(self.hcan,id|NTCAN_20B_BASE) def id29_region_add(self,id,count): countadd = ctypes.c_int32(count) ret = canIdRegionAdd(self.hcan,id|NTCAN_20B_BASE,countadd) return ret def event_id_add(self,id): return canIdAdd(self.hcan,id|NTCAN_EV_BASE) def event_id_region_add(self,id,count): countadd = ctypes.c_int32(count) ret = canIdRegionAdd(self.hcan,id|NTCAN_EV_BASE,countadd) return ret def id_delete(self,id): return canIdDelete(self.hcan,id) def id_region_delete(self,id,count): countadd = ctypes.c_int32(count) ret = canIdRegionDelete(self.hcan,id,countadd) return ret def id29_delete(self,id): return canIdDelete(self.hcan,id|NTCAN_20B_BASE) def id29_region_delete(self,id,count): countadd = ctypes.c_int32(count) ret = canIdRegionDelete(self.hcan,id|NTCAN_20B_BASE,countadd) return ret def event_id_delete(self,id): return canIdDelete(self.hcan,id|NTCAN_EV_BASE) def event_id_region_delete(self,id,count): countadd = ctypes.c_int32(count) ret = canIdRegionDelete(self.hcan,id|NTCAN_EV_BASE,countadd) return ret def can_send(self,cmsgn): return cmsgn.__class__._can_send_func(self,cmsgn) def can_send_x(self,cmsgx): # (CanMsg=CMSG_X): with CAN FD and timestamp #print("can_send_x") lenmsg = ctypes.c_int32(1) return canSendX(self.hcan,cmsgx,lenmsg) def can_send_t(self,cmsgt): # (CMSG_T): no CAN FD with timestamp #print("can_send_t") lenmsg = ctypes.c_int32(1) return canSendT(self.hcan,cmsgt,lenmsg) def can_send_c(self,cmsg): # (CMSG): no CAN FD, no timestamp #print("can_send_c") lenmsg = ctypes.c_int32(1) return canSend(self.hcan,cmsg,lenmsg) # def can_take(self,cmsgn): # if isinstance(cmsgn, CanMsg): # return self.can_take_x(cmsgn) # elif isinstance(cmsgn, CanMsgT): # return self.can_take_t(cmsgn) # else: # elif isinstance(cmsgn, CanMsgC): # return self.can_take_c(cmsgn) def can_take(self,cmsgn): return cmsgn.__class__._can_take_func(self,cmsgn) def can_take_x(self,cmsgx): # (CanMsg=CMSG_X): with CAN FD and timestamp #print("can_take_x") lenmsg = ctypes.c_int32(1) ret=canTakeX(self.hcan,cmsgx,lenmsg) if( 0 == lenmsg.value ): ret = -1 return ret def can_take_t(self,cmsgt): # (CMSG_T): no CAN FD with timestamp #print("can_take_t") lenmsg = ctypes.c_int32(1) ret=canTakeT(self.hcan,cmsgt,lenmsg) if( 0 == lenmsg.value ): ret = -1 return ret def can_take_c(self,cmsg): # (CMSG): no CAN FD, no timestamp #print("can_take_c") lenmsg = ctypes.c_int32(1) ret=canTake(self.hcan,cmsg,lenmsg) if( 0 == lenmsg.value ): ret = -1 return ret def can_write(self,cmsgn): return cmsgn.__class__._can_write_func(self,cmsgn) def can_write_x(self,cmsgx): # (CanMsg=CMSG_X): with CAN FD and timestamp #print("can_write_x") lenmsg = ctypes.c_int32(1) return canWriteX(self.hcan,cmsgx,lenmsg,ctypes.c_void_p(None)) #None=0? def can_write_t(self,cmsgt): # (CMSG_T): no CAN FD with timestamp #print("can_write_t") lenmsg = ctypes.c_int32(1) return canWriteT(self.hcan,cmsgt,lenmsg,ctypes.c_void_p(None)) #None=0? def can_write_c(self,cmsg): #print("can_write_c") lenmsg = ctypes.c_int32(1) # (CMSG): no CAN FD, no timestamp return canWrite(self.hcan,cmsg,lenmsg,ctypes.c_void_p(None)) #None=0? def can_read(self,cmsgn): return cmsgn.__class__._can_read_func(self,cmsgn) def can_read_x(self,cmsgx): # (CanMsg=CMSG_X): with CAN FD and timestamp #print("can_read_x") lenmsg = ctypes.c_int32(1) res = canReadX(self.hcan,cmsgx,lenmsg,ctypes.c_void_p(None)) #None=0? return res def can_read_t(self,cmsgt): # (CMSG_T): no CAN FD w/ timestamp #print("can_read_t") lenmsg = ctypes.c_int32(1) return canReadT(self.hcan,cmsgt,lenmsg,ctypes.c_void_p(None)) #None=0? def can_read_c(self,cmsg): # (CMSG): no CAN FD, no timestamp #print("can_read_c") lenmsg = ctypes.c_int32(1) return canRead(self.hcan,cmsg,lenmsg,ctypes.c_void_p(None)) #None=0? def get_current_timestamp(self): timestamp = ctypes.c_uint64(0) res = getTimestamp(self.hcan,NTCAN_IOCTL_GET_TIMESTAMP,timestamp) if res != 0: timestamp = 0 return timestamp.value def get_timestamp_freq(self): if self.timestamp_cached is None: timestamp = ctypes.c_uint64(0) res = getTimestampFreq(self.hcan,NTCAN_IOCTL_GET_TIMESTAMP_FREQ,timestamp) if 0 == res: ret = timestamp.value else: ret = 0 self.timestamp_cached = ret return self.timestamp_cached current_timestamp = property( fget=get_current_timestamp ) timestamp_freq = property( fget=get_timestamp_freq ) timestamp_cached = None def io_control_get_uint32(self,control): ret_value = ctypes.c_uint32(0) res = IoControlUint32(self.hcan,control,ret_value) if SUCCESS == res: return ret_value.value else: return -1 def io_control_set_uint32(self,control,value): set_value = ctypes.c_uint32(value) res = IoControlUint32(self.hcan,control,set_value) return res def get_tx_timeout(self): return self.io_control_get_uint32( NTCAN_IOCTL_GET_TX_TIMEOUT ) def get_rx_timeout(self): return self.io_control_get_uint32( NTCAN_IOCTL_GET_RX_TIMEOUT ) def set_tx_timeout(self,value): return self.io_control_set_uint32( NTCAN_IOCTL_SET_TX_TIMEOUT, value ) def set_rx_timeout(self,value): return self.io_control_set_uint32( NTCAN_IOCTL_SET_RX_TIMEOUT, value ) def get_busload_interval(self): return self.io_control_get_uint32( NTCAN_IOCTL_GET_BUSLOAD_INTERVAL ) def set_busload_interval(self,value): return self.io_control_set_uint32( NTCAN_IOCTL_SET_BUSLOAD_INTERVAL, value ) busload_interval = property( fset=set_busload_interval, fget=get_busload_interval ) rx_timeout = property( fset=set_rx_timeout, fget=get_rx_timeout ) tx_timeout = property( fset=set_tx_timeout, fget=get_tx_timeout ) def get_serial(self): return self.io_control_get_uint32( NTCAN_IOCTL_GET_SERIAL ) serial = property( fget=get_serial ) def get_msg_count(self): return self.io_control_get_uint32( NTCAN_IOCTL_GET_RX_MSG_COUNT ) msg_count = property( fget=get_msg_count ) def set_ext_filter(self,value): return self.io_control_set_uint32( NTCAN_IOCTL_SET_20B_HND_FILTER, value ) ext_filter = property( fset=set_ext_filter ) def can_status(self,ifstatus): ifstatus._boardid[0] = 0 ires = canStatus(self.hcan,ifstatus) ifstatus._boardid[13] = 0 return ires def _cache_ifstatus(self): if self.ifstatus == None: self.ifstatus = CAN_IF_STATUS() ires = self.can_status(self.ifstatus) if ires != SUCCESS: self.ifstatus = None def get_boardid(self): self._cache_ifstatus() if self.ifstatus != None: return self.ifstatus.boardid def get_hardware(self): self._cache_ifstatus() if self.ifstatus != None: return self.ifstatus.hardware def get_firmware(self): self._cache_ifstatus() if self.ifstatus != None: return self.ifstatus.firmware def get_driver(self): self._cache_ifstatus() if self.ifstatus != None: return self.ifstatus.driver def get_library(self): self._cache_ifstatus() if self.ifstatus != None: return self.ifstatus.dll def get_features(self): self._cache_ifstatus() if self.ifstatus != None: return self.ifstatus.features boardid = property( fget=get_boardid ) hardware = property( fget=get_hardware ) firmware = property( fget=get_firmware ) driver = property( fget=get_driver ) library = property( fget=get_library ) features = property( fget=get_features ) def get_bus_statistic(self): bus_statistic = NTCAN_BUS_STATISTIC() bus_statistic.result = IoControlGetBusStatistic(self.hcan,NTCAN_IOCTL_GET_BUS_STATISTIC,bus_statistic) return bus_statistic def bus_statistic_clear(self): return IoControlNullPtr( self.hcan, NTCAN_IOCTL_GET_BUS_STATISTIC, ctypes.c_void_p(None) ) def get_controller_state(self): ctrl_state = NTCAN_CTRL_STATE() ctrl_state.result = IoControlGetCtrlState(self.hcan,NTCAN_IOCTL_GET_CTRL_STATUS,ctrl_state) return ctrl_state def get_bitrate_details(self): bitrate_details = NTCAN_BITRATE() bitrate_details.result = IoControlGetBitrateDetails(self.hcan,NTCAN_IOCTL_GET_BITRATE_DETAILS,bitrate_details) return bitrate_details def get_info(self): ninfo = NTCAN_INFO() ninfo._boardid[31] = 0 ninfo._serial_string[15] = 0 ninfo._drv_build_info[63] = 0 ninfo._lib_build_info[63] = 0 ninfo.result = IoControlGetInfo(self.hcan,NTCAN_IOCTL_GET_INFO,ninfo) ninfo._boardid[31] = 0 ninfo._serial_string[15] = 0 ninfo._drv_build_info[63] = 0 ninfo._lib_build_info[63] = 0 return ninfo def set_hnd_filter(self,acr,amr,idArea): mask = NTCAN_FILTER_MASK() mask.acr = acr mask.amr = amr mask.idArea = idArea return IoControlFilterMask(self.hcan,NTCAN_IOCTL_SET_HND_FILTER,ninfo) def unset_hnd_filter(self,idArea): mask = NTCAN_FILTER_MASK() mask.acr = 0x00000000 mask.amr = 0xFFFFFFFF mask.idArea = idArea return IoControlFilterMask(self.hcan,NTCAN_IOCTL_SET_HND_FILTER,ninfo) def canIdAdd(self, idFirst, idLast=-1): if -1==idLast: idLast = idFirst for id in range(idFirst, idLast+1): res = canIdAdd(self.hcan,id) if( res != 0): break def canIdDelete(self, idFirst, idLast=-1): if -1==idLast: idLast = idFirst for id in range(idFirst, idLast+1): res = canIdDelete(self.hcan,id) if( res != 0): break def canPurge(self): return IoControlNullPtr( self.hcan, NTCAN_IOCTL_FLUSH_RX_FIFO, ctypes.c_void_p(None) ) def canAbortRx(self): return IoControlNullPtr( self.hcan, NTCAN_IOCTL_ABORT_RX, ctypes.c_void_p(None) ) def canAbortTx(self): return IoControlNullPtr( self.hcan, NTCAN_IOCTL_ABORT_TX, ctypes.c_void_p(None) ) # end class # global def create_can_fd_handle(netno,TxQueueSize=1,RxQueueSize=1,TxTimeout=0,RxTimeout=0,flags=0): return CanFdHandle(netno,flags|NTCAN_MODE_FD,TxQueueSize,RxQueueSize,TxTimeout,RxTimeout) def create_can_handle(netno,TxQueueSize=1,RxQueueSize=1,TxTimeout=0,RxTimeout=0,flags=0): return CanFdHandle(netno,flags,TxQueueSize,RxQueueSize,TxTimeout,RxTimeout) def CIF(netno, RxQueueSize=128, RxTimeout=2000, TxTimeout=2000, TxQueueSize=2, flags = 0): #ntcanpy compatibility return create_can_handle(netno,TxQueueSize,RxQueueSize,TxTimeout,RxTimeout,flags) # indexer for CMSG.. class DataIndexerUChar(object): # -> ntcanpy compatible indexer syntax: msg.data.c[5] #s_data_ref = None def __init__(self,dataref): self.data_ref = dataref def __getitem__(self,idx): return self.data_ref.candata[idx] def __setitem__(self,idx,value): self.data_ref.candata[idx] = value class DataIndexerUShort(object): # -> ntcanpy compatible indexer syntax: msg.data.s[5] #s_data_ref = None def __init__(self,dataref): self.data_ref = dataref def __getitem__(self,idx): idx *= 2 return self.data_ref.candata[idx]\ |(self.data_ref.candata[idx+1]<<8) def __setitem__(self,idx,value): idx *= 2 self.data_ref.candata[idx] = value&0xFF self.data_ref.candata[idx+1] = (value&0xFF00)>>8 class DataIndexerULong(object): # -> ntcanpy compatible indexer syntax: msg.data.l[5] s_data_ref = None def __init__(self,dataref): self.data_ref = dataref def __getitem__(self,idx): idx *= 4 return self.data_ref.candata[idx]\ |(self.data_ref.candata[idx+1]<<8)\ |(self.data_ref.candata[idx+2]<<16)\ |(self.data_ref.candata[idx+3]<<24) def __setitem__(self,idx,value): idx *= 4 self.data_ref.candata[idx] = value&0x000000FF self.data_ref.candata[idx+1] = (value&0x0000FF00)>>8 self.data_ref.candata[idx+2] = (value&0x00FF0000)>>16 self.data_ref.candata[idx+3] = (value&0xFF000000)>>24 class DataIndexerULongLong(object): # -> indexer syntax: msg.data.ll[5] #s_data_ref = None def __init__(self,dataref): self.data_ref = dataref def __getitem__(self,idx): idx *= 8 return self.data_ref.candata[idx]\ |(self.data_ref.candata[idx+1]<<8)\ |(self.data_ref.candata[idx+2]<<16)\ |(self.data_ref.candata[idx+3]<<24)\ |(self.data_ref.candata[idx+4]<<32)\ |(self.data_ref.candata[idx+5]<<40)\ |(self.data_ref.candata[idx+6]<<48)\ |(self.data_ref.candata[idx+7]<<56) def __setitem__(self,idx,value): idx *= 8 self.data_ref.candata[idx] = value&0x00000000000000FF self.data_ref.candata[idx+1] = (value&0x000000000000FF00)>>8 self.data_ref.candata[idx+2] = (value&0x0000000000FF0000)>>16 self.data_ref.candata[idx+3] = (value&0x00000000FF000000)>>24 self.data_ref.candata[idx+4] = (value&0x000000FF00000000)>>32 self.data_ref.candata[idx+5] = (value&0x0000FF0000000000)>>40 self.data_ref.candata[idx+6] = (value&0x00FF000000000000)>>48 self.data_ref.candata[idx+7] = (value&0xFF00000000000000)>>56 class DataIndexer(object): # -> ntcanpy compatible indexer syntax: cmsg.data.c/s/l[5] def __init__(self,dataref): self.c = DataIndexerUChar(dataref) self.s = DataIndexerUShort(dataref) self.l = DataIndexerULong(dataref) self.ll = DataIndexerULongLong(dataref)