.. _diagnostics-client: Python UDS Client with DoIP =========================== .. code:: python import time, datetime, enum import vspyx from intrepidcs.vspyx.rpc.TCPIP import Network_pb2 # Constants TEST_INTERFACE_TYPE_DOIP = "DoIP" TEST_INTERFACE_TYPE_ISO15765_CAN = "ISO-15765+CAN" TEST_INTERFACE_TYPE_HSFZ = "HSFZ" # Test interface selction TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_DOIP #TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_ISO15765_CAN #TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_HSFZ if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP): # Ethernet interface config IF_AUTO_DETECT = True # Automatically detect network interface by name; otherwise, the device's MAC address must be provided IF_NAME = "Local Area Connection 3" # Client network interface device name IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:64" # Client network virtual interface MAC address IF_IP_ADDRESS = "192.168.7.100" # Client network interface IP address IF_IP_MASK = "255.255.255.0" # Client network interface IP netmask # DoIP Config BROADCAST_ADDRESS = "192.168.7.255" # Specify the appropriate DoIP discovery broadcast address TESTER_ADDRESS = 0x3584 # Specify the client external tester address ECU_ADDRESS_TYPE = 0 ECU_AE = None # Specify the ECU server address (N_AE) ROUTING_TYPE = vspyx.Diagnostics.ISO13400_2.RoutingActivationTypes.Default # Select the routing type ROUTING_EXTRA_ISO = None # Specify a 32-bit value to override the ISO reserved field in the routing activation request ROUTING_EXTRA_OEM = None # Specify a 32-bit value to include the OEM reserved field in the routing activation request ROUTING_EXTRA_ISO_EXPECTED = None # Specify an expected value of the ISO reserved field in the routing activation response ROUTING_EXTRA_OEM_EXPECTED = None # Specify an expected value of the OEM reserved field in the routing activation response # Test Application Config TEST_MANUAL_ARP_ENTRY_ADD = False # Specify True to manually add the ARP entry for the target ECU TEST_ECU_MAC_ADDRESS = "00:FC:70:00:00:02" # Manual MAC address TEST_MANUAL_ENTITY_ADD = False # Specify True to manually configure for use with the entity identified using IP Address [TEST_MANUAL_ECU_IP] and Logical Address [TEST_MANUAL_ECU_LA] TEST_MANUAL_ECU_IP = "192.168.7.2" # Manual entity IP address TEST_MANUAL_ECU_LA = 0xDEAD # Manual entity logical address TEST_AUTO_PROBE_WAIT = True # Wait for client to send a probe TEST_CONFIRM_POWER_STATUS = True # Specify True to confirm the ECU's power mode via DoIP function GetPowerMode() TEST_CONFIRM_SYNC_STATUS = True # Specify True to confirm the ECU's VIN/GID synchronization status via DoIP function GetVehicleIdentification() TEST_CONFIRM_ENTITY_STATUS = True # Specify True to confirm the ECU's entity info via DoIP function GetEntityStatusInfo() TEST_VEHICLE_QUERY_EID = vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD8])) # Specify a value to test query by EID TEST_VEHICLE_QUERY_VIN = vspyx.Core.BytesView(bytes("1Z0123456789ABCD1", "ascii")) # Specify a value to test query by VIN DEBUG_IF_TRACE = False # Specify True to trace raw interface frames (for debugging only) elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN): class ISO15765_AddressTypes(enum.Enum): Normal = 1 NormalFixed = 2 Extended = 3 IF_DEVICE = "88432" # ValueCAN 3 - 88432 #IF_DEVICE = "CY7347" # neoVI FIRE 2 - CY7347 IF_NAME = "%s %s"%(IF_LIB, IF_DEVICE) IF_CHANNEL = "HSCAN" NETWORK_ADDRESS_TYPE = ISO15765_AddressTypes.Normal TESTER_ADDRESS = 0x45 # Specify the client external tester address TESTER_CAN_ADDRESS = 0x18ff456 TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN29Bit #TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD29Bit ECU_ADDRESS = 0x23 # Specify the ECU server address ECU_CAN_ADDRESS = 0x123 ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN11Bit #ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD11Bit TESTER_DATA_LENGTH = 8 # Specify the client external tester DLC (in octets) TESTER_STMIN = 0 TESTER_BS = 0 ECU_AE = None # Specify the ECU server address (N_AE) ECU_STMIN_MIN = 0 elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ): # Ethernet interface config IF_AUTO_DETECT = True # Automatically detect network interface by name; otherwise, the device's MAC address must be provided #IF_NAME = "Local Area Connection 3" # Client network interface device name IF_NAME = "Bottom Intel I218-V" # Client network interface device name IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:64" # Client network virtual interface MAC address IF_IP_ADDRESS = "192.168.7.100" # Client network interface IP address IF_IP_MASK = "255.255.255.0" # Client network interface IP netmask # DoIP Config BROADCAST_ADDRESS = "192.168.7.255" # Specify the appropriate DoIP discovery broadcast address TESTER_ADDRESS = 0x84 # Specify the client external tester address ECU_ADDRESS_TYPE = 0 ECU_AE = None # Specify the ECU server address (N_AE) ROUTING_TYPE = None # Not supported in HSFZ ROUTING_EXTRA_ISO = None # Not supported in HSFZ ROUTING_EXTRA_OEM = None # Not supported in HSFZ ROUTING_EXTRA_ISO_EXPECTED = None # Not supported in HSFZ ROUTING_EXTRA_OEM_EXPECTED = None # Not supported in HSFZ # Test Application Config TEST_MANUAL_ARP_ENTRY_ADD = False # Specify True to manually add the ARP entry for the target ECU TEST_ECU_MAC_ADDRESS = "00:FC:70:00:00:02" # Manual MAC address TEST_MANUAL_ENTITY_ADD = False # Specify True to manually configure for use with the entity identified using IP Address [TEST_MANUAL_ECU_IP] and Logical Address [TEST_MANUAL_ECU_LA] TEST_MANUAL_ECU_IP = "192.168.7.2" # Manual entity IP address TEST_MANUAL_ECU_LA = 1 # Manual entity logical address TEST_AUTO_PROBE_WAIT = True # Wait for client to send a probe TEST_CONFIRM_POWER_STATUS = True # Specify True to confirm the ECU's power mode via DoIP function GetPowerMode() TEST_CONFIRM_SYNC_STATUS = False # Specify True to confirm the ECU's VIN/GID synchronization status via DoIP function GetVehicleIdentification() TEST_CONFIRM_ENTITY_STATUS = True # Specify True to confirm the ECU's entity info via DoIP function GetEntityStatusInfo() TEST_VEHICLE_QUERY_EID = vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD8])) # Specify a value to test query by EID TEST_VEHICLE_QUERY_VIN = vspyx.Core.BytesView(bytes("1Z0123456789ABCD1", "ascii")) # Specify a value to test query by VIN DEBUG_IF_TRACE = False # Specify True to trace raw interface frames (for debugging only) class Signal: def __init__(self, id, value, size = None): self.Id = id if (value is None): self.Value = bytearray() if (size is None): self.Size = 0 else: self.Size = size else: self.Value = value self.Size = len(value) # List of arbitrary DID definitions - feel free to add your own Signal(DID [int], value [byte array], size [int, number of octets]) # This list will be used by the example DID read response handler for parsing DID parameter data _signals = [ Signal(0xF101, None, 2), Signal(0xF107, None, 4), Signal(0x2001, None, 1) ] # Application Setup app = vspyx.Core.Application.New() app.Initialize(loadAllModules=True) def AutoDetectDevice(name, interfaceDetectionAttempts, targetType, childMatch = None): source = None while (interfaceDetectionAttempts > 0): try: interfaceDetectionAttempts -= 1 source = app.Resolver[IF_NAME] break except: print("Waiting for interface %s..."%(IF_NAME)) time.sleep(1.0) if (source is None): print("Unable to resolve device %s"%(IF_NAME)) exit() channel = None source = app.VehicleSpy.AddSource(source.Description) for child in source.Discovery.Children: if (isinstance(child, targetType)): if (childMatch is None or (childMatch in child.ID)): channel = child break return channel def ToHexString(octets, delimiter = ":"): value = "" for octet in octets: value += "{:02X}".format(octet) + delimiter value = value.strip(delimiter) return value def HexOrNone(number, width= 2): return ("{:0" + str(width) + "X}").format(number) if (number is not None) else None if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)): if (IF_AUTO_DETECT): channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.EthernetChannel) else: source = app.VehicleSpy.AddSource("pcap " + IF_SOFT_MAC_ADDRESS) source.SearchFor() print(IF_NAME + " :", source.SourceState) if (source.SourceState != source.SourceState.Ready): print("Source is not ready!") exit() channel = app.Resolver[IF_NAME + " Discovery Channel"] if (channel is None): print("Unable to initialize device: %s"%(IF_NAME)) exit() interfaceConfig = Network_pb2.Interface() interfaceConfig.MACAddress = IF_SOFT_MAC_ADDRESS interfaceConfig.IPv4.Address = IF_IP_ADDRESS interfaceConfig.IPv4.Netmask = IF_IP_MASK interface = vspyx.TCPIP.Interface.New(interfaceConfig) interface.Initialize(app, "Interface") interface.Attach(channel) if (DEBUG_IF_TRACE): def _Interface_OnEgress(frame): print(frame.Data.bytes().hex()) interface.OnEgress.Add(_Interface_OnEgress) network = vspyx.TCPIP.Network.New() network.Initialize(app, "Network") network.AddInterface(interface) class EntityInfo: IpAddress = None Info = None targetIpAddress = None targetAddress = None target = EntityInfo() waitHandle = vspyx.Core.Event() # Setup DoIP Transport layer def _OnDoIPNack(source, nackCode): print("Received nack code: ", nackCode) def _OnVehicleAnnouncement(ipAddress, entity): global targetIpAddress global targetAddress print("ECU detected at IP address %s with logical address %s, VIN= %s, EID= %s, GID= %s, ActivationRequirement= %s, SyncStatus= %s"%(ipAddress, hex(entity.Address), bytes(entity.VIN).decode("ascii"), ToHexString(entity.EID), ToHexString(entity.GID), entity.ActivationRequirement.name, entity.SyncStatus.name if (entity.SyncStatus is not None) else None)) if (targetIpAddress is None): target.IpAddress = ipAddress target.Info = entity targetIpAddress = target.IpAddress targetAddress = entity.Address waitHandle.Set() def _OnRoutingActivationRequested(args): args.ActivationType = ROUTING_TYPE args.ISOReservedData_Req = ROUTING_EXTRA_ISO args.OEMReservedData_Req = ROUTING_EXTRA_OEM activationType = args.ActivationType.name if (args.ActivationType is not None) else None print("Route activation: Requesting; activation type %s for %s with SA %s Req[ISO: %s; OEM= %s]"%(activationType, args.IPAddress, hex(args.Address), HexOrNone(args.ISOReservedData_Req, 8), HexOrNone(args.OEMReservedData_Req, 8))) return None def _OnRoutingActivationResponse(args): if ((ROUTING_EXTRA_ISO_EXPECTED is not None) and (ROUTING_EXTRA_ISO_EXPECTED != args.ISOReservedData_Rsp)): print("Route activation: Expected ISO reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_ISO_EXPECTED, 8), HexOrNone(args.ISOReservedData_Rsp, 8))) if (ROUTING_EXTRA_OEM_EXPECTED != args.OEMReservedData_Rsp): print("Route activation: Expected OEM reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_OEM_EXPECTED, 8), HexOrNone(args.OEMReservedData_Rsp, 8))) result = args.Result.name if (args.Result is not None) else None activationType = args.ActivationType.name if (args.ActivationType is not None) else None print("Route activation: Returned %s; activation type %s for %s with SA %s Rsp[ISO: %s; OEM= %s]"%(result, activationType, args.IPAddress, hex(args.Address), HexOrNone(args.ISOReservedData_Rsp, 8), HexOrNone(args.OEMReservedData_Rsp, 8))) return None if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP): transportLayer = vspyx.Diagnostics.ISO13400_2.New() transportLayer.Initialize(app, "DoIP") elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ): transportLayer = vspyx.Diagnostics.ISO13400_2.New(False, vspyx.Diagnostics.ISO13400_2.ProtocolVersions.Legacy_HSFZ) transportLayer.Initialize(app, "HSFZ") transportLayer.Attach(network) transportLayer.OnDoIPNack = _OnDoIPNack transportLayer.OnVehicleAnnouncement = _OnVehicleAnnouncement transportLayer.OnRoutingActivationRequested = _OnRoutingActivationRequested transportLayer.OnRoutingActivationResponse = _OnRoutingActivationResponse transportLayer.SetBroadcastAddress(BROADCAST_ADDRESS) elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN): channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.CANChannel, IF_CHANNEL) if (channel is None): print("Unable to initialize device: %s"%(IF_NAME)) exit() interface = channel.NewISO11898_1Interface() transportLayer = vspyx.Communication.ISO15765_2.New(TESTER_DATA_LENGTH, 0) transportLayer.Initialize(app, "ISO 15765-2") transportLayer.Attach(interface) # Note: Extended + RemoteDiagnostics is not possible mType = vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics if (ECU_AE is None) else vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.RemoteDiagnostics if (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Normal): transportLayer.AddRxNormalAddress(mType, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, TESTER_STMIN, TESTER_BS) transportLayer.AddTxNormalAddress(mType, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, ECU_STMIN_MIN) elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.NormalFixed): transportLayer.AddRxFixedAddress(mType, ECU_ADDRESS, TESTER_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, False, TESTER_STMIN, TESTER_BS) transportLayer.AddTxFixedAddress(mType, TESTER_ADDRESS, ECU_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, False, ECU_STMIN_MIN) elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Extended): transportLayer.AddRxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, None, TESTER_STMIN, TESTER_BS) transportLayer.AddTxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, None, ECU_STMIN_MIN) # Setup UDS Session layer sessionParameters = vspyx.Diagnostics.ISO14229_2.Parameters() # sessionParameters.DeltaP2 = datetime.timedelta(milliseconds= 200) # sessionParameters.DeltaP6 = datetime.timedelta(milliseconds= 200) # sessionParameters.P2_server_max = datetime.timedelta(milliseconds= 250) # sessionParameters.P2star_server_max = datetime.timedelta(milliseconds= 5000) # sessionParameters.P2_client_max = datetime.timedelta(milliseconds= 250) # sessionParameters.P6_client_max = datetime.timedelta(milliseconds= 250) # sessionParameters.P2star_client_max = datetime.timedelta(milliseconds= 5000) # sessionParameters.P6star_client_max = datetime.timedelta(milliseconds= 5000) session = vspyx.Diagnostics.ISO14229_2.NewClient(sessionParameters) session.Initialize(app, "Session") session.Attach(transportLayer) # Setup UDS Application layer udsInterface = vspyx.Diagnostics.ISO14229_1ClientApplicationLayerProtocol.New() udsInterface.Initialize(app, "UDS Client") udsInterface.Attach(session) def _OnUnsolicitedResponse(message): sid = int(message.SID) print("Unsolicited Response: SID= %s\t>>>\t"%(hex(sid)), end= "") if(sid == 0x62): HandleDIDsResponse(message) elif (sid == 0x59): HandleDTCReadResponse(message) else: rawMessage = message.ToRaw() print("Raw= %s"%([hex(octet) for octet in rawMessage.bytes()])) return None # Setup UDS Application Services layer services = vspyx.Diagnostics.ISO14229_ServiceClient.New() services.Initialize(app, "Service Client") services.Attach(udsInterface) # Add handler for unsolicited responses services.OnUnsolicitedResponse = _OnUnsolicitedResponse # Add custom services # ServiceConfig.AddService(sid, name, supported_sessions, datetime.timedelta(milliseconds= p4server_max), supported_subfunctions, security_requirements) def _CreateMessageWithSubfunction(pdu, message): return vspyx.Diagnostics.ISO14229_Services.MessageWithSubfunction(pdu, message) service = services.ServiceConfig.AddService(0xBA, "Echo", [ 1 ], datetime.timedelta(milliseconds= 5000), [ 1 ], 0xFFFFFFFF) service.RequestDecoder = _CreateMessageWithSubfunction service.ResponseDecoder = _CreateMessageWithSubfunction # Test Support Functions def CheckForNRC(result): if (result is None or len(result.Responses) == 0): print("No response received!") return False message = result.Responses[0] if (message.IsNegativeResponse): responseCode = int(message.NRC) sid = int(message.FailedSID) print("Received NRC %s (%s) for service %s"%(message.NRC.name, hex(responseCode), hex(sid))) return False return True def HandleDIDsResponse(message): handle = message.DataStart while (handle.IsValid()): match = None did = message.ReadId(handle) for signal in _signals: if (signal.Id == did): match = signal signal.Value = message.ReadParameterData(handle, signal.Size).bytes() break if (match is not None): print("DID", hex(match.Id), "contains:", ToHexString(match.Value)) else: data = message.ReadParameterData(handle, (handle.Size - handle.CurrentOffset)) print("Unknown DID", hex(did), "data:", ToHexString(data.bytes())) return _dtcFormat = vspyx.Diagnostics.ISO14229_Services.DTCFormatIdentifier.SAE_J2012_DA_00 def HandleDTCReadResponse(message): global _dtcFormat reportType = message.Subfunction print("ReadDTCs Response - ReportType= %s"%(hex(reportType)), end= "") if ((reportType == 0x02) or (reportType == 0x0A) or (reportType == 0x0B) or (reportType == 0x0C) or (reportType == 0x0D) or (reportType == 0x0E) or (reportType == 0x15)): dtcStatusInfo = message.GetDtcStatusInfo(_dtcFormat) print(", DTCStatusAvailabilityMask= %s, Count= %s"%(hex(dtcStatusInfo.StatusAvailabilityMask), len(dtcStatusInfo.Dtcs))) for dtcInfo in dtcStatusInfo.Dtcs: print("\t[DTC= %s (%s), Status= %s]"%(dtcInfo, hex(dtcInfo.Code), hex(dtcInfo.Status))) elif ((reportType == 0x01) or (reportType == 0x07)): dtcCountInfo = message.GetDtcCountInfo() _dtcFormat = dtcCountInfo.FormatIdentifier print(", DTCStatusAvailabilityMask= %s, Format= %s, Count= %s"%(hex(dtcCountInfo.StatusAvailabilityMask), _dtcFormat.name, dtcCountInfo.Count)) else: data = message.ToRaw() print("[Raw Data]= %s"%(ToHexString(data.bytes()))) # Start application runtime = app.VehicleSpy.PrepareForStart() runtime.AddComponent(services) runtime.AddComponent(udsInterface) runtime.AddComponent(session) if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)): runtime.AddComponent(network) runtime.AddComponent(transportLayer) app.VehicleSpy.Start() if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)): # Manual ARP entry if (TEST_MANUAL_ARP_ENTRY_ADD): interface.AddARPEntry(TEST_MANUAL_ECU_IP, TEST_ECU_MAC_ADDRESS) # Example of manually adding a DoIP entity if (TEST_MANUAL_ENTITY_ADD): targetIpAddress = vspyx.Core.IPAddress(TEST_MANUAL_ECU_IP) targetAddress = TEST_MANUAL_ECU_LA print("Manually identifying entity: %s at %s"%(hex(targetAddress), targetIpAddress)) transportLayer.AddDoIPEntity(TEST_MANUAL_ECU_IP, TEST_MANUAL_ECU_LA) isReadyForDiagnostics = True else: # Otherwise, automatically detect entities if (TEST_AUTO_PROBE_WAIT): # Wait for announcement in response to client probe app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=3500)) isReadyForDiagnostics = waitHandle.IsSet() else: isReadyForDiagnostics = False if (isReadyForDiagnostics): print("Entity Identified: Passive (or client auto probe)") else: # Manual, immediate discovery attempt entities = transportLayer.GetVehicleIdentification(None) isReadyForDiagnostics = (len(entities) > 0) if(isReadyForDiagnostics): print("Entity Identified: Active (returned %s records)"%(len(entities))) else: # If manual discovery didn't detect anything, wait for auto or passive discovery isReadyForDiagnostics = app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(seconds=120), waitHandle) if(isReadyForDiagnostics): print("Entity Identified: Passive (ECU power-on)") # Example of vehicle identification query by MAC (EID) try: if (TEST_VEHICLE_QUERY_EID is not None): entities = transportLayer.GetVehicleIdentificationByEID(TEST_VEHICLE_QUERY_EID, None) print("Vehicle identification query by EID: returned %s records %s"%(len(entities), ToHexString(TEST_VEHICLE_QUERY_EID.bytes()))) except NameError: pass # Example of vehicle identification query by VIN try: if (TEST_VEHICLE_QUERY_VIN is not None): entities = transportLayer.GetVehicleIdentificationByVIN(TEST_VEHICLE_QUERY_VIN, None) print("Vehicle identification query by VIN: returned %s records %s"%(len(entities), TEST_VEHICLE_QUERY_VIN.bytes().decode("ascii"))) except NameError: pass # Example of confirming the sync status if (TEST_CONFIRM_SYNC_STATUS and isReadyForDiagnostics and (target.Info.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete)): app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(seconds=2)) if (target.Info.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete): transportLayer.GetVehicleIdentification(target.IpAddress) if (target.Info.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete): print("VIN/GID Sync not complete within discovery time period") isReadyForDiagnostics = False # Example of checking the ECU's power mode prior to diagnostics if (TEST_CONFIRM_POWER_STATUS and isReadyForDiagnostics): powerMode = vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.NotReady while (powerMode == vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.NotReady): powerMode = transportLayer.GetPowerMode(targetIpAddress) if (powerMode is None): print("Unable to determine power mode") isReadyForDiagnostics = False if (powerMode == vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.NotReady): app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(seconds=1)) # Example of querying the entity status prior to diagnostics (i.e. before opening a TCP connection) if (TEST_CONFIRM_ENTITY_STATUS and isReadyForDiagnostics): entityStatus = transportLayer.GetEntityStatusInfo(targetIpAddress) if (entityStatus is not None): print("NodeType= %s, TCP_DATA active= %i, TCP_DATA max=%i, DataSizeMax= %s"%(entityStatus.NodeType.name, entityStatus.TcpSocketsCount, entityStatus.TcpSocketsMax, entityStatus.DataSizeMax)) if (entityStatus.TcpSocketsCount == entityStatus.TcpSocketsMax): print("Warning: TCP_DATA max limit reached, route activation request may result in response code HostUnavailable") else: print("Unable to determine entity status") isReadyForDiagnostics = False taType = ECU_ADDRESS_TYPE n_ae = ECU_AE elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN): isReadyForDiagnostics = True targetAddress = ECU_ADDRESS taType = int(ECU_ADDRESS_TYPE) n_ae = ECU_AE if (isReadyForDiagnostics): mType = vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics if (n_ae is None) else vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.RemoteDiagnostics target_ai = (mType, TESTER_ADDRESS, targetAddress, taType, n_ae) # Set the default target address info services.TargetAddress = target_ai # Example: SessionControl result = services.SessionControl(1, None) if (CheckForNRC(result)): message = result.Responses[0] print("SessionControl Response - Subfunction=", hex(message.Subfunction), "P2ServerMax=", message.P2ServerMax, "P2StarServerMax=", message.P2StarServerMax) if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP): # Example of querying the entity status during diagnostics (i.e. after opening a TCP connection) if (TEST_CONFIRM_ENTITY_STATUS): entityStatus = transportLayer.GetEntityStatusInfo(targetIpAddress) if (entityStatus is not None): print("TCP_DATA active=", entityStatus.TcpSocketsCount, "TCP_DATA max=", entityStatus.TcpSocketsMax) else: print("Error querying entity status") # Example: TesterPresent result = services.TesterPresent(target_ai) if (CheckForNRC(result)): message = result.Responses[0] print("TesterPresent Response - Subfunction=", hex(message.Subfunction)) # Example: ReadDTCInfo(NumberOfDTCByStatusMask) result = services.ReadDtcs_ByStatusMaskOrRecordNr(0x01, 0x85, None) if (CheckForNRC(result)): message = result.Responses[0] HandleDTCReadResponse(message) # Example: ReadDTCInfo(DTCByStatusMask) result = services.ReadDtcs_ByStatusMaskOrRecordNr(0x02, 0x84, None) if (CheckForNRC(result)): message = result.Responses[0] HandleDTCReadResponse(message) # Example: ReadDTCInfo(SupportedDTCs) result = services.ReadDtcs(0x0A, None) if (CheckForNRC(result)): message = result.Responses[0] HandleDTCReadResponse(message) # Example: ClearDTCs result = services.ClearDtcs(0x0000F0, 0xAF, None) if (CheckForNRC(result)): print("ClearDTCs Response") # Example: ClearDTCs result = services.ClearDtcs(0xFFFFFE, None, None) if (CheckForNRC(result)): print("ClearDTCs Response") # Example: ReadDTCInfo(DTCByStatusMask) result = services.ReadDtcs_ByStatusMaskOrRecordNr(0x02, 0x84, None) if (CheckForNRC(result)): message = result.Responses[0] HandleDTCReadResponse(message) # Example: ClearDTCs result = services.ClearDtcs(0xFFFFFF, None, None) if (CheckForNRC(result)): print("ClearDTCs Response") # Example: ReadDTCInfo(DTCByStatusMask) result = services.ReadDtcs_ByStatusMaskOrRecordNr(0x02, 0x84, None) if (CheckForNRC(result)): message = result.Responses[0] HandleDTCReadResponse(message) # Example: Custom service result = services.GenericService(vspyx.Core.BytesView(bytes([0xBA, 0x01, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, 0xFE, 0xDC, 0xBA, 0x98, 0x76, 0x54, 0x32, 0x10])), None) if (CheckForNRC(result)): message = result.Responses[0] print("Custom Echo Response - Subfunction=", hex(message.Subfunction), "payload=", ToHexString(message.ToRaw().bytes()[2:])) # Example: TesterPresent (response suppressed) result = services.TesterPresent(target_ai, True) if (len(result.Responses) == 0): print("TesterPresent [Response Suppresed]") elif (CheckForNRC(result)): message = result.Responses[0] print("Unexpected TesterPresent Response - Subfunction=", hex(message.Subfunction)) # Example: Custom service result = services.GenericService(vspyx.Core.BytesView(bytes([0xBA, 0x01, 0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF, 0xFE, 0xDC, 0xBA, 0x98, 0x76, 0x54, 0x32, 0x10])), None) if (CheckForNRC(result)): message = result.Responses[0] print("Custom Echo Response - Subfunction=", hex(message.Subfunction), "payload=", ToHexString(message.ToRaw().bytes()[2:])) # Example: TesterPresent result = services.TesterPresent(target_ai) if (CheckForNRC(result)): message = result.Responses[0] print("TesterPresent Response - Subfunction=", hex(message.Subfunction)) # Example: Read Data by Id (multiple DIDs) result = services.ReadDataById([ 0xF101, 0xF107 ], None) if (CheckForNRC(result)): message = result.Responses[0] HandleDIDsResponse(message) # Example: Read Data by Id result = services.ReadDataById([ 0x2001 ], None) if (CheckForNRC(result)): message = result.Responses[0] HandleDIDsResponse(message) # Example: Custom service (invalid) result = services.GenericService(vspyx.Core.BytesView(bytes([0xBB, 0x01, 0x23, 0x45])), None) if (CheckForNRC(result)): message = result.Responses[0] print("Custom Service 0xBB - ", ToHexString(message.ToRaw().bytes())) # Example: Routine Control (Invalid RID) result = services.RoutineControl(1, 0x0000, None, None) if (CheckForNRC(result)): message = result.Responses[0] statusData = message.StatusData print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes()))) # Example: Routine Control (Start) result = services.RoutineControl(1, 0xF001, vspyx.Core.BytesView(bytes([0x05, 0xFF])), None) if (CheckForNRC(result)): message = result.Responses[0] statusData = message.StatusData print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes()))) app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=5000)) # Example: Routine Control (Invalid Sequence) result = services.RoutineControl(3, 0xF001, None, None) if (CheckForNRC(result)): message = result.Responses[0] statusData = message.StatusData print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes()))) # Example: Routine Control (Stop) result = services.RoutineControl(2, 0xF001, None, None) if (CheckForNRC(result)): message = result.Responses[0] statusData = message.StatusData print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes()))) # Example: Routine Control (Get Results) result = services.RoutineControl(3, 0xF001, None, None) if (CheckForNRC(result)): message = result.Responses[0] statusData = message.StatusData print("RoutineControl Response - RID= %s, StatusData= %s"%(hex(message.RoutineId), ToHexString(statusData.bytes()))) # Example: ECU Reset (hard reset) result = services.EcuReset(1, None) if (CheckForNRC(result)): message = result.Responses[0] print("EcuReset Response - ResetType=", hex(message.ResetType)) powerDownTime = message.PowerDownTime if ((message.ResetType != 4) and (powerDownTime > 0)): print("Invalid response! Unexpected PowerDownTime included ", powerDownTime) # Example: ECU Reset (rapid power shutdown) result = services.EcuReset(4, None) if (CheckForNRC(result)): message = result.Responses[0] print("EcuReset Response - ResetType=", hex(message.ResetType), "PowerDownTime=", message.PowerDownTime, "seconds") app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=3500)) # Example: ECU Reset (cancel rapid power shutdown) result = services.EcuReset(5, None) if (CheckForNRC(result)): message = result.Responses[0] print("EcuReset Response - ResetType=", hex(message.ResetType)) powerDownTime = message.PowerDownTime if ((message.ResetType != 4) and (powerDownTime > 0)): print("Invalid response! Unexpected PowerDownTime included ", powerDownTime) # Example: ECU Reset (rapid power shutdown) result = services.EcuReset(4, None) if (CheckForNRC(result)): message = result.Responses[0] print("EcuReset Response - ResetType=", hex(message.ResetType), "PowerDownTime=", message.PowerDownTime, "seconds") else: print("Unable to initiate diagnostics") runtime.ShutdownEnvironment() app.VehicleSpy.Stop()