123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- # -*- coding: utf-8 -*-
- #
- # PROFIBUS - Layer 2 - Fieldbus Data Link (FDL)
- #
- # Copyright (c) 2013-2020 Michael Buesch <m@bues.ch>
- #
- # Licensed under the terms of the GNU General Public License version 2,
- # or (at your option) any later version.
- #
- from __future__ import division, absolute_import, print_function, unicode_literals
- from pyprofibus.compat import *
- from pyprofibus.phy import *
- from pyprofibus.util import *
- __all__ = [
- "FdlError",
- "FdlFCB",
- "FdlTransceiver",
- "FdlTelegram",
- "FdlTelegram_var",
- "FdlTelegram_stat8",
- "FdlTelegram_stat0",
- "FdlTelegram_token",
- "FdlTelegram_ack",
- "FdlTelegram_FdlStat_Req",
- "FdlTelegram_FdlStat_Con",
- "FdlTelegram_Ident_Req",
- "FdlTelegram_Lsap_Req",
- ]
- class FdlError(ProfibusError):
- __slots__ = (
- )
- class FdlFCB():
- """FCB context, per slave.
- """
- __slots__ = (
- "__fcb",
- "__fcv",
- "__fcbWaitingReply",
- "__fcbEnabled",
- )
- def __init__(self, enable = False):
- self.resetFCB()
- self.enableFCB(enable)
- def resetFCB(self):
- self.__fcb = 1
- self.__fcv = 0
- self.__fcbWaitingReply = False
- def enableFCB(self, enabled = True):
- self.__fcbEnabled = bool(enabled)
- def FCBnext(self):
- self.__fcb ^= 1
- self.__fcv = 1
- self.__fcbWaitingReply = False
- def enabled(self):
- return self.__fcbEnabled
- def bitIsOn(self):
- return self.__fcb != 0
- def bitIsValid(self):
- return self.__fcv != 0
- def setWaitingReply(self):
- self.__fcbWaitingReply = True
- def handleReply(self):
- if self.__fcbWaitingReply:
- self.FCBnext()
- def __repr__(self):
- return ("FdlFCB(en=%s, fcb=%d, fcv=%d, wait=%s)" % (
- str(self.__fcbEnabled),
- self.__fcb,
- self.__fcv,
- str(self.__fcbWaitingReply)))
- class FdlTransceiver(object):
- __slots__ = (
- "phy",
- "__rxFilter",
- )
- def __init__(self, phy):
- self.phy = phy
- self.setRXFilter(None)
- def setRXFilter(self, newFilter):
- if newFilter is None:
- newFilter = range(0, FdlTelegram.ADDRESS_MASK + 1)
- self.__rxFilter = set(newFilter)
- def __checkRXFilter(self, telegram):
- if telegram.da is None:
- # Accept telegrams without DA field.
- return True
- # Accept the packet, if it's in the RX filter.
- return (telegram.da & FdlTelegram.ADDRESS_MASK) in self.__rxFilter
- def poll(self, timeout=0.0):
- ok, telegram = False, None
- reply = self.phy.poll(timeout)
- if reply is not None:
- telegram = FdlTelegram.fromRawData(reply)
- if self.__checkRXFilter(telegram):
- ok = True
- return (ok, telegram)
- # Send an FdlTelegram.
- def send(self, fcb, telegram):
- srd = False
- if telegram.fc & FdlTelegram.FC_REQ:
- func = telegram.fc & FdlTelegram.FC_REQFUNC_MASK
- srd = func in (FdlTelegram.FC_SRD_LO,
- FdlTelegram.FC_SRD_HI,
- FdlTelegram.FC_SDA_LO,
- FdlTelegram.FC_SDA_HI,
- FdlTelegram.FC_DDB,
- FdlTelegram.FC_FDL_STAT,
- FdlTelegram.FC_IDENT,
- FdlTelegram.FC_LSAP)
- telegram.fc &= ~(FdlTelegram.FC_FCB | FdlTelegram.FC_FCV)
- if fcb.enabled():
- if fcb.bitIsOn():
- telegram.fc |= FdlTelegram.FC_FCB
- if fcb.bitIsValid():
- telegram.fc |= FdlTelegram.FC_FCV
- if srd:
- fcb.setWaitingReply()
- else:
- fcb.FCBnext()
- self.phy.send(telegram, srd)
- class FdlTelegram(object):
- # Start delimiter
- SD1 = 0x10 # No DU
- SD2 = 0x68 # Variable DU
- SD3 = 0xA2 # 8 octet fixed DU
- SD4 = 0xDC # Token telegram
- SC = 0xE5 # Short ACK
- # End delimiter
- ED = 0x16
- # Addresses
- ADDRESS_MASK = 0x7F # Address value mask
- ADDRESS_EXT = 0x80 # DAE/SAE present
- ADDRESS_MCAST = 127 # Multicast/broadcast address
- # DAE/SAE (Address extension)
- AE_EXT = 0x80 # Further extensions present
- AE_SEGMENT = 0x40 # Segment address
- AE_ADDRESS = 0x3F # Address extension number
- # Frame Control
- FC_REQ = 0x40 # Request
- # Request Frame Control function codes (FC_REQ set)
- FC_REQFUNC_MASK = 0x0F
- FC_TIME_EV = 0x00 # Time event
- FC_SDA_LO = 0x03 # SDA low prio
- FC_SDN_LO = 0x04 # SDN low prio
- FC_SDA_HI = 0x05 # SDA high prio
- FC_SDN_HI = 0x06 # SDN high prio
- FC_DDB = 0x07 # Req. diagnosis data
- FC_FDL_STAT = 0x09 # Req. FDL status
- FC_TE = 0x0A # Actual time event
- FC_CE = 0x0B # Actual counter event
- FC_SRD_LO = 0x0C # SRD low prio
- FC_SRD_HI = 0x0D # SRD high prio
- FC_IDENT = 0x0E # Req. ident
- FC_LSAP = 0x0F # Req. LSAP status
- # Frame Control Frame Count Bit (FC_REQ set)
- FC_FCV = 0x10 # Frame Count Bit valid
- FC_FCB = 0x20 # Frame Count Bit
- # Response Frame Control function codes (FC_REQ clear)
- FC_RESFUNC_MASK = 0x0F
- FC_OK = 0x00 # Positive ACK
- FC_UE = 0x01 # User error
- FC_RR = 0x02 # Resource error
- FC_RS = 0x03 # No service activated
- FC_DL = 0x08 # Res. data low
- FC_NR = 0x09 # ACK negative
- FC_DH = 0x0A # Res. data high
- FC_RDL = 0x0C # Res. data low, resource error
- FC_RDH = 0x0D # Res. data high, resource error
- # Response Frame Control Station Type (FC_REQ clear)
- FC_STYPE_MASK = 0x30
- FC_SLAVE = 0x00 # Slave station
- FC_MNRDY = 0x10 # Master, not ready to enter token ring
- FC_MRDY = 0x20 # Master, ready to enter token ring
- FC_MTR = 0x30 # Master, in token ring
- # Delimiter to size converstion table.
- delim2size = {
- SD1 : 6,
- SD3 : 14,
- SD4 : 3,
- SC : 1,
- }
- __slots__ = (
- "sd",
- "haveLE",
- "da",
- "sa",
- "fc",
- "dae",
- "sae",
- "du",
- "haveFCS",
- "ed",
- )
- @classmethod
- def getSizeFromRaw(cls, data):
- dataLen = len(data)
- if dataLen < 1:
- return -1 # Telegram too short.
- sd = data[0]
- if sd in cls.delim2size:
- return cls.delim2size[sd]
- if sd == cls.SD2:
- if dataLen < 3:
- return -1 # Telegram too short.
- le = data[1]
- if data[2] != le:
- return -1 # Repeated length field mismatch.
- if le < 3 or le > 249:
- return -1 # Invalid length field.
- return le + 6
- return -1 # Unknown start delimiter.
- def __init__(self, sd, haveLE=False, da=None, sa=None,
- fc=None, dae=b"", sae=b"", du=None,
- haveFCS=False, ed=None):
- self.sd = sd
- self.haveLE = haveLE
- self.da = (da & FdlTelegram.ADDRESS_MASK) if da is not None else None
- self.sa = (sa & FdlTelegram.ADDRESS_MASK) if sa is not None else None
- self.fc = fc
- self.dae = dae
- self.sae = sae
- self.du = du
- self.haveFCS = haveFCS
- self.ed = ed
- if self.haveLE:
- assert(self.du is not None)
- def __repr__(self):
- def sdVal(val):
- try:
- return {
- FdlTelegram.SD1 : "SD1",
- FdlTelegram.SD2 : "SD2",
- FdlTelegram.SD3 : "SD3",
- FdlTelegram.SD4 : "SD4",
- FdlTelegram.SC : "SC",
- }[val]
- except KeyError:
- return intToHex(val)
- return ("FdlTelegram(sd=%s, haveLE=%s, da=%s, sa=%s, "
- "fc=%s, dae=%s, sae=%s, du=%s, haveFCS=%s, ed=%s)" % (
- sdVal(self.sd),
- boolToStr(self.haveLE),
- intToHex(self.da),
- intToHex(self.sa),
- intToHex(self.fc),
- bytesToHex(self.dae),
- bytesToHex(self.sae),
- bytesToHex(self.du),
- boolToStr(self.haveFCS),
- intToHex(self.ed)))
- # Get real length of DU field
- def getRealDuLen(self):
- return len(self.du) + len(self.dae) + len(self.sae)
- @staticmethod
- def calcFCS(data):
- return sum(data) & 0xFF
- def getRawData(self):
- data = bytearray()
- if self.haveLE:
- le = 3 + self.getRealDuLen()
- data.append(self.sd)
- data.append(le)
- data.append(le)
- data.append(self.sd)
- if self.da is not None:
- data.append((self.da | FdlTelegram.ADDRESS_EXT) if self.dae
- else self.da)
- if self.sa is not None:
- data.append((self.sa | FdlTelegram.ADDRESS_EXT) if self.sae
- else self.sa)
- if self.fc is not None:
- data.append(self.fc)
- assert isinstance(self.dae, (bytes, bytearray))
- data.extend(self.dae)
- assert isinstance(self.sae, (bytes, bytearray))
- data.extend(self.sae)
- if self.du is not None:
- assert isinstance(self.du, (bytes, bytearray))
- data.extend(self.du)
- if self.haveFCS:
- if self.haveLE:
- fcs = self.calcFCS(data[4:])
- else:
- fcs = self.calcFCS(data[1:])
- data.append(fcs)
- if self.ed is not None:
- data.append(self.ed)
- return data
- # Extract address extension bytes from DU
- @staticmethod
- def __duExtractAe(du):
- ae = bytearray()
- while 1:
- if not du:
- raise FdlError("Address extension error: Data too short")
- aeByte = du[0]
- ae.append(aeByte)
- du = du[1:]
- if not aeByte & FdlTelegram.AE_EXT:
- break
- return (du, ae)
- @staticmethod
- def fromRawData(data):
- error = False
- try:
- sd = data[0]
- if sd == FdlTelegram.SD1:
- # No DU
- if len(data) != 6:
- raise FdlError("Invalid FDL packet length")
- if data[5] != FdlTelegram.ED:
- raise FdlError("Invalid end delimiter")
- if data[4] != FdlTelegram.calcFCS(data[1:4]):
- raise FdlError("Checksum mismatch")
- return FdlTelegram_stat0(
- da=data[1], sa=data[2], fc=data[3])
- elif sd == FdlTelegram.SD2:
- # Variable DU
- le = data[1]
- if data[2] != le:
- raise FdlError("Repeated length field mismatch")
- if le < 3 or le > 249:
- raise FdlError("Invalid LE field")
- if data[3] != sd:
- raise FdlError("Repeated SD mismatch")
- if data[5+le] != FdlTelegram.ED:
- raise FdlError("Invalid end delimiter")
- if data[4+le] != FdlTelegram.calcFCS(data[4:4+le]):
- raise FdlError("Checksum mismatch")
- du = data[7:7+(le-3)]
- if len(du) != le - 3:
- raise FdlError("FDL packet shorter than FE")
- da, sa, dae, sae = data[4], data[5], b"", b""
- if da & FdlTelegram.ADDRESS_EXT:
- du, dae = FdlTelegram.__duExtractAe(du)
- if sa & FdlTelegram.ADDRESS_EXT:
- du, sae = FdlTelegram.__duExtractAe(du)
- return FdlTelegram_var(
- da=da, sa=sa, fc=data[6], dae=dae, sae=sae, du=du)
- elif sd == FdlTelegram.SD3:
- # Static 8 byte DU
- if len(data) != 14:
- raise FdlError("Invalid FDL packet length")
- if data[13] != FdlTelegram.ED:
- raise FdlError("Invalid end delimiter")
- if data[12] != FdlTelegram.calcFCS(data[1:12]):
- raise FdlError("Checksum mismatch")
- du = data[4:12]
- da, sa, dae, sae = data[1], data[2], b"", b""
- if da & FdlTelegram.ADDRESS_EXT:
- du, dae = FdlTelegram.__duExtractAe(du)
- if sa & FdlTelegram.ADDRESS_EXT:
- du, sae = FdlTelegram.__duExtractAe(du)
- return FdlTelegram_stat8(
- da=da, sa=sa, fc=data[3], dae=dae, sae=sae, du=du)
- elif sd == FdlTelegram.SD4:
- # Token telegram
- if len(data) != 3:
- raise FdlError("Invalid FDL packet length")
- return FdlTelegram_token(
- da=data[1], sa=data[2])
- elif sd == FdlTelegram.SC:
- # ACK
- if len(data) != 1:
- raise FdlError("Invalid FDL packet length")
- return FdlTelegram_ack()
- else:
- raise FdlError("Invalid start delimiter")
- except IndexError:
- error = True
- if error:
- raise FdlError("Invalid FDL packet format")
- @classmethod
- def checkType(cls, telegram):
- return isinstance(telegram, cls)
- class FdlTelegram_var(FdlTelegram):
- __slots__ = (
- )
- def __init__(self, da, sa, fc, dae, sae, du):
- FdlTelegram.__init__(self, sd=FdlTelegram.SD2,
- haveLE=True, da=da, sa=sa, fc=fc,
- dae=dae, sae=sae, du=du,
- haveFCS=True, ed=FdlTelegram.ED)
- if self.getRealDuLen() > 246:
- raise FdlError("Invalid data length (> 246)")
- class FdlTelegram_stat8(FdlTelegram):
- __slots__ = (
- )
- def __init__(self, da, sa, fc, dae, sae, du):
- FdlTelegram.__init__(self, sd=FdlTelegram.SD3,
- da=da, sa=sa, fc=fc,
- dae=dae, sae=sae, du=du,
- haveFCS=True, ed=FdlTelegram.ED)
- if self.getRealDuLen() != 8:
- raise FdlError("Invalid data length (!= 8)")
- class FdlTelegram_stat0(FdlTelegram):
- __slots__ = (
- )
- def __init__(self, da, sa, fc):
- FdlTelegram.__init__(self, sd=FdlTelegram.SD1,
- da=da, sa=sa, fc=fc,
- haveFCS=True, ed=FdlTelegram.ED)
- class FdlTelegram_token(FdlTelegram):
- __slots__ = (
- )
- def __init__(self, da, sa):
- FdlTelegram.__init__(self, sd=FdlTelegram.SD4,
- da=da, sa=sa)
- class FdlTelegram_ack(FdlTelegram):
- __slots__ = (
- )
- def __init__(self):
- FdlTelegram.__init__(self, sd=FdlTelegram.SC)
- class FdlTelegram_FdlStat_Req(FdlTelegram_stat0):
- __slots__ = (
- )
- def __init__(self, da, sa):
- FdlTelegram_stat0.__init__(self, da=da, sa=sa,
- fc=FdlTelegram.FC_REQ |\
- FdlTelegram.FC_FDL_STAT)
- class FdlTelegram_FdlStat_Con(FdlTelegram_stat0):
- __slots__ = (
- )
- def __init__(self, da, sa,
- fc=FdlTelegram.FC_OK |
- FdlTelegram.FC_SLAVE):
- FdlTelegram_stat0.__init__(self, da=da, sa=sa, fc=fc)
- class FdlTelegram_Ident_Req(FdlTelegram_stat0):
- __slots__ = (
- )
- def __init__(self, da, sa):
- FdlTelegram_stat0.__init__(self, da=da, sa=sa,
- fc=FdlTelegram.FC_REQ |\
- FdlTelegram.FC_IDENT)
- class FdlTelegram_Lsap_Req(FdlTelegram_stat0):
- __slots__ = (
- )
- def __init__(self, da, sa):
- FdlTelegram_stat0.__init__(self, da=da, sa=sa,
- fc=FdlTelegram.FC_REQ |\
- FdlTelegram.FC_LSAP)
|