debug.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package eventstream
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "strconv"
  8. )
  9. type decodedMessage struct {
  10. rawMessage
  11. Headers decodedHeaders `json:"headers"`
  12. }
  13. type jsonMessage struct {
  14. Length json.Number `json:"total_length"`
  15. HeadersLen json.Number `json:"headers_length"`
  16. PreludeCRC json.Number `json:"prelude_crc"`
  17. Headers decodedHeaders `json:"headers"`
  18. Payload []byte `json:"payload"`
  19. CRC json.Number `json:"message_crc"`
  20. }
  21. func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) {
  22. var jsonMsg jsonMessage
  23. if err = json.Unmarshal(b, &jsonMsg); err != nil {
  24. return err
  25. }
  26. d.Length, err = numAsUint32(jsonMsg.Length)
  27. if err != nil {
  28. return err
  29. }
  30. d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen)
  31. if err != nil {
  32. return err
  33. }
  34. d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC)
  35. if err != nil {
  36. return err
  37. }
  38. d.Headers = jsonMsg.Headers
  39. d.Payload = jsonMsg.Payload
  40. d.CRC, err = numAsUint32(jsonMsg.CRC)
  41. if err != nil {
  42. return err
  43. }
  44. return nil
  45. }
  46. func (d *decodedMessage) MarshalJSON() ([]byte, error) {
  47. jsonMsg := jsonMessage{
  48. Length: json.Number(strconv.Itoa(int(d.Length))),
  49. HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))),
  50. PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))),
  51. Headers: d.Headers,
  52. Payload: d.Payload,
  53. CRC: json.Number(strconv.Itoa(int(d.CRC))),
  54. }
  55. return json.Marshal(jsonMsg)
  56. }
  57. func numAsUint32(n json.Number) (uint32, error) {
  58. v, err := n.Int64()
  59. if err != nil {
  60. return 0, fmt.Errorf("failed to get int64 json number, %v", err)
  61. }
  62. return uint32(v), nil
  63. }
  64. func (d decodedMessage) Message() Message {
  65. return Message{
  66. Headers: Headers(d.Headers),
  67. Payload: d.Payload,
  68. }
  69. }
  70. type decodedHeaders Headers
  71. func (hs *decodedHeaders) UnmarshalJSON(b []byte) error {
  72. var jsonHeaders []struct {
  73. Name string `json:"name"`
  74. Type valueType `json:"type"`
  75. Value interface{} `json:"value"`
  76. }
  77. decoder := json.NewDecoder(bytes.NewReader(b))
  78. decoder.UseNumber()
  79. if err := decoder.Decode(&jsonHeaders); err != nil {
  80. return err
  81. }
  82. var headers Headers
  83. for _, h := range jsonHeaders {
  84. value, err := valueFromType(h.Type, h.Value)
  85. if err != nil {
  86. return err
  87. }
  88. headers.Set(h.Name, value)
  89. }
  90. *hs = decodedHeaders(headers)
  91. return nil
  92. }
  93. func valueFromType(typ valueType, val interface{}) (Value, error) {
  94. switch typ {
  95. case trueValueType:
  96. return BoolValue(true), nil
  97. case falseValueType:
  98. return BoolValue(false), nil
  99. case int8ValueType:
  100. v, err := val.(json.Number).Int64()
  101. return Int8Value(int8(v)), err
  102. case int16ValueType:
  103. v, err := val.(json.Number).Int64()
  104. return Int16Value(int16(v)), err
  105. case int32ValueType:
  106. v, err := val.(json.Number).Int64()
  107. return Int32Value(int32(v)), err
  108. case int64ValueType:
  109. v, err := val.(json.Number).Int64()
  110. return Int64Value(v), err
  111. case bytesValueType:
  112. v, err := base64.StdEncoding.DecodeString(val.(string))
  113. return BytesValue(v), err
  114. case stringValueType:
  115. v, err := base64.StdEncoding.DecodeString(val.(string))
  116. return StringValue(string(v)), err
  117. case timestampValueType:
  118. v, err := val.(json.Number).Int64()
  119. return TimestampValue(timeFromEpochMilli(v)), err
  120. case uuidValueType:
  121. v, err := base64.StdEncoding.DecodeString(val.(string))
  122. var tv UUIDValue
  123. copy(tv[:], v)
  124. return tv, err
  125. default:
  126. panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val))
  127. }
  128. }