Getting signal values

This tutorial will cover using vspyx.Runtime.Environment.OnPoint and vspyx.Runtime.Trace.OnPoint to get every signal value or all values for a specific signal in a buffer file.

For this tutorial, we will assume that you have a buffer file named buffer.vsb and database named database.dbc. These file could be any of buffer or database formats supported by Vehicle Spy X.

Setup

The first step is to initialize a new instance of the vspyx.Core.Application.

import vspyx

app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)

Next, we can load the database with vspyx.VehicleSpy.Module.AddDatabase() and the buffer with vspyx.VehicleSpy.Module.AddSource().

db = app.VehicleSpy.AddDatabase('database.dbc')
buffer = app.VehicleSpy.AddSource('buffer.vsb')

Disable looping, as we only want to run through the buffer once

buffer.Source.Loop = False

Once the buffer and database are loaded we need to connect the vspyx.Communication.Controller provided by the buffer to the vspyx.Communication.Channel provided by the database. This is done in the Vehicle Spy X UI by dragging a line between the two in the Topology view. Here, we will use the vspyx.Communication.Module.ConnectControllerToChannel() function.

Note

This will need to be modified for your specific buffer/database. Here we will connect a controller named HSCAN to the database channel. Since DBC files do not name their channels, the channel created by vspyx.Communication.DBC is named after the file.

vspyx.Core.Resolver is used to look up objects by their name.

controller = app.Resolver[f"{buffer.Source.Identifier} HSCAN Controller"]
channel = app.Resolver["database"]
app.Communication.ConnectControllerToChannel(controller, channel)

Now that the channel and connector are hooked up, we are ready to go online.

vspyx.VehicleSpy.Module.Observer is the vspyx.Runtime.Environment responsible for holding the “latest” value of all signals and calculating statistics for them.

vspyx.VehicleSpy.Module.PrepareForStart() creates and returns a new vspyx.Runtime.Environment, the vspyx.VehicleSpy.Module.Observer for this run of the application. With this, we can register callbacks or even add our own instances of vspyx.Runtime.Component.

Getting every signal update

To begin, we will register a callback to vspyx.Runtime.Environment.OnPoint. Instances of vspyx.Runtime.Point represent a sample of an object that Vehicle Spy X processed. For example, most messages are defined as an vspyx.Communication.ISignalIPDU. When an incoming frame matches an vspyx.Communication.ISignalIPDU, an instance of vspyx.Communication.ISignalIPDUPoint is created. The vspyx.Runtime.Environment.OnPoint is fired for every new vspyx.Runtime.Point that is created.

def on_point(point):
  if not isinstance(point, vspyx.Communication.ISignalPoint):
      return

  print(f"[{point.Timestamp}] {point.GetAttribute("ShortName")}" +
        f" = {point.PhysicalValue} {point.UnitString}{'' if point.Valid else ' -- Warning: Invalid'}")

observer = app.VehicleSpy.PrepareForStart()
observer.OnPoint.Add(on_point)
app.VehicleSpy.Start()
app.VehicleSpy.Scheduler.Wait(buffer.Source.BufferEndEvent)
app.VehicleSpy.Stop()

If you run this, you should see all the signal values printed out to your console.

Warning

When a signal value exceeds the defined range of the signal, whether this be linear bounds or an undefined enum (TEXTTABLE) value, vspyx.Runtime.SignalPoint.Valid will be False.

It is important to check for invalid signals, as vspyx.Runtime.SignalPoint.PhysicalValue will equal vspyx.Runtime.SignalPoint.InternalValue in this case, which will be a vspyx.Core.Numeric representing the invalid state as it was seen on the wire.

For enums, this makes sense. For numeric values, this means the value will have no scaling or offset applied!

Example output:

[2018/08/26 13:16:59:244134] X = -0.14 g
[2018/08/26 13:16:59:244134] Y = 0.017 g
[2018/08/26 13:16:59:244134] Z = 0.028 g
[2018/08/26 13:16:59:244324] FL Damper Height Zero = -5.165 mm
[2018/08/26 13:16:59:244324] FR Damper Height Zero = -5.031 mm
[2018/08/26 13:16:59:247420] FL Velocity = 0.0 m/s
[2018/08/26 13:16:59:247420] FR Velocity = 0.0 m/s
[2018/08/26 13:16:59:247420] FL Speed = 0.0 kph
[2018/08/26 13:16:59:247420] FR Speed = 0.0 kph
[2018/08/26 13:16:59:248516] RPM = 2935
[2018/08/26 13:16:59:248516] Seconds = 519 s

Getting every update of a specific signal

Depending on your database and buffer, this can be quite a lot of data! What if we wanted only the points of a particular signal? We could look for that signal in our on_point handler, but this could potentially be expensive. Instead, we can register a callback just for the points of the signal we’re interested in.

In Vehicle Spy X, a vspyx.Runtime.Traceable is an abstract object that can produce a point. Definitions of frames, signals, or PDUs from a database are all examples of vspyx.Runtime.Traceable. vspyx.Runtime.Environment manages an instance of vspyx.Runtime.Trace for every vspyx.Runtime.Traceable. vspyx.Runtime.Trace contains information about the history of the vspyx.Runtime.Traceable. It also has an vspyx.Runtime.Trace.OnPoint event which will be raised when a new instance of a point is created for the corresponding vspyx.Runtime.Traceable, along with statistics about its history.

