Getting signal values
This tutorial will cover using vspyx.Runtime.Environment.OnPoint
and vspyx.Runtime.Trace.OnPoint
to get every signal value or all values for a specific signal in a buffer file.
For this tutorial, we will assume that you have a buffer file named buffer.vsb
and database named database.dbc
.
These file could be any of buffer or database formats supported by Vehicle Spy X.
Setup
The first step is to initialize a new instance of the vspyx.Core.Application
.
import vspyx
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)
Next, we can load the database with vspyx.VehicleSpy.Module.AddDatabase()
and the buffer with vspyx.VehicleSpy.Module.AddSource()
.
db = app.VehicleSpy.AddDatabase('database.dbc')
buffer = app.VehicleSpy.AddSource('buffer.vsb')
Disable looping, as we only want to run through the buffer once
buffer.Source.Loop = False
Once the buffer and database are loaded we need to connect the vspyx.Communication.Controller
provided by the buffer to the vspyx.Communication.Channel
provided by the database.
This is done in the Vehicle Spy X UI by dragging a line between the two in the Topology view.
Here, we will use the vspyx.Communication.Module.ConnectControllerToChannel()
function.
Note
This will need to be modified for your specific buffer/database.
Here we will connect a controller named HSCAN
to the database
channel.
Since DBC files do not name their channels, the channel created by vspyx.Communication.DBC
is named after the file.
vspyx.Core.Resolver
is used to look up objects by their name.
controller = app.Resolver[f"{buffer.Source.Identifier} HSCAN Controller"]
channel = app.Resolver["database"]
app.Communication.ConnectControllerToChannel(controller, channel)
Now that the channel and connector are hooked up, we are ready to go online.
vspyx.VehicleSpy.Module.Observer
is the vspyx.Runtime.Environment
responsible for holding the “latest” value of all signals and calculating statistics for them.
vspyx.VehicleSpy.Module.PrepareForStart()
creates and returns a new vspyx.Runtime.Environment
, the vspyx.VehicleSpy.Module.Observer
for this run of the application.
With this, we can register callbacks or even add our own instances of vspyx.Runtime.Component
.
Getting every signal update
To begin, we will register a callback to vspyx.Runtime.Environment.OnPoint
.
Instances of vspyx.Runtime.Point
represent a sample of an object that Vehicle Spy X processed.
For example, most messages are defined as an vspyx.Communication.ISignalIPDU
.
When an incoming frame matches an vspyx.Communication.ISignalIPDU
, an instance of vspyx.Communication.ISignalIPDUPoint
is created.
The vspyx.Runtime.Environment.OnPoint
is fired for every new vspyx.Runtime.Point
that is created.
def on_point(point):
if not isinstance(point, vspyx.Communication.ISignalPoint):
return
print(f"[{point.Timestamp}] {point.GetAttribute("ShortName")}" +
f" = {point.PhysicalValue} {point.UnitString}{'' if point.Valid else ' -- Warning: Invalid'}")
observer = app.VehicleSpy.PrepareForStart()
observer.OnPoint.Add(on_point)
app.VehicleSpy.Start()
app.VehicleSpy.Scheduler.Wait(buffer.Source.BufferEndEvent)
app.VehicleSpy.Stop()
If you run this, you should see all the signal values printed out to your console.
Warning
When a signal value exceeds the defined range of the signal, whether this be linear bounds or an undefined enum (TEXTTABLE) value, vspyx.Runtime.SignalPoint.Valid
will be False
.
It is important to check for invalid signals, as vspyx.Runtime.SignalPoint.PhysicalValue
will equal vspyx.Runtime.SignalPoint.InternalValue
in this case, which will be a vspyx.Core.Numeric
representing the invalid state as it was seen on the wire.
For enums, this makes sense. For numeric values, this means the value will have no scaling or offset applied!
Example output:
[2018/08/26 13:16:59:244134] X = -0.14 g
[2018/08/26 13:16:59:244134] Y = 0.017 g
[2018/08/26 13:16:59:244134] Z = 0.028 g
[2018/08/26 13:16:59:244324] FL Damper Height Zero = -5.165 mm
[2018/08/26 13:16:59:244324] FR Damper Height Zero = -5.031 mm
[2018/08/26 13:16:59:247420] FL Velocity = 0.0 m/s
[2018/08/26 13:16:59:247420] FR Velocity = 0.0 m/s
[2018/08/26 13:16:59:247420] FL Speed = 0.0 kph
[2018/08/26 13:16:59:247420] FR Speed = 0.0 kph
[2018/08/26 13:16:59:248516] RPM = 2935
[2018/08/26 13:16:59:248516] Seconds = 519 s
Getting every update of a specific signal
Depending on your database and buffer, this can be quite a lot of data!
What if we wanted only the points of a particular signal?
We could look for that signal in our on_point
handler, but this could potentially be expensive.
Instead, we can register a callback just for the points of the signal we’re interested in.
In Vehicle Spy X, a vspyx.Runtime.Traceable
is an abstract object that can produce a point.
Definitions of frames, signals, or PDUs from a database are all examples of vspyx.Runtime.Traceable
.
vspyx.Runtime.Environment
manages an instance of vspyx.Runtime.Trace
for every vspyx.Runtime.Traceable
.
vspyx.Runtime.Trace
contains information about the history of the vspyx.Runtime.Traceable
.
It also has an vspyx.Runtime.Trace.OnPoint
event which will be raised when a new instance of a point is created for the corresponding vspyx.Runtime.Traceable
, along with statistics about its history.
def on_trace_point(point: vspyx.Runtime.Point, statistics: vspyx.Runtime.Trace.Statistics):
print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" +
f" = {point.PhysicalValue} {point.UnitString}" +
(
#If the signal type is an Enum, show the previous value
"(Prev: {})".format(statistics.BeforeLastPoint.PhysicalValue)
if isinstance(statistics.BeforeLastPoint.PhysicalValue, str)
#If the signal type is Numeric, calculate the difference and sign
else " ({}{})".format
(
'+' if statistics.BeforeLastPoint.PhysicalValue < point.PhysicalValue else '',
point.PhysicalValue - statistics.BeforeLastPoint.PhysicalValue
)
)
if statistics.BeforeLastPoint is not None else "")
#observer.OnPoint.Add(on_point)
trace = observer.GetTrace(signal)
trace.OnPoint.Add(on_trace_point)
Example numeric output:
[2018/08/26 13:10:45:264806] RPM = 2858 (+2)
[2018/08/26 13:10:45:275039] RPM = 2873 (+15)
[2018/08/26 13:10:45:285278] RPM = 2874 (+1)
[2018/08/26 13:10:45:295645] RPM = 2865 (-9)
[2018/08/26 13:10:45:305753] RPM = 2844 (-21)
[2018/08/26 13:10:45:316124] RPM = 2828 (-16)
[2018/08/26 13:10:45:326287] RPM = 2841 (+13)
[2018/08/26 13:10:45:336342] RPM = 2871 (+30)
Example enum output:
[2018/08/26 13:09:12:531009] Fix Type = None (Prev: None)
[2018/08/26 13:09:13:031048] Fix Type = 3D (Prev: None)
[2018/08/26 13:09:13:531038] Fix Type = 3D (Prev: 3D)
Going faster than realtime
You may notice that our simulation is running in “realtime” – that is, advancing at the rate of one second per second (mod the effects of relativity, which is ignored for everyone’s sanity).
Since this is a simulation and not connected to real hardware, we can go faster than realtime to post-process the buffer quickly.
This can be done by setting the optional analysisMode
parameter of vspyx.VehicleSpy.Module.PrepareForStart()
.
Note
When in analysis mode, timestamp comparisons will reflect the simulated time, not wall time.
vspyx.Runtime.Scheduler.Now
will advance faster than realtime, as will the the values for vspyx.Runtime.Point.Timestamp
.
If your script makes any timing comparisons, be sure to use these values and not Python’s builtin time
module so that your script will run correctly both in analysis mode and realtime mode.
#app.VehicleSpy.PrepareForStart()
app.VehicleSpy.PrepareForStart(analysisMode=True)
Completed example
import vspyx, argparse
from typing import List, Tuple, Optional
def on_point(point: vspyx.Runtime.Point):
if not isinstance(point, vspyx.Communication.ISignalPoint):
return
print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" +
f" = {point.PhysicalValue} {point.UnitString}{'' if point.Valid else ' -- Warning: Invalid'}")
def on_trace_point(point: vspyx.Runtime.Point, statistics: vspyx.Runtime.Trace.Statistics):
print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" +
f" = {point.PhysicalValue} {point.UnitString}" +
(
#If the signal type is an Enum, show the previous value
"(Prev: {})".format(statistics.BeforeLastPoint.PhysicalValue)
if isinstance(statistics.BeforeLastPoint.PhysicalValue, str)
#If the signal type is Numeric, calculate the difference and sign
else " ({}{})".format
(
'+' if statistics.BeforeLastPoint.PhysicalValue < point.PhysicalValue else '',
point.PhysicalValue - statistics.BeforeLastPoint.PhysicalValue
)
)
if statistics.BeforeLastPoint is not None else "")
def decode(database_path: str, buffer_path: str,
controller_network_name_to_channel: List[Tuple[str, str]],
signal_ids: Optional[List[str]]=None):
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)
app.VehicleSpy.AddDatabase(database_path)
buffer = app.VehicleSpy.AddSource(buffer_path)
buffer.Source.Loop = False
for controller_network_name, channel_id in controller_network_name_to_channel:
controller = \
app.Resolver[f"{buffer.Source.Identifier} {controller_network_name} Controller"]
channel = app.Resolver[channel_id]
app.Communication.ConnectControllerToChannel(controller, channel)
observer = app.VehicleSpy.PrepareForStart(analysisMode=True)
if signal_ids is not None:
for signal_id in signal_ids:
signal = app.Resolver[signal_id]
trace = observer.GetTrace(signal)
trace.OnPoint.Add(on_trace_point)
else:
observer.OnPoint.Add(on_point)
app.VehicleSpy.Start()
app.VehicleSpy.Scheduler.Wait(buffer.Source.BufferEndEvent)
app.VehicleSpy.Stop()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Decode signals using Vehicle Spy X')
parser.add_argument('database', type=str, help='Path to database')
parser.add_argument('buffer', type=str, help='Path to buffer')
parser.add_argument('--mapping', nargs=2, action='append',
metavar=('"Buffer Network Name"', '"Channel Name"'), help='Connector to channel mapping')
parser.add_argument('--signal', metavar=('"Signal Name"'), action='append', help='Signal to view')
args = parser.parse_args()
decode(args.database, args.buffer, args.mapping, args.signal)