Transmitting CAN frame

In this tutorial we will learn how to create and transmit CAN/CANFD frame using simulation or neoVI hardware.

1. Initialize Vehicle Spy X

First step we need to initialize a new instance of the vspyx.Core.Application.

import vspyx

if 'app' not in globals():
        global app
        app = vspyx.Core.Application.New()
        app.Initialize(loadAllModules=True)

2. Connecting Channel to Controller:

Next we need to connect a channel to a controller.

a) Hardware Setup

To use hardware, first we to activate it, create a controller and connect the desired hardware channel to the controller. vspyx.VehicleSpy.Module.AddSource()

device = app.VehicleSpy.AddSource("hardware sn") # Activates HW
channel = app.Resolver[f'{device.Source.Identifier} {"Channel Name"} Discovery Channel'] # Assign a channel from the HW

b) Creating CAN Channel (for simulation mode)

On simulation (no hardware), we need to create a new CAN cluster and CAN channel then connect the channel to the controller. We must specify CAN and CANFD baud rate (int) and give a unique name to the new cluster, channel and controller. vspyx.Communication.CANCluster() and vspyx.Communication.CANChannel()

can_channel_config = Channel_pb2.CANChannel()
channel = vspyx.Communication.CANChannel.New(can_channel_config)

can_cluster_config = Cluster_pb2.CANCluster()
can_cluster_config.ClusterBase.BaudRate = "Enter CAN channel baud rate"

if canfd_protocol: # in case if we want to create CAN FD channel
    can_cluster_config.FDBaudRate = "Enter CAN channel baud rate"
    can_cluster_config.UseFDBaudRate = True

can_cluster = vspyx.Communication.CANCluster.New(can_cluster_config)

channel.Initialize(app, "channel name")

can_cluster.Initialize(app, "cluster name")
can_cluster.AddChannel(channel)

app.Communication.Topology.Channels.Add(channel)
app.Communication.UserTopology.Channels.Add(channel)
app.Communication.Topology.Clusters.Add(can_cluster)
app.Communication.UserTopology.Clusters.Add(can_cluster)

Now we need to create a controller and connect it to a channel: vspyx.Communication.Channel.NewAttachedController()

[controller, connector] = channel.NewAttachedController('controller name', listenOnly=False) # creates controller and connects it to HW channel

3. Creating CAN Frame

After we created a channel, next let’s create a CAN/CANFD frame using vspyx.Frames.CANFrameBuilder(). In this example, we are going to specify the arbid (in hex), databytes (byte array), frame length (int), extended id (bool) and ifCANFD (bool)

frame = vspyx.Frames.CANFrameBuilder()
frame.ArbIDSet("arbid in hex")
frame.DataSet("data bytes as byte array)
frame.DLCSet(vspyx.Frames.CANFrameBuilder.CAN_DLToDLC("length","bool canfd protocol"))
frame.ExtendedSet("bool extended id")
frame.CANFDSet("bool canfd protocol")

4. Creating a Report

Next we need create a function that prints the CAN frame using vspyx.Communication.DataLinkPDUPoint

def report(point: vspyx.Runtime.Point):
        if not isinstance(point, vspyx.Communication.DataLinkPDUPoint):
                return

        print(f"{point.GetAttribute('ShortName')} {point.GetAttribute('ArbID')}\
                        {point.GetAttribute('Length')} {point.GetAttribute('Payload')}\
                        {point.GetAttribute('ChannelName')} {point.GetAttribute('WireType')}\
                        {point.GetAttribute('DLC')} {point.GetAttribute('Timestamp')}")

5. Creating TX Event

Using the controller and the CAN frame made in step2 and step3, we’re going to create a TX Event by submitting the CAN Frame into the Controller:

tx_event = lambda scheduler: controller.SubmitEvent(frame)

6. Transmitting the Frame

Last step is to go online with the hardware or start the simulation. We’re going to use report function and tx_event made in step4 and 5. We must specify the periodic TX rate and the duration using vspyx.VehicleSpy.Module.Scheduler().

observer = app.VehicleSpy.PrepareForStart(analysisMode=False)
observer.OnPoint.Add(report)
app.VehicleSpy.Start()
app.VehicleSpy.Scheduler.GetTimingCallback(datetime.timedelta(milliseconds="tx rate")).Add(tx_event)
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds="tx duration"))
app.VehicleSpy.Stop()

