message.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package rpc
  2. import (
  3. "io"
  4. "fmt"
  5. "bytes"
  6. "errors"
  7. "strings"
  8. "strconv"
  9. "encoding/json"
  10. )
  11. const MsgKindWidth = 8
  12. const MsgCallIdWidth = 16
  13. const MsgPayloadLengthWidth = 8
  14. const MsgPayloadLengthMax = 99999999
  15. const MsgPlainErrorPrefix = '}'
  16. const MSG_SERVICE = "service"
  17. const MSG_CREATED = "created"
  18. const MSG_CALL = "call"
  19. const MSG_CALL_MULTI = "call*"
  20. const MSG_VALUE = "value"
  21. const MSG_ERROR = "error"
  22. const MSG_COMPLETE = "complete"
  23. type ErrorWithExtraData struct {
  24. Desc string `json:"desc"`
  25. Data map[string] string `json:"data"`
  26. }
  27. func (e *ErrorWithExtraData) Error() string {
  28. return e.Desc
  29. }
  30. func (e *ErrorWithExtraData) Serialize() ([] byte) {
  31. var content, err = json.Marshal(e)
  32. if err != nil { panic(err) }
  33. return content
  34. }
  35. func writeMessageHeaderField(content string, width int, w io.Writer) error {
  36. if len(content) > width {
  37. panic(fmt.Sprintf("field content width exceeded maximum (%d)", width))
  38. }
  39. var buf = make([] byte, width)
  40. for i := 0; i < width; i += 1 {
  41. if i < len(content) {
  42. buf[i] = content[i]
  43. } else {
  44. buf[i] = ' '
  45. }
  46. }
  47. _, err := w.Write(buf)
  48. return err
  49. }
  50. func readMessageHeaderField(width int, r io.Reader) (string, error) {
  51. var buf = make([] byte, width)
  52. _, err := io.ReadFull(r, buf)
  53. if err != nil { return "", err }
  54. var raw_str = string(buf)
  55. var str = strings.TrimRight(raw_str, " ")
  56. return str, nil
  57. }
  58. func sendMessage(kind string, id uint64, payload ([] byte), conn io.Writer) error {
  59. if len(payload) > MsgPayloadLengthMax {
  60. return fmt.Errorf("message payload length exceeded maximum (%d)", MsgPayloadLengthMax)
  61. }
  62. var buf bytes.Buffer
  63. err := writeMessageHeaderField(kind, MsgKindWidth, &buf)
  64. if err != nil { return err }
  65. id_string := strconv.FormatUint(id, 16)
  66. err = writeMessageHeaderField(id_string, MsgCallIdWidth, &buf)
  67. if err != nil { return err }
  68. length := strconv.Itoa(len(payload))
  69. err = writeMessageHeaderField(length, MsgPayloadLengthWidth, &buf)
  70. if err != nil { return err }
  71. _, err = buf.Write(payload)
  72. if err != nil { return err }
  73. _, err = conn.Write(buf.Bytes())
  74. if err != nil { return err }
  75. return nil
  76. }
  77. func receiveMessage(conn io.Reader) (string, uint64, ([] byte), error) {
  78. kind, err := readMessageHeaderField(MsgKindWidth, conn)
  79. if err != nil { return "", ^uint64(0), nil, err }
  80. id_string, err := readMessageHeaderField(MsgCallIdWidth, conn)
  81. if err != nil { return kind, ^uint64(0), nil, err }
  82. id, err := strconv.ParseUint(id_string, 16, 64)
  83. if err != nil { return kind, ^uint64(0), nil, fmt.Errorf("invalid call id: %w", err) }
  84. length_string, err := readMessageHeaderField(MsgPayloadLengthWidth, conn)
  85. if err != nil { return kind, id, nil, err }
  86. length, err := strconv.Atoi(length_string)
  87. if err != nil { return kind, id, nil, fmt.Errorf("invalid payload length: %w", err) }
  88. buf := make([] byte, length)
  89. _, err = io.ReadFull(conn, buf)
  90. if err != nil { return kind, id, nil, err }
  91. return kind, id, buf, nil
  92. }
  93. func sendError(e error, id uint64, conn io.Writer) error {
  94. var bin ([] byte)
  95. var e_with_extra, with_extra = e.(*ErrorWithExtraData)
  96. if with_extra {
  97. bin = e_with_extra.Serialize()
  98. }
  99. const max = MsgPayloadLengthMax
  100. var size_with_extra = len(bin)
  101. if size_with_extra == 0 || // e is NOT of type *ErrorWithExtraData
  102. size_with_extra > max { // or maximum payload size exceeded
  103. var desc = e.Error()
  104. var str = (string([] rune { MsgPlainErrorPrefix }) + desc)
  105. bin = ([] byte)(str)
  106. if len(bin) > max {
  107. bin = bin[:max]
  108. }
  109. }
  110. return sendMessage(MSG_ERROR, id, ([] byte)(bin), conn)
  111. }
  112. func deserializeError(payload ([] byte)) error {
  113. var e ErrorWithExtraData
  114. var unmarshal_err = json.Unmarshal(payload, &e)
  115. if unmarshal_err == nil {
  116. return &e
  117. } else {
  118. var str = string(payload)
  119. if strings.HasPrefix(str, string([] rune { MsgPlainErrorPrefix })) {
  120. return errors.New(str[1:])
  121. } else {
  122. return errors.New("unknown error (invalid payload format)")
  123. }
  124. }
  125. }