import time, datetime, enum
import vspyx
from intrepidcs.vspyx.rpc.TCPIP import Network_pb2
# Constants
TEST_INTERFACE_TYPE_DOIP = "DoIP"
TEST_INTERFACE_TYPE_ISO15765_CAN = "ISO-15765+CAN"
TEST_INTERFACE_TYPE_HSFZ = "HSFZ"
# Test interface selction
TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_DOIP
#TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_ISO15765_CAN
#TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_HSFZ
if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP):
# Ethernet interface config
IF_AUTO_DETECT = True # Automatically detect network interface by name; otherwise, the device's MAC address must be provided
IF_NAME = "Local Area Connection 3" # Client network interface device name
IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:64" # Client network virtual interface MAC address
IF_IP_ADDRESS = "192.168.7.100" # Client network interface IP address
IF_IP_MASK = "255.255.255.0" # Client network interface IP netmask
# DoIP Config
BROADCAST_ADDRESS = "192.168.7.255" # Specify the appropriate DoIP discovery broadcast address
TESTER_ADDRESS = 0x3584 # Specify the client external tester address
ECU_ADDRESS_TYPE = 0
ECU_AE = None # Specify the ECU server address (N_AE)
ROUTING_TYPE = vspyx.Diagnostics.ISO13400_2.RoutingActivationTypes.Default # Select the routing type
ROUTING_EXTRA_ISO = None # Specify a 32-bit value to override the ISO reserved field in the routing activation request
ROUTING_EXTRA_OEM = None # Specify a 32-bit value to include the OEM reserved field in the routing activation request
ROUTING_EXTRA_ISO_EXPECTED = None # Specify an expected value of the ISO reserved field in the routing activation response
ROUTING_EXTRA_OEM_EXPECTED = None # Specify an expected value of the OEM reserved field in the routing activation response
# Test Application Config
TEST_MANUAL_ARP_ENTRY_ADD = False # Specify True to manually add the ARP entry for the target ECU
TEST_ECU_MAC_ADDRESS = "00:FC:70:00:00:02" # Manual MAC address
TEST_MANUAL_ENTITY_ADD = False # Specify True to manually configure for use with the entity identified using IP Address [TEST_MANUAL_ECU_IP] and Logical Address [TEST_MANUAL_ECU_LA]
TEST_MANUAL_ECU_IP = "192.168.7.2" # Manual entity IP address
TEST_MANUAL_ECU_LA = 0xDEAD # Manual entity logical address
TEST_AUTO_PROBE_WAIT = True # Wait for client to send a probe
TEST_CONFIRM_POWER_STATUS = True # Specify True to confirm the ECU's power mode via DoIP function GetPowerMode()
TEST_CONFIRM_SYNC_STATUS = True # Specify True to confirm the ECU's VIN/GID synchronization status via DoIP function GetVehicleIdentification()
TEST_CONFIRM_ENTITY_STATUS = True # Specify True to confirm the ECU's entity info via DoIP function GetEntityStatusInfo()
TEST_VEHICLE_QUERY_EID = vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD8])) # Specify a value to test query by EID
TEST_VEHICLE_QUERY_VIN = vspyx.Core.BytesView(bytes("1Z0123456789ABCD1", "ascii")) # Specify a value to test query by VIN
DEBUG_IF_TRACE = False # Specify True to trace raw interface frames (for debugging only)
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN):
class ISO15765_AddressTypes(enum.Enum):
Normal = 1
NormalFixed = 2
Extended = 3
IF_DEVICE = "88432" # ValueCAN 3 - 88432
#IF_DEVICE = "CY7347" # neoVI FIRE 2 - CY7347
IF_NAME = "%s %s"%(IF_LIB, IF_DEVICE)
IF_CHANNEL = "HSCAN"
NETWORK_ADDRESS_TYPE = ISO15765_AddressTypes.Normal
TESTER_ADDRESS = 0x45 # Specify the client external tester address
TESTER_CAN_ADDRESS = 0x18ff456
TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN29Bit
#TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD29Bit
ECU_ADDRESS = 0x23 # Specify the ECU server address
ECU_CAN_ADDRESS = 0x123
ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN11Bit
#ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD11Bit
TESTER_DATA_LENGTH = 8 # Specify the client external tester DLC (in octets)
TESTER_STMIN = 0
TESTER_BS = 0
ECU_AE = None # Specify the ECU server address (N_AE)
ECU_STMIN_MIN = 0
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ):
# Ethernet interface config
IF_AUTO_DETECT = True # Automatically detect network interface by name; otherwise, the device's MAC address must be provided
#IF_NAME = "Local Area Connection 3" # Client network interface device name
IF_NAME = "Bottom Intel I218-V" # Client network interface device name
IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:64" # Client network virtual interface MAC address
IF_IP_ADDRESS = "192.168.7.100" # Client network interface IP address
IF_IP_MASK = "255.255.255.0" # Client network interface IP netmask
# DoIP Config
BROADCAST_ADDRESS = "192.168.7.255" # Specify the appropriate DoIP discovery broadcast address
TESTER_ADDRESS = 0x84 # Specify the client external tester address
ECU_ADDRESS_TYPE = 0
ECU_AE = None # Specify the ECU server address (N_AE)
ROUTING_TYPE = None # Not supported in HSFZ
ROUTING_EXTRA_ISO = None # Not supported in HSFZ
ROUTING_EXTRA_OEM = None # Not supported in HSFZ
ROUTING_EXTRA_ISO_EXPECTED = None # Not supported in HSFZ
ROUTING_EXTRA_OEM_EXPECTED = None # Not supported in HSFZ
# Test Application Config
TEST_MANUAL_ARP_ENTRY_ADD = False # Specify True to manually add the ARP entry for the target ECU
TEST_ECU_MAC_ADDRESS = "00:FC:70:00:00:02" # Manual MAC address
TEST_MANUAL_ENTITY_ADD = False # Specify True to manually configure for use with the entity identified using IP Address [TEST_MANUAL_ECU_IP] and Logical Address [TEST_MANUAL_ECU_LA]
TEST_MANUAL_ECU_IP = "192.168.7.2" # Manual entity IP address
TEST_MANUAL_ECU_LA = 1 # Manual entity logical address
TEST_AUTO_PROBE_WAIT = True # Wait for client to send a probe
TEST_CONFIRM_POWER_STATUS = True # Specify True to confirm the ECU's power mode via DoIP function GetPowerMode()
TEST_CONFIRM_SYNC_STATUS = False # Specify True to confirm the ECU's VIN/GID synchronization status via DoIP function GetVehicleIdentification()
TEST_CONFIRM_ENTITY_STATUS = True # Specify True to confirm the ECU's entity info via DoIP function GetEntityStatusInfo()
TEST_VEHICLE_QUERY_EID = vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD8])) # Specify a value to test query by EID
TEST_VEHICLE_QUERY_VIN = vspyx.Core.BytesView(bytes("1Z0123456789ABCD1", "ascii")) # Specify a value to test query by VIN
DEBUG_IF_TRACE = False # Specify True to trace raw interface frames (for debugging only)
class Signal:
def __init__(self, id, value, size = None):
self.Id = id
if (value is None):
self.Value = bytearray()
if (size is None):
self.Size = 0
else:
self.Size = size
else:
self.Value = value
self.Size = len(value)
# List of arbitrary DID definitions - feel free to add your own Signal(DID [int], value [byte array], size [int, number of octets])
# This list will be used by the example DID read response handler for parsing DID parameter data
_signals = [ Signal(0xF101, None, 2), Signal(0xF107, None, 4), Signal(0x2001, None, 1) ]
# Application Setup
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)
def AutoDetectDevice(name, interfaceDetectionAttempts, targetType, childMatch = None):
source = None
while (interfaceDetectionAttempts > 0):
try:
interfaceDetectionAttempts -= 1
source = app.Resolver[IF_NAME]
break
except:
print("Waiting for interface %s..."%(IF_NAME))
time.sleep(1.0)
if (source is None):
print("Unable to resolve device %s"%(IF_NAME))
exit()
channel = None
source = app.VehicleSpy.AddSource(source.Description)
for child in source.Discovery.Children:
if (isinstance(child, targetType)):
if (childMatch is None or (childMatch in child.ID)):
channel = child
break
return channel
def ToHexString(octets, delimiter = ":"):
value = ""
for octet in octets:
value += "{:02X}".format(octet) + delimiter
value = value.strip(delimiter)
return value
def HexOrNone(number, width= 2):
return ("{:0" + str(width) + "X}").format(number) if (number is not None) else None
if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)):
if (IF_AUTO_DETECT):
channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.EthernetChannel)
else:
source = app.VehicleSpy.AddSource("pcap " + IF_SOFT_MAC_ADDRESS)
source.SearchFor()
print(IF_NAME + " :", source.SourceState)
if (source.SourceState != source.SourceState.Ready):
print("Source is not ready!")
exit()
channel = app.Resolver[IF_NAME + " Discovery Channel"]
if (channel is None):
print("Unable to initialize device: %s"%(IF_NAME))
exit()
interfaceConfig = Network_pb2.Interface()
interfaceConfig.MACAddress = IF_SOFT_MAC_ADDRESS
interfaceConfig.IPv4.Address = IF_IP_ADDRESS
interfaceConfig.IPv4.Netmask = IF_IP_MASK
interface = vspyx.TCPIP.Interface.New(interfaceConfig)
interface.Initialize(app, "Interface")
interface.Attach(channel)
if (DEBUG_IF_TRACE):
def _Interface_OnEgress(frame):
print(frame.Data.bytes().hex())
interface.OnEgress.Add(_Interface_OnEgress)
network = vspyx.TCPIP.Network.New()
network.Initialize(app, "Network")
network.AddInterface(interface)
class EntityInfo:
IpAddress = None
Info = None
targetIpAddress = None
targetAddress = None
target = EntityInfo()
waitHandle = vspyx.Core.Event()
# Setup DoIP Transport layer
def _OnDoIPNack(source, nackCode):
print("Received nack code: ", nackCode)
def _OnVehicleAnnouncement(ipAddress, entity):
global targetIpAddress
global targetAddress
print("ECU detected at IP address %s with logical address %s, VIN= %s, EID= %s, GID= %s, ActivationRequirement= %s, SyncStatus= %s"%(ipAddress, hex(entity.Address), bytes(entity.VIN).decode("ascii"), ToHexString(entity.EID), ToHexString(entity.GID), entity.ActivationRequirement.name, entity.SyncStatus.name if (entity.SyncStatus is not None) else None))
if (targetIpAddress is None):
target.IpAddress = ipAddress
target.Info = entity
targetIpAddress = target.IpAddress
targetAddress = entity.Address
waitHandle.Set()
def _OnRoutingActivationRequested(args):
args.ActivationType = ROUTING_TYPE
args.ISOReservedData_Req = ROUTING_EXTRA_ISO
args.OEMReservedData_Req = ROUTING_EXTRA_OEM
activationType = args.ActivationType.name if (args.ActivationType is not None) else None
print("Route activation: Requesting; activation type %s for %s with SA %s Req[ISO: %s; OEM= %s]"%(activationType, args.IPAddress, hex(args.Address), HexOrNone(args.ISOReservedData_Req, 8), HexOrNone(args.OEMReservedData_Req, 8)))
return None
def _OnRoutingActivationResponse(args):
if ((ROUTING_EXTRA_ISO_EXPECTED is not None) and (ROUTING_EXTRA_ISO_EXPECTED != args.ISOReservedData_Rsp)):
print("Route activation: Expected ISO reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_ISO_EXPECTED, 8), HexOrNone(args.ISOReservedData_Rsp, 8)))
if (ROUTING_EXTRA_OEM_EXPECTED != args.OEMReservedData_Rsp):
print("Route activation: Expected OEM reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_OEM_EXPECTED, 8), HexOrNone(args.OEMReservedData_Rsp, 8)))
result = args.Result.name if (args.Result is not None) else None
activationType = args.ActivationType.name if (args.ActivationType is not None) else None
print("Route activation: Returned %s; activation type %s for %s with SA %s Rsp[ISO: %s; OEM= %s]"%(result, activationType, args.IPAddress, hex(args.Address), HexOrNone(args.ISOReservedData_Rsp, 8), HexOrNone(args.OEMReservedData_Rsp, 8)))
return None
if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP):
transportLayer = vspyx.Diagnostics.ISO13400_2.New()
transportLayer.Initialize(app, "DoIP")
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ):
transportLayer = vspyx.Diagnostics.ISO13400_2.New(False, vspyx.Diagnostics.ISO13400_2.ProtocolVersions.Legacy_HSFZ)
transportLayer.Initialize(app, "HSFZ")
transportLayer.Attach(network)
transportLayer.OnDoIPNack = _OnDoIPNack
transportLayer.OnVehicleAnnouncement = _OnVehicleAnnouncement
transportLayer.OnRoutingActivationRequested = _OnRoutingActivationRequested
transportLayer.OnRoutingActivationResponse = _OnRoutingActivationResponse
transportLayer.SetBroadcastAddress(BROADCAST_ADDRESS)
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN):
channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.CANChannel, IF_CHANNEL)
if (channel is None):
print("Unable to initialize device: %s"%(IF_NAME))
exit()
interface = channel.NewISO11898_1Interface()
transportLayer = vspyx.Communication.ISO15765_2.New(TESTER_DATA_LENGTH, 0)
transportLayer.Initialize(app, "ISO 15765-2")
transportLayer.Attach(interface)
# Note: Extended + RemoteDiagnostics is not possible
mType = vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics if (ECU_AE is None) else vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.RemoteDiagnostics
if (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Normal):
transportLayer.AddRxNormalAddress(mType, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, TESTER_STMIN, TESTER_BS)
transportLayer.AddTxNormalAddress(mType, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, ECU_STMIN_MIN)
elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.NormalFixed):
transportLayer.AddRxFixedAddress(mType, ECU_ADDRESS, TESTER_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, False, TESTER_STMIN, TESTER_BS)
transportLayer.AddTxFixedAddress(mType, TESTER_ADDRESS, ECU_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, False, ECU_STMIN_MIN)
elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Extended):
transportLayer.AddRxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, None, TESTER_STMIN, TESTER_BS)
transportLayer.AddTxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, None, ECU_STMIN_MIN)
# Setup UDS Session layer
sessionParameters = vspyx.Diagnostics.ISO14229_2.Parameters()
# sessionParameters.DeltaP2 = datetime.timedelta(milliseconds= 200)
# sessionParameters.DeltaP6 = datetime.timedelta(milliseconds= 200)
# sessionParameters.P2_server_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P2star_server_max = datetime.timedelta(milliseconds= 5000)
# sessionParameters.P2_client_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P6_client_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P2star_client_max = datetime.timedelta(milliseconds= 5000)
# sessionParameters.P6star_client_max = datetime.timedelta(milliseconds= 5000)
session = vspyx.Diagnostics.ISO14229_2.NewClient(sessionParameters)
session.Initialize(app, "Session")
session.Attach(transportLayer)
# Setup UDS Application layer
udsInterface = vspyx.Diagnostics.ISO14229_1ClientApplicationLayerProtocol.New()
udsInterface.Initialize(app, "UDS Client")
udsInterface.Attach(session)
def _OnUnsolicitedResponse(message):
sid = int(message.SID)
print("Unsolicited Response: SID= %s\t>>>\t"%(hex(sid)), end= "")
if(sid == 0x62):
HandleDIDsResponse(message)
elif (sid == 0x59):
HandleDTCReadResponse(message)
else:
rawMessage = message.ToRaw()
print("Raw= %s"%([hex(octet) for octet in rawMessage.bytes()]))
return None
# Setup UDS Application Services layer
services = vspyx.Diagnostics.ISO14229_ServiceClient.New()
services.Initialize(app, "Service Client")
services.Attach(udsInterface)
# Add handler for unsolicited responses
services.OnUnsolicitedResponse = _OnUnsolicitedResponse
# Add custom services
# ServiceConfig.AddService(sid, name, supported_sessions, datetime.timedelta(milliseconds= p4server_max), supported_subfunctions, security_requirements)
def _CreateMessageWithSubfunction(pdu, message):
return vspyx.Diagnostics.ISO14229_Services.MessageWithSubfunction(pdu, message)
service = services.ServiceConfig.AddService(0xBA, "Echo", [ 1 ], datetime.timedelta(milliseconds= 5000), [ 1 ], 0xFFFFFFFF)
service.RequestDecoder = _CreateMessageWithSubfunction
service.ResponseDecoder = _CreateMessageWithSubfunction
# Test Support Functions
def CheckForNRC(result):
if (result is None or len(result.Responses) == 0):
print("No response received!")
return False
message = result.Responses[0]
if (message.IsNegativeResponse):
responseCode = int(message.NRC)
sid = int(message.FailedSID)
print("Received NRC %s (%s) for service %s"%(message.NRC.name, hex(responseCode), hex(sid)))
return False
return True
def HandleDIDsResponse(message):
handle = message.DataStart
while (handle.IsValid()):
match = None
did = message.ReadId(handle)
for signal in _signals:
if (signal.Id == did):
match = signal
signal.Value = message.ReadParameterData(handle, signal.Size).bytes()
break
if (match is not None):
print("DID", hex(match.Id), "contains:", ToHexString(match.Value))
else:
data = message.ReadParameterData(handle, (handle.Size - handle.CurrentOffset))
print("Unknown DID", hex(did), "data:", ToHexString(data.bytes()))
return
_dtcFormat = vspyx.Diagnostics.ISO14229_Services.DTCFormatIdentifier.SAE_J2012_DA_00
def HandleDTCReadResponse(message):
global _dtcFormat
reportType = message.Subfunction
print("ReadDTCs Response - ReportType= %s"%(hex(reportType)), end= "")
if ((reportType == 0x02) or (reportType == 0x0A) or (reportType == 0x0B) or (reportType == 0x0C) or (reportType == 0x0D) or (reportType == 0x0E) or (reportType == 0x15)):
dtcStatusInfo = message.GetDtcStatusInfo(_dtcFormat)
print(", DTCStatusAvailabilityMask= %s, Count= %s"%(hex(dtcStatusInfo.StatusAvailabilityMask), len(dtcStatusInfo.Dtcs)))
for dtcInfo in dtcStatusInfo.Dtcs:
print("\t[DTC= %s (%s), Status= %s]"%(dtcInfo, hex(dtcInfo.Code), hex(dtcInfo.Status)))
elif ((reportType == 0x01) or (reportType == 0x07)):
dtcCountInfo = message.GetDtcCountInfo()
_dtcFormat = dtcCountInfo.FormatIdentifier
print(", DTCStatusAvailabilityMask= %s, Format= %s, Count= %s"%(hex(dtcCountInfo.StatusAvailabilityMask), _dtcFormat.name, dtcCountInfo.Count))
else:
data = message.ToRaw()
print("[Raw Data]= %s"%(ToHexString(data.bytes())))
# Start application
runtime = app.VehicleSpy.PrepareForStart()
runtime.AddComponent(services)
runtime.AddComponent(udsInterface)
runtime.AddComponent(session)
if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)):
runtime.AddComponent(network)
runtime.AddComponent(transportLayer)
app.VehicleSpy.Start()
if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)):
# Manual ARP entry
if (TEST_MANUAL_ARP_ENTRY_ADD):
interface.AddARPEntry(TEST_MANUAL_ECU_IP, TEST_ECU_MAC_ADDRESS)
# Example of manually adding a DoIP entity
if (TEST_MANUAL_ENTITY_ADD):
targetIpAddress = vspyx.Core.IPAddress(TEST_MANUAL_ECU_IP)
targetAddress = TEST_MANUAL_ECU_LA
print("Manually identifying entity: %s at %s"%(hex(targetAddress), targetIpAddress))
transportLayer.AddDoIPEntity(TEST_MANUAL_ECU_IP, TEST_MANUAL_ECU_LA)
isReadyForDiagnostics = True
else: # Otherwise, automatically detect entities
if (TEST_AUTO_PROBE_WAIT):
# Wait for announcement in response to client probe
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=3500))
isReadyForDiagnostics = waitHandle.IsSet()
else:
isReadyForDiagnostics = False
if (isReadyForDiagnostics):
print("Entity Identified: Passive (or client auto probe)")
else:
# Manual, immediate discovery attempt
entities = transportLayer.GetVehicleIdentification(None)
isReadyForDiagnostics = (len(entities) > 0)
if(isReadyForDiagnostics):
print("Entity Identified: Active (returned %s records)"%(len(entities)))
else: # If manual discovery didn't detect anything, wait for auto or passive discovery
isReadyForDiagnostics = app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(seconds=120), waitHandle)
if(isReadyForDiagnostics):
print("Entity Identified: Passive (ECU power-on)")
# Example of vehicle identification query by MAC (EID)
try:
if (TEST_VEHICLE_QUERY_EID is not None):
entities = transportLayer.GetVehicleIdentificationByEID(TEST_VEHICLE_QUERY_EID, None)
print("Vehicle identification query by EID: returned %s records %s"%(len(entities), ToHexString(TEST_VEHICLE_QUERY_EID.bytes())))
except NameError:
pass
# Example of vehicle identification query by VIN
try:
if (TEST_VEHICLE_QUERY_VIN is not None):
entities = transportLayer.GetVehicleIdentificationByVIN(TEST_VEHICLE_QUERY_VIN, None)
print("Vehicle identification query by VIN: returned %s records %s"%(len(entities), TEST_VEHICLE_QUERY_VIN.bytes().decode("ascii")))
except NameError:
pass
# Example of confirming the sync status
if (TEST_CONFIRM_SYNC_STATUS and isReadyForDiagnostics and (target.Info.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete)):
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(seconds=2))
if (target.Info.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete):
transportLayer.GetVehicleIdentification(target.IpAddress)
if (target.Info.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete):
print("VIN/GID Sync not complete within discovery time period")
isReadyForDiagnostics = False
# Example of checking the ECU's power mode prior to diagnostics
if (TEST_CONFIRM_POWER_STATUS and isReadyForDiagnostics):
powerMode = vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.NotReady
while (powerMode == vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.NotReady):
powerMode = transportLayer.GetPowerMode(targetIpAddress)
if (powerMode is None):
print("Unable to determine power mode")
isReadyForDiagnostics = False
if (powerMode == vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.NotReady):
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(seconds=1))
# Example of querying the entity status prior to diagnostics (i.e. before opening a TCP connection)
if (TEST_CONFIRM_ENTITY_STATUS and isReadyForDiagnostics):
entityStatus = transportLayer.GetEntityStatusInfo(targetIpAddress)
if (entityStatus is not None):
print("NodeType= %s, TCP_DATA active= %i, TCP_DATA max=%i, DataSizeMax= %s"%(entityStatus.NodeType.name, entityStatus.TcpSocketsCount, entityStatus.TcpSocketsMax, entityStatus.DataSizeMax))
if (entityStatus.TcpSocketsCount == entityStatus.TcpSocketsMax):
print("Warning: TCP_DATA max limit reached, route activation request may result in response code HostUnavailable")
else:
print("Unable to determine entity status")
isReadyForDiagnostics = False
taType = ECU_ADDRESS_TYPE
n_ae = ECU_AE
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN):
isReadyForDiagnostics = True
targetAddress = ECU_ADDRESS
taType = int(ECU_ADDRESS_TYPE)
n_ae = ECU_AE
if (isReadyForDiagnostics):
mType = vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics if (n_ae is None) else vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.RemoteDiagnostics
target_ai = (mType, TESTER_ADDRESS, targetAddress, taType, n_ae)
# Set the default target address info
services.TargetAddress = target_ai
# Example: SessionControl
result = services.SessionControl(1, None)
if (CheckForNRC(result)):
message = result.Responses[0]
print("SessionControl Response - Subfunction=", hex(message.Subfunction), "P2ServerMax=", message.P2ServerMax, "P2StarServerMax=", message.P2StarServerMax)
if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP):
# Example of querying the entity status during diagnostics (i.e. after opening a TCP connection)
if (TEST_CONFIRM_ENTITY_STATUS):
entityStatus = transportLayer.GetEntityStatusInfo(targetIpAddress)
if (entityStatus is not None):
print("TCP_DATA active=", entityStatus.TcpSocketsCount, "TCP_DATA max=", entityStatus.TcpSocketsMax)
else:
print("Error querying entity status")
# Example: TesterPresent
result = services.TesterPresent(target_ai)
if (CheckForNRC(result)):
message = result.Responses[0]
print("TesterPresent Response - Subfunction=", hex(message.Subfunction))
# Example: ReadDTCInfo(NumberOfDTCByStatusMask)
result = services.ReadDtcs_ByStatusMaskOrRecordNr(0x01, 0x85, None)
if (CheckForNRC(result)):
message = result.Responses[0]
HandleDTCReadResponse(message)
# Example: ReadDTCInfo(DTCByStatusMask)
result = services.ReadDtcs_ByStatusMaskOrRecordNr(0x02, 0x84, None)
if (CheckForNRC(result)):
message = result.Responses[0]
HandleDTCReadResponse(message)
# Example: ReadDTCInfo(SupportedDTCs)
result = services.ReadDtcs(0x0A, None)
if (CheckForNRC(result)):
message = result.Responses[0]
HandleDTCReadResponse(message)
# Example: ClearDTCs
result = services.ClearDtcs(0x0000F0, 0xAF, None)
if (CheckForNRC(result)):
print("ClearDTCs Response")
# Example: ClearDTCs
result = services.ClearDtcs(0xFFFFFE, None, None)
if (CheckForNRC(result)):
print("ClearDTCs Response")
# Example: ReadDTCInfo(DTCByStatusMask)
result = services.ReadDtcs_ByStatusMaskOrRecordNr(0x02, 0x84, None)
if (CheckForNRC(result)):
message = result.Responses[0]
HandleDTCReadResponse(message)
# Example: ClearDTCs
result = services.ClearDtcs(0xFFFFFF, None, None)
if (CheckForNRC(result)):
print("ClearDTCs Response")
# Example: ReadDTCInfo(DTCByStatusMask)
result = services.ReadDtcs_ByStatusMaskOrRecordNr(0x02, 0x84, None)
if (CheckForNRC(result)):
message = result.Responses[0]
HandleDTCReadResponse(message)
# Example: Custom service
result = services.GenericService(vspyx.Core.BytesView(bytes([0xBA, 0x01, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, 0xFE, 0xDC, 0xBA, 0x98, 0x76, 0x54, 0x32, 0x10])), None)
if (CheckForNRC(result)):
message = result.Responses[0]
print("Custom Echo Response - Subfunction=", hex(message.Subfunction), "payload=", ToHexString(message.ToRaw().bytes()[2:]))
# Example: TesterPresent (response suppressed)
result = services.TesterPresent(target_ai, True)
if (len(result.Responses) == 0):
print("TesterPresent [Response Suppresed]")
elif (CheckForNRC(result)):
message = result.Responses[0]
print("Unexpected TesterPresent Response - Subfunction=", hex(message.Subfunction))
# Example: Custom service
result = services.GenericService(vspyx.Core.BytesView(bytes([0xBA, 0x01, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, 0xFE, 0xDC, 0xBA, 0x98, 0x76, 0x54, 0x32, 0x10])), None)
if (CheckForNRC(result)):
message = result.Responses[0]
print("Custom Echo Response - Subfunction=", hex(message.Subfunction), "payload=", ToHexString(message.ToRaw().bytes()[2:]))
# Example: TesterPresent
result = services.TesterPresent(target_ai)
if (CheckForNRC(result)):
message = result.Responses[0]
print("TesterPresent Response - Subfunction=", hex(message.Subfunction))
# Example: Read Data by Id (multiple DIDs)
result = services.ReadDataById([ 0xF101, 0xF107 ], None)
if (CheckForNRC(result)):
message = result.Responses[0]
HandleDIDsResponse(message)
# Example: Read Data by Id
result = services.ReadDataById([ 0x2001 ], None)
if (CheckForNRC(result)):
message = result.Responses[0]
HandleDIDsResponse(message)
# Example: Custom service (invalid)
result = services.GenericService(vspyx.Core.BytesView(bytes([0xBB, 0x01, 0x23, 0x45])), None)
if (CheckForNRC(result)):
message = result.Responses[0]
print("Custom Service 0xBB - ", ToHexString(message.ToRaw().bytes()))
# Example: Routine Control (Invalid RID)
result = services.RoutineControl(1, 0x0000, None, None)
if (CheckForNRC(result)):
message = result.Responses[0]
statusData = message.StatusData
print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes())))
# Example: Routine Control (Start)
result = services.RoutineControl(1, 0xF001, vspyx.Core.BytesView(bytes([0x05, 0xFF])), None)
if (CheckForNRC(result)):
message = result.Responses[0]
statusData = message.StatusData
print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes())))
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=5000))
# Example: Routine Control (Invalid Sequence)
result = services.RoutineControl(3, 0xF001, None, None)
if (CheckForNRC(result)):
message = result.Responses[0]
statusData = message.StatusData
print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes())))
# Example: Routine Control (Stop)
result = services.RoutineControl(2, 0xF001, None, None)
if (CheckForNRC(result)):
message = result.Responses[0]
statusData = message.StatusData
print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes())))
# Example: Routine Control (Get Results)
result = services.RoutineControl(3, 0xF001, None, None)
if (CheckForNRC(result)):
message = result.Responses[0]
statusData = message.StatusData
print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes())))
# Example: ECU Reset (hard reset)
result = services.EcuReset(1, None)
if (CheckForNRC(result)):
message = result.Responses[0]
print("EcuReset Response - ResetType=", hex(message.ResetType))
powerDownTime = message.PowerDownTime
if ((message.ResetType != 4) and (powerDownTime > 0)):
print("Invalid response! Unexpected PowerDownTime included ", powerDownTime)
# Example: ECU Reset (rapid power shutdown)
result = services.EcuReset(4, None)
if (CheckForNRC(result)):
message = result.Responses[0]
print("EcuReset Response - ResetType=", hex(message.ResetType), "PowerDownTime=", message.PowerDownTime, "seconds")
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=3500))
# Example: ECU Reset (cancel rapid power shutdown)
result = services.EcuReset(5, None)
if (CheckForNRC(result)):
message = result.Responses[0]
print("EcuReset Response - ResetType=", hex(message.ResetType))
powerDownTime = message.PowerDownTime
if ((message.ResetType != 4) and (powerDownTime > 0)):
print("Invalid response! Unexpected PowerDownTime included ", powerDownTime)
# Example: ECU Reset (rapid power shutdown)
result = services.EcuReset(4, None)
if (CheckForNRC(result)):
message = result.Responses[0]
print("EcuReset Response - ResetType=", hex(message.ResetType), "PowerDownTime=", message.PowerDownTime, "seconds")
else:
print("Unable to initiate diagnostics")
runtime.ShutdownEnvironment()
app.VehicleSpy.Stop()