encode.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package eventstream
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/aws/smithy-go/logging"
  9. "hash"
  10. "hash/crc32"
  11. "io"
  12. )
  13. // EncoderOptions is the configuration options for Encoder.
  14. type EncoderOptions struct {
  15. Logger logging.Logger
  16. LogMessages bool
  17. }
  18. // Encoder provides EventStream message encoding.
  19. type Encoder struct {
  20. options EncoderOptions
  21. headersBuf *bytes.Buffer
  22. messageBuf *bytes.Buffer
  23. }
  24. // NewEncoder initializes and returns an Encoder to encode Event Stream
  25. // messages.
  26. func NewEncoder(optFns ...func(*EncoderOptions)) *Encoder {
  27. o := EncoderOptions{}
  28. for _, fn := range optFns {
  29. fn(&o)
  30. }
  31. return &Encoder{
  32. options: o,
  33. headersBuf: bytes.NewBuffer(nil),
  34. messageBuf: bytes.NewBuffer(nil),
  35. }
  36. }
  37. // Encode encodes a single EventStream message to the io.Writer the Encoder
  38. // was created with. An error is returned if writing the message fails.
  39. func (e *Encoder) Encode(w io.Writer, msg Message) (err error) {
  40. e.headersBuf.Reset()
  41. e.messageBuf.Reset()
  42. var writer io.Writer = e.messageBuf
  43. if e.options.Logger != nil && e.options.LogMessages {
  44. encodeMsgBuf := bytes.NewBuffer(nil)
  45. writer = io.MultiWriter(writer, encodeMsgBuf)
  46. defer func() {
  47. logMessageEncode(e.options.Logger, encodeMsgBuf, msg, err)
  48. }()
  49. }
  50. if err = EncodeHeaders(e.headersBuf, msg.Headers); err != nil {
  51. return err
  52. }
  53. crc := crc32.New(crc32IEEETable)
  54. hashWriter := io.MultiWriter(writer, crc)
  55. headersLen := uint32(e.headersBuf.Len())
  56. payloadLen := uint32(len(msg.Payload))
  57. if err = encodePrelude(hashWriter, crc, headersLen, payloadLen); err != nil {
  58. return err
  59. }
  60. if headersLen > 0 {
  61. if _, err = io.Copy(hashWriter, e.headersBuf); err != nil {
  62. return err
  63. }
  64. }
  65. if payloadLen > 0 {
  66. if _, err = hashWriter.Write(msg.Payload); err != nil {
  67. return err
  68. }
  69. }
  70. msgCRC := crc.Sum32()
  71. if err := binary.Write(writer, binary.BigEndian, msgCRC); err != nil {
  72. return err
  73. }
  74. _, err = io.Copy(w, e.messageBuf)
  75. return err
  76. }
  77. func logMessageEncode(logger logging.Logger, msgBuf *bytes.Buffer, msg Message, encodeErr error) {
  78. w := bytes.NewBuffer(nil)
  79. defer func() { logger.Logf(logging.Debug, w.String()) }()
  80. fmt.Fprintf(w, "Message to encode:\n")
  81. encoder := json.NewEncoder(w)
  82. if err := encoder.Encode(msg); err != nil {
  83. fmt.Fprintf(w, "Failed to get encoded message, %v\n", err)
  84. }
  85. if encodeErr != nil {
  86. fmt.Fprintf(w, "Encode error: %v\n", encodeErr)
  87. return
  88. }
  89. fmt.Fprintf(w, "Raw message:\n%s\n", hex.Dump(msgBuf.Bytes()))
  90. }
  91. func encodePrelude(w io.Writer, crc hash.Hash32, headersLen, payloadLen uint32) error {
  92. p := messagePrelude{
  93. Length: minMsgLen + headersLen + payloadLen,
  94. HeadersLen: headersLen,
  95. }
  96. if err := p.ValidateLens(); err != nil {
  97. return err
  98. }
  99. err := binaryWriteFields(w, binary.BigEndian,
  100. p.Length,
  101. p.HeadersLen,
  102. )
  103. if err != nil {
  104. return err
  105. }
  106. p.PreludeCRC = crc.Sum32()
  107. err = binary.Write(w, binary.BigEndian, p.PreludeCRC)
  108. if err != nil {
  109. return err
  110. }
  111. return nil
  112. }
  113. // EncodeHeaders writes the header values to the writer encoded in the event
  114. // stream format. Returns an error if a header fails to encode.
  115. func EncodeHeaders(w io.Writer, headers Headers) error {
  116. for _, h := range headers {
  117. hn := headerName{
  118. Len: uint8(len(h.Name)),
  119. }
  120. copy(hn.Name[:hn.Len], h.Name)
  121. if err := hn.encode(w); err != nil {
  122. return err
  123. }
  124. if err := h.Value.encode(w); err != nil {
  125. return err
  126. }
  127. }
  128. return nil
  129. }
  130. func binaryWriteFields(w io.Writer, order binary.ByteOrder, vs ...interface{}) error {
  131. for _, v := range vs {
  132. if err := binary.Write(w, order, v); err != nil {
  133. return err
  134. }
  135. }
  136. return nil
  137. }