Python UDS Server with DoIP

import time, datetime, enum
import vspyx

from intrepidcs.vspyx.rpc.TCPIP import Network_pb2

# Constants
TEST_INTERFACE_TYPE_DOIP = "DoIP"
TEST_INTERFACE_TYPE_ISO15765_CAN = "ISO-15765+CAN"
TEST_INTERFACE_TYPE_HSFZ = "HSFZ"

# Test interface selction
TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_DOIP
#TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_ISO15765_CAN
#TEST_INTERFACE_TYPE = TEST_INTERFACE_TYPE_HSFZ

TEST_SERVER_AUTO_SHUTDOWN_TIME = 30000  # ms

if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP):
        ECU_ADDRESS = 0xDEAD
        ECU_AE = None
        ECU_GATEWAY_ADDRESS = 0x1000

        TESTER_ADDRESS_TYPE = 0

        # Ethernet interface config
        IF_AUTO_DETECT = True                       # Automatically detect network interface by name; otherwise, the device's MAC address must be provided
        IF_NAME = "Bottom Intel I218-V"
        IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:02"
        IF_IP_ADDRESS = "192.168.7.2"
        IF_IP_MASK = "255.255.255.0"

        # DoIP Config
        BROADCAST_ADDRESS = "192.168.7.255"
        MAX_TCP_CONNECTIONS = 10
        _acceptableTesterAddress = 0x3584   # Expected Tester Address
        _powerMode = vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.Ready

        ROUTING_TYPE = vspyx.Diagnostics.ISO13400_2.RoutingActivationTypes.Default  # Select the routing type
        ROUTING_EXTRA_ISO_EXPECTED = None   # Specify an expected value of the ISO reserved field in the routing activation request
        ROUTING_EXTRA_OEM_EXPECTED = None   # Specify an expected value of the OEM reserved field in the routing activation request
        ROUTING_EXTRA_ISO_RESPONSE = None   # Specify a 32-bit value to override the ISO reserved field in the routing activation response (None will use default)
        ROUTING_EXTRA_OEM_RESPONSE = None   # Specify a 32-bit value to include the OEM reserved field in the routing activation response (None will use default)

        # List of all entities supported by this host
        _entityInfo = [
                        vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS, vspyx.Core.BytesView(bytes("1Z0123456789ABCDE", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD7])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired, vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete)
                        #,vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS + 1, vspyx.Core.BytesView(bytes("1Z0123456789ABCD0", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD8])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired, vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Synchronized)
                        #,vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS + 2, vspyx.Core.BytesView(bytes("1Z0123456789ABCD1", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD9])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired)
        ]

elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN):
        class ISO15765_AddressTypes(enum.Enum):
                Normal = 1
                NormalFixed = 2
                Extended = 3

        #IF_DEVICE = "88432"     # ValueCAN 3 - 88432
        IF_DEVICE = "CY7347"    # neoVI FIRE 2 - CY7347

        IF_NAME = "%s %s"%(IF_LIB, IF_DEVICE)
        IF_CHANNEL = "HSCAN Discovery Channel"

        NETWORK_ADDRESS_TYPE = ISO15765_AddressTypes.Normal

        TESTER_ADDRESS = 0x45           # Specify the client external tester address
        TESTER_CAN_ADDRESS = 0x18ff456
        TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN29Bit
        #TESTER_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD29Bit

        ECU_ADDRESS = 0x23              # Specify the ECU server address
        ECU_CAN_ADDRESS = 0x123
        ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalClassicalCAN11Bit
        #ECU_ADDRESS_TYPE = vspyx.Communication.ISO15765_2.NetworkAddressType.PhysicalCANFD11Bit

        TESTER_DATA_LENGTH = 8          # Specify the client external tester DLC (in octets)
        TESTER_STMIN_MIN = 0
        ECU_AE = None                   # Specify the ECU server address (N_AE)
        ECU_STMIN = 0
        ECU_BS = 0

elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ):
        ECU_ADDRESS = 1
        ECU_AE = None
        ECU_GATEWAY_ADDRESS = 0x10

        TESTER_ADDRESS_TYPE = 0

        # Ethernet interface config
        IF_AUTO_DETECT = True                       # Automatically detect network interface by name; otherwise, the device's MAC address must be provided
        IF_NAME = "Bottom Intel I218-V"
        IF_SOFT_MAC_ADDRESS = "00:FC:70:00:00:02"
        IF_IP_ADDRESS = "192.168.7.2"
        IF_IP_MASK = "255.255.255.0"

        # DoIP Config
        BROADCAST_ADDRESS = "192.168.7.255"
        MAX_TCP_CONNECTIONS = 10
        _acceptableTesterAddress = 0x84     # Expected Tester Address
        _powerMode = vspyx.Diagnostics.ISO13400_2.DiagnosticPowerModes.Ready

        ROUTING_TYPE = None                 # Not supported in HSFZ
        ROUTING_EXTRA_ISO_EXPECTED = None   # Not supported in HSFZ
        ROUTING_EXTRA_OEM_EXPECTED = None   # Not supported in HSFZ
        ROUTING_EXTRA_ISO_RESPONSE = None   # Not supported in HSFZ
        ROUTING_EXTRA_OEM_RESPONSE = None   # Not supported in HSFZ

        # List of all entities supported by this host
        _entityInfo = [
                        vspyx.Diagnostics.ISO13400_2.EntityIdentificationInfo(ECU_ADDRESS, vspyx.Core.BytesView(bytes("1Z0123456789ABCDE", "ascii")), vspyx.Core.BytesView(bytes([ 0xFA, 0x0A, 0x43, 0x99, 0xF7, 0xD7])), vspyx.Core.BytesView(bytes([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF])), vspyx.Diagnostics.ISO13400_2.EntityIdActivationRequirements.NoFurtherActionRequired)
        ]

# Test Application Config
_exit = vspyx.Core.Event()

# DIDs
class Signal:
        def __init__(self, id, value, size = None):
                self.Id = id
                if (value is None):
                        self.Value = bytearray()
                        if (size is None):
                                self.Size = 0
                        else:
                                self.Size = size
                else:
                        self.Value = value
                        self.Size = len(value)

# List of arbitrary DID definitions - feel free to add your own Signal(DID [int], value [byte array], size [int, number of octets])
# This list will be used by the example DID read service handler
_signals = [ Signal(0xF101, bytearray([0xDE, 0xAD])), Signal(0xF107, bytearray([0xFE, 0xED, 0xBA, 0x50])), Signal(0x2001, bytearray([0x01])) ]

# DTCs
class DtcRecord:
        def __init__(self, code, severityMask, functionalGroup, status = 0):
                self.Code = code
                self.SeverityMask = severityMask
                self.FunctionalGroup = functionalGroup
                self.Status = status

_dtcFormat = vspyx.Diagnostics.ISO14229_Services.DTCFormatIdentifier.SAE_J2012_DA_00
_dtcStatusAvailabilityMask = 0xFF
_dtcs = [
        DtcRecord(0xef56e4, 0x05, 0x00, 0x81),
        DtcRecord(0xef553e, 0x16, 0xFE, 0x40),
        DtcRecord(0xef5595, 0x27, 0x33, 0x40),
        DtcRecord(0xef54ff, 0x38, 0x00, 0x0B),
        DtcRecord(0xef54f0, 0x49, 0x00, 0x10),
        DtcRecord(0xef56b2, 0x5A, 0x00, 0x0C),
        DtcRecord(0xef56b8, 0x6B, 0xFE, 0x40),
        DtcRecord(0xef56af, 0x7C, 0x33, 0x10),
        DtcRecord(0xef5673, 0x8D, 0x00, 0x01),
        DtcRecord(0xef4700, 0x9E, 0x00, 0x40),
        DtcRecord(0xef56d6, 0xAF, 0xFE, 0x40),
        DtcRecord(0xef5652, 0xB0, 0x33, 0x10),
        DtcRecord(0xef5643, 0xC1, 0x00, 0x0B),
        DtcRecord(0x7e00b5, 0xD2, 0xFE, 0x30),
        DtcRecord(0x7e03ad, 0xE3, 0x33, 0x8C),
        DtcRecord(0x029971, 0xF4, 0x00, 0x00),
        DtcRecord(0x029970, 0x02, 0x00, 0x0C),
        DtcRecord(0x029908, 0x1F, 0x33, 0x01),
        DtcRecord(0x029985, 0x2B, 0xFE, 0x30),
        DtcRecord(0x029980, 0x3C, 0x00, 0x0B),
]