Complete Example:

import vspyx
import datetime
from intrepidcs.vspyx.rpc.Communication import Cluster_pb2, Channel_pb2

hardware_sn = None
channel_name = 'HSCAN'
canfd_baudrate = 2000000
baudrate = 500000
canfd_protocol = False
extended_id =  True
arbid = 0x123
length = 3
data_bytes = ([0x01, 0xff, 0x03])
periodic_rate = 10  # in milliseconds
duration = 10000 # in milliseconds

def setup_hw():

        device = app.VehicleSpy.AddSource(hardware_sn) #activate hw
        channel = app.Resolver[f'{device.Source.Identifier} {channel_name} Discovery Channel']
        return channel

def create_channel():

        can_channel_config = Channel_pb2.CANChannel()
        channel = vspyx.Communication.CANChannel.New(can_channel_config)

        can_cluster_config = Cluster_pb2.CANCluster()
        can_cluster_config.ClusterBase.BaudRate = baudrate

        if canfd_protocol:
                can_cluster_config.FDBaudRate = canfd_baudrate
                can_cluster_config.UseFDBaudRate = True

        can_cluster = vspyx.Communication.CANCluster.New(can_cluster_config)

        channel.Initialize(app, channel_name)

        can_cluster.Initialize(app, f'cluster_{channel_name}')
        can_cluster.AddChannel(channel)

        app.Communication.Topology.Channels.Add(channel)
        app.Communication.UserTopology.Channels.Add(channel)
        app.Communication.Topology.Clusters.Add(can_cluster)
        app.Communication.UserTopology.Clusters.Add(can_cluster)

        return channel

def report(point: vspyx.Runtime.Point):
        if not isinstance(point, vspyx.Communication.DataLinkPDUPoint):
                return

        print(f"{point.GetAttribute('ShortName')} {point.GetAttribute('ArbID')}\
                        {point.GetAttribute('Length')} {point.GetAttribute('Payload')}\
                        {point.GetAttribute('ChannelName')} {point.GetAttribute('WireType')}\
                        {point.GetAttribute('DLC')} {point.GetAttribute('Timestamp')}")

def can_frame():

        frame = vspyx.Frames.CANFrameBuilder()
        frame.ArbIDSet(arbid)
        frame.DataSet(data_bytes)
        frame.DLCSet(vspyx.Frames.CANFrameBuilder.CAN_DLToDLC(length,canfd_protocol))
        frame.ExtendedSet(extended_id)
        frame.CANFDSet(canfd_protocol)
        return frame

def connect_to_controller(channel):
        [controller, connector] = channel.NewAttachedController('my_setup', listenOnly=False)
        return controller

def start_stop(tx):
        observer = app.VehicleSpy.PrepareForStart(analysisMode=False)
        observer.OnPoint.Add(report)
        app.VehicleSpy.Start()
        app.VehicleSpy.Scheduler.GetTimingCallback(datetime.timedelta(milliseconds=periodic_rate)).Add(tx)
        app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=duration))
        app.VehicleSpy.Stop()

def submit_tx_event(controller):

        tx_event = lambda scheduler: controller.SubmitEvent(can_frame())
        return tx_event

def main():

        if 'app' not in globals():
                global app
                app = vspyx.Core.Application.New()
                app.Initialize(loadAllModules=True)

        print(f'Version: {app.Version}')

        if hardware_sn is not None:
                channel = setup_hw()
        else:
                channel = create_channel()

        controller = connect_to_controller(channel)
        tx_event = submit_tx_event(controller)
        start_stop(tx_event)

if __name__ == '__main__':
        main()