phy_dummy.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # -*- coding: utf-8 -*-
  2. #
  3. # PROFIBUS DP - Communication Processor PHY access library
  4. #
  5. # Copyright (c) 2016-2021 Michael Buesch <m@bues.ch>
  6. #
  7. # Licensed under the terms of the GNU General Public License version 2,
  8. # or (at your option) any later version.
  9. #
  10. from __future__ import division, absolute_import, print_function, unicode_literals
  11. from pyprofibus.compat import *
  12. from pyprofibus.phy import *
  13. from pyprofibus.fdl import *
  14. from pyprofibus.dp import *
  15. from pyprofibus.util import *
  16. __all__ = [
  17. "CpPhyDummySlave",
  18. ]
  19. class CpPhyDummySlave(CpPhy):
  20. """Dummy slave PROFIBUS CP PHYsical layer
  21. """
  22. __slots__ = (
  23. "__echoDX",
  24. "__echoDXSize",
  25. "__pollQueue",
  26. )
  27. def __init__(self, *args, **kwargs):
  28. super(CpPhyDummySlave, self).__init__(*args, **kwargs)
  29. self.__echoDX = kwargs.get("echoDX", True)
  30. self.__echoDXSize = kwargs.get("echoDXSize", None)
  31. self.__pollQueue = []
  32. def __msg(self, message):
  33. if self.debug:
  34. print("CpPhyDummySlave: %s" % message)
  35. def close(self):
  36. """Close the PHY device.
  37. """
  38. self.__pollQueue = []
  39. super(CpPhyDummySlave, self).close()
  40. def sendData(self, telegramData, srd):
  41. """Send data to the physical line.
  42. """
  43. telegramData = bytearray(telegramData)
  44. self.__msg("Sending %s %s" % ("SRD" if srd else "SDN",
  45. bytesToHex(telegramData)))
  46. self.__mockSend(telegramData, srd = srd)
  47. def pollData(self, timeout=0.0):
  48. """Poll received data from the physical line.
  49. timeout => timeout in seconds.
  50. 0.0 = no timeout, return immediately.
  51. negative = unlimited.
  52. """
  53. try:
  54. telegramData = self.__pollQueue.pop(0)
  55. except IndexError as e:
  56. return None
  57. self.__msg("Receiving %s" % bytesToHex(telegramData))
  58. return telegramData
  59. def setConfig(self, baudrate=CpPhy.BAUD_9600, *args, **kwargs):
  60. self.__msg("Baudrate = %d" % baudrate)
  61. self.__pollQueue = []
  62. super(CpPhyDummySlave, self).setConfig(baudrate=baudrate, *args, **kwargs)
  63. def __mockSend(self, telegramData, srd):
  64. if not srd:
  65. return
  66. try:
  67. fdl = FdlTelegram.fromRawData(telegramData)
  68. if (fdl.fc & FdlTelegram.FC_REQFUNC_MASK) == FdlTelegram.FC_FDL_STAT:
  69. telegram = FdlTelegram_FdlStat_Con(da = fdl.sa,
  70. sa = fdl.da)
  71. self.__pollQueue.append(telegram.getRawData())
  72. return
  73. dp = DpTelegram.fromFdlTelegram(fdl, thisIsMaster = False)
  74. if DpTelegram_SlaveDiag_Req.checkType(dp):
  75. telegram = DpTelegram_SlaveDiag_Con(da = fdl.sa,
  76. sa = fdl.da)
  77. telegram.b1 |= DpTelegram_SlaveDiag_Con.B1_ONE
  78. self.__pollQueue.append(telegram.toFdlTelegram().getRawData())
  79. return
  80. if DpTelegram_SetPrm_Req.checkType(dp):
  81. telegram = FdlTelegram_ack()
  82. self.__pollQueue.append(telegram.getRawData())
  83. return
  84. if DpTelegram_ChkCfg_Req.checkType(dp):
  85. telegram = FdlTelegram_ack()
  86. self.__pollQueue.append(telegram.getRawData())
  87. return
  88. if DpTelegram_DataExchange_Req.checkType(dp):
  89. if self.__echoDX:
  90. du = bytearray([ d ^ 0xFF for d in dp.du ])
  91. if self.__echoDXSize is not None:
  92. if len(du) > self.__echoDXSize:
  93. du = du[ : self.__echoDXSize]
  94. if len(du) < self.__echoDXSize:
  95. du += bytearray(self.__echoDXSize - len(du))
  96. telegram = DpTelegram_DataExchange_Con(da = fdl.sa,
  97. sa = fdl.da,
  98. du = du)
  99. self.__pollQueue.append(telegram.toFdlTelegram().getRawData())
  100. else:
  101. telegram = FdlTelegram_ack()
  102. self.__pollQueue.append(telegram.getRawData())
  103. return
  104. self.__msg("Dropping SRD telegram: %s" % str(fdl))
  105. except ProfibusError as e:
  106. text = "SRD mock-send error: %s" % str(e)
  107. self.__msg(text)
  108. raise PhyError(text)