import time, datetime, enum
import vspyx
from intrepidcs.vspyx.rpc.TCPIP import Network_pb2
# Constants
TEST_INTERFACE_TYPE_DOIP = "DoIP"
TEST_INTERFACE_TYPE_ISO15765_CAN = "ISO-15765+CAN"
TEST_INTERFACE_TYPE_HSFZ = "HSFZ"
# Test interface selction
TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_DOIP
#TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_ISO15765_CAN
#TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_HSFZ
TEST_SERVER_AUTO_SHUTDOWN_TIME = 30000 # ms
if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP):
ECU_ADDRESS = 0xDEAD
ECU_AE = None
ECU_GATEWAY_ADDRESS = 0x1000
TESTER_ADDRESS_TYPE = 0
# Ethernet interface config
IF_AUTO_DETECT = True # Automatically detect network interface by name; otherwise, the device's MAC address must be provided
IF_NAME = "Bottom Intel I218-V"
IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:02"
IF_IP_ADDRESS = "192.168.7.2"
IF_IP_MASK = "255.255.255.0"
# DoIP Config
BROADCAST_ADDRESS = "192.168.7.255"
MAX_TCP_CONNECTIONS = 10
_acceptableTesterAddress = 0x3584 # Expected Tester Address
_powerMode = vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.Ready
ROUTING_TYPE = vspyx.Diagnostics.ISO13400_2.RoutingActivationTypes.Default # Select the routing type
ROUTING_EXTRA_ISO_EXPECTED = None # Specify an expected value of the ISO reserved field in the routing activation request
ROUTING_EXTRA_OEM_EXPECTED = None # Specify an expected value of the OEM reserved field in the routing activation request
ROUTING_EXTRA_ISO_RESPONSE = None # Specify a 32-bit value to override the ISO reserved field in the routing activation response (None will use default)
ROUTING_EXTRA_OEM_RESPONSE = None # Specify a 32-bit value to include the OEM reserved field in the routing activation response (None will use default)
# List of all entities supported by this host
_entityInfo = [
vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS, vspyx.Core.BytesView(bytes("1Z0123456789ABCDE", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD7])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired, vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete)
#,vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS + 1, vspyx.Core.BytesView(bytes("1Z0123456789ABCD0", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD8])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired, vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Synchronized)
#,vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS + 2, vspyx.Core.BytesView(bytes("1Z0123456789ABCD1", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD9])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired)
]
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN):
class ISO15765_AddressTypes(enum.Enum):
Normal = 1
NormalFixed = 2
Extended = 3
#IF_DEVICE = "88432" # ValueCAN 3 - 88432
IF_DEVICE = "CY7347" # neoVI FIRE 2 - CY7347
IF_NAME = "%s %s"%(IF_LIB, IF_DEVICE)
IF_CHANNEL = "HSCAN Discovery Channel"
NETWORK_ADDRESS_TYPE = ISO15765_AddressTypes.Normal
TESTER_ADDRESS = 0x45 # Specify the client external tester address
TESTER_CAN_ADDRESS = 0x18ff456
TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN29Bit
#TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD29Bit
ECU_ADDRESS = 0x23 # Specify the ECU server address
ECU_CAN_ADDRESS = 0x123
ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN11Bit
#ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD11Bit
TESTER_DATA_LENGTH = 8 # Specify the client external tester DLC (in octets)
TESTER_STMIN_MIN = 0
ECU_AE = None # Specify the ECU server address (N_AE)
ECU_STMIN = 0
ECU_BS = 0
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ):
ECU_ADDRESS = 1
ECU_AE = None
ECU_GATEWAY_ADDRESS = 0x10
TESTER_ADDRESS_TYPE = 0
# Ethernet interface config
IF_AUTO_DETECT = True # Automatically detect network interface by name; otherwise, the device's MAC address must be provided
IF_NAME = "Bottom Intel I218-V"
IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:02"
IF_IP_ADDRESS = "192.168.7.2"
IF_IP_MASK = "255.255.255.0"
# DoIP Config
BROADCAST_ADDRESS = "192.168.7.255"
MAX_TCP_CONNECTIONS = 10
_acceptableTesterAddress = 0x84 # Expected Tester Address
_powerMode = vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.Ready
ROUTING_TYPE = None # Not supported in HSFZ
ROUTING_EXTRA_ISO_EXPECTED = None # Not supported in HSFZ
ROUTING_EXTRA_OEM_EXPECTED = None # Not supported in HSFZ
ROUTING_EXTRA_ISO_RESPONSE = None # Not supported in HSFZ
ROUTING_EXTRA_OEM_RESPONSE = None # Not supported in HSFZ
# List of all entities supported by this host
_entityInfo = [
vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS, vspyx.Core.BytesView(bytes("1Z0123456789ABCDE", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD7])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired)
]
# Test Application Config
_exit = vspyx.Core.Event()
# DIDs
class Signal:
def __init__(self, id, value, size = None):
self.Id = id
if (value is None):
self.Value = bytearray()
if (size is None):
self.Size = 0
else:
self.Size = size
else:
self.Value = value
self.Size = len(value)
# List of arbitrary DID definitions - feel free to add your own Signal(DID [int], value [byte array], size [int, number of octets])
# This list will be used by the example DID read service handler
_signals = [ Signal(0xF101, bytearray([0xDE, 0xAD])), Signal(0xF107, bytearray([0xFE, 0xED, 0xBA, 0x50])), Signal(0x2001, bytearray([0x01])) ]
# DTCs
class DtcRecord:
def __init__(self, code, severityMask, functionalGroup, status = 0):
self.Code = code
self.SeverityMask = severityMask
self.FunctionalGroup = functionalGroup
self.Status = status
_dtcFormat = vspyx.Diagnostics.ISO14229_Services.DTCFormatIdentifier.SAE_J2012_DA_00
_dtcStatusAvailabilityMask = 0xFF
_dtcs = [
DtcRecord(0xef56e4, 0x05, 0x00, 0x81),
DtcRecord(0xef553e, 0x16, 0xFE, 0x40),
DtcRecord(0xef5595, 0x27, 0x33, 0x40),
DtcRecord(0xef54ff, 0x38, 0x00, 0x0B),
DtcRecord(0xef54f0, 0x49, 0x00, 0x10),
DtcRecord(0xef56b2, 0x5A, 0x00, 0x0C),
DtcRecord(0xef56b8, 0x6B, 0xFE, 0x40),
DtcRecord(0xef56af, 0x7C, 0x33, 0x10),
DtcRecord(0xef5673, 0x8D, 0x00, 0x01),
DtcRecord(0xef4700, 0x9E, 0x00, 0x40),
DtcRecord(0xef56d6, 0xAF, 0xFE, 0x40),
DtcRecord(0xef5652, 0xB0, 0x33, 0x10),
DtcRecord(0xef5643, 0xC1, 0x00, 0x0B),
DtcRecord(0x7e00b5, 0xD2, 0xFE, 0x30),
DtcRecord(0x7e03ad, 0xE3, 0x33, 0x8C),
DtcRecord(0x029971, 0xF4, 0x00, 0x00),
DtcRecord(0x029970, 0x02, 0x00, 0x0C),
DtcRecord(0x029908, 0x1F, 0x33, 0x01),
DtcRecord(0x029985, 0x2B, 0xFE, 0x30),
DtcRecord(0x029980, 0x3C, 0x00, 0x0B),
]
# Routines
class Routine:
_isStarted = False
_hasCompleted = False
def __init__(self, rid):
self.Id = rid
def IsSupportedInActiveSession(self):
return True
def HasSecurityAccess(self):
return True
def IsSubfunctionSupported(self, function):
return (function == 1) or (function == 2) or (function == 3)
def IsOptionLengthValid(self, function, optionData):
return True
def AreConditionsCorrect_General(self):
return True
def IsOptionDataValid(self, function, optionData):
return True
def AreConditionsCorrect_Routine(self, function, optionData):
return True
def IsStarted(self):
return self._isStarted
def HasResults(self):
return self._hasCompleted
def Start(self, optionData):
return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None]
def Stop(self, optionData):
return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None]
def GetResults(self, optionData):
return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None]
class ExampleRoutine_1(Routine):
def IsOptionLengthValid(self, function, optionData):
expectedLength = 0
if (function == 1):
expectedLength = 2
return (len(optionData) == expectedLength)
def IsOptionDataValid(self, function, optionData):
return True
def Start(self, optionData):
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=100))
self._isStarted = True
self._hasCompleted = False
return [None, None, optionData]
def Stop(self, optionData):
self._isStarted = False
self._hasCompleted = True
return [None, None, None]
def GetResults(self, optionData):
return [None, None, vspyx.Core.BytesView(bytes([0xDE, 0xAD, 0xBE, 0xEF]))]
class ExampleRoutine_2(ExampleRoutine_1):
pass
_routines = [
ExampleRoutine_1(0xF001)
, ExampleRoutine_2(0xF004)
]
# Application Setup
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)
def AutoDetectDevice(name, interfaceDetectionAttempts, targetType, childMatch = None):
source = None
while (interfaceDetectionAttempts > 0):
try:
interfaceDetectionAttempts -= 1
source = app.Resolver[IF_NAME]
break
except:
print("Waiting for interface %s..."%(IF_NAME))
time.sleep(1.0)
if (source is None):
print("Unable to resolve device %s"%(IF_NAME))
exit()
channel = None
source = app.VehicleSpy.AddSource(source.Description)
for child in source.Discovery.Children:
if (isinstance(child, targetType)):
if (childMatch is None or ((childMatch) in child.ID)):
channel = child
break
return channel
def ToHexString(octets, delimiter = ":"):
value = ""
for octet in octets:
value += "{:02X}".format(octet) + delimiter
value = value.strip(delimiter)
return value
def HexOrNone(number, width= 2):
return ("{:0" + str(width) + "X}").format(number) if (number is not None) else None
if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)):
if (IF_AUTO_DETECT):
channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.EthernetChannel)
else:
source = app.VehicleSpy.AddSource("pcap " + IF_SOFT_MAC_ADDRESS)
source.SearchFor()
print(IF_NAME + " :", source.SourceState)
if (source.SourceState != source.SourceState.Ready):
print("Source is not ready!")
exit()
channel = app.Resolver[IF_NAME + " Discovery Channel"]
if (channel is None):
print("Unable to initialize device: %s"%(IF_NAME))
exit()
interfaceConfig = Network_pb2.Interface()
interfaceConfig.MACAddress = IF_SOFT_MAC_ADDRESS
interfaceConfig.IPv4.Address = IF_IP_ADDRESS
interfaceConfig.IPv4.Netmask = IF_IP_MASK
interface = vspyx.TCPIP.Interface.New(interfaceConfig)
interface.Initialize(app, "Interface")
interface.Attach(channel)
network = vspyx.TCPIP.Network.New()
network.Initialize(app, "Network")
network.AddInterface(interface)
# Setup DoIP Transport layer
def _OnDoIPNack(source, nackCode):
print("Received nack code: ", nackCode)
_entityAnnounceCount = 0
def _OnGetEntityInfo(source, eid, vin):
global _entityAnnounceCount
entities = []
for entity in _entityInfo:
if (_entityAnnounceCount >= 2):
if(entity.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete):
entity.SyncStatus = vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Synchronized
if (((eid is None) and (vin is None)) or ((eid is not None) and (entity.EID == eid)) or ((vin is not None) and (entity.VIN == vin))):
entities.append(entity)
_entityAnnounceCount += 1
return entities
_clientInfo = []
def _OnValidateRoutingActivationRequest(args):
global _clientInfo
routeInfo = None
if (args.Address == _acceptableTesterAddress):
routeInfo = vspyx.Diagnostics.ISO13400_2.EntityRouteStatus()
if (len(_entityInfo) == 1):
routeInfo.Address = _entityInfo[0].Address
else:
routeInfo.Address = ECU_GATEWAY_ADDRESS
hostedAddresses = []
for entity in _entityInfo:
hostedAddresses.append(entity.Address)
routeInfo.HostedAddresses = hostedAddresses
if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ):
routeInfo.IsActivationTypeValid = True
activationType = None
else:
routeInfo.IsActivationTypeValid = ((args.ActivationType is not None) and (args.ActivationType == ROUTING_TYPE))
activationType = args.ActivationType.name if (args.ActivationType is not None) else None
if ((ROUTING_EXTRA_ISO_EXPECTED is not None) and (ROUTING_EXTRA_ISO_EXPECTED != args.ISOReservedData_Req)):
print("Route activation: Expected ISO reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_ISO_EXPECTED, 8), HexOrNone(args.ISOReservedData_Req, 8)))
if (ROUTING_EXTRA_OEM_EXPECTED != args.OEMReservedData_Req):
print("Route activation: Expected OEM reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_OEM_EXPECTED, 8), HexOrNone(args.OEMReservedData_Req, 8)))
args.ISOReservedData_Rsp = ROUTING_EXTRA_ISO_RESPONSE
args.OEMReservedData_Rsp = ROUTING_EXTRA_OEM_RESPONSE
_clientInfo.append((args.IPAddress, args.Address))
print("Route activation: Added route; type %s for %s with SA %s (total routes: %i) Req[ISO: %s; OEM= %s], Rsp[ISO: %s; OEM= %s]"%(activationType, args.IPAddress, hex(args.Address), len(_clientInfo), HexOrNone(args.ISOReservedData_Req, 8), HexOrNone(args.OEMReservedData_Req, 8), HexOrNone(args.ISOReservedData_Rsp, 8), HexOrNone(args.OEMReservedData_Rsp, 8)))
return routeInfo
def _OnRouteClose(args):
global _clientInfo
_clientInfo.remove((args.IPAddress, args.Address))
print("Removed route for %s with SA %s (total routes: %i)"%(args.IPAddress, hex(args.Address), len(_clientInfo)))
return None
def _OnDiagnosticPowerModeRequest(source):
return _powerMode
if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP):
transportLayer = vspyx.Diagnostics.ISO13400_2.New(True)
transportLayer.Initialize(app, "DoIP")
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ):
transportLayer = vspyx.Diagnostics.ISO13400_2.New(True, vspyx.Diagnostics.ISO13400_2.ProtocolVersions.Legacy_HSFZ)
transportLayer.Initialize(app, "HSFZ")
transportLayer.Attach(network)
transportLayer.OnDoIPNack = _OnDoIPNack
transportLayer.OnGetEntityInfo = _OnGetEntityInfo
transportLayer.OnValidateRoutingActivationRequest = _OnValidateRoutingActivationRequest
transportLayer.OnRouteClose = _OnRouteClose
transportLayer.OnDiagnosticPowerModeRequest = _OnDiagnosticPowerModeRequest
transportLayer.SetBroadcastAddress(BROADCAST_ADDRESS)
if (len(_entityInfo) > 1):
transportLayer.SetNodeType(vspyx.Diagnostics.ISO13400_2.EntityNodeTypes.Gateway)
transportLayer.SetMaxDataChannelCount(MAX_TCP_CONNECTIONS)
elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN):
channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.CANChannel, IF_CHANNEL)
if (channel is None):
print("Unable to initialize device: %s"%(IF_NAME))
exit()
print("Using channel: %s"%(channel.ID))
interface = channel.NewISO11898_1Interface()
transportLayer = vspyx.Communication.ISO15765_2.New(TESTER_DATA_LENGTH, 0)
transportLayer.Initialize(app, "ISO 15765-2")
transportLayer.Attach(interface)
# Note: Extended + RemoteDiagnostics is not possible
mType = vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics if (ECU_AE is None) else vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.RemoteDiagnostics
if (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Normal):
transportLayer.AddRxNormalAddress(mType, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, ECU_STMIN, ECU_BS)
transportLayer.AddTxNormalAddress(mType, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, TESTER_STMIN_MIN)
elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.NormalFixed):
transportLayer.AddRxFixedAddress(mType, TESTER_ADDRESS, ECU_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, False, ECU_STMIN, ECU_BS)
transportLayer.AddTxFixedAddress(mType, ECU_ADDRESS, TESTER_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, False, TESTER_STMIN_MIN)
elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Extended):
transportLayer.AddRxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, None, ECU_STMIN, ECU_BS)
transportLayer.AddTxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, None, TESTER_STMIN_MIN)
# Setup UDS Session layer
sessionParameters = vspyx.Diagnostics.ISO14229_2.Parameters()
# sessionParameters.DeltaP2 = datetime.timedelta(milliseconds= 200)
# sessionParameters.DeltaP6 = datetime.timedelta(milliseconds= 200)
# sessionParameters.P2_server_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P2star_server_max = datetime.timedelta(milliseconds= 5000)
# sessionParameters.P2_client_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P6_client_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P2star_client_max = datetime.timedelta(milliseconds= 5000)
# sessionParameters.P6star_client_max = datetime.timedelta(milliseconds= 5000)
session = vspyx.Diagnostics.ISO14229_2.NewServer(sessionParameters)
session.Initialize(app, "Session")
session.Attach(transportLayer)
# Setup UDS Application layer
def _GetHostAddressInfo():
return ECU_ADDRESS, int(TESTER_ADDRESS_TYPE), ECU_AE
udsInterface = vspyx.Diagnostics.ISO14229_1ServerApplicationLayerProtocol.New()
udsInterface.Initialize(app, "UDS Host")
udsInterface.Attach(session)
udsInterface.GetHostAddressInfo = _GetHostAddressInfo
# Setup UDS Application Services layer
def _OnServiceStartRequest(message):
return services.HandleService(message)
services = vspyx.Diagnostics.ISO14229_ServiceServer.New()
services.Initialize(app, "Service Host")
services.Attach(udsInterface)
services.OnRequest = _OnServiceStartRequest
# Configure use of predefined services
SESSION_ID_DEFAULT = 1
SESSION_ID_PROGRAMMING = 2
SESSION_ID_EXTENDED = 3
def ChangeSession(sessionId):
# TODO: Handle internal session change here
print("SessionControl: [current] %s -> [new] %s"%(udsInterface.SessionId, sessionId))
# The below function confirms with the session layer that the current session has been updated
# ConfirmServerSessionChange(sessionId, isDefaultSession, p2server_max, p2starServer_max)
session.ConfirmServerSessionChange(sessionId, (sessionId == SESSION_ID_DEFAULT), sessionParameters.P2_server_max, sessionParameters.P2star_server_max)
sessionControl_areConditionsCorrect = True
def _doSessionControl(message):
if (sessionControl_areConditionsCorrect):
ChangeSession(message.SessionType)
response = vspyx.Diagnostics.ISO14229_Services.SessionControlResponse(message.SessionType, int(sessionParameters.P2_server_max.total_seconds() * 1000), int(sessionParameters.P2star_server_max.total_seconds() * 1000))
else:
response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.CNC)
return response
def _doTesterPresent(message):
return vspyx.Diagnostics.ISO14229_Services.TesterPresentResponse()
def _doReadDataById(message):
dids = message.Ids
response = None
for did in dids:
for signal in _signals:
if (did == signal.Id):
if (response is None):
response = vspyx.Diagnostics.ISO14229_Services.ReadDataByIdResponse(bytearray())
response.WriteId(did)
response.WriteData(signal.Value)
break
if (response is None):
response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR)
return response
ecuReset_areConditionsCorrect = True
ecuReset_delay = 10 #ms
ecuReset_PowerDownTime = None
def _doEcuReset(message):
global ecuReset_PowerDownTime
ecuReset_PowerDownTime = None
areConditionsCorrect = ecuReset_areConditionsCorrect
if (message.ResetType == 4):
areConditionsCorrect = areConditionsCorrect and not _exit.IsSet()
if (areConditionsCorrect):
ecuReset_PowerDownTime = 6
_exit.Set()
elif (message.ResetType == 5):
areConditionsCorrect = areConditionsCorrect and _exit.IsSet()
if (areConditionsCorrect):
_exit.Reset()
if (areConditionsCorrect):
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=ecuReset_delay))
response = vspyx.Diagnostics.ISO14229_Services.EcuResetResponse(message.ResetType, ecuReset_PowerDownTime)
else:
response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.CNC)
return response
def _doRoutineControl(message):
rid = message.RoutineId
subfunction = message.Subfunction
optionData = message.OptionData
routineMatch = None
for routine in _routines:
if (routine.Id == rid):
routineMatch = routine
break
nrc = None
if (routineMatch is not None):
if (routineMatch.IsSupportedInActiveSession()):
if (routineMatch.HasSecurityAccess()):
if (routineMatch.IsSubfunctionSupported(subfunction)):
if (routineMatch.IsOptionLengthValid(subfunction, optionData)):
if (routineMatch.AreConditionsCorrect_General()):
if (routineMatch.IsOptionLengthValid(subfunction, optionData)):
if (routineMatch.AreConditionsCorrect_Routine(subfunction, optionData)):
optionDataString = None
if ((optionData is not None) and (len(optionData) > 0)):
optionDataString = ToHexString(optionData.bytes())
print("Routine Control: rid= %s subfunction= %s, options= %s"%(hex(rid), subfunction, optionDataString))
if ((subfunction == 1) and (not routineMatch.IsStarted())):
[nrc, info, status] = routineMatch.Start(optionData)
elif ((subfunction == 2) and (routineMatch.IsStarted())):
[nrc, info, status] = routineMatch.Stop(optionData)
elif ((subfunction == 3) and (routineMatch.HasResults())):
[nrc, info, status] = routineMatch.GetResults(optionData)
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.RSE
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.CNC
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.CNC
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.IMLOIF
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.SFNS
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.SAD
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR
else:
nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR
if (nrc is None):
response = vspyx.Diagnostics.ISO14229_Services.RoutineControlResponse(subfunction, rid, info, status)
else:
response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, nrc)
return response
def _doSecurityAccess(message):
accessType = message.SecurityAccessType
isRequestSeed = ((accessType % 2) == 1)
if isRequestSeed:
seed = vspyx.Core.BytesView(bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]))
else:
seed = None
return vspyx.Diagnostics.ISO14229_Services.SecurityAccessResponse(accessType, seed)
def _doReadDTCs(message):
global _dtcFormat
reportType = message.Subfunction
isPositiveResponseSuppressed = message.IsPositiveResponseSuppressedSpecified
if (reportType == 0x01):
statusMask = message.StatusMask
dtcCount = 0
print("ReadDtcs: report= %s, statusMask= %s"%(reportType, hex(statusMask)))
for dtc in _dtcs:
if ((dtc.Status & statusMask) != 0):
dtcCount += 1
response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), _dtcFormat, dtcCount, isPositiveResponseSuppressed)
elif (reportType == 0x02):
statusMask = message.StatusMask
response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), isPositiveResponseSuppressed)
print("ReadDtcs: report= %s, statusMask= %s"%(reportType, hex(statusMask)))
for dtc in _dtcs:
if ((dtc.Status & statusMask) != 0):
response.AddDtcRecord(True, dtc.Code, dtc.Status, None, None)
elif (reportType == 0x07):
statusMask = message.StatusMask
severityMask = message.SeverityMask
dtcCount = 0
for dtc in _dtcs:
if (((dtc.Status & statusMask) != 0) and ((dtc.SeverityMask & severityMask) != 0)):
dtcCount += 1
response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), _dtcFormat, dtcCount, isPositiveResponseSuppressed)
elif (reportType == 0x0A):
response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, _dtcStatusAvailabilityMask, isPositiveResponseSuppressed)
for dtc in _dtcs:
response.AddDtcRecord(True, dtc.Code, dtc.SeverityMask, None, None)
else:
response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.SFNS)
return response
def _doClearDTCs(message):
groupInfo = message.GroupInfo
memorySelection = message.MemorySelection
clearedDtcCount = 0
print("ClearDTCs: groupInfo= 0x%s"%(hex(groupInfo)), end= "")
if (memorySelection is not None):
print(", memorySelection= %s"%(hex(memorySelection)), end= "")
if (groupInfo > 0x0000FF):
for dtc in _dtcs:
clearIt = False
if (groupInfo > 0xFFFF00):
group = (groupInfo & 0xFF)
if ((group == 0xFF) or (dtc.FunctionalGroup == group)):
clearIt = True
elif (groupInfo == dtc.Code):
clearIt = True
if (clearIt):
dtc.Status = 0x00
clearedDtcCount += 1
print(" -> cleared %s DTCs"%(clearedDtcCount))
response = vspyx.Diagnostics.ISO14229_Services.ClearDtcsResponse()
else:
print(" -> invalid group: %s"%(hex(groupInfo)))
response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR)
return response
# ServiceConfig.ConfigureService(name, supported_sessions, p4server_max, supported_subfunctions= None, security_requirements= None):
service = services.ServiceConfig.ConfigureService("SessionControl", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10), [ 1, 2, 3 ], None)
service.DoService = _doSessionControl
service = services.ServiceConfig.ConfigureService("TesterPresent", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10), [ 0 ], None)
service.DoService = _doTesterPresent
service = services.ServiceConfig.ConfigureService("ReadDTCs", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 2, 7, 10 ], None)
service.DoService = _doReadDTCs
service = services.ServiceConfig.ConfigureService("ReadDataById", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), None, None)
service.DoService = _doReadDataById
service = services.ServiceConfig.ConfigureService("ECUReset", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 3, 4, 5 ], (1 | 2))
service.DoService = _doEcuReset
service = services.ServiceConfig.ConfigureService("RoutineControl", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 2, 3 ], None)
service.DoService = _doRoutineControl
service = services.ServiceConfig.ConfigureService("SecurityAccess", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10000), [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ], None)
service.DoService = _doSecurityAccess
service = services.ServiceConfig.ConfigureService("ClearDTCs", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), None, None)
service.DoService = _doClearDTCs
# Add custom services
customEcho_delay = 600 #ms
def _doCustomEcho(message):
(mType, sa, ta, taType, ae, data) = message.PDU
if (len(data) > 2):
payload = bytearray(data.bytes()[2:])
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=customEcho_delay))
response = vspyx.Diagnostics.ISO14229_Services.MessageWithSubfunction(message.SID, True, message.Subfunction, message.IsPositiveResponseSuppressedSpecified, payload, 0, len(payload))
else:
response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR)
return response
# ServiceConfig.AddService(sid, name, supported_sessions, datetime.timedelta(milliseconds= p4server_max), supported_subfunctions, security_requirements)
def _CreateMessageWithSubfunction(pdu, message):
return vspyx.Diagnostics.ISO14229_Services.MessageWithSubfunction(pdu, message)
service = services.ServiceConfig.AddService(0xBA, "Echo", [ 1 ], datetime.timedelta(milliseconds= 5000), [ 1 ], 0xFFFFFFFF)
service.RequestDecoder = _CreateMessageWithSubfunction
service.ResponseDecoder = _CreateMessageWithSubfunction
service.DoService = _doCustomEcho
# Start application
runtime = app.VehicleSpy.PrepareForStart()
runtime.AddComponent(services)
runtime.AddComponent(udsInterface)
runtime.AddComponent(session)
if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)):
runtime.AddComponent(network)
runtime.AddComponent(transportLayer)
app.VehicleSpy.Start()
# Idle and let the server handle requests
isShuttingDown = False
while (not isShuttingDown):
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=TEST_SERVER_AUTO_SHUTDOWN_TIME), _exit)
isShuttingDown = True
if(ecuReset_PowerDownTime is not None):
checkTimeInMs = 5
shutdownTime = (ecuReset_PowerDownTime * 1000)
print("Server shutting down in %i seconds!"%(ecuReset_PowerDownTime))
while (isShuttingDown and (shutdownTime > 0)):
app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=checkTimeInMs))
shutdownTime -= checkTimeInMs
isShuttingDown = (not _exit.IsSet())
if (not isShuttingDown):
print("Shutdown cancelled! (%i ms remaining)"%(shutdownTime))
print("До свидания!")
runtime.ShutdownEnvironment()
app.VehicleSpy.Stop()