.. _diagnostics-server: Python UDS Server with DoIP =========================== .. code:: python import time, datetime, enum import vspyx from intrepidcs.vspyx.rpc.TCPIP import Network_pb2 # Constants TEST_INTERFACE_TYPE_DOIP = "DoIP" TEST_INTERFACE_TYPE_ISO15765_CAN = "ISO-15765+CAN" TEST_INTERFACE_TYPE_HSFZ = "HSFZ" # Test interface selction TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_DOIP #TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_ISO15765_CAN #TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_HSFZ TEST_SERVER_AUTO_SHUTDOWN_TIME = 30000 # ms if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP): ECU_ADDRESS = 0xDEAD ECU_AE = None ECU_GATEWAY_ADDRESS = 0x1000 TESTER_ADDRESS_TYPE = 0 # Ethernet interface config IF_AUTO_DETECT = True # Automatically detect network interface by name; otherwise, the device's MAC address must be provided IF_NAME = "Bottom Intel I218-V" IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:02" IF_IP_ADDRESS = "192.168.7.2" IF_IP_MASK = "255.255.255.0" # DoIP Config BROADCAST_ADDRESS = "192.168.7.255" MAX_TCP_CONNECTIONS = 10 _acceptableTesterAddress = 0x3584 # Expected Tester Address _powerMode = vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.Ready ROUTING_TYPE = vspyx.Diagnostics.ISO13400_2.RoutingActivationTypes.Default # Select the routing type ROUTING_EXTRA_ISO_EXPECTED = None # Specify an expected value of the ISO reserved field in the routing activation request ROUTING_EXTRA_OEM_EXPECTED = None # Specify an expected value of the OEM reserved field in the routing activation request ROUTING_EXTRA_ISO_RESPONSE = None # Specify a 32-bit value to override the ISO reserved field in the routing activation response (None will use default) ROUTING_EXTRA_OEM_RESPONSE = None # Specify a 32-bit value to include the OEM reserved field in the routing activation response (None will use default) # List of all entities supported by this host _entityInfo = [ vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS, vspyx.Core.BytesView(bytes("1Z0123456789ABCDE", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD7])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired, vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete) #,vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS + 1, vspyx.Core.BytesView(bytes("1Z0123456789ABCD0", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD8])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired, vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Synchronized) #,vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS + 2, vspyx.Core.BytesView(bytes("1Z0123456789ABCD1", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD9])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired) ] elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN): class ISO15765_AddressTypes(enum.Enum): Normal = 1 NormalFixed = 2 Extended = 3 #IF_DEVICE = "88432" # ValueCAN 3 - 88432 IF_DEVICE = "CY7347" # neoVI FIRE 2 - CY7347 IF_NAME = "%s %s"%(IF_LIB, IF_DEVICE) IF_CHANNEL = "HSCAN Discovery Channel" NETWORK_ADDRESS_TYPE = ISO15765_AddressTypes.Normal TESTER_ADDRESS = 0x45 # Specify the client external tester address TESTER_CAN_ADDRESS = 0x18ff456 TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN29Bit #TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD29Bit ECU_ADDRESS = 0x23 # Specify the ECU server address ECU_CAN_ADDRESS = 0x123 ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN11Bit #ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD11Bit TESTER_DATA_LENGTH = 8 # Specify the client external tester DLC (in octets) TESTER_STMIN_MIN = 0 ECU_AE = None # Specify the ECU server address (N_AE) ECU_STMIN = 0 ECU_BS = 0 elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ): ECU_ADDRESS = 1 ECU_AE = None ECU_GATEWAY_ADDRESS = 0x10 TESTER_ADDRESS_TYPE = 0 # Ethernet interface config IF_AUTO_DETECT = True # Automatically detect network interface by name; otherwise, the device's MAC address must be provided IF_NAME = "Bottom Intel I218-V" IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:02" IF_IP_ADDRESS = "192.168.7.2" IF_IP_MASK = "255.255.255.0" # DoIP Config BROADCAST_ADDRESS = "192.168.7.255" MAX_TCP_CONNECTIONS = 10 _acceptableTesterAddress = 0x84 # Expected Tester Address _powerMode = vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.Ready ROUTING_TYPE = None # Not supported in HSFZ ROUTING_EXTRA_ISO_EXPECTED = None # Not supported in HSFZ ROUTING_EXTRA_OEM_EXPECTED = None # Not supported in HSFZ ROUTING_EXTRA_ISO_RESPONSE = None # Not supported in HSFZ ROUTING_EXTRA_OEM_RESPONSE = None # Not supported in HSFZ # List of all entities supported by this host _entityInfo = [ vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS, vspyx.Core.BytesView(bytes("1Z0123456789ABCDE", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD7])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired) ] # Test Application Config _exit = vspyx.Core.Event() # DIDs class Signal: def __init__(self, id, value, size = None): self.Id = id if (value is None): self.Value = bytearray() if (size is None): self.Size = 0 else: self.Size = size else: self.Value = value self.Size = len(value) # List of arbitrary DID definitions - feel free to add your own Signal(DID [int], value [byte array], size [int, number of octets]) # This list will be used by the example DID read service handler _signals = [ Signal(0xF101, bytearray([0xDE, 0xAD])), Signal(0xF107, bytearray([0xFE, 0xED, 0xBA, 0x50])), Signal(0x2001, bytearray([0x01])) ] # DTCs class DtcRecord: def __init__(self, code, severityMask, functionalGroup, status = 0): self.Code = code self.SeverityMask = severityMask self.FunctionalGroup = functionalGroup self.Status = status _dtcFormat = vspyx.Diagnostics.ISO14229_Services.DTCFormatIdentifier.SAE_J2012_DA_00 _dtcStatusAvailabilityMask = 0xFF _dtcs = [ DtcRecord(0xef56e4, 0x05, 0x00, 0x81), DtcRecord(0xef553e, 0x16, 0xFE, 0x40), DtcRecord(0xef5595, 0x27, 0x33, 0x40), DtcRecord(0xef54ff, 0x38, 0x00, 0x0B), DtcRecord(0xef54f0, 0x49, 0x00, 0x10), DtcRecord(0xef56b2, 0x5A, 0x00, 0x0C), DtcRecord(0xef56b8, 0x6B, 0xFE, 0x40), DtcRecord(0xef56af, 0x7C, 0x33, 0x10), DtcRecord(0xef5673, 0x8D, 0x00, 0x01), DtcRecord(0xef4700, 0x9E, 0x00, 0x40), DtcRecord(0xef56d6, 0xAF, 0xFE, 0x40), DtcRecord(0xef5652, 0xB0, 0x33, 0x10), DtcRecord(0xef5643, 0xC1, 0x00, 0x0B), DtcRecord(0x7e00b5, 0xD2, 0xFE, 0x30), DtcRecord(0x7e03ad, 0xE3, 0x33, 0x8C), DtcRecord(0x029971, 0xF4, 0x00, 0x00), DtcRecord(0x029970, 0x02, 0x00, 0x0C), DtcRecord(0x029908, 0x1F, 0x33, 0x01), DtcRecord(0x029985, 0x2B, 0xFE, 0x30), DtcRecord(0x029980, 0x3C, 0x00, 0x0B), ] # Routines class Routine: _isStarted = False _hasCompleted = False def __init__(self, rid): self.Id = rid def IsSupportedInActiveSession(self): return True def HasSecurityAccess(self): return True def IsSubfunctionSupported(self, function): return (function == 1) or (function == 2) or (function == 3) def IsOptionLengthValid(self, function, optionData): return True def AreConditionsCorrect_General(self): return True def IsOptionDataValid(self, function, optionData): return True def AreConditionsCorrect_Routine(self, function, optionData): return True def IsStarted(self): return self._isStarted def HasResults(self): return self._hasCompleted def Start(self, optionData): return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None] def Stop(self, optionData): return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None] def GetResults(self, optionData): return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None] class ExampleRoutine_1(Routine): def IsOptionLengthValid(self, function, optionData): expectedLength = 0 if (function == 1): expectedLength = 2 return (len(optionData) == expectedLength) def IsOptionDataValid(self, function, optionData): return True def Start(self, optionData): app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=100)) self._isStarted = True self._hasCompleted = False return [None, None, optionData] def Stop(self, optionData): self._isStarted = False self._hasCompleted = True return [None, None, None] def GetResults(self, optionData): return [None, None, vspyx.Core.BytesView(bytes([0xDE, 0xAD, 0xBE, 0xEF]))] class ExampleRoutine_2(ExampleRoutine_1): pass _routines = [ ExampleRoutine_1(0xF001) , ExampleRoutine_2(0xF004) ] # Application Setup app = vspyx.Core.Application.New() app.Initialize(loadAllModules=True) def AutoDetectDevice(name, interfaceDetectionAttempts, targetType, childMatch = None): source = None while (interfaceDetectionAttempts > 0): try: interfaceDetectionAttempts -= 1 source = app.Resolver[IF_NAME] break except: print("Waiting for interface %s..."%(IF_NAME)) time.sleep(1.0) if (source is None): print("Unable to resolve device %s"%(IF_NAME)) exit() channel = None source = app.VehicleSpy.AddSource(source.Description) for child in source.Discovery.Children: if (isinstance(child, targetType)): if (childMatch is None or ((childMatch) in child.ID)): channel = child break return channel def ToHexString(octets, delimiter = ":"): value = "" for octet in octets: value += "{:02X}".format(octet) + delimiter value = value.strip(delimiter) return value def HexOrNone(number, width= 2): return ("{:0" + str(width) + "X}").format(number) if (number is not None) else None if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)): if (IF_AUTO_DETECT): channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.EthernetChannel) else: source = app.VehicleSpy.AddSource("pcap " + IF_SOFT_MAC_ADDRESS) source.SearchFor() print(IF_NAME + " :", source.SourceState) if (source.SourceState != source.SourceState.Ready): print("Source is not ready!") exit() channel = app.Resolver[IF_NAME + " Discovery Channel"] if (channel is None): print("Unable to initialize device: %s"%(IF_NAME)) exit() interfaceConfig = Network_pb2.Interface() interfaceConfig.MACAddress = IF_SOFT_MAC_ADDRESS interfaceConfig.IPv4.Address = IF_IP_ADDRESS interfaceConfig.IPv4.Netmask = IF_IP_MASK interface = vspyx.TCPIP.Interface.New(interfaceConfig) interface.Initialize(app, "Interface") interface.Attach(channel) network = vspyx.TCPIP.Network.New() network.Initialize(app, "Network") network.AddInterface(interface) # Setup DoIP Transport layer def _OnDoIPNack(source, nackCode): print("Received nack code: ", nackCode) _entityAnnounceCount = 0 def _OnGetEntityInfo(source, eid, vin): global _entityAnnounceCount entities = [] for entity in _entityInfo: if (_entityAnnounceCount >= 2): if(entity.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete): entity.SyncStatus = vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Synchronized if (((eid is None) and (vin is None)) or ((eid is not None) and (entity.EID == eid)) or ((vin is not None) and (entity.VIN == vin))): entities.append(entity) _entityAnnounceCount += 1 return entities _clientInfo = [] def _OnValidateRoutingActivationRequest(args): global _clientInfo routeInfo = None if (args.Address == _acceptableTesterAddress): routeInfo = vspyx.Diagnostics.ISO13400_2.EntityRouteStatus() if (len(_entityInfo) == 1): routeInfo.Address = _entityInfo[0].Address else: routeInfo.Address = ECU_GATEWAY_ADDRESS hostedAddresses = [] for entity in _entityInfo: hostedAddresses.append(entity.Address) routeInfo.HostedAddresses = hostedAddresses if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ): routeInfo.IsActivationTypeValid = True activationType = None else: routeInfo.IsActivationTypeValid = ((args.ActivationType is not None) and (args.ActivationType == ROUTING_TYPE)) activationType = args.ActivationType.name if (args.ActivationType is not None) else None if ((ROUTING_EXTRA_ISO_EXPECTED is not None) and (ROUTING_EXTRA_ISO_EXPECTED != args.ISOReservedData_Req)): print("Route activation: Expected ISO reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_ISO_EXPECTED, 8), HexOrNone(args.ISOReservedData_Req, 8))) if (ROUTING_EXTRA_OEM_EXPECTED != args.OEMReservedData_Req): print("Route activation: Expected OEM reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_OEM_EXPECTED, 8), HexOrNone(args.OEMReservedData_Req, 8))) args.ISOReservedData_Rsp = ROUTING_EXTRA_ISO_RESPONSE args.OEMReservedData_Rsp = ROUTING_EXTRA_OEM_RESPONSE _clientInfo.append((args.IPAddress, args.Address)) print("Route activation: Added route; type %s for %s with SA %s (total routes: %i) Req[ISO: %s; OEM= %s], Rsp[ISO: %s; OEM= %s]"%(activationType, args.IPAddress, hex(args.Address), len(_clientInfo), HexOrNone(args.ISOReservedData_Req, 8), HexOrNone(args.OEMReservedData_Req, 8), HexOrNone(args.ISOReservedData_Rsp, 8), HexOrNone(args.OEMReservedData_Rsp, 8))) return routeInfo def _OnRouteClose(args): global _clientInfo _clientInfo.remove((args.IPAddress, args.Address)) print("Removed route for %s with SA %s (total routes: %i)"%(args.IPAddress, hex(args.Address), len(_clientInfo))) return None def _OnDiagnosticPowerModeRequest(source): return _powerMode if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP): transportLayer = vspyx.Diagnostics.ISO13400_2.New(True) transportLayer.Initialize(app, "DoIP") elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ): transportLayer = vspyx.Diagnostics.ISO13400_2.New(True, vspyx.Diagnostics.ISO13400_2.ProtocolVersions.Legacy_HSFZ) transportLayer.Initialize(app, "HSFZ") transportLayer.Attach(network) transportLayer.OnDoIPNack = _OnDoIPNack transportLayer.OnGetEntityInfo = _OnGetEntityInfo transportLayer.OnValidateRoutingActivationRequest = _OnValidateRoutingActivationRequest transportLayer.OnRouteClose = _OnRouteClose transportLayer.OnDiagnosticPowerModeRequest = _OnDiagnosticPowerModeRequest transportLayer.SetBroadcastAddress(BROADCAST_ADDRESS) if (len(_entityInfo) > 1): transportLayer.SetNodeType(vspyx.Diagnostics.ISO13400_2.EntityNodeTypes.Gateway) transportLayer.SetMaxDataChannelCount(MAX_TCP_CONNECTIONS) elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN): channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.CANChannel, IF_CHANNEL) if (channel is None): print("Unable to initialize device: %s"%(IF_NAME)) exit() print("Using channel: %s"%(channel.ID)) interface = channel.NewISO11898_1Interface() transportLayer = vspyx.Communication.ISO15765_2.New(TESTER_DATA_LENGTH, 0) transportLayer.Initialize(app, "ISO 15765-2") transportLayer.Attach(interface) # Note: Extended + RemoteDiagnostics is not possible mType = vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics if (ECU_AE is None) else vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.RemoteDiagnostics if (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Normal): transportLayer.AddRxNormalAddress(mType, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, ECU_STMIN, ECU_BS) transportLayer.AddTxNormalAddress(mType, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, TESTER_STMIN_MIN) elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.NormalFixed): transportLayer.AddRxFixedAddress(mType, TESTER_ADDRESS, ECU_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, False, ECU_STMIN, ECU_BS) transportLayer.AddTxFixedAddress(mType, ECU_ADDRESS, TESTER_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, False, TESTER_STMIN_MIN) elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Extended): transportLayer.AddRxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, None, ECU_STMIN, ECU_BS) transportLayer.AddTxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, None, TESTER_STMIN_MIN) # Setup UDS Session layer sessionParameters = vspyx.Diagnostics.ISO14229_2.Parameters() # sessionParameters.DeltaP2 = datetime.timedelta(milliseconds= 200) # sessionParameters.DeltaP6 = datetime.timedelta(milliseconds= 200) # sessionParameters.P2_server_max = datetime.timedelta(milliseconds= 250) # sessionParameters.P2star_server_max = datetime.timedelta(milliseconds= 5000) # sessionParameters.P2_client_max = datetime.timedelta(milliseconds= 250) # sessionParameters.P6_client_max = datetime.timedelta(milliseconds= 250) # sessionParameters.P2star_client_max = datetime.timedelta(milliseconds= 5000) # sessionParameters.P6star_client_max = datetime.timedelta(milliseconds= 5000) session = vspyx.Diagnostics.ISO14229_2.NewServer(sessionParameters) session.Initialize(app, "Session") session.Attach(transportLayer) # Setup UDS Application layer def _GetHostAddressInfo(): return ECU_ADDRESS, int(TESTER_ADDRESS_TYPE), ECU_AE udsInterface = vspyx.Diagnostics.ISO14229_1ServerApplicationLayerProtocol.New() udsInterface.Initialize(app, "UDS Host") udsInterface.Attach(session) udsInterface.GetHostAddressInfo = _GetHostAddressInfo # Setup UDS Application Services layer def _OnServiceStartRequest(message): return services.HandleService(message) services = vspyx.Diagnostics.ISO14229_ServiceServer.New() services.Initialize(app, "Service Host") services.Attach(udsInterface) services.OnRequest = _OnServiceStartRequest # Configure use of predefined services SESSION_ID_DEFAULT = 1 SESSION_ID_PROGRAMMING = 2 SESSION_ID_EXTENDED = 3 def ChangeSession(sessionId): # TODO: Handle internal session change here print("SessionControl: [current] %s -> [new] %s"%(udsInterface.SessionId, sessionId)) # The below function confirms with the session layer that the current session has been updated # ConfirmServerSessionChange(sessionId, isDefaultSession, p2server_max, p2starServer_max) session.ConfirmServerSessionChange(sessionId, (sessionId == SESSION_ID_DEFAULT), sessionParameters.P2_server_max, sessionParameters.P2star_server_max) sessionControl_areConditionsCorrect = True def _doSessionControl(message): if (sessionControl_areConditionsCorrect): ChangeSession(message.SessionType) response = vspyx.Diagnostics.ISO14229_Services.SessionControlResponse(message.SessionType, int(sessionParameters.P2_server_max.total_seconds() * 1000), int(sessionParameters.P2star_server_max.total_seconds() * 1000)) else: response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.CNC) return response def _doTesterPresent(message): return vspyx.Diagnostics.ISO14229_Services.TesterPresentResponse() def _doReadDataById(message): dids = message.Ids response = None for did in dids: for signal in _signals: if (did == signal.Id): if (response is None): response = vspyx.Diagnostics.ISO14229_Services.ReadDataByIdResponse(bytearray()) response.WriteId(did) response.WriteData(signal.Value) break if (response is None): response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR) return response ecuReset_areConditionsCorrect = True ecuReset_delay = 10 #ms ecuReset_PowerDownTime = None def _doEcuReset(message): global ecuReset_PowerDownTime ecuReset_PowerDownTime = None areConditionsCorrect = ecuReset_areConditionsCorrect if (message.ResetType == 4): areConditionsCorrect = areConditionsCorrect and not _exit.IsSet() if (areConditionsCorrect): ecuReset_PowerDownTime = 6 _exit.Set() elif (message.ResetType == 5): areConditionsCorrect = areConditionsCorrect and _exit.IsSet() if (areConditionsCorrect): _exit.Reset() if (areConditionsCorrect): app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=ecuReset_delay)) response = vspyx.Diagnostics.ISO14229_Services.EcuResetResponse(message.ResetType, ecuReset_PowerDownTime) else: response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.CNC) return response def _doRoutineControl(message): rid = message.RoutineId subfunction = message.Subfunction optionData = message.OptionData routineMatch = None for routine in _routines: if (routine.Id == rid): routineMatch = routine break nrc = None if (routineMatch is not None): if (routineMatch.IsSupportedInActiveSession()): if (routineMatch.HasSecurityAccess()): if (routineMatch.IsSubfunctionSupported(subfunction)): if (routineMatch.IsOptionLengthValid(subfunction, optionData)): if (routineMatch.AreConditionsCorrect_General()): if (routineMatch.IsOptionLengthValid(subfunction, optionData)): if (routineMatch.AreConditionsCorrect_Routine(subfunction, optionData)): optionDataString = None if ((optionData is not None) and (len(optionData) > 0)): optionDataString = ToHexString(optionData.bytes()) print("Routine Control: rid= %s subfunction= %s, options= %s"%(hex(rid), subfunction, optionDataString)) if ((subfunction == 1) and (not routineMatch.IsStarted())): [nrc, info, status] = routineMatch.Start(optionData) elif ((subfunction == 2) and (routineMatch.IsStarted())): [nrc, info, status] = routineMatch.Stop(optionData) elif ((subfunction == 3) and (routineMatch.HasResults())): [nrc, info, status] = routineMatch.GetResults(optionData) else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.RSE else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.CNC else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.CNC else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.IMLOIF else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.SFNS else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.SAD else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR else: nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR if (nrc is None): response = vspyx.Diagnostics.ISO14229_Services.RoutineControlResponse(subfunction, rid, info, status) else: response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, nrc) return response def _doSecurityAccess(message): accessType = message.SecurityAccessType isRequestSeed = ((accessType % 2) == 1) if isRequestSeed: seed = vspyx.Core.BytesView(bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])) else: seed = None return vspyx.Diagnostics.ISO14229_Services.SecurityAccessResponse(accessType, seed) def _doReadDTCs(message): global _dtcFormat reportType = message.Subfunction isPositiveResponseSuppressed = message.IsPositiveResponseSuppressedSpecified if (reportType == 0x01): statusMask = message.StatusMask dtcCount = 0 print("ReadDtcs: report= %s, statusMask= %s"%(reportType, hex(statusMask))) for dtc in _dtcs: if ((dtc.Status & statusMask) != 0): dtcCount += 1 response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), _dtcFormat, dtcCount, isPositiveResponseSuppressed) elif (reportType == 0x02): statusMask = message.StatusMask response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), isPositiveResponseSuppressed) print("ReadDtcs: report= %s, statusMask= %s"%(reportType, hex(statusMask))) for dtc in _dtcs: if ((dtc.Status & statusMask) != 0): response.AddDtcRecord(True, dtc.Code, dtc.Status, None, None) elif (reportType == 0x07): statusMask = message.StatusMask severityMask = message.SeverityMask dtcCount = 0 for dtc in _dtcs: if (((dtc.Status & statusMask) != 0) and ((dtc.SeverityMask & severityMask) != 0)): dtcCount += 1 response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), _dtcFormat, dtcCount, isPositiveResponseSuppressed) elif (reportType == 0x0A): response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, _dtcStatusAvailabilityMask, isPositiveResponseSuppressed) for dtc in _dtcs: response.AddDtcRecord(True, dtc.Code, dtc.SeverityMask, None, None) else: response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.SFNS) return response def _doClearDTCs(message): groupInfo = message.GroupInfo memorySelection = message.MemorySelection clearedDtcCount = 0 print("ClearDTCs: groupInfo= 0x%s"%(hex(groupInfo)), end= "") if (memorySelection is not None): print(", memorySelection= %s"%(hex(memorySelection)), end= "") if (groupInfo > 0x0000FF): for dtc in _dtcs: clearIt = False if (groupInfo > 0xFFFF00): group = (groupInfo & 0xFF) if ((group == 0xFF) or (dtc.FunctionalGroup == group)): clearIt = True elif (groupInfo == dtc.Code): clearIt = True if (clearIt): dtc.Status = 0x00 clearedDtcCount += 1 print(" -> cleared %s DTCs"%(clearedDtcCount)) response = vspyx.Diagnostics.ISO14229_Services.ClearDtcsResponse() else: print(" -> invalid group: %s"%(hex(groupInfo))) response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR) return response # ServiceConfig.ConfigureService(name, supported_sessions, p4server_max, supported_subfunctions= None, security_requirements= None): service = services.ServiceConfig.ConfigureService("SessionControl", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10), [ 1, 2, 3 ], None) service.DoService = _doSessionControl service = services.ServiceConfig.ConfigureService("TesterPresent", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10), [ 0 ], None) service.DoService = _doTesterPresent service = services.ServiceConfig.ConfigureService("ReadDTCs", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 2, 7, 10 ], None) service.DoService = _doReadDTCs service = services.ServiceConfig.ConfigureService("ReadDataById", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), None, None) service.DoService = _doReadDataById service = services.ServiceConfig.ConfigureService("ECUReset", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 3, 4, 5 ], (1 | 2)) service.DoService = _doEcuReset service = services.ServiceConfig.ConfigureService("RoutineControl", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 2, 3 ], None) service.DoService = _doRoutineControl service = services.ServiceConfig.ConfigureService("SecurityAccess", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10000), [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ], None) service.DoService = _doSecurityAccess service = services.ServiceConfig.ConfigureService("ClearDTCs", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), None, None) service.DoService = _doClearDTCs # Add custom services customEcho_delay = 600 #ms def _doCustomEcho(message): (mType, sa, ta, taType, ae, data) = message.PDU if (len(data) > 2): payload = bytearray(data.bytes()[2:]) app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=customEcho_delay)) response = vspyx.Diagnostics.ISO14229_Services.MessageWithSubfunction(message.SID, True, message.Subfunction, message.IsPositiveResponseSuppressedSpecified, payload, 0, len(payload)) else: response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR) return response # ServiceConfig.AddService(sid, name, supported_sessions, datetime.timedelta(milliseconds= p4server_max), supported_subfunctions, security_requirements) def _CreateMessageWithSubfunction(pdu, message): return vspyx.Diagnostics.ISO14229_Services.MessageWithSubfunction(pdu, message) service = services.ServiceConfig.AddService(0xBA, "Echo", [ 1 ], datetime.timedelta(milliseconds= 5000), [ 1 ], 0xFFFFFFFF) service.RequestDecoder = _CreateMessageWithSubfunction service.ResponseDecoder = _CreateMessageWithSubfunction service.DoService = _doCustomEcho # Start application runtime = app.VehicleSpy.PrepareForStart() runtime.AddComponent(services) runtime.AddComponent(udsInterface) runtime.AddComponent(session) if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)): runtime.AddComponent(network) runtime.AddComponent(transportLayer) app.VehicleSpy.Start() # Idle and let the server handle requests isShuttingDown = False while (not isShuttingDown): app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=TEST_SERVER_AUTO_SHUTDOWN_TIME), _exit) isShuttingDown = True if(ecuReset_PowerDownTime is not None): checkTimeInMs = 5 shutdownTime = (ecuReset_PowerDownTime * 1000) print("Server shutting down in %i seconds!"%(ecuReset_PowerDownTime)) while (isShuttingDown and (shutdownTime > 0)): app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=checkTimeInMs)) shutdownTime -= checkTimeInMs isShuttingDown = (not _exit.IsSet()) if (not isShuttingDown): print("Shutdown cancelled! (%i ms remaining)"%(shutdownTime)) print("До свидания!") runtime.ShutdownEnvironment() app.VehicleSpy.Stop()