.. _transmitting-can-frame: Transmitting CAN frame ======================= In this tutorial we will learn how to create and transmit CAN/CANFD frame using simulation or neoVI hardware. 1. Initialize |project| ----------------------- First step we need to initialize a new instance of the :py:class:`vspyx.Core.Application`. .. code:: python import vspyx if 'app' not in globals(): global app app = vspyx.Core.Application.New() app.Initialize(loadAllModules=True) 2. Connecting Channel to Controller: ------------------------------------ Next we need to connect a channel to a controller. a) Hardware Setup `````````````````` To use hardware, first we to activate it, create a controller and connect the desired hardware channel to the controller. :py:class:`vspyx.VehicleSpy.Module.AddSource()` .. code:: python device = app.VehicleSpy.AddSource("hardware sn") # Activates HW channel = app.Resolver[f'{device.Source.Identifier} {"Channel Name"} Discovery Channel'] # Assign a channel from the HW b) Creating CAN Channel (for simulation mode) ````````````````````````````````````````````` On simulation (no hardware), we need to create a new CAN cluster and CAN channel then connect the channel to the controller. We must specify CAN and CANFD baud rate (int) and give a unique name to the new cluster, channel and controller. :py:class:`vspyx.Communication.CANCluster()` and :py:class:`vspyx.Communication.CANChannel()` .. code:: python can_channel_config = Channel_pb2.CANChannel() channel = vspyx.Communication.CANChannel.New(can_channel_config) can_cluster_config = Cluster_pb2.CANCluster() can_cluster_config.ClusterBase.BaudRate = "Enter CAN channel baud rate" if canfd_protocol: # in case if we want to create CAN FD channel can_cluster_config.FDBaudRate = "Enter CAN channel baud rate" can_cluster_config.UseFDBaudRate = True can_cluster = vspyx.Communication.CANCluster.New(can_cluster_config) channel.Initialize(app, "channel name") can_cluster.Initialize(app, "cluster name") can_cluster.AddChannel(channel) app.Communication.Topology.Channels.Add(channel) app.Communication.UserTopology.Channels.Add(channel) app.Communication.Topology.Clusters.Add(can_cluster) app.Communication.UserTopology.Clusters.Add(can_cluster) Now we need to create a controller and connect it to a channel: :py:class:`vspyx.Communication.Channel.NewAttachedController()` .. code:: python [controller, connector] = channel.NewAttachedController('controller name', listenOnly=False) # creates controller and connects it to HW channel 3. Creating CAN Frame ---------------------- After we created a channel, next let's create a CAN/CANFD frame using :py:class:`vspyx.Frames.CANFrameBuilder()`. In this example, we are going to specify the arbid (in hex), databytes (byte array), frame length (int), extended id (bool) and ifCANFD (bool) .. code:: python frame = vspyx.Frames.CANFrameBuilder() frame.ArbIDSet("arbid in hex") frame.DataSet("data bytes as byte array) frame.DLCSet(vspyx.Frames.CANFrameBuilder.CAN_DLToDLC("length","bool canfd protocol")) frame.ExtendedSet("bool extended id") frame.CANFDSet("bool canfd protocol") 4. Creating a Report --------------------- Next we need create a function that prints the CAN frame using :py:class:`vspyx.Communication.DataLinkPDUPoint` .. code:: python def report(point: vspyx.Runtime.Point): if not isinstance(point, vspyx.Communication.DataLinkPDUPoint): return print(f"{point.GetAttribute('ShortName')} {point.GetAttribute('ArbID')}\ {point.GetAttribute('Length')} {point.GetAttribute('Payload')}\ {point.GetAttribute('ChannelName')} {point.GetAttribute('WireType')}\ {point.GetAttribute('DLC')} {point.GetAttribute('Timestamp')}") 5. Creating TX Event -------------------- Using the :py:class:`controller` and the CAN :py:class:`frame` made in step2 and step3, we're going to create a TX Event by submitting the CAN Frame into the Controller: .. code:: python tx_event = lambda scheduler: controller.SubmitEvent(frame) 6. Transmitting the Frame ------------------------- Last step is to go online with the hardware or start the simulation. We're going to use :py:class:`report` function and :py:class:`tx_event` made in step4 and 5. We must specify the periodic TX rate and the duration using :py:class:`vspyx.VehicleSpy.Module.Scheduler()`. .. code:: python observer = app.VehicleSpy.PrepareForStart(analysisMode=False) observer.OnPoint.Add(report) app.VehicleSpy.Start() app.VehicleSpy.Scheduler.GetTimingCallback(datetime.timedelta(milliseconds="tx rate")).Add(tx_event) app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds="tx duration")) app.VehicleSpy.Stop() Complete Example: ----------------- .. code:: python import vspyx import datetime from intrepidcs.vspyx.rpc.Communication import Cluster_pb2, Channel_pb2 hardware_sn = None channel_name = 'HSCAN' canfd_baudrate = 2000000 baudrate = 500000 canfd_protocol = False extended_id = True arbid = 0x123 length = 3 data_bytes = ([0x01, 0xff, 0x03]) periodic_rate = 10 # in milliseconds duration = 10000 # in milliseconds def setup_hw(): device = app.VehicleSpy.AddSource(hardware_sn) #activate hw channel = app.Resolver[f'{device.Source.Identifier} {channel_name} Discovery Channel'] return channel def create_channel(): can_channel_config = Channel_pb2.CANChannel() channel = vspyx.Communication.CANChannel.New(can_channel_config) can_cluster_config = Cluster_pb2.CANCluster() can_cluster_config.ClusterBase.BaudRate = baudrate if canfd_protocol: can_cluster_config.FDBaudRate = canfd_baudrate can_cluster_config.UseFDBaudRate = True can_cluster = vspyx.Communication.CANCluster.New(can_cluster_config) channel.Initialize(app, channel_name) can_cluster.Initialize(app, f'cluster_{channel_name}') can_cluster.AddChannel(channel) app.Communication.Topology.Channels.Add(channel) app.Communication.UserTopology.Channels.Add(channel) app.Communication.Topology.Clusters.Add(can_cluster) app.Communication.UserTopology.Clusters.Add(can_cluster) return channel def report(point: vspyx.Runtime.Point): if not isinstance(point, vspyx.Communication.DataLinkPDUPoint): return print(f"{point.GetAttribute('ShortName')} {point.GetAttribute('ArbID')}\ {point.GetAttribute('Length')} {point.GetAttribute('Payload')}\ {point.GetAttribute('ChannelName')} {point.GetAttribute('WireType')}\ {point.GetAttribute('DLC')} {point.GetAttribute('Timestamp')}") def can_frame(): frame = vspyx.Frames.CANFrameBuilder() frame.ArbIDSet(arbid) frame.DataSet(data_bytes) frame.DLCSet(vspyx.Frames.CANFrameBuilder.CAN_DLToDLC(length,canfd_protocol)) frame.ExtendedSet(extended_id) frame.CANFDSet(canfd_protocol) return frame def connect_to_controller(channel): [controller, connector] = channel.NewAttachedController('my_setup', listenOnly=False) return controller def start_stop(tx): observer = app.VehicleSpy.PrepareForStart(analysisMode=False) observer.OnPoint.Add(report) app.VehicleSpy.Start() app.VehicleSpy.Scheduler.GetTimingCallback(datetime.timedelta(milliseconds=periodic_rate)).Add(tx) app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=duration)) app.VehicleSpy.Stop() def submit_tx_event(controller): tx_event = lambda scheduler: controller.SubmitEvent(can_frame()) return tx_event def main(): if 'app' not in globals(): global app app = vspyx.Core.Application.New() app.Initialize(loadAllModules=True) print(f'Version: {app.Version}') if hardware_sn is not None: channel = setup_hw() else: channel = create_channel() controller = connect_to_controller(channel) tx_event = submit_tx_event(controller) start_stop(tx_event) if __name__ == '__main__': main()