Socket Adaptor (SoAD) Gateway
In this tutorial we will learn how to gateway CAN frames to UDP using simulation
1. Initialize Vehicle Spy X
First step we need to initialize a new instance of the vspyx.Core.Application
.
import vspyx
if 'app' not in globals():
global app
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)
2. Create Ethernet Channel:
On simulation (no hardware), we need to create a new Ethernet cluster and Ethernet channel then connect the channel to a controller.
We must give a unique name to the new cluster, channel and controller.
vspyx.Communication.EthernetChannel()
and vspyx.Communication.EthernetCluster()
def ether_channel():
ether_channel = vspyx.Communication.EthernetChannel.New()
cluster = vspyx.Communication.EthernetCluster.New()
ether_channel.Initialize(app, 'udp_channel')
cluster.Initialize(app, 'cluster')
cluster.AddChannel(ether_channel)
app.Communication.Topology.Channels.Add(ether_channel)
app.Communication.UserTopology.Channels.Add(ether_channel)
app.Communication.Topology.Clusters.Add(cluster)
app.Communication.UserTopology.Clusters.Add(cluster)
return ether_channel
Now we need to create a controller and connect it to the channel: vspyx.Communication.Channel.NewAttachedController()
def connect_to_controller(ether_channel):
[ether_controller, ether_connector] = ether_channel.NewAttachedController('Gateway Output', listenOnly=False)
return ether_controller
3. Create UDP Frame
After we created a channel, next let’s create a UDP frame using vspyx.Frames.EthernetFrameBuilder.FrameBuilder()
.
In this example, we are going to specify the Destination/Source MAC Address (string), Destination/Source IP Address (string)
Destination/Source Port (int), and Length (int).
def ether_frame(data: bytearray):
if len(data) == 0:
return None
raw = vspyx.Frames.EthernetFrameBuilder.FrameBuilder()
raw.SourceMACAddress('00:FC:70:FF:AA:BB')
raw.DestinationMACAddress('FF:FF:FF:FF:FF:FF')
ip = raw.IPv4()
ip.SourceIPAddress('10.0.1.2')
ip.DestinationIPAddress('255.255.255.255')
ip.TotalLength(len(data) + 8 + 20)
udp_frame = ip.UDP()
udp_frame.SourcePort(51280)
udp_frame.DestinationPort(5555)
udp_frame.Length(len(data) + 8)
tot_len = len(data) + 8 + 20 + 14
if tot_len < 60: # Pad short frames
data.extend([0] * (60 - tot_len))
udp_frame.AppendPayload(vspyx.Core.BytesView(data))
return udp_frame
4. Creating TX Event
Using ether_controller
and UDP udp_frame
made in step2 and step3, we’re going to create a tx event by submitting the UDP frame into the controller.
data = []
def tx_event(ether_controller):
def tx(Scheduler: vspyx.Runtime.Scheduler):
global data
frame = ether_frame(bytearray(data))
if frame is not None:
ether_controller.SubmitEvent(frame)
data = []
return tx
5. Add Data Source to Simulate CAN Traffic
Next we need to add data source handler (vsb) to simulate CAN traffic vspyx.VehicleSpy.Module.AddSource()
def traffic(traffic_path, database_path):
app.VehicleSpy.AddDatabase(database_path)
source_handle = app.VehicleSpy.AddSource(traffic_path)
return source_handle
6. Creating a Report
Next we need to create a function (OnPoint) to access CAN frame and copy the data into a global list global data
.
def report(point: vspyx.Runtime.Point):
if isinstance(point, vspyx.Communication.CANDataLinkPDUPoint): # if CAN traffic
arb = point.GetAttribute('ArbID')
payload = point.GetAttribute('Payload')
data.extend([0, arb >> 16, arb >> 8 & 0xFF, arb & 0xFF, 0, 0, 0, len(payload)])
data.extend(payload)
log_traffic(point.GetAttribute) # print
elif isinstance(point, vspyx.Communication.DataLinkPDUPoint): # UDP traffic
log_traffic(point.GetAttribute)
7. Start the Gateway
Last step is to go online with CAN simulation and gateway the traffic to UDP. We’re going to use report
function and tx_event
.
def start_stop(tx_event):
source_handle = traffic('path to vsb', 'path to database')
source_handle.Source.Loop = False
app.VehicleSpy.PrepareForStart()
app.VehicleSpy.Scheduler.GetTimingCallback(timedelta(milliseconds=2)).Add(tx_event)
app.VehicleSpy.Observer.OnPoint.Add(report)
app.VehicleSpy.Start()
app.VehicleSpy.Scheduler.Wait(source_handle.Source.BufferEndEvent)
app.VehicleSpy.Stop()
Complete Example:
import vspyx
from datetime import timedelta
global data
def ether_channel():
ether_channel = vspyx.Communication.EthernetChannel.New()
cluster = vspyx.Communication.EthernetCluster.New()
ether_channel.Initialize(app, 'UDP')
cluster.Initialize(app, 'UDP Cluster')
cluster.AddChannel(ether_channel)
app.Communication.Topology.Channels.Add(ether_channel)
app.Communication.UserTopology.Channels.Add(ether_channel)
app.Communication.Topology.Clusters.Add(cluster)
app.Communication.UserTopology.Clusters.Add(cluster)
return ether_channel
def connect_to_controller(ether_channel):
[ether_controller, ether_connector] = ether_channel.NewAttachedController('Gateway Output', listenOnly=False)
return ether_controller
def ether_frame(data: bytearray):
if len(data) == 0:
return None
raw = vspyx.Frames.EthernetFrameBuilder.FrameBuilder()
raw.SourceMACAddress('00:FC:70:FF:AA:BB')
raw.DestinationMACAddress('FF:FF:FF:FF:FF:FF')
ip = raw.IPv4()
ip.SourceIPAddress('10.0.1.2')
ip.DestinationIPAddress('255.255.255.255')
ip.TotalLength(len(data) + 8 + 20)
udp_frame = ip.UDP()
udp_frame.SourcePort(51280)
udp_frame.DestinationPort(5555)
udp_frame.Length(len(data) + 8)
tot_len = len(data) + 8 + 20 + 14
if tot_len < 60: # Pad short frames
data.extend([0] * (60 - tot_len))
udp_frame.AppendPayload(vspyx.Core.BytesView(data))
return udp_frame
data = []
def tx_event(ether_controller):
def tx(Scheduler: vspyx.Runtime.Scheduler):
global data
frame = ether_frame(bytearray(data))
if frame is not None:
ether_controller.SubmitEvent(frame)
data = []
return tx
def traffic(traffic_path, database_path):
app.VehicleSpy.AddDatabase(database_path)
traffic = app.VehicleSpy.AddSource(traffic_path)
return traffic
def report(point: vspyx.Runtime.Point):
if isinstance(point, vspyx.Communication.CANDataLinkPDUPoint):
arb = point.GetAttribute('ArbID')
payload = point.GetAttribute('Payload')
data.extend([0, arb >> 16, arb >> 8 & 0xFF, arb & 0xFF, 0, 0, 0, len(payload)])
data.extend(payload)
log_traffic(point.GetAttribute)
elif isinstance(point, vspyx.Communication.DataLinkPDUPoint):
log_traffic(point.GetAttribute)
def start_stop(tx):
source_handle = traffic('FSAE.vsb', 'FSAE.vsdb')
source_handle.Source.Loop = False
app.VehicleSpy.PrepareForStart()
app.VehicleSpy.Scheduler.GetTimingCallback(timedelta(milliseconds=2)).Add(tx)
app.VehicleSpy.Observer.OnPoint.Add(report)
app.VehicleSpy.Start()
app.VehicleSpy.Scheduler.Wait(source_handle.Source.BufferEndEvent)
app.VehicleSpy.Stop()
def log_traffic(attribute):
if attribute('ChannelName') == 'HS CAN':
vspyx.Core.Log("CAN Traffic").o(f"{attribute('ShortName')} {hex(attribute('ArbID'))}\
{attribute('Length')} {attribute('Payload')} {attribute('ChannelName')}\
{attribute('DLC')} {attribute('Timestamp')}")
elif attribute('ChannelName') == 'UDP':
vspyx.Core.Log("UDP Traffic").o(f"{hex(attribute('eth.source'))} {hex(attribute('eth.destination'))}\
{attribute('ip.srcport')} {attribute('ip.dstport')} {hex(attribute('eth.type'))} {hex(attribute('eth.payload'))}")
def main():
if 'app' not in globals():
global app
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=False)
app.ModuleManager.GetModule('Communication')
app.ModuleManager.GetModule('VehicleSpy')
channel = ether_channel()
controller = connect_to_controller(channel)
event = tx_event(controller)
start_stop(event)
if __name__ == '__main__':
main()