|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- #
- # AWL simulator - Commandline testing interface
- #
- # Copyright 2012-2018 Michael Buesch <m@bues.ch>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License along
- # with this program; if not, write to the Free Software Foundation, Inc.,
- # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- #
- from __future__ import division, absolute_import, print_function, unicode_literals
- import sys
- import os
- import getopt
- import traceback
- import signal
- from awlsim_loader.common import *
- from awlsim_loader.core import *
- from awlsim_loader.coreclient import *
- from awlsim_loader.awlcompiler import *
- from awlsim_loader.fupcompiler import *
- import awlsim_loader.cython_helper as cython_helper
- class TestAwlSimClient(AwlSimClient):
- def handle_CPUDUMP(self, dumpText):
- emitCpuDump(dumpText)
- class ConsoleSSHTunnel(SSHTunnel):
- def sshMessage(self, message, isDebug):
- if opt_loglevel > Logging.LOG_INFO:
- isDebug = False
- super(ConsoleSSHTunnel, self).sshMessage(message, isDebug)
- def usage():
- print("awlsim version %s" % VERSION_STRING)
- print("")
- print("Usage: awlsim-test [OPTIONS] <AWL-source or awlsim-project file>")
- print("")
- print("Options:")
- print(" -Y|--cycle-limit SEC Cycle time limit, in seconds (default 1.0)")
- print(" -M|--max-runtime SEC CPU will be stopped after SEC seconds (default: off)")
- print(" -2|--twoaccu Force 2-accu mode")
- print(" -4|--fouraccu Force 4-accu mode")
- print(" -D|--no-cpu-dump Do not show CPU status while running")
- print(" -x|--extended-insns Enable extended instructions")
- print(" -t|--obtemp 1/0 Enable/disable writing of OB-temp variables (Default: off)")
- print(" -T|--clock-mem ADDR Force clock memory address (Default: off)")
- print(" -m|--mnemonics auto Force mnemonics type: en, de, auto")
- print(" -O|--optimizers OPT Sets the optimization mode.")
- print(" OPT may be one of:")
- print(" default: Keep project settings (default)")
- print(" all: Enable all optimizers")
- print(" off: Disable all optimizers")
- print(" -L|--loglevel LVL Set the log level:")
- print(" 0: Log nothing")
- print(" 1: Log errors")
- print(" 2: Log errors and warnings")
- print(" 3: Log errors, warnings and info messages (default)")
- print(" 4: Verbose logging")
- print(" 5: Extremely verbose logging")
- print("")
- print("Server backend related options:")
- print(" -c|--connect Connect to server backend")
- print(" -C|--connect-to IP:PORT Connect to server backend")
- print(" -b|--spawn-backend Spawn a new backend server and connect to it")
- if not isWinStandalone:
- print(" -i|--interpreter EXE Set the backend interpreter executable")
- print("")
- print("Loading hardware modules:")
- print(" -H|--hardware NAME:PARAM=VAL:PARAM=VAL...")
- print("Print module information:")
- print(" -I|--hardware-info NAME")
- print("")
- print(" Where NAME is the name of the hardware module.")
- print(" PARAM=VAL are optional hardware specific parameters.")
- print("")
- print("Other options:")
- print(" --list-sfc Print a list of all supported SFCs")
- print(" --list-sfc-verbose Verbose SFC list")
- print(" --list-sfb Print a list of all supported SFBs")
- print(" --list-sfb-verbose Verbose SFB list")
- print("")
- print("Environment variables:")
- print(" AWLSIM_PROFILE =0 Disable profiling (default)")
- print(" =1 Enable core cycle profiling")
- print(" =2 Enable full core profiling (including startup)")
- print("")
- print(" AWLSIM_CYTHON =0 Do not attempt to use Cython core (default)")
- print(" =1 Attempt to use Cython core, but fall back to Python")
- print(" =2 Enforce Cython core")
- print("")
- print(" AWLSIM_AFFINITY =0,2,... Comma separated list of host CPU cores")
- print(" to run on. Default: all cores.")
- def printSysblockInfo(blockTable, prefix, withExtended, withInterface):
- for block in sorted(dictValues(blockTable),
- key = lambda b: b.name[0]):
- if block.broken:
- continue
- number, name, desc = block.name
- if number < 0 and not withExtended:
- continue
- if desc:
- desc = " (%s)" % desc
- else:
- desc = ""
- print(" %s %d \"%s\"%s" % (prefix, number, name, desc))
- if withInterface:
- for ftype in (BlockInterfaceField.FTYPE_IN,
- BlockInterfaceField.FTYPE_OUT,
- BlockInterfaceField.FTYPE_INOUT):
- try:
- fields = block.interfaceFields[ftype]
- except KeyError:
- continue
- for field in fields:
- field.fieldType = ftype
- print(" %s" % str(field))
- def writeStdout(message):
- if Logging.loglevel >= Logging.LOG_INFO:
- sys.stdout.write(message)
- sys.stdout.flush()
- nextScreenUpdate = 0.0
- lastDump = ""
- lastDumpNrLines = 0
- emptyLine = " " * 79
- def clearConsole():
- # Make cursor visible, clear console and
- # move cursor to homeposition.
- if osIsPosix:
- writeStdout("\x1B[?25h\x1B[2J\x1B[H")
- elif osIsWindows:
- os.system("cls")
- def emitCpuDump(dump):
- global lastDump
- global lastDumpNrLines
- # Pad lines
- dumpLines = list(line + (78 - len(line)) * ' ' + '|'
- for line in dump.splitlines())
- dumpNrLines = len(dumpLines)
- # Clear lines from previous dump.
- if dumpNrLines < lastDumpNrLines:
- dumpLines.extend([ emptyLine, ] * (lastDumpNrLines - dumpNrLines))
- dump = "\n".join(dumpLines)
- lastDumpNrLines = dumpNrLines
- lastDump = dump
- if osIsPosix:
- # Clear console, move home and print dump.
- writeStdout("\x1B[2J\x1B[H" + dump)
- else:
- # Clear console, move home and print dump.
- clearConsole()
- writeStdout(dump)
- def emitSpeedDump(cpu):
- dump = "S7 CPU speed: %s stmt/s" % cpu.insnPerSecondHR
- dump += " " * (79 - len(dump))
- writeStdout("\x1B[2J\x1B[H" + dump + "\n")
- def cpuDumpCallback(cpu):
- global nextScreenUpdate
- if cpu.now >= nextScreenUpdate:
- nextScreenUpdate = cpu.now + 0.3
- emitCpuDump(str(cpu))
- def cpuStatsCallback(cpu):
- global nextScreenUpdate
- if cpu.now >= nextScreenUpdate:
- nextScreenUpdate = cpu.now + 1.0
- emitSpeedDump(cpu)
- def assignCpuSpecs(cpuSpecs, projectCpuSpecs):
- cpuSpecs.assignFrom(projectCpuSpecs)
- if opt_nrAccus is not None:
- cpuSpecs.setNrAccus(opt_nrAccus)
- def assignCpuConf(cpuConf, projectCpuConf):
- cpuConf.assignFrom(projectCpuConf)
- if opt_mnemonics is not None:
- cpuConf.setConfiguredMnemonics(opt_mnemonics)
- if opt_clockMem is not None:
- cpuConf.setClockMemByte(opt_clockMem)
- if opt_cycletime is not None:
- cpuConf.setCycleTimeLimitUs(int(round(opt_cycletime * 1000000.0)))
- if opt_maxRuntime is not None:
- cpuConf.setRunTimeLimitUs(int(round(opt_maxRuntime * 1000000.0)))
- if opt_obtemp is not None:
- cpuConf.setOBStartinfoEn(opt_obtemp)
- if opt_extInsns is not None:
- cpuConf.setExtInsnsEn(opt_extInsns)
- def run(inputFile):
- s = None
- try:
- if cython_helper.shouldUseCython():
- printInfo("*** Using accelerated CYTHON core "
- "(AWLSIM_CYTHON environment variable is set)")
- project = Project.fromProjectOrRawAwlFile(inputFile)
- printInfo("Parsing code...")
- generatedAwlSrcs = []
- # Get mnemonics type
- mnemonics = project.getCpuConf().getConfiguredMnemonics()
- if opt_mnemonics is not None:
- mnemonics = opt_mnemonics
- # Parse FUP sources
- optSettCont = None
- if opt_optimizers == "off":
- optSettCont = AwlOptimizerSettingsContainer(globalEnable=False)
- elif opt_optimizers == "all":
- optSettCont = AwlOptimizerSettingsContainer(globalEnable=True,
- allEnable=True)
- for fupSrc in project.getFupSources():
- if not fupSrc.enabled:
- continue
- generatedAwlSrcs.append(FupCompiler().compile(
- fupSource=fupSrc,
- symTabSources=project.getSymTabSources(),
- mnemonics=mnemonics,
- optimizerSettingsContainer=optSettCont))
- # Parse KOP sources
- for kopSrc in project.getKopSources():
- if not kopSrc.enabled:
- continue
- pass#TODO
- # Parse AWL sources
- parseTrees = []
- for awlSrc in itertools.chain(project.getAwlSources(),
- generatedAwlSrcs):
- if not awlSrc.enabled:
- continue
- p = AwlParser()
- p.parseSource(awlSrc)
- parseTrees.append(p.getParseTree())
- # Parse symbol tables
- symTables = []
- for symTabSrc in project.getSymTabSources():
- if not symTabSrc.enabled:
- continue
- tab = SymTabParser.parseSource(symTabSrc,
- autodetectFormat = True,
- mnemonics = mnemonics)
- symTables.append(tab)
- printInfo("Initializing core...")
- s = AwlSim()
- s.reset()
- # Load hardware modules
- def loadMod(name, parameters):
- printInfo("Loading hardware module '%s'..." % name)
- hwClass = s.loadHardwareModule(name)
- s.registerHardwareClass(hwClass = hwClass,
- parameters = parameters)
- for modDesc in project.getHwmodSettings().getLoadedModules():
- loadMod(modDesc.getModuleName(),
- modDesc.getParameters())
- for name, parameters in opt_hwmods:
- loadMod(name, parameters)
- # Configure the CPU
- cpu = s.getCPU()
- assignCpuSpecs(cpu.getSpecs(), project.getCpuSpecs())
- assignCpuConf(cpu.getConf(), project.getCpuConf())
- if not opt_noCpuDump:
- if opt_speedStats:
- cpu.setBlockExitCallback(cpuStatsCallback, cpu)
- elif opt_loglevel >= Logging.LOG_INFO:
- cpu.setBlockExitCallback(cpuDumpCallback, cpu)
- # Download the program
- printInfo("Initializing CPU...")
- for symTable in symTables:
- s.loadSymbolTable(symTable)
- for libSel in project.getLibSelections():
- s.loadLibraryBlock(libSel)
- for parseTree in parseTrees:
- s.load(parseTree)
- # Run the program
- s.startup()
- printInfo("[Initialization finished - CPU is executing user code]")
- try:
- if not opt_noCpuDump:
- clearConsole()
- while 1:
- s.runCycle()
- finally:
- if not opt_noCpuDump and opt_loglevel >= Logging.LOG_INFO:
- clearConsole()
- writeStdout(lastDump + '\n')
- except (AwlParserError, AwlSimError) as e:
- printError(e.getReport())
- return ExitCodes.EXIT_ERR_SIM
- except KeyboardInterrupt as e:
- pass
- except MaintenanceRequest as e:
- if e.requestType in (MaintenanceRequest.TYPE_SHUTDOWN,
- MaintenanceRequest.TYPE_STOP,
- MaintenanceRequest.TYPE_RTTIMEOUT):
- printInfo("Shutting down, as requested (%s)..." % str(e))
- else:
- printError("Received unknown maintenance request "
- "(%d: %s)..." % (e.requestType, str(e)))
- finally:
- if s:
- s.shutdown()
- return ExitCodes.EXIT_OK
- def runWithServerBackend(inputFile):
- client = None
- tunnel = None
- try:
- project = Project.fromProjectOrRawAwlFile(inputFile)
- linkSettings = project.getCoreLinkSettings()
- if opt_spawnBackend:
- host = AwlSimServer.DEFAULT_HOST
- port = range(AwlSimServer.DEFAULT_PORT,
- AwlSimServer.DEFAULT_PORT + 4096)
- else:
- host = linkSettings.getConnectHost()
- port = linkSettings.getConnectPort()
- if opt_connectTo:
- host, port = opt_connectTo
- # Establish SSH tunnel, if requested.
- if linkSettings.getTunnel() == linkSettings.TUNNEL_SSH and\
- not opt_spawnBackend:
- printInfo("Establishing SSH tunnel...")
- localPort = linkSettings.getTunnelLocalPort()
- if localPort == linkSettings.TUNNEL_LOCPORT_AUTO:
- localPort = None
- tunnel = ConsoleSSHTunnel(
- remoteHost = host,
- remotePort = port,
- localPort = localPort,
- sshUser = linkSettings.getSSHUser(),
- sshPort = linkSettings.getSSHPort(),
- sshExecutable = linkSettings.getSSHExecutable(),
- )
- host, port = tunnel.connect()
- # Connect to the server
- client = TestAwlSimClient()
- if opt_spawnBackend:
- client.spawnServer(interpreter = opt_interpreter,
- listenHost = host,
- listenPort = port)
- port = client.serverProcessPort
- printInfo("Connecting to core server...")
- client.connectToServer(host=host, port=port, timeout=20.0)
- printInfo("Initializing core...")
- client.setLoglevel(opt_loglevel)
- client.setRunState(False)
- client.reset()
- # Load hardware modules
- client.loadHardwareModules(project.getHwmodSettings().getLoadedModules())
- for name, parameters in opt_hwmods:
- client.loadHardwareModule(HwmodDescriptor(name, parameters))
- # Configure the core
- if opt_noCpuDump:
- client.setPeriodicDumpInterval(0)
- else:
- client.setPeriodicDumpInterval(300)
- specs = client.getCpuSpecs()
- assignCpuSpecs(specs, project.getCpuSpecs())
- client.setCpuSpecs(specs)
- conf = client.getCpuConf()
- assignCpuConf(conf, project.getCpuConf())
- client.setCpuConf(conf)
- #TODO configure optimizers
- # Fire up the core
- printInfo("Initializing CPU...")
- client.loadProject(project, loadCpuSpecs=False,
- loadCpuConf=False,
- loadHwMods=False)
- client.setRunState(True)
- # Run the client-side event loop
- printInfo("[Initialization finished - Remote-CPU is executing user code]")
- try:
- if not opt_noCpuDump:
- clearConsole()
- while True:
- client.processMessages(None)
- finally:
- if not opt_noCpuDump and opt_loglevel >= Logging.LOG_INFO:
- clearConsole()
- writeStdout(lastDump + '\n')
- except AwlSimError as e:
- printError(e.getReport())
- return ExitCodes.EXIT_ERR_SIM
- except MaintenanceRequest as e:
- if e.requestType in (MaintenanceRequest.TYPE_SHUTDOWN,
- MaintenanceRequest.TYPE_STOP,
- MaintenanceRequest.TYPE_RTTIMEOUT):
- printInfo("Shutting down, as requested (%s)..." % str(e))
- else:
- printError("Received unknown maintenance request "
- "(%d: %s)..." % (e.requestType, str(e)))
- except KeyboardInterrupt as e:
- pass
- finally:
- if tunnel:
- tunnel.shutdown()
- if client:
- client.shutdown()
- return ExitCodes.EXIT_OK
- def __signalHandler(sig, frame):
- printInfo("Received signal %d" % sig)
- if sig == signal.SIGTERM:
- # Raise SIGINT. It will shut down everything.
- os.kill(os.getpid(), signal.SIGINT)
- def main():
- global opt_cycletime
- global opt_maxRuntime
- global opt_noCpuDump
- global opt_speedStats
- global opt_nrAccus
- global opt_extInsns
- global opt_obtemp
- global opt_clockMem
- global opt_mnemonics
- global opt_optimizers
- global opt_hwmods
- global opt_hwinfos
- global opt_loglevel
- global opt_connect
- global opt_connectTo
- global opt_spawnBackend
- global opt_interpreter
- opt_cycletime = None
- opt_maxRuntime = None
- opt_noCpuDump = False
- opt_speedStats = False
- opt_nrAccus = None
- opt_extInsns = None
- opt_obtemp = None
- opt_clockMem = None
- opt_mnemonics = None
- opt_optimizers = "default"
- opt_hwmods = []
- opt_hwinfos = []
- opt_loglevel = Logging.LOG_INFO
- opt_connect = None
- opt_connectTo = False
- opt_spawnBackend = False
- opt_interpreter = None
- try:
- (opts, args) = getopt.getopt(sys.argv[1:],
- "hY:M:24qDSxt:T:m:O:H:I:P:L:cC:bi:",
- [ "help", "cycle-limit=", "max-runtime=", "twoaccu", "fouraccu",
- "quiet", "no-cpu-dump", "speed-stats", "extended-insns",
- "obtemp=", "clock-mem=", "mnemonics=", "optimizers=",
- "hardware=", "hardware-info=", "profile=",
- "loglevel=",
- "connect", "connect-to=", "spawn-backend", "interpreter=",
- "list-sfc", "list-sfc-verbose",
- "list-sfb", "list-sfb-verbose", ])
- except getopt.GetoptError as e:
- printError(str(e))
- usage()
- return ExitCodes.EXIT_ERR_CMDLINE
- for (o, v) in opts:
- if o in ("-h", "--help"):
- usage()
- return ExitCodes.EXIT_OK
- if o in ("-Y", "--cycle-limit"):
- try:
- opt_cycletime = float(v)
- except ValueError:
- printError("-Y|--cycle-limit: Invalid time format")
- sys.exit(1)
- if o in ("-M", "--max-runtime"):
- try:
- opt_maxRuntime = float(v)
- except ValueError:
- printError("-M|--max-runtime: Invalid time format")
- sys.exit(1)
- if o in ("-2", "--twoaccu"):
- opt_nrAccus = 2
- if o in ("-4", "--fouraccu"):
- opt_nrAccus = 4
- if o in ("-D", "--no-cpu-dump"):
- opt_noCpuDump = True
- if o in ("-S", "--speed-stats"):
- opt_speedStats = True
- if o in ("-x", "--extended-insns"):
- opt_extInsns = True
- if o in ("-t", "--obtemp"):
- opt_obtemp = str2bool(v)
- if o in ("-T", "--clock-mem"):
- try:
- opt_clockMem = int(v)
- if opt_clockMem < -1 or opt_clockMem > 0xFFFF:
- raise ValueError
- except ValueError:
- printError("-T|--clock-mem: Invalid byte address")
- sys.exit(1)
- if o in ("-m", "--mnemonics"):
- opt_mnemonics = v.lower()
- if opt_mnemonics not in ("en", "de", "auto"):
- printError("-m|--mnemonics: Invalid mnemonics type")
- sys.exit(1)
- if o in ("-O", "--optimizers"):
- try:
- modes = v.split(",")
- for mode in modes:
- mode = mode.lower()
- if mode in ("off", "all", "default"):
- opt_optimizers = mode
- else:
- printError("-O|--optimizers: Unknown optimizer: %s" % mode)
- sys.exit(1)
- except (ValueError, IndexError) as e:
- printError("-O|--optimizers: Invalid optimization mode")
- sys.exit(1)
- if o in ("-H", "--hardware"):
- try:
- v = v.split(':')
- if not v:
- raise ValueError
- name = v[0]
- params = {}
- for pstr in v[1:]:
- if not pstr:
- continue
- i = pstr.find('=')
- if i < 0:
- raise ValueError
- pname = pstr[:i]
- pval = pstr[i+1:]
- if not pname or not pval:
- raise ValueError
- params[pname] = pval
- opt_hwmods.append( (name, params) )
- except (ValueError, IndexError) as e:
- printError("-H|--hardware: Invalid module name or parameters")
- sys.exit(1)
- if o in ("-I", "--hardware-info"):
- opt_hwinfos.append(v.split(':')[0])
- if o in ("-L", "--loglevel"):
- try:
- opt_loglevel = int(v)
- except ValueError:
- printError("-L|--loglevel: Invalid log level")
- sys.exit(1)
- if o in ("-c", "--connect"):
- opt_connect = True
- if o in ("-C", "--connect-to"):
- try:
- idx = v.rfind(":")
- if idx <= 0:
- raise ValueError
- opt_connectTo = (v[:idx], int(v[idx+1:]))
- except ValueError:
- printError("-c|--connect: Invalid host/port")
- sys.exit(1)
- if o in ("-b", "--spawn-backend"):
- opt_spawnBackend = True
- if o in ("-i", "--interpreter"):
- if isWinStandalone:
- printError("-i|--interpreter not supported on win-standalone")
- sys.exit(1)
- opt_interpreter = v
- if o in ("--list-sfc", "--list-sfc-verbose"):
- print("The supported system functions (SFCs) are:")
- from awlsim.core.systemblocks.tables import SFC_table
- printSysblockInfo(SFC_table, "SFC", bool(opt_extInsns),
- o.endswith("verbose"))
- return ExitCodes.EXIT_OK
- if o in ("--list-sfb", "--list-sfb-verbose"):
- print("The supported system function blocks (SFBs) are:")
- from awlsim.core.systemblocks.tables import SFB_table
- printSysblockInfo(SFB_table, "SFB", bool(opt_extInsns),
- o.endswith("verbose"))
- return ExitCodes.EXIT_OK
- if len(args) != 1 and not opt_hwinfos:
- usage()
- return ExitCodes.EXIT_ERR_CMDLINE
- if args:
- inputFile = args[0]
- Logging.setLoglevel(opt_loglevel)
- opt_mnemonics = {
- None : None,
- "en" : S7CPUConfig.MNEMONICS_EN,
- "de" : S7CPUConfig.MNEMONICS_DE,
- "auto" : S7CPUConfig.MNEMONICS_AUTO,
- }[opt_mnemonics]
- try:
- if opt_hwinfos:
- # Just print the hardware-infos and exit.
- for name in opt_hwinfos:
- cls = AwlSim.loadHardwareModule(name)
- print(cls.getModuleInfo())
- return ExitCodes.EXIT_OK
- except (AwlParserError, AwlSimError) as e:
- printError(e.getReport())
- return ExitCodes.EXIT_ERR_SIM
- signal.signal(signal.SIGTERM, __signalHandler)
- if opt_interpreter and not opt_spawnBackend:
- printError("Selected an --interpreter, but no "
- "--spawn-backend was requested.")
- return ExitCodes.EXIT_ERR_CMDLINE
- if opt_spawnBackend or opt_connect or opt_connectTo:
- return runWithServerBackend(inputFile)
- return run(inputFile)
- if __name__ == "__main__":
- sys.exit(main())
|