# Routines
class Routine:
        _isStarted = False
        _hasCompleted = False

        def __init__(self, rid):
                self.Id = rid

        def IsSupportedInActiveSession(self):
                return True

        def HasSecurityAccess(self):
                return True

        def IsSubfunctionSupported(self, function):
                return (function == 1) or (function == 2) or (function == 3)

        def IsOptionLengthValid(self, function, optionData):
                return True

        def AreConditionsCorrect_General(self):
                return True

        def IsOptionDataValid(self, function, optionData):
                return True

        def AreConditionsCorrect_Routine(self, function, optionData):
                return True

        def IsStarted(self):
                return self._isStarted

        def HasResults(self):
                return self._hasCompleted

        def Start(self, optionData):
                return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None]

        def Stop(self, optionData):
                return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None]

        def GetResults(self, optionData):
                return [vspyx.Diagnostics.ISO14229_1.Nrc.SFNS, None, None]

class ExampleRoutine_1(Routine):

        def IsOptionLengthValid(self, function, optionData):
                expectedLength = 0
                if (function == 1):
                        expectedLength = 2
                return (len(optionData) == expectedLength)

        def IsOptionDataValid(self, function, optionData):
                return True

        def Start(self, optionData):
                app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=100))
                self._isStarted = True
                self._hasCompleted = False
                return [None, None, optionData]

        def Stop(self, optionData):
                self._isStarted = False
                self._hasCompleted = True
                return [None, None, None]

        def GetResults(self, optionData):
                return [None, None, vspyx.Core.BytesView(bytes([0xDE, 0xAD, 0xBE, 0xEF]))]

class ExampleRoutine_2(ExampleRoutine_1):
        pass

_routines = [
        ExampleRoutine_1(0xF001)
        , ExampleRoutine_2(0xF004)
]

# Application Setup
app = vspyx.Core.Application.New()
app.Initialize(loadAllModules=True)

def AutoDetectDevice(name, interfaceDetectionAttempts, targetType, childMatch = None):
        source = None
        while (interfaceDetectionAttempts > 0):
                try:
                        interfaceDetectionAttempts -= 1
                        source = app.Resolver[IF_NAME]
                        break
                except:
                        print("Waiting for interface %s..."%(IF_NAME))
                        time.sleep(1.0)
        if (source is None):
                print("Unable to resolve device %s"%(IF_NAME))
                exit()

        channel = None
        source = app.VehicleSpy.AddSource(source.Description)
        for child in source.Discovery.Children:
                if (isinstance(child, targetType)):
                        if (childMatch is None or ((childMatch) in child.ID)):
                                channel = child
                                break

        return channel

def ToHexString(octets, delimiter = ":"):
        value = ""
        for octet in octets:
                value += "{:02X}".format(octet) + delimiter
        value = value.strip(delimiter)
        return value

def HexOrNone(number, width= 2):
        return ("{:0" + str(width) + "X}").format(number) if (number is not None) else None

