.. _signalvalues: Getting signal values ===================== This tutorial will cover using :py:attr:`vspyx.Runtime.Environment.OnPoint` and :py:attr:`vspyx.Runtime.Trace.OnPoint` to get every signal value or all values for a specific signal in a buffer file. For this tutorial, we will assume that you have a buffer file named ``buffer.vsb`` and database named ``database.dbc``. These file could be any of buffer or database formats supported by |project|. Setup ----- The first step is to initialize a new instance of the :py:class:`vspyx.Core.Application`. .. code:: python import vspyx app = vspyx.Core.Application.New() app.Initialize(loadAllModules=True) Next, we can load the database with :py:func:`vspyx.VehicleSpy.Module.AddDatabase` and the buffer with :py:func:`vspyx.VehicleSpy.Module.AddSource`. .. code:: python db = app.VehicleSpy.AddDatabase('database.dbc') buffer = app.VehicleSpy.AddSource('buffer.vsb') Disable looping, as we only want to run through the buffer once .. code:: python buffer.Source.Loop = False Once the buffer and database are loaded we need to connect the :py:class:`vspyx.Communication.Controller` provided by the buffer to the :py:class:`vspyx.Communication.Channel` provided by the database. This is done in the |project| UI by dragging a line between the two in the Topology view. Here, we will use the :py:func:`vspyx.Communication.Module.ConnectControllerToChannel` function. .. note:: This will need to be modified for your specific buffer/database. Here we will connect a controller named ``HSCAN`` to the ``database`` channel. Since DBC files do not name their channels, the channel created by :py:class:`vspyx.Communication.DBC` is named after the file. :py:class:`vspyx.Core.Resolver` is used to look up objects by their name. .. code:: python controller = app.Resolver[f"{buffer.Source.Identifier} HSCAN Controller"] channel = app.Resolver["database"] app.Communication.ConnectControllerToChannel(controller, channel) Now that the channel and connector are hooked up, we are ready to go online. :py:attr:`vspyx.VehicleSpy.Module.Observer` is the :py:class:`vspyx.Runtime.Environment` responsible for holding the "latest" value of all signals and calculating statistics for them. :py:func:`vspyx.VehicleSpy.Module.PrepareForStart` creates and returns a new :py:class:`vspyx.Runtime.Environment`, the :py:attr:`vspyx.VehicleSpy.Module.Observer` for this run of the application. With this, we can register callbacks or even add our own instances of :py:class:`vspyx.Runtime.Component`. Getting every signal update --------------------------- To begin, we will register a callback to :py:attr:`vspyx.Runtime.Environment.OnPoint`. Instances of :py:class:`vspyx.Runtime.Point` represent a sample of an object that |project| processed. For example, most messages are defined as an :py:class:`vspyx.Communication.ISignalIPDU`. When an incoming frame matches an :py:class:`vspyx.Communication.ISignalIPDU`, an instance of :py:class:`vspyx.Communication.ISignalIPDUPoint` is created. The :py:attr:`vspyx.Runtime.Environment.OnPoint` is fired for every new :py:class:`vspyx.Runtime.Point` that is created. .. code:: python def on_point(point): if not isinstance(point, vspyx.Communication.ISignalPoint): return print(f"[{point.Timestamp}] {point.GetAttribute("ShortName")}" + f" = {point.PhysicalValue} {point.UnitString}{'' if point.Valid else ' -- Warning: Invalid'}") observer = app.VehicleSpy.PrepareForStart() observer.OnPoint.Add(on_point) app.VehicleSpy.Start() app.VehicleSpy.Scheduler.Wait(buffer.Source.BufferEndEvent) app.VehicleSpy.Stop() If you run this, you should see all the signal values printed out to your console. .. warning:: When a signal value exceeds the defined range of the signal, whether this be linear bounds or an undefined enum (TEXTTABLE) value, :py:attr:`vspyx.Runtime.SignalPoint.Valid` will be ``False``. It is important to check for invalid signals, as :py:attr:`vspyx.Runtime.SignalPoint.PhysicalValue` will equal :py:attr:`vspyx.Runtime.SignalPoint.InternalValue` in this case, which will be a :py:class:`vspyx.Core.Numeric` representing the invalid state as it was seen on the wire. For enums, this makes sense. For numeric values, this means the value will have no scaling or offset applied! Example output:: [2018/08/26 13:16:59:244134] X = -0.14 g [2018/08/26 13:16:59:244134] Y = 0.017 g [2018/08/26 13:16:59:244134] Z = 0.028 g [2018/08/26 13:16:59:244324] FL Damper Height Zero = -5.165 mm [2018/08/26 13:16:59:244324] FR Damper Height Zero = -5.031 mm [2018/08/26 13:16:59:247420] FL Velocity = 0.0 m/s [2018/08/26 13:16:59:247420] FR Velocity = 0.0 m/s [2018/08/26 13:16:59:247420] FL Speed = 0.0 kph [2018/08/26 13:16:59:247420] FR Speed = 0.0 kph [2018/08/26 13:16:59:248516] RPM = 2935 [2018/08/26 13:16:59:248516] Seconds = 519 s Getting every update of a specific signal ----------------------------------------- Depending on your database and buffer, this can be quite a lot of data! What if we wanted only the points of a particular signal? We could look for that signal in our ``on_point`` handler, but this could potentially be expensive. Instead, we can register a callback just for the points of the signal we're interested in. In |project|, a :py:class:`vspyx.Runtime.Traceable` is an abstract object that can produce a point. Definitions of frames, signals, or PDUs from a database are all examples of :py:class:`vspyx.Runtime.Traceable`. :py:class:`vspyx.Runtime.Environment` manages an instance of :py:class:`vspyx.Runtime.Trace` for every :py:class:`vspyx.Runtime.Traceable`. :py:class:`vspyx.Runtime.Trace` contains information about the history of the :py:class:`vspyx.Runtime.Traceable`. It also has an :py:attr:`vspyx.Runtime.Trace.OnPoint` event which will be raised when a new instance of a point is created for the corresponding :py:class:`vspyx.Runtime.Traceable`, along with statistics about its history. .. code:: python def on_trace_point(point: vspyx.Runtime.Point, statistics: vspyx.Runtime.Trace.Statistics): print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" + f" = {point.PhysicalValue} {point.UnitString}" + ( #If the signal type is an Enum, show the previous value "(Prev: {})".format(statistics.BeforeLastPoint.PhysicalValue) if isinstance(statistics.BeforeLastPoint.PhysicalValue, str) #If the signal type is Numeric, calculate the difference and sign else " ({}{})".format ( '+' if statistics.BeforeLastPoint.PhysicalValue < point.PhysicalValue else '', point.PhysicalValue - statistics.BeforeLastPoint.PhysicalValue ) ) if statistics.BeforeLastPoint is not None else "") #observer.OnPoint.Add(on_point) trace = observer.GetTrace(signal) trace.OnPoint.Add(on_trace_point) Example numeric output:: [2018/08/26 13:10:45:264806] RPM = 2858 (+2) [2018/08/26 13:10:45:275039] RPM = 2873 (+15) [2018/08/26 13:10:45:285278] RPM = 2874 (+1) [2018/08/26 13:10:45:295645] RPM = 2865 (-9) [2018/08/26 13:10:45:305753] RPM = 2844 (-21) [2018/08/26 13:10:45:316124] RPM = 2828 (-16) [2018/08/26 13:10:45:326287] RPM = 2841 (+13) [2018/08/26 13:10:45:336342] RPM = 2871 (+30) Example enum output:: [2018/08/26 13:09:12:531009] Fix Type = None (Prev: None) [2018/08/26 13:09:13:031048] Fix Type = 3D (Prev: None) [2018/08/26 13:09:13:531038] Fix Type = 3D (Prev: 3D) Going faster than realtime -------------------------- You may notice that our simulation is running in "realtime" -- that is, advancing at the rate of one second per second (mod the effects of relativity, which is ignored for everyone's sanity). Since this is a simulation and not connected to real hardware, we can go faster than realtime to post-process the buffer quickly. This can be done by setting the optional ``analysisMode`` parameter of :py:func:`vspyx.VehicleSpy.Module.PrepareForStart`. .. note:: When in analysis mode, timestamp comparisons will reflect the simulated time, not wall time. :py:attr:`vspyx.Runtime.Scheduler.Now` will advance faster than realtime, as will the the values for :py:attr:`vspyx.Runtime.Point.Timestamp`. If your script makes any timing comparisons, be sure to use these values and not Python's builtin ``time`` module so that your script will run correctly both in analysis mode and realtime mode. .. code:: python #app.VehicleSpy.PrepareForStart() app.VehicleSpy.PrepareForStart(analysisMode=True) Completed example ----------------- .. code:: python import vspyx, argparse from typing import List, Tuple, Optional def on_point(point: vspyx.Runtime.Point): if not isinstance(point, vspyx.Communication.ISignalPoint): return print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" + f" = {point.PhysicalValue} {point.UnitString}{'' if point.Valid else ' -- Warning: Invalid'}") def on_trace_point(point: vspyx.Runtime.Point, statistics: vspyx.Runtime.Trace.Statistics): print(f"[{point.Timestamp}] {point.GetAttribute('ShortName')}" + f" = {point.PhysicalValue} {point.UnitString}" + ( #If the signal type is an Enum, show the previous value "(Prev: {})".format(statistics.BeforeLastPoint.PhysicalValue) if isinstance(statistics.BeforeLastPoint.PhysicalValue, str) #If the signal type is Numeric, calculate the difference and sign else " ({}{})".format ( '+' if statistics.BeforeLastPoint.PhysicalValue < point.PhysicalValue else '', point.PhysicalValue - statistics.BeforeLastPoint.PhysicalValue ) ) if statistics.BeforeLastPoint is not None else "") def decode(database_path: str, buffer_path: str, controller_network_name_to_channel: List[Tuple[str, str]], signal_ids: Optional[List[str]]=None): app = vspyx.Core.Application.New() app.Initialize(loadAllModules=True) app.VehicleSpy.AddDatabase(database_path) buffer = app.VehicleSpy.AddSource(buffer_path) buffer.Source.Loop = False for controller_network_name, channel_id in controller_network_name_to_channel: controller = \ app.Resolver[f"{buffer.Source.Identifier} {controller_network_name} Controller"] channel = app.Resolver[channel_id] app.Communication.ConnectControllerToChannel(controller, channel) observer = app.VehicleSpy.PrepareForStart(analysisMode=True) if signal_ids is not None: for signal_id in signal_ids: signal = app.Resolver[signal_id] trace = observer.GetTrace(signal) trace.OnPoint.Add(on_trace_point) else: observer.OnPoint.Add(on_point) app.VehicleSpy.Start() app.VehicleSpy.Scheduler.Wait(buffer.Source.BufferEndEvent) app.VehicleSpy.Stop() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Decode signals using Vehicle Spy X') parser.add_argument('database', type=str, help='Path to database') parser.add_argument('buffer', type=str, help='Path to buffer') parser.add_argument('--mapping', nargs=2, action='append', metavar=('"Buffer Network Name"', '"Channel Name"'), help='Connector to channel mapping') parser.add_argument('--signal', metavar=('"Signal Name"'), action='append', help='Signal to view') args = parser.parse_args() decode(args.database, args.buffer, args.mapping, args.signal)