Transmitting CAN frame
In this tutorial we will learn how to create and transmit CAN/CANFD frame using simulation or neoVI hardware.
1. Initialize Vehicle Spy X
First step we need to initialize a new instance of the vspyx.Core.Application
.
import vspyx
if 'app' not in globals():
global app
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)
2. Connecting Channel to Controller:
Next we need to connect a channel to a controller.
a) Hardware Setup
To use hardware, first we to activate it, create a controller and connect the desired hardware channel to the controller. vspyx.VehicleSpy.Module.AddSource()
device = app.VehicleSpy.AddSource("hardware sn") # Activates HW
channel = app.Resolver[f'{device.Source.Identifier} {"Channel Name"} Discovery Channel'] # Assign a channel from the HW
b) Creating CAN Channel (for simulation mode)
On simulation (no hardware), we need to create a new CAN cluster and CAN channel then connect the channel to the controller.
We must specify CAN and CANFD baud rate (int) and give a unique name to the new cluster, channel and controller.
vspyx.Communication.CANCluster()
and vspyx.Communication.CANChannel()
can_channel_config = Channel_pb2.CANChannel()
channel = vspyx.Communication.CANChannel.New(can_channel_config)
can_cluster_config = Cluster_pb2.CANCluster()
can_cluster_config.ClusterBase.BaudRate = "Enter CAN channel baud rate"
if canfd_protocol: # in case if we want to create CAN FD channel
can_cluster_config.FDBaudRate = "Enter CAN channel baud rate"
can_cluster_config.UseFDBaudRate = True
can_cluster = vspyx.Communication.CANCluster.New(can_cluster_config)
channel.Initialize(app, "channel name")
can_cluster.Initialize(app, "cluster name")
can_cluster.AddChannel(channel)
app.Communication.Topology.Channels.Add(channel)
app.Communication.UserTopology.Channels.Add(channel)
app.Communication.Topology.Clusters.Add(can_cluster)
app.Communication.UserTopology.Clusters.Add(can_cluster)
Now we need to create a controller and connect it to a channel: vspyx.Communication.Channel.NewAttachedController()
[controller, connector] = channel.NewAttachedController('controller name', listenOnly=False) # creates controller and connects it to HW channel
3. Creating CAN Frame
After we created a channel, next let’s create a CAN/CANFD frame using vspyx.Frames.CANFrameBuilder()
.
In this example, we are going to specify the arbid (in hex), databytes (byte array), frame length (int), extended id (bool) and ifCANFD (bool)
frame = vspyx.Frames.CANFrameBuilder()
frame.ArbIDSet("arbid in hex")
frame.DataSet("data bytes as byte array)
frame.DLCSet(vspyx.Frames.CANFrameBuilder.CAN_DLToDLC("length","bool canfd protocol"))
frame.ExtendedSet("bool extended id")
frame.CANFDSet("bool canfd protocol")
4. Creating a Report
Next we need create a function that prints the CAN frame using vspyx.Communication.DataLinkPDUPoint
def report(point: vspyx.Runtime.Point):
if not isinstance(point, vspyx.Communication.DataLinkPDUPoint):
return
print(f"{point.GetAttribute('ShortName')} {point.GetAttribute('ArbID')}\
{point.GetAttribute('Length')} {point.GetAttribute('Payload')}\
{point.GetAttribute('ChannelName')} {point.GetAttribute('WireType')}\
{point.GetAttribute('DLC')} {point.GetAttribute('Timestamp')}")
5. Creating TX Event
Using the controller
and the CAN frame
made in step2 and step3, we’re going to create a TX Event by submitting the CAN Frame into the Controller:
tx_event = lambda scheduler: controller.SubmitEvent(frame)
6. Transmitting the Frame
Last step is to go online with the hardware or start the simulation. We’re going to use report
function and tx_event
made in step4 and 5.
We must specify the periodic TX rate and the duration using vspyx.VehicleSpy.Module.Scheduler()
.
observer = app.VehicleSpy.PrepareForStart(analysisMode=False)
observer.OnPoint.Add(report)
app.VehicleSpy.Start()
app.VehicleSpy.Scheduler.GetTimingCallback(datetime.timedelta(milliseconds="tx rate")).Add(tx_event)
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds="tx duration"))
app.VehicleSpy.Stop()
Complete Example:
import vspyx
import datetime
from intrepidcs.vspyx.rpc.Communication import Cluster_pb2, Channel_pb2
hardware_sn = None
channel_name = 'HSCAN'
canfd_baudrate = 2000000
baudrate = 500000
canfd_protocol = False
extended_id = True
arbid = 0x123
length = 3
data_bytes = ([0x01, 0xff, 0x03])
periodic_rate = 10 # in milliseconds
duration = 10000 # in milliseconds
def setup_hw():
device = app.VehicleSpy.AddSource(hardware_sn) #activate hw
channel = app.Resolver[f'{device.Source.Identifier} {channel_name} Discovery Channel']
return channel
def create_channel():
can_channel_config = Channel_pb2.CANChannel()
channel = vspyx.Communication.CANChannel.New(can_channel_config)
can_cluster_config = Cluster_pb2.CANCluster()
can_cluster_config.ClusterBase.BaudRate = baudrate
if canfd_protocol:
can_cluster_config.FDBaudRate = canfd_baudrate
can_cluster_config.UseFDBaudRate = True
can_cluster = vspyx.Communication.CANCluster.New(can_cluster_config)
channel.Initialize(app, channel_name)
can_cluster.Initialize(app, f'cluster_{channel_name}')
can_cluster.AddChannel(channel)
app.Communication.Topology.Channels.Add(channel)
app.Communication.UserTopology.Channels.Add(channel)
app.Communication.Topology.Clusters.Add(can_cluster)
app.Communication.UserTopology.Clusters.Add(can_cluster)
return channel
def report(point: vspyx.Runtime.Point):
if not isinstance(point, vspyx.Communication.DataLinkPDUPoint):
return
print(f"{point.GetAttribute('ShortName')} {point.GetAttribute('ArbID')}\
{point.GetAttribute('Length')} {point.GetAttribute('Payload')}\
{point.GetAttribute('ChannelName')} {point.GetAttribute('WireType')}\
{point.GetAttribute('DLC')} {point.GetAttribute('Timestamp')}")
def can_frame():
frame = vspyx.Frames.CANFrameBuilder()
frame.ArbIDSet(arbid)
frame.DataSet(data_bytes)
frame.DLCSet(vspyx.Frames.CANFrameBuilder.CAN_DLToDLC(length,canfd_protocol))
frame.ExtendedSet(extended_id)
frame.CANFDSet(canfd_protocol)
return frame
def connect_to_controller(channel):
[controller, connector] = channel.NewAttachedController('my_setup', listenOnly=False)
return controller
def start_stop(tx):
observer = app.VehicleSpy.PrepareForStart(analysisMode=False)
observer.OnPoint.Add(report)
app.VehicleSpy.Start()
app.VehicleSpy.Scheduler.GetTimingCallback(datetime.timedelta(milliseconds=periodic_rate)).Add(tx)
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=duration))
app.VehicleSpy.Stop()
def submit_tx_event(controller):
tx_event = lambda scheduler: controller.SubmitEvent(can_frame())
return tx_event
def main():
if 'app' not in globals():
global app
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)
print(f'Version: {app.Version}')
if hardware_sn is not None:
channel = setup_hw()
else:
channel = create_channel()
controller = connect_to_controller(channel)
tx_event = submit_tx_event(controller)
start_stop(tx_event)
if __name__ == '__main__':
main()