if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)):
        if (IF_AUTO_DETECT):
                channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.EthernetChannel)
        else:
                source = app.VehicleSpy.AddSource("pcap " + IF_SOFT_MAC_ADDRESS)
                source.SearchFor()
                print(IF_NAME + " :", source.SourceState)
                if (source.SourceState != source.SourceState.Ready):
                        print("Source is not ready!")
                        exit()
                channel = app.Resolver[IF_NAME + " Discovery Channel"]

        if (channel is None):
                print("Unable to initialize device: %s"%(IF_NAME))
                exit()

        interfaceConfig = Network_pb2.Interface()
        interfaceConfig.MACAddress = IF_SOFT_MAC_ADDRESS
        interfaceConfig.IPv4.Address = IF_IP_ADDRESS
        interfaceConfig.IPv4.Netmask = IF_IP_MASK

        interface = vspyx.TCPIP.Interface.New(interfaceConfig)
        interface.Initialize(app, "Interface")
        interface.Attach(channel)

        network = vspyx.TCPIP.Network.New()
        network.Initialize(app, "Network")

        network.AddInterface(interface)

        # Setup DoIP Transport layer
        def _OnDoIPNack(source, nackCode):
                print("Received nack code: ", nackCode)

        _entityAnnounceCount = 0
        def _OnGetEntityInfo(source, eid, vin):
                global _entityAnnounceCount

                entities = []
                for entity in _entityInfo:
                        if (_entityAnnounceCount >= 2):
                                if(entity.SyncStatus == vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Incomplete):
                                        entity.SyncStatus = vspyx.Diagnostics.ISO13400_2.EntityIdSyncStatuses.Synchronized
                        if (((eid is None) and (vin is None)) or ((eid is not None) and (entity.EID == eid)) or ((vin is not None) and (entity.VIN == vin))):
                                entities.append(entity)

                _entityAnnounceCount += 1

                return entities

        _clientInfo = []
        def _OnValidateRoutingActivationRequest(args):
                global _clientInfo

                routeInfo = None

                if (args.Address == _acceptableTesterAddress):
                        routeInfo = vspyx.Diagnostics.ISO13400_2.EntityRouteStatus()
                        if (len(_entityInfo) == 1):
                                routeInfo.Address = _entityInfo[0].Address
                        else:
                                routeInfo.Address = ECU_GATEWAY_ADDRESS
                                hostedAddresses = []
                                for entity in _entityInfo:
                                        hostedAddresses.append(entity.Address)
                                routeInfo.HostedAddresses = hostedAddresses

                        if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ):
                                routeInfo.IsActivationTypeValid = True
                                activationType = None
                        else:
                                routeInfo.IsActivationTypeValid = ((args.ActivationType is not None) and (args.ActivationType == ROUTING_TYPE))
                                activationType = args.ActivationType.name if (args.ActivationType is not None) else None

                        if ((ROUTING_EXTRA_ISO_EXPECTED is not None) and (ROUTING_EXTRA_ISO_EXPECTED != args.ISOReservedData_Req)):
                                print("Route activation: Expected ISO reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_ISO_EXPECTED, 8), HexOrNone(args.ISOReservedData_Req, 8)))

                        if (ROUTING_EXTRA_OEM_EXPECTED != args.OEMReservedData_Req):
                                print("Route activation: Expected OEM reserved value %s, received %s"%(HexOrNone(ROUTING_EXTRA_OEM_EXPECTED, 8), HexOrNone(args.OEMReservedData_Req, 8)))

                        args.ISOReservedData_Rsp = ROUTING_EXTRA_ISO_RESPONSE
                        args.OEMReservedData_Rsp = ROUTING_EXTRA_OEM_RESPONSE

                        _clientInfo.append((args.IPAddress, args.Address))
                        print("Route activation: Added route; type %s for %s with SA %s (total routes: %i) Req[ISO: %s; OEM= %s], Rsp[ISO: %s; OEM= %s]"%(activationType, args.IPAddress, hex(args.Address), len(_clientInfo), HexOrNone(args.ISOReservedData_Req, 8), HexOrNone(args.OEMReservedData_Req, 8), HexOrNone(args.ISOReservedData_Rsp, 8), HexOrNone(args.OEMReservedData_Rsp, 8)))

                return routeInfo

        def _OnRouteClose(args):
                global _clientInfo

                _clientInfo.remove((args.IPAddress, args.Address))
                print("Removed route for %s with SA %s (total routes: %i)"%(args.IPAddress, hex(args.Address), len(_clientInfo)))

                return None

        def _OnDiagnosticPowerModeRequest(source):
                return _powerMode

        if (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP):
                transportLayer = vspyx.Diagnostics.ISO13400_2.New(True)
                transportLayer.Initialize(app, "DoIP")
        elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ):
                transportLayer = vspyx.Diagnostics.ISO13400_2.New(True, vspyx.Diagnostics.ISO13400_2.ProtocolVersions.Legacy_HSFZ)
                transportLayer.Initialize(app, "HSFZ")

        transportLayer.Attach(network)

        transportLayer.OnDoIPNack = _OnDoIPNack
        transportLayer.OnGetEntityInfo = _OnGetEntityInfo
        transportLayer.OnValidateRoutingActivationRequest = _OnValidateRoutingActivationRequest
        transportLayer.OnRouteClose = _OnRouteClose
        transportLayer.OnDiagnosticPowerModeRequest = _OnDiagnosticPowerModeRequest

        transportLayer.SetBroadcastAddress(BROADCAST_ADDRESS)
        if (len(_entityInfo) > 1):
                transportLayer.SetNodeType(vspyx.Diagnostics.ISO13400_2.EntityNodeTypes.Gateway)
        transportLayer.SetMaxDataChannelCount(MAX_TCP_CONNECTIONS)

