.. _soad-gateway: Socket Adaptor (SoAD) Gateway ============================== In this tutorial we will learn how to gateway CAN frames to UDP using simulation 1. Initialize |project| ----------------------- First step we need to initialize a new instance of the :py:class:`vspyx.Core.Application`. .. code:: python import vspyx if 'app' not in globals(): global app app = vspyx.Core.Application.New() app.Initialize(loadAllModules=True) 2. Create Ethernet Channel: --------------------------- On simulation (no hardware), we need to create a new Ethernet cluster and Ethernet channel then connect the channel to a controller. We must give a unique name to the new cluster, channel and controller. :py:class:`vspyx.Communication.EthernetChannel()` and :py:class:`vspyx.Communication.EthernetCluster()` .. code:: python def ether_channel(): ether_channel = vspyx.Communication.EthernetChannel.New() cluster = vspyx.Communication.EthernetCluster.New() ether_channel.Initialize(app, 'udp_channel') cluster.Initialize(app, 'cluster') cluster.AddChannel(ether_channel) app.Communication.Topology.Channels.Add(ether_channel) app.Communication.UserTopology.Channels.Add(ether_channel) app.Communication.Topology.Clusters.Add(cluster) app.Communication.UserTopology.Clusters.Add(cluster) return ether_channel Now we need to create a controller and connect it to the channel: :py:class:`vspyx.Communication.Channel.NewAttachedController()` .. code:: python def connect_to_controller(ether_channel): [ether_controller, ether_connector] = ether_channel.NewAttachedController('Gateway Output', listenOnly=False) return ether_controller 3. Create UDP Frame -------------------- After we created a channel, next let's create a UDP frame using :py:class:`vspyx.Frames.EthernetFrameBuilder.FrameBuilder()`. In this example, we are going to specify the Destination/Source MAC Address (string), Destination/Source IP Address (string) Destination/Source Port (int), and Length (int). .. code:: python def ether_frame(data: bytearray): if len(data) == 0: return None raw = vspyx.Frames.EthernetFrameBuilder.FrameBuilder() raw.SourceMACAddress('00:FC:70:FF:AA:BB') raw.DestinationMACAddress('FF:FF:FF:FF:FF:FF') ip = raw.IPv4() ip.SourceIPAddress('10.0.1.2') ip.DestinationIPAddress('255.255.255.255') ip.TotalLength(len(data) + 8 + 20) udp_frame = ip.UDP() udp_frame.SourcePort(51280) udp_frame.DestinationPort(5555) udp_frame.Length(len(data) + 8) tot_len = len(data) + 8 + 20 + 14 if tot_len < 60: # Pad short frames data.extend([0] * (60 - tot_len)) udp_frame.AppendPayload(vspyx.Core.BytesView(data)) return udp_frame 4. Creating TX Event -------------------- Using :py:class:`ether_controller` and UDP :py:class:`udp_frame` made in step2 and step3, we're going to create a tx event by submitting the UDP frame into the controller. .. code:: python data = [] def tx_event(ether_controller): def tx(Scheduler: vspyx.Runtime.Scheduler): global data frame = ether_frame(bytearray(data)) if frame is not None: ether_controller.SubmitEvent(frame) data = [] return tx 5. Add Data Source to Simulate CAN Traffic ------------------------------------------- Next we need to add data source handler (vsb) to simulate CAN traffic :py:class:`vspyx.VehicleSpy.Module.AddSource()` .. code:: python def traffic(traffic_path, database_path): app.VehicleSpy.AddDatabase(database_path) source_handle = app.VehicleSpy.AddSource(traffic_path) return source_handle 6. Creating a Report --------------------- Next we need to create a function (OnPoint) to access CAN frame and copy the data into a global list :py:class:`global data`. .. code:: python def report(point: vspyx.Runtime.Point): if isinstance(point, vspyx.Communication.CANDataLinkPDUPoint): # if CAN traffic arb = point.GetAttribute('ArbID') payload = point.GetAttribute('Payload') data.extend([0, arb >> 16, arb >> 8 & 0xFF, arb & 0xFF, 0, 0, 0, len(payload)]) data.extend(payload) log_traffic(point.GetAttribute) # print elif isinstance(point, vspyx.Communication.DataLinkPDUPoint): # UDP traffic log_traffic(point.GetAttribute) 7. Start the Gateway --------------------- Last step is to go online with CAN simulation and gateway the traffic to UDP. We're going to use :py:class:`report` function and :py:class:`tx_event`. .. code:: python def start_stop(tx_event): source_handle = traffic('path to vsb', 'path to database') source_handle.Source.Loop = False app.VehicleSpy.PrepareForStart() app.VehicleSpy.Scheduler.GetTimingCallback(timedelta(milliseconds=2)).Add(tx_event) app.VehicleSpy.Observer.OnPoint.Add(report) app.VehicleSpy.Start() app.VehicleSpy.Scheduler.Wait(source_handle.Source.BufferEndEvent) app.VehicleSpy.Stop() Complete Example: ----------------- .. code:: python import vspyx from datetime import timedelta global data def ether_channel(): ether_channel = vspyx.Communication.EthernetChannel.New() cluster = vspyx.Communication.EthernetCluster.New() ether_channel.Initialize(app, 'UDP') cluster.Initialize(app, 'UDP Cluster') cluster.AddChannel(ether_channel) app.Communication.Topology.Channels.Add(ether_channel) app.Communication.UserTopology.Channels.Add(ether_channel) app.Communication.Topology.Clusters.Add(cluster) app.Communication.UserTopology.Clusters.Add(cluster) return ether_channel def connect_to_controller(ether_channel): [ether_controller, ether_connector] = ether_channel.NewAttachedController('Gateway Output', listenOnly=False) return ether_controller def ether_frame(data: bytearray): if len(data) == 0: return None raw = vspyx.Frames.EthernetFrameBuilder.FrameBuilder() raw.SourceMACAddress('00:FC:70:FF:AA:BB') raw.DestinationMACAddress('FF:FF:FF:FF:FF:FF') ip = raw.IPv4() ip.SourceIPAddress('10.0.1.2') ip.DestinationIPAddress('255.255.255.255') ip.TotalLength(len(data) + 8 + 20) udp_frame = ip.UDP() udp_frame.SourcePort(51280) udp_frame.DestinationPort(5555) udp_frame.Length(len(data) + 8) tot_len = len(data) + 8 + 20 + 14 if tot_len < 60: # Pad short frames data.extend([0] * (60 - tot_len)) udp_frame.AppendPayload(vspyx.Core.BytesView(data)) return udp_frame data = [] def tx_event(ether_controller): def tx(Scheduler: vspyx.Runtime.Scheduler): global data frame = ether_frame(bytearray(data)) if frame is not None: ether_controller.SubmitEvent(frame) data = [] return tx def traffic(traffic_path, database_path): app.VehicleSpy.AddDatabase(database_path) traffic = app.VehicleSpy.AddSource(traffic_path) return traffic def report(point: vspyx.Runtime.Point): if isinstance(point, vspyx.Communication.CANDataLinkPDUPoint): arb = point.GetAttribute('ArbID') payload = point.GetAttribute('Payload') data.extend([0, arb >> 16, arb >> 8 & 0xFF, arb & 0xFF, 0, 0, 0, len(payload)]) data.extend(payload) log_traffic(point.GetAttribute) elif isinstance(point, vspyx.Communication.DataLinkPDUPoint): log_traffic(point.GetAttribute) def start_stop(tx): source_handle = traffic('FSAE.vsb', 'FSAE.vsdb') source_handle.Source.Loop = False app.VehicleSpy.PrepareForStart() app.VehicleSpy.Scheduler.GetTimingCallback(timedelta(milliseconds=2)).Add(tx) app.VehicleSpy.Observer.OnPoint.Add(report) app.VehicleSpy.Start() app.VehicleSpy.Scheduler.Wait(source_handle.Source.BufferEndEvent) app.VehicleSpy.Stop() def log_traffic(attribute): if attribute('ChannelName') == 'HS CAN': vspyx.Core.Log("CAN Traffic").o(f"{attribute('ShortName')} {hex(attribute('ArbID'))}\ {attribute('Length')} {attribute('Payload')} {attribute('ChannelName')}\ {attribute('DLC')} {attribute('Timestamp')}") elif attribute('ChannelName') == 'UDP': vspyx.Core.Log("UDP Traffic").o(f"{hex(attribute('eth.source'))} {hex(attribute('eth.destination'))}\ {attribute('ip.srcport')} {attribute('ip.dstport')} {hex(attribute('eth.type'))} {hex(attribute('eth.payload'))}") def main(): if 'app' not in globals(): global app app = vspyx.Core.Application.New() app.Initialize(loadAllModules=False) app.ModuleManager.GetModule('Communication') app.ModuleManager.GetModule('VehicleSpy') channel = ether_channel() controller = connect_to_controller(channel) event = tx_event(controller) start_stop(event) if __name__ == '__main__': main()