message.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package networkdb
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "github.com/hashicorp/go-msgpack/codec"
  7. )
  8. type messageType uint8
  9. const (
  10. // For network join/leave event message
  11. networkEventMsg messageType = 1 + iota
  12. // For pushing/pulling network/node association state
  13. networkPushPullMsg
  14. // For table entry CRUD event message
  15. tableEventMsg
  16. // For building a compound message which packs many different
  17. // message types together
  18. compoundMsg
  19. // For syncing table entries in bulk b/w nodes.
  20. bulkSyncMsg
  21. )
  22. const (
  23. // Max udp message size chosen to avoid network packet
  24. // fragmentation.
  25. udpSendBuf = 1400
  26. // Compound message header overhead 1 byte(message type) + 4
  27. // bytes (num messages)
  28. compoundHeaderOverhead = 5
  29. // Overhead for each embedded message in a compound message 2
  30. // bytes (len of embedded message)
  31. compoundOverhead = 2
  32. )
  33. func decodeMessage(buf []byte, out interface{}) error {
  34. var handle codec.MsgpackHandle
  35. return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
  36. }
  37. func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
  38. buf := bytes.NewBuffer(nil)
  39. buf.WriteByte(uint8(t))
  40. handle := codec.MsgpackHandle{}
  41. encoder := codec.NewEncoder(buf, &handle)
  42. err := encoder.Encode(msg)
  43. return buf.Bytes(), err
  44. }
  45. // makeCompoundMessage takes a list of messages and generates
  46. // a single compound message containing all of them
  47. func makeCompoundMessage(msgs [][]byte) *bytes.Buffer {
  48. // Create a local buffer
  49. buf := bytes.NewBuffer(nil)
  50. // Write out the type
  51. buf.WriteByte(uint8(compoundMsg))
  52. // Write out the number of message
  53. binary.Write(buf, binary.BigEndian, uint32(len(msgs)))
  54. // Add the message lengths
  55. for _, m := range msgs {
  56. binary.Write(buf, binary.BigEndian, uint16(len(m)))
  57. }
  58. // Append the messages
  59. for _, m := range msgs {
  60. buf.Write(m)
  61. }
  62. return buf
  63. }
  64. // decodeCompoundMessage splits a compound message and returns
  65. // the slices of individual messages. Also returns the number
  66. // of truncated messages and any potential error
  67. func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
  68. if len(buf) < 1 {
  69. err = fmt.Errorf("missing compound length byte")
  70. return
  71. }
  72. numParts := binary.BigEndian.Uint32(buf[0:4])
  73. buf = buf[4:]
  74. // Check we have enough bytes
  75. if len(buf) < int(numParts*2) {
  76. err = fmt.Errorf("truncated len slice")
  77. return
  78. }
  79. // Decode the lengths
  80. lengths := make([]uint16, numParts)
  81. for i := 0; i < int(numParts); i++ {
  82. lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
  83. }
  84. buf = buf[numParts*2:]
  85. // Split each message
  86. for idx, msgLen := range lengths {
  87. if len(buf) < int(msgLen) {
  88. trunc = int(numParts) - idx
  89. return
  90. }
  91. // Extract the slice, seek past on the buffer
  92. slice := buf[:msgLen]
  93. buf = buf[msgLen:]
  94. parts = append(parts, slice)
  95. }
  96. return
  97. }