def on_trace_point(point: vspyx.Runtime.Point, statistics: vspyx.Runtime.Trace.Statistics):
    print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" +
          f" = {point.PhysicalValue} {point.UnitString}" +
          (
            #If the signal type is an Enum, show the previous value
            "(Prev: {})".format(statistics.BeforeLastPoint.PhysicalValue)
            if isinstance(statistics.BeforeLastPoint.PhysicalValue, str)

            #If the signal type is Numeric, calculate the difference and sign
            else " ({}{})".format
            (
              '+' if statistics.BeforeLastPoint.PhysicalValue < point.PhysicalValue else '',
              point.PhysicalValue - statistics.BeforeLastPoint.PhysicalValue
            )
          )
          if statistics.BeforeLastPoint is not None else "")

#observer.OnPoint.Add(on_point)
trace = observer.GetTrace(signal)
trace.OnPoint.Add(on_trace_point)

Example numeric output:

[2018/08/26 13:10:45:264806] RPM = 2858  (+2)
[2018/08/26 13:10:45:275039] RPM = 2873  (+15)
[2018/08/26 13:10:45:285278] RPM = 2874  (+1)
[2018/08/26 13:10:45:295645] RPM = 2865  (-9)
[2018/08/26 13:10:45:305753] RPM = 2844  (-21)
[2018/08/26 13:10:45:316124] RPM = 2828  (-16)
[2018/08/26 13:10:45:326287] RPM = 2841  (+13)
[2018/08/26 13:10:45:336342] RPM = 2871  (+30)

Example enum output:

[2018/08/26 13:09:12:531009] Fix Type = None (Prev: None)
[2018/08/26 13:09:13:031048] Fix Type = 3D (Prev: None)
[2018/08/26 13:09:13:531038] Fix Type = 3D (Prev: 3D)

Going faster than realtime

You may notice that our simulation is running in “realtime” – that is, advancing at the rate of one second per second (mod the effects of relativity, which is ignored for everyone’s sanity). Since this is a simulation and not connected to real hardware, we can go faster than realtime to post-process the buffer quickly. This can be done by setting the optional analysisMode parameter of vspyx.VehicleSpy.Module.PrepareForStart().

Note

When in analysis mode, timestamp comparisons will reflect the simulated time, not wall time. vspyx.Runtime.Scheduler.Now will advance faster than realtime, as will the the values for vspyx.Runtime.Point.Timestamp. If your script makes any timing comparisons, be sure to use these values and not Python’s builtin time module so that your script will run correctly both in analysis mode and realtime mode.

#app.VehicleSpy.PrepareForStart()
app.VehicleSpy.PrepareForStart(analysisMode=True)

Completed example

import vspyx, argparse
from typing import List, Tuple, Optional

def on_point(point: vspyx.Runtime.Point):
    if not isinstance(point, vspyx.Communication.ISignalPoint):
        return

    print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" +
          f" = {point.PhysicalValue} {point.UnitString}{'' if point.Valid else ' -- Warning: Invalid'}")

def on_trace_point(point: vspyx.Runtime.Point, statistics: vspyx.Runtime.Trace.Statistics):
    print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" +
          f" = {point.PhysicalValue} {point.UnitString}" +
          (
            #If the signal type is an Enum, show the previous value
            "(Prev: {})".format(statistics.BeforeLastPoint.PhysicalValue)
            if isinstance(statistics.BeforeLastPoint.PhysicalValue, str)

            #If the signal type is Numeric, calculate the difference and sign
            else " ({}{})".format
            (
              '+' if statistics.BeforeLastPoint.PhysicalValue < point.PhysicalValue else '',
              point.PhysicalValue - statistics.BeforeLastPoint.PhysicalValue
            )
          )
          if statistics.BeforeLastPoint is not None else "")

def decode(database_path: str, buffer_path: str,
    controller_network_name_to_channel: List[Tuple[str, str]],
    signal_ids: Optional[List[str]]=None):

    app = vspyx.Core.Application.New()
    app.Initialize(loadAllModules=True)

    app.VehicleSpy.AddDatabase(database_path)
    buffer = app.VehicleSpy.AddSource(buffer_path)
    buffer.Source.Loop = False

    for controller_network_name, channel_id in controller_network_name_to_channel:

        controller = \
            app.Resolver[f"{buffer.Source.Identifier} {controller_network_name} Controller"]
        channel = app.Resolver[channel_id]

        app.Communication.ConnectControllerToChannel(controller, channel)

    observer = app.VehicleSpy.PrepareForStart(analysisMode=True)

    if signal_ids is not None:
        for signal_id in signal_ids:
            signal = app.Resolver[signal_id]
            trace = observer.GetTrace(signal)
            trace.OnPoint.Add(on_trace_point)
    else:
        observer.OnPoint.Add(on_point)

    app.VehicleSpy.Start()
    app.VehicleSpy.Scheduler.Wait(buffer.Source.BufferEndEvent)
    app.VehicleSpy.Stop()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Decode signals using Vehicle Spy X')
    parser.add_argument('database', type=str, help='Path to database')
    parser.add_argument('buffer', type=str, help='Path to buffer')
    parser.add_argument('--mapping', nargs=2, action='append',
        metavar=('"Buffer Network Name"', '"Channel Name"'), help='Connector to channel mapping')
    parser.add_argument('--signal', metavar=('"Signal Name"'), action='append', help='Signal to view')

    args = parser.parse_args()
    decode(args.database, args.buffer, args.mapping, args.signal)