elif (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_ISO15765_CAN):
        channel = AutoDetectDevice(IF_NAME, 10, vspyx.Communication.CANChannel, IF_CHANNEL)

        if (channel is None):
                print("Unable to initialize device: %s"%(IF_NAME))
                exit()
        print("Using channel: %s"%(channel.ID))

        interface = channel.NewISO11898_1Interface()

        transportLayer = vspyx.Communication.ISO15765_2.New(TESTER_DATA_LENGTH, 0)
        transportLayer.Initialize(app, "ISO 15765-2")
        transportLayer.Attach(interface)

        # Note: Extended + RemoteDiagnostics is not possible
        mType = vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics if (ECU_AE is None)  else vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.RemoteDiagnostics

        if (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Normal):
                transportLayer.AddRxNormalAddress(mType, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, ECU_STMIN, ECU_BS)
                transportLayer.AddTxNormalAddress(mType, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, TESTER_STMIN_MIN)
        elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.NormalFixed):
                transportLayer.AddRxFixedAddress(mType, TESTER_ADDRESS, ECU_ADDRESS, ECU_ADDRESS_TYPE, ECU_AE, False, ECU_STMIN, ECU_BS)
                transportLayer.AddTxFixedAddress(mType, ECU_ADDRESS, TESTER_ADDRESS, TESTER_ADDRESS_TYPE, ECU_AE, False, TESTER_STMIN_MIN)
        elif (NETWORK_ADDRESS_TYPE == ISO15765_AddressTypes.Extended):
                transportLayer.AddRxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, None, ECU_STMIN, ECU_BS)
                transportLayer.AddTxExtendedAddress(vspyx.Communication.ISOStandardizedServicePrimitiveInterface.MessageType.Diagnostics, ECU_ADDRESS, ECU_CAN_ADDRESS, ECU_ADDRESS_TYPE, TESTER_ADDRESS, TESTER_CAN_ADDRESS, TESTER_ADDRESS_TYPE, None, TESTER_STMIN_MIN)

# Setup UDS Session layer
sessionParameters = vspyx.Diagnostics.ISO14229_2.Parameters()
# sessionParameters.DeltaP2 = datetime.timedelta(milliseconds= 200)
# sessionParameters.DeltaP6 = datetime.timedelta(milliseconds= 200)
# sessionParameters.P2_server_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P2star_server_max = datetime.timedelta(milliseconds= 5000)
# sessionParameters.P2_client_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P6_client_max = datetime.timedelta(milliseconds= 250)
# sessionParameters.P2star_client_max = datetime.timedelta(milliseconds= 5000)
# sessionParameters.P6star_client_max = datetime.timedelta(milliseconds= 5000)
session = vspyx.Diagnostics.ISO14229_2.NewServer(sessionParameters)

session.Initialize(app, "Session")
session.Attach(transportLayer)

# Setup UDS Application layer
def _GetHostAddressInfo():
        return ECU_ADDRESS, int(TESTER_ADDRESS_TYPE), ECU_AE

udsInterface = vspyx.Diagnostics.ISO14229_1ServerApplicationLayerProtocol.New()
udsInterface.Initialize(app, "UDS Host")
udsInterface.Attach(session)

udsInterface.GetHostAddressInfo = _GetHostAddressInfo

# Setup UDS Application Services layer
def _OnServiceStartRequest(message):
        return services.HandleService(message)

services = vspyx.Diagnostics.ISO14229_ServiceServer.New()
services.Initialize(app, "Service Host")
services.Attach(udsInterface)

