phy_fpga.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # -*- coding: utf-8 -*-
  2. #
  3. # PROFIBUS DP - Communication Processor PHY access library
  4. #
  5. # Copyright (c) 2019 Michael Buesch <m@bues.ch>
  6. #
  7. # Licensed under the terms of the GNU General Public License version 2,
  8. # or (at your option) any later version.
  9. #
  10. from __future__ import division, absolute_import, print_function, unicode_literals
  11. from pyprofibus.compat import *
  12. from pyprofibus.phy import *
  13. from pyprofibus.fdl import *
  14. from pyprofibus.dp import *
  15. from pyprofibus.phy_fpga_driver import *
  16. from collections import deque
  17. __all__ = [
  18. "CpPhyFPGA",
  19. ]
  20. class CpPhyFPGA(CpPhy):
  21. """FPGA based PROFIBUS CP PHYsical layer
  22. """
  23. PFX = "PHY-fpga: "
  24. def __init__(self, spiBus, spiCS, spiSpeedHz, *args, **kwargs):
  25. super(CpPhyFPGA, self).__init__(*args, **kwargs)
  26. self.__rxDeque = deque()
  27. self.__driver = None
  28. self.__spiBus = spiBus
  29. self.__spiCS = spiCS
  30. self.__spiSpeedHz = spiSpeedHz
  31. def close(self):
  32. """Close the PHY device.
  33. """
  34. if self.__driver is not None:
  35. try:
  36. self.__driver.shutdown()
  37. except FpgaPhyError as e:
  38. pass
  39. self.__rxDeque.clear()
  40. self.__driver = None
  41. super(CpPhyFPGA, self).close()
  42. def __tryRestartDriver(self, exception):
  43. try:
  44. self._debugMsg("Driver exception: %s" % str(exception))
  45. if self.__driver is not None:
  46. self.__driver.restart()
  47. except FpgaPhyError as e:
  48. self._debugMsg("Error recovery restart failed: %s" % (
  49. str(e)))
  50. def sendData(self, telegramData, srd):
  51. """Send data to the physical line.
  52. """
  53. if self.__driver is None:
  54. return
  55. if self.debug:
  56. self._debugMsg("TX %s" % bytesToHex(telegramData))
  57. try:
  58. self.__driver.telegramSend(telegramData)
  59. except FpgaPhyError as e:
  60. self.__tryRestartDriver(e)
  61. def pollData(self, timeout=0.0):
  62. """Poll received data from the physical line.
  63. timeout => timeout in seconds.
  64. 0 = no timeout, return immediately.
  65. negative = unlimited.
  66. """
  67. if self.__driver is None:
  68. return None
  69. telegramData = None
  70. try:
  71. if self.__rxDeque:
  72. telegramData = self.__rxDeque.popleft()
  73. else:
  74. timeoutStamp = monotonic_time() + timeout#TODO
  75. telegramDataList = self.__driver.telegramReceive()
  76. count = len(telegramDataList)
  77. if count >= 1:
  78. telegramData = telegramDataList[0]
  79. if count >= 2:
  80. self.__rxDeque.extend(telegramDataList[1:])
  81. except FpgaPhyError as e:
  82. self.__tryRestartDriver(e)
  83. telegramData = None
  84. if self.debug and telegramData:
  85. self._debugMsg("RX %s" % bytesToHex(telegramData))
  86. return telegramData
  87. def setConfig(self, baudrate=CpPhy.BAUD_9600, *args, **kwargs):
  88. super(CpPhyFPGA, self).setConfig(baudrate=baudrate, *args, **kwargs)
  89. self.close()
  90. try:
  91. self.__driver = FpgaPhyDriver(spiDev=self.__spiBus,
  92. spiChipSelect=self.__spiCS,
  93. spiSpeedHz=self.__spiSpeedHz)
  94. self.__driver.setBaudRate(baudrate)
  95. except FpgaPhyError as e:
  96. raise PhyError(self.PFX + ("Failed to setup driver:\n%s" % str(e)))