Socket Adaptor (SoAD) Gateway

In this tutorial we will learn how to gateway CAN frames to UDP using simulation

1. Initialize Vehicle Spy X

First step we need to initialize a new instance of the vspyx.Core.Application.

import vspyx

if 'app' not in globals():
        global app
        app = vspyx.Core.Application.New()
        app.Initialize(loadAllModules=True)

2. Create Ethernet Channel:

On simulation (no hardware), we need to create a new Ethernet cluster and Ethernet channel then connect the channel to a controller. We must give a unique name to the new cluster, channel and controller. vspyx.Communication.EthernetChannel() and vspyx.Communication.EthernetCluster()

def ether_channel():
        ether_channel = vspyx.Communication.EthernetChannel.New()
        cluster = vspyx.Communication.EthernetCluster.New()
        ether_channel.Initialize(app, 'udp_channel')
        cluster.Initialize(app, 'cluster')
        cluster.AddChannel(ether_channel)
        app.Communication.Topology.Channels.Add(ether_channel)
        app.Communication.UserTopology.Channels.Add(ether_channel)
        app.Communication.Topology.Clusters.Add(cluster)
        app.Communication.UserTopology.Clusters.Add(cluster)
        return ether_channel

Now we need to create a controller and connect it to the channel: vspyx.Communication.Channel.NewAttachedController()

def connect_to_controller(ether_channel):
        [ether_controller, ether_connector] = ether_channel.NewAttachedController('Gateway Output', listenOnly=False)
        return ether_controller

3. Create UDP Frame

After we created a channel, next let’s create a UDP frame using vspyx.Frames.EthernetFrameBuilder.FrameBuilder(). In this example, we are going to specify the Destination/Source MAC Address (string), Destination/Source IP Address (string) Destination/Source Port (int), and Length (int).

def ether_frame(data: bytearray):
        if len(data) == 0:
                return None
        raw = vspyx.Frames.EthernetFrameBuilder.FrameBuilder()
        raw.SourceMACAddress('00:FC:70:FF:AA:BB')
        raw.DestinationMACAddress('FF:FF:FF:FF:FF:FF')
        ip = raw.IPv4()
        ip.SourceIPAddress('10.0.1.2')
        ip.DestinationIPAddress('255.255.255.255')
        ip.TotalLength(len(data) + 8 + 20)
        udp_frame = ip.UDP()
        udp_frame.SourcePort(51280)
        udp_frame.DestinationPort(5555)
        udp_frame.Length(len(data) + 8)
        tot_len = len(data) + 8 + 20 + 14
        if tot_len < 60:  # Pad short frames
                data.extend([0] * (60 - tot_len))
        udp_frame.AppendPayload(vspyx.Core.BytesView(data))
        return udp_frame

4. Creating TX Event

Using ether_controller and UDP udp_frame made in step2 and step3, we’re going to create a tx event by submitting the UDP frame into the controller.

data = []
def tx_event(ether_controller):
        def tx(Scheduler: vspyx.Runtime.Scheduler):
                global data
                frame = ether_frame(bytearray(data))
                if frame is not None:
                        ether_controller.SubmitEvent(frame)
                data = []
        return tx

5. Add Data Source to Simulate CAN Traffic

Next we need to add data source handler (vsb) to simulate CAN traffic vspyx.VehicleSpy.Module.AddSource()

def traffic(traffic_path, database_path):
        app.VehicleSpy.AddDatabase(database_path)
        source_handle = app.VehicleSpy.AddSource(traffic_path)
        return source_handle

6. Creating a Report

Next we need to create a function (OnPoint) to access CAN frame and copy the data into a global list global data.

def report(point: vspyx.Runtime.Point):
        if isinstance(point, vspyx.Communication.CANDataLinkPDUPoint): # if CAN traffic
                arb = point.GetAttribute('ArbID')
                payload = point.GetAttribute('Payload')
                data.extend([0, arb >> 16, arb >> 8 & 0xFF, arb & 0xFF, 0, 0, 0, len(payload)])
                data.extend(payload)
                log_traffic(point.GetAttribute) # print
        elif isinstance(point, vspyx.Communication.DataLinkPDUPoint): # UDP traffic
                log_traffic(point.GetAttribute)

7. Start the Gateway

Last step is to go online with CAN simulation and gateway the traffic to UDP. We’re going to use report function and tx_event.

def start_stop(tx_event):
        source_handle = traffic('path to vsb', 'path to database')
        source_handle.Source.Loop = False
        app.VehicleSpy.PrepareForStart()
        app.VehicleSpy.Scheduler.GetTimingCallback(timedelta(milliseconds=2)).Add(tx_event)
        app.VehicleSpy.Observer.OnPoint.Add(report)
        app.VehicleSpy.Start()
        app.VehicleSpy.Scheduler.Wait(source_handle.Source.BufferEndEvent)
        app.VehicleSpy.Stop()