services.OnRequest = _OnServiceStartRequest

# Configure use of predefined services
SESSION_ID_DEFAULT = 1
SESSION_ID_PROGRAMMING = 2
SESSION_ID_EXTENDED = 3
def ChangeSession(sessionId):
        # TODO: Handle internal session change here

        print("SessionControl: [current] %s -> [new] %s"%(udsInterface.SessionId, sessionId))

        # The below function confirms with the session layer that the current session has been updated
        # ConfirmServerSessionChange(sessionId, isDefaultSession, p2server_max, p2starServer_max)
        session.ConfirmServerSessionChange(sessionId, (sessionId == SESSION_ID_DEFAULT), sessionParameters.P2_server_max, sessionParameters.P2star_server_max)

sessionControl_areConditionsCorrect = True
def _doSessionControl(message):
        if (sessionControl_areConditionsCorrect):

                ChangeSession(message.SessionType)

                response = vspyx.Diagnostics.ISO14229_Services.SessionControlResponse(message.SessionType, int(sessionParameters.P2_server_max.total_seconds() * 1000), int(sessionParameters.P2star_server_max.total_seconds() * 1000))
        else:
                response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.CNC)
        return response

def _doTesterPresent(message):
        return vspyx.Diagnostics.ISO14229_Services.TesterPresentResponse()

def _doReadDataById(message):
        dids = message.Ids
        response = None
        for did in dids:
                for signal in _signals:
                        if (did == signal.Id):
                                if (response is None):
                                        response = vspyx.Diagnostics.ISO14229_Services.ReadDataByIdResponse(bytearray())
                                response.WriteId(did)
                                response.WriteData(signal.Value)
                                break

        if (response is None):
                response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR)
        return response

ecuReset_areConditionsCorrect = True
ecuReset_delay = 10 #ms
ecuReset_PowerDownTime = None
def _doEcuReset(message):
        global ecuReset_PowerDownTime

        ecuReset_PowerDownTime = None
        areConditionsCorrect = ecuReset_areConditionsCorrect

        if (message.ResetType == 4):
                areConditionsCorrect = areConditionsCorrect and not _exit.IsSet()
                if (areConditionsCorrect):
                        ecuReset_PowerDownTime = 6
                        _exit.Set()
        elif (message.ResetType == 5):
                areConditionsCorrect = areConditionsCorrect and _exit.IsSet()
                if (areConditionsCorrect):
                        _exit.Reset()

        if (areConditionsCorrect):
                app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=ecuReset_delay))
                response = vspyx.Diagnostics.ISO14229_Services.EcuResetResponse(message.ResetType, ecuReset_PowerDownTime)
        else:
                response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.CNC)
        return response

def _doRoutineControl(message):

        rid = message.RoutineId
        subfunction = message.Subfunction
        optionData = message.OptionData

        routineMatch = None
        for routine in _routines:
                if (routine.Id == rid):
                        routineMatch = routine
                        break

        nrc = None
        if (routineMatch is not None):
                if (routineMatch.IsSupportedInActiveSession()):
                        if (routineMatch.HasSecurityAccess()):
                                if (routineMatch.IsSubfunctionSupported(subfunction)):
                                        if (routineMatch.IsOptionLengthValid(subfunction, optionData)):
                                                if (routineMatch.AreConditionsCorrect_General()):
                                                        if (routineMatch.IsOptionLengthValid(subfunction, optionData)):
                                                                if (routineMatch.AreConditionsCorrect_Routine(subfunction, optionData)):

                                                                        optionDataString = None
                                                                        if ((optionData is not None) and (len(optionData) > 0)):
                                                                                optionDataString = ToHexString(optionData.bytes())
                                                                        print("Routine Control: rid= %s subfunction= %s, options= %s"%(hex(rid), subfunction, optionDataString))

                                                                        if ((subfunction == 1) and (not routineMatch.IsStarted())):
                                                                                [nrc, info, status] = routineMatch.Start(optionData)
                                                                        elif ((subfunction == 2) and (routineMatch.IsStarted())):
                                                                                [nrc, info, status] = routineMatch.Stop(optionData)
                                                                        elif ((subfunction == 3) and (routineMatch.HasResults())):
                                                                                [nrc, info, status] = routineMatch.GetResults(optionData)
                                                                        else:
                                                                                nrc = vspyx.Diagnostics.ISO14229_1.Nrc.RSE
                                                                else:
                                                                        nrc = vspyx.Diagnostics.ISO14229_1.Nrc.CNC
                                                        else:
                                                                nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR
                                                else:
                                                        nrc = vspyx.Diagnostics.ISO14229_1.Nrc.CNC
                                        else:
                                                nrc = vspyx.Diagnostics.ISO14229_1.Nrc.IMLOIF
                                else:
                                        nrc = vspyx.Diagnostics.ISO14229_1.Nrc.SFNS
                        else:
                                nrc = vspyx.Diagnostics.ISO14229_1.Nrc.SAD
                else:
                        nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR
        else:
                nrc = vspyx.Diagnostics.ISO14229_1.Nrc.ROOR

        if (nrc is None):
                response = vspyx.Diagnostics.ISO14229_Services.RoutineControlResponse(subfunction, rid, info, status)
        else:
                response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, nrc)

        return response

