object.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package rpc
  2. import (
  3. "io"
  4. "bytes"
  5. "errors"
  6. "compress/gzip"
  7. "encoding/binary"
  8. "kumachan/standalone/rpc/kmd"
  9. )
  10. type StreamKmdApi interface {
  11. SerializeToStream(v kmd.Object, t *kmd.Type, stream io.Writer) error
  12. DeserializeFromStream(t *kmd.Type, stream io.Reader) (kmd.Object, error)
  13. }
  14. func receiveObject(t *kmd.Type, conn io.Reader, limit uint, api StreamKmdApi) (kmd.Object, error) {
  15. var length uint64
  16. err := binary.Read(conn, binary.BigEndian, &length)
  17. if err != nil { return nil, err }
  18. if limit != 0 && length > uint64(limit) {
  19. return nil, errors.New("object size limit exceeded")
  20. }
  21. var buf = make([] byte, length)
  22. _, err = io.ReadFull(conn, buf)
  23. if err != nil { return nil, err }
  24. var decompressed, gz_err = gzip.NewReader(bytes.NewReader(buf))
  25. if gz_err != nil { panic(gz_err) }
  26. return api.DeserializeFromStream(t, decompressed)
  27. }
  28. func sendObject(obj kmd.Object, t *kmd.Type, conn io.Writer, api StreamKmdApi) error {
  29. var buf bytes.Buffer
  30. var compressed = gzip.NewWriter(&buf)
  31. err := api.SerializeToStream(obj, t, compressed)
  32. if err != nil { return err }
  33. err = compressed.Close()
  34. if err != nil { return err }
  35. var bin = buf.Bytes()
  36. err = binary.Write(conn, binary.BigEndian, uint64(len(bin)))
  37. if err != nil { return err }
  38. _, err = conn.Write(bin)
  39. return err
  40. }