Complete Example:

import vspyx
from datetime import timedelta

global data
def ether_channel():
        ether_channel = vspyx.Communication.EthernetChannel.New()
        cluster = vspyx.Communication.EthernetCluster.New()
        ether_channel.Initialize(app, 'UDP')
        cluster.Initialize(app, 'UDP Cluster')
        cluster.AddChannel(ether_channel)
        app.Communication.Topology.Channels.Add(ether_channel)
        app.Communication.UserTopology.Channels.Add(ether_channel)
        app.Communication.Topology.Clusters.Add(cluster)
        app.Communication.UserTopology.Clusters.Add(cluster)
        return ether_channel

def connect_to_controller(ether_channel):
        [ether_controller, ether_connector] = ether_channel.NewAttachedController('Gateway Output', listenOnly=False)
        return ether_controller

def ether_frame(data: bytearray):
        if len(data) == 0:
                return None
        raw = vspyx.Frames.EthernetFrameBuilder.FrameBuilder()
        raw.SourceMACAddress('00:FC:70:FF:AA:BB')
        raw.DestinationMACAddress('FF:FF:FF:FF:FF:FF')
        ip = raw.IPv4()
        ip.SourceIPAddress('10.0.1.2')
        ip.DestinationIPAddress('255.255.255.255')
        ip.TotalLength(len(data) + 8 + 20)
        udp_frame = ip.UDP()
        udp_frame.SourcePort(51280)
        udp_frame.DestinationPort(5555)
        udp_frame.Length(len(data) + 8)
        tot_len = len(data) + 8 + 20 + 14
        if tot_len < 60:  # Pad short frames
                data.extend([0] * (60 - tot_len))
        udp_frame.AppendPayload(vspyx.Core.BytesView(data))
        return udp_frame

data = []
def tx_event(ether_controller):
        def tx(Scheduler: vspyx.Runtime.Scheduler):
                global data
                frame = ether_frame(bytearray(data))
                if frame is not None:
                        ether_controller.SubmitEvent(frame)
                data = []
        return tx

def traffic(traffic_path, database_path):
        app.VehicleSpy.AddDatabase(database_path)
        traffic = app.VehicleSpy.AddSource(traffic_path)
        return traffic

def report(point: vspyx.Runtime.Point):
        if isinstance(point, vspyx.Communication.CANDataLinkPDUPoint):
                arb = point.GetAttribute('ArbID')
                payload = point.GetAttribute('Payload')
                data.extend([0, arb >> 16, arb >> 8 & 0xFF, arb & 0xFF, 0, 0, 0, len(payload)])
                data.extend(payload)
                log_traffic(point.GetAttribute)
        elif isinstance(point, vspyx.Communication.DataLinkPDUPoint):
                log_traffic(point.GetAttribute)

def start_stop(tx):
        source_handle = traffic('FSAE.vsb', 'FSAE.vsdb')
        source_handle.Source.Loop = False
        app.VehicleSpy.PrepareForStart()
        app.VehicleSpy.Scheduler.GetTimingCallback(timedelta(milliseconds=2)).Add(tx)
        app.VehicleSpy.Observer.OnPoint.Add(report)
        app.VehicleSpy.Start()
        app.VehicleSpy.Scheduler.Wait(source_handle.Source.BufferEndEvent)
        app.VehicleSpy.Stop()

def log_traffic(attribute):
                if attribute('ChannelName') == 'HS CAN':
                        vspyx.Core.Log("CAN Traffic").o(f"{attribute('ShortName')} {hex(attribute('ArbID'))}\
                        {attribute('Length')} {attribute('Payload')} {attribute('ChannelName')}\
                        {attribute('DLC')} {attribute('Timestamp')}")
                elif attribute('ChannelName') == 'UDP':
                        vspyx.Core.Log("UDP Traffic").o(f"{hex(attribute('eth.source'))} {hex(attribute('eth.destination'))}\
                        {attribute('ip.srcport')} {attribute('ip.dstport')} {hex(attribute('eth.type'))} {hex(attribute('eth.payload'))}")
def main():
        if 'app' not in globals():
                global app
                app = vspyx.Core.Application.New()
                app.Initialize(loadAllModules=False)

        app.ModuleManager.GetModule('Communication')
        app.ModuleManager.GetModule('VehicleSpy')

        channel = ether_channel()
        controller = connect_to_controller(channel)
        event = tx_event(controller)
        start_stop(event)

if __name__ == '__main__':
        main()