def _doSecurityAccess(message):
        accessType = message.SecurityAccessType
        isRequestSeed = ((accessType % 2) == 1)
        if isRequestSeed:
                seed = vspyx.Core.BytesView(bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]))
        else:
                seed = None
        return vspyx.Diagnostics.ISO14229_Services.SecurityAccessResponse(accessType, seed)

def _doReadDTCs(message):
        global _dtcFormat

        reportType = message.Subfunction
        isPositiveResponseSuppressed = message.IsPositiveResponseSuppressedSpecified
        if (reportType == 0x01):
                statusMask = message.StatusMask
                dtcCount = 0
                print("ReadDtcs: report= %s, statusMask= %s"%(reportType, hex(statusMask)))
                for dtc in _dtcs:
                        if ((dtc.Status & statusMask) != 0):
                                dtcCount += 1
                response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), _dtcFormat, dtcCount, isPositiveResponseSuppressed)
        elif (reportType == 0x02):
                statusMask = message.StatusMask
                response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), isPositiveResponseSuppressed)
                print("ReadDtcs: report= %s, statusMask= %s"%(reportType, hex(statusMask)))
                for dtc in _dtcs:
                        if ((dtc.Status & statusMask) != 0):
                                response.AddDtcRecord(True, dtc.Code, dtc.Status, None, None)
        elif (reportType == 0x07):
                statusMask = message.StatusMask
                severityMask = message.SeverityMask
                dtcCount = 0
                for dtc in _dtcs:
                        if (((dtc.Status & statusMask) != 0) and ((dtc.SeverityMask & severityMask) != 0)):
                                dtcCount += 1
                response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, (statusMask & _dtcStatusAvailabilityMask), _dtcFormat, dtcCount, isPositiveResponseSuppressed)
        elif (reportType == 0x0A):
                response = vspyx.Diagnostics.ISO14229_Services.ReadDtcsResponse(reportType, _dtcStatusAvailabilityMask, isPositiveResponseSuppressed)
                for dtc in _dtcs:
                        response.AddDtcRecord(True, dtc.Code, dtc.SeverityMask, None, None)
        else:
                response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.SFNS)
        return response

def _doClearDTCs(message):
        groupInfo = message.GroupInfo
        memorySelection = message.MemorySelection
        clearedDtcCount = 0

        print("ClearDTCs: groupInfo= 0x%s"%(hex(groupInfo)), end= "")
        if (memorySelection is not None):
                print(", memorySelection= %s"%(hex(memorySelection)), end= "")

        if (groupInfo > 0x0000FF):
                for dtc in _dtcs:
                        clearIt = False
                        if (groupInfo > 0xFFFF00):
                                group = (groupInfo & 0xFF)
                                if ((group == 0xFF) or (dtc.FunctionalGroup == group)):
                                        clearIt = True
                        elif (groupInfo == dtc.Code):
                                clearIt = True

                        if (clearIt):
                                dtc.Status = 0x00
                                clearedDtcCount += 1
                print(" -> cleared %s DTCs"%(clearedDtcCount))
                response = vspyx.Diagnostics.ISO14229_Services.ClearDtcsResponse()
        else:
                print(" -> invalid group: %s"%(hex(groupInfo)))
                response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR)
        return response

# ServiceConfig.ConfigureService(name, supported_sessions, p4server_max, supported_subfunctions= None, security_requirements= None):
service = services.ServiceConfig.ConfigureService("SessionControl", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10), [ 1, 2, 3 ], None)
service.DoService = _doSessionControl
service = services.ServiceConfig.ConfigureService("TesterPresent", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10), [ 0 ], None)
service.DoService = _doTesterPresent
service = services.ServiceConfig.ConfigureService("ReadDTCs", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 2, 7, 10 ], None)
service.DoService = _doReadDTCs
service = services.ServiceConfig.ConfigureService("ReadDataById", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), None, None)
service.DoService = _doReadDataById
service = services.ServiceConfig.ConfigureService("ECUReset", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 3, 4, 5 ], (1 | 2))
service.DoService = _doEcuReset
service = services.ServiceConfig.ConfigureService("RoutineControl", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), [ 1, 2, 3 ], None)
service.DoService = _doRoutineControl
service = services.ServiceConfig.ConfigureService("SecurityAccess", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 10000), [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ], None)
service.DoService = _doSecurityAccess
service = services.ServiceConfig.ConfigureService("ClearDTCs", [ 1, 2, 3 ], datetime.timedelta(milliseconds= 30000), None, None)
service.DoService = _doClearDTCs

# Add custom services
customEcho_delay = 600 #ms
def _doCustomEcho(message):
        (mType, sa, ta, taType, ae, data) = message.PDU
        if (len(data) > 2):
                payload = bytearray(data.bytes()[2:])
                app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=customEcho_delay))
                response = vspyx.Diagnostics.ISO14229_Services.MessageWithSubfunction(message.SID, True, message.Subfunction, message.IsPositiveResponseSuppressedSpecified, payload, 0, len(payload))
        else:
                response = vspyx.Diagnostics.ISO14229_Services.NegativeResponse(message.SID, vspyx.Diagnostics.ISO14229_1.Nrc.ROOR)
        return response

# ServiceConfig.AddService(sid, name, supported_sessions, datetime.timedelta(milliseconds= p4server_max), supported_subfunctions, security_requirements)
def _CreateMessageWithSubfunction(pdu, message):
        return vspyx.Diagnostics.ISO14229_Services.MessageWithSubfunction(pdu, message)
service = services.ServiceConfig.AddService(0xBA, "Echo", [ 1 ], datetime.timedelta(milliseconds= 5000), [ 1 ], 0xFFFFFFFF)
service.RequestDecoder = _CreateMessageWithSubfunction
service.ResponseDecoder = _CreateMessageWithSubfunction
service.DoService = _doCustomEcho

# Start application
runtime = app.VehicleSpy.PrepareForStart()
runtime.AddComponent(services)
runtime.AddComponent(udsInterface)
runtime.AddComponent(session)
if ((TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_DOIP) or (TEST_INTERFACE_TYPE == TEST_INTERFACE_TYPE_HSFZ)):
        runtime.AddComponent(network)
runtime.AddComponent(transportLayer)

app.VehicleSpy.Start()

# Idle and let the server handle requests
isShuttingDown = False
while (not isShuttingDown):
        app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=TEST_SERVER_AUTO_SHUTDOWN_TIME), _exit)
        isShuttingDown = True

        if(ecuReset_PowerDownTime is not None):
                checkTimeInMs = 5
                shutdownTime = (ecuReset_PowerDownTime * 1000)
                print("Server shutting down in %i seconds!"%(ecuReset_PowerDownTime))
                while (isShuttingDown and (shutdownTime > 0)):
                        app.VehicleSpy.Scheduler.WaitFor(datetime.timedelta(milliseconds=checkTimeInMs))
                        shutdownTime -= checkTimeInMs
                        isShuttingDown = (not _exit.IsSet())
                if (not isShuttingDown):
                        print("Shutdown cancelled! (%i ms remaining)"%(shutdownTime))

print("До свидания!")
runtime.ShutdownEnvironment()
app.VehicleSpy.Stop()