一、概述
1.介绍
python-can库为Python提供了控制器局域网的支持,为不同的硬件设备提供了通用的抽象,并提供了一套实用程序,用于在CAN总线上发送和接收消息。
支持硬件接口:
Name |
Documentation |
---|---|
|
SocketCAN |
|
Kvaser’s CANLIB |
|
CAN over Serial |
|
CAN over Serial / SLCAN |
|
IXXAT Virtual CAN Interface |
|
PCAN Basic API |
|
USB2CAN Interface |
|
NI-CAN |
|
isCAN |
|
neoVI |
|
Vector |
|
Virtual |
|
CANalyst-II |
|
SYSTEC interface |
2.环境搭建
Python安装:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe
PCAN-USB驱动:https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip
库:pip install python-can
3.参考文档
https://python-can.readthedocs.io/en/master/#
二、常用方法
1.接收报文
from can.interfaces.pcan.pcan import PcanBus def bus_recv(): \"\"\"轮询接收消息\"\"\" try: while True: msg = bus.recv(timeout=100) print(msg) except KeyboardInterrupt: pass if __name__ == \'__main__\': bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000) bus_recv()
2.发送报文
from can.interfaces.pcan.pcan import PcanBus def bus_send(): \"\"\"can消息发送\"\"\" while True: time.sleep(0.02) try: bus.send(msg) print(\"消息发送 {}\".format(bus.channel_info)) except can.CanError: print(\"消息未发送\") if __name__ == \'__main__\': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 报文 bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000) bus_send()
3.定期发送报文
def bus_send_periodic(): \"\"\"周期发送报文\"\"\" print(\"开始每200毫秒发送一条消息。持续时间10s\") task = bus.send_periodic(msg, 1.5) # 定期发送 if not isinstance(task, can.ModifiableCyclicTaskABC): # 断言task类型 print(\"此接口似乎不支持\") task.stop() return time.sleep(5) # 持续时间 print(\"发送完成\") print(\"更改运行任务的数据以99开头\") msg.data[0] = 0x99 task.modify_data(msg) # 修改data首字节 time.sleep(10) task.stop() print(\"停止循环发送\") print(\"将停止任务的数据更改为单个 ff 字节\") msg.data = bytearray([0xff]) # 重新定向data msg.dlc = 1 # 定义data长度 task.modify_data(msg) # 修改data time.sleep(10) print(\"重新开始\") task.start() # 重新启动已停止的周期性任务 time.sleep(10) task.stop() print(\"完毕\") if __name__ == \'__main__\': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 报文 bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000) bus_send_periodic()
4.循环收发消息
from can.interfaces.pcan.pcan import PcanBus def send_cyclic(stop_event): \"\"\"循环发送消息\"\"\" print(\"开始每1秒发送1条消息\") start_time = time.time() while not stop_event.is_set(): msg.timestamp = time.time() - start_time bus.send(msg) print(f\"tx: {msg}\") time.sleep(1) print(\"停止发送消息\") def receive(stop_event): \"\"\"循环接收消息\"\"\" print(\"开始接收消息\") while not stop_event.is_set(): rx_msg = bus.recv(1) if rx_msg is not None: print(f\"rx: {rx_msg}\") print(\"停止接收消息\") def send_and_recv_msg(): \"\"\"发送一个消息并接收一个消息,需要双通道CAN\"\"\" stop_event = threading.Event() t_send_cyclic = threading.Thread(target=send_cyclic, args=(stop_event,)) t_receive = threading.Thread(target=receive, args=(stop_event,)) t_receive.start() t_send_cyclic.start() try: while True: time.sleep(0) # yield except KeyboardInterrupt: pass # 正常退出 stop_event.set() time.sleep(0.5) if __name__ == \'__main__\': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 报文 bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000) send_and_recv_msg()
来源:https://www.cnblogs.com/airb/p/15963037.html
本站部分图文来源于网络,如有侵权请联系删除。