postback.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package postback
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "time"
  10. "github.com/knadh/listmonk/internal/messenger"
  11. "github.com/knadh/listmonk/models"
  12. )
  13. // postback is the payload that's posted as JSON to the HTTP Postback server.
  14. //easyjson:json
  15. type postback struct {
  16. Subject string `json:"subject"`
  17. ContentType string `json:"content_type"`
  18. Body string `json:"body"`
  19. Recipients []recipient `json:"recipients"`
  20. Campaign *campaign `json:"campaign"`
  21. }
  22. type campaign struct {
  23. UUID string `db:"uuid" json:"uuid"`
  24. Name string `db:"name" json:"name"`
  25. Tags []string `db:"tags" json:"tags"`
  26. }
  27. type recipient struct {
  28. UUID string `db:"uuid" json:"uuid"`
  29. Email string `db:"email" json:"email"`
  30. Name string `db:"name" json:"name"`
  31. Attribs models.SubscriberAttribs `db:"attribs" json:"attribs"`
  32. Status string `db:"status" json:"status"`
  33. }
  34. // Options represents HTTP Postback server options.
  35. type Options struct {
  36. Name string `json:"name"`
  37. Username string `json:"username"`
  38. Password string `json:"password"`
  39. RootURL string `json:"root_url"`
  40. MaxConns int `json:"max_conns"`
  41. Retries int `json:"retries"`
  42. Timeout time.Duration `json:"timeout"`
  43. }
  44. // Postback represents an HTTP Message server.
  45. type Postback struct {
  46. authStr string
  47. o Options
  48. c *http.Client
  49. }
  50. // New returns a new instance of the HTTP Postback messenger.
  51. func New(o Options) (*Postback, error) {
  52. authStr := ""
  53. if o.Username != "" && o.Password != "" {
  54. authStr = fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(
  55. []byte(o.Username+":"+o.Password)))
  56. }
  57. return &Postback{
  58. authStr: authStr,
  59. o: o,
  60. c: &http.Client{
  61. Timeout: o.Timeout,
  62. Transport: &http.Transport{
  63. MaxIdleConnsPerHost: o.MaxConns,
  64. MaxConnsPerHost: o.MaxConns,
  65. ResponseHeaderTimeout: o.Timeout,
  66. IdleConnTimeout: o.Timeout,
  67. },
  68. },
  69. }, nil
  70. }
  71. // Name returns the messenger's name.
  72. func (p *Postback) Name() string {
  73. return p.o.Name
  74. }
  75. // Push pushes a message to the server.
  76. func (p *Postback) Push(m messenger.Message) error {
  77. pb := postback{
  78. Subject: m.Subject,
  79. ContentType: m.ContentType,
  80. Body: string(m.Body),
  81. Recipients: []recipient{{
  82. UUID: m.Subscriber.UUID,
  83. Email: m.Subscriber.Email,
  84. Name: m.Subscriber.Name,
  85. Status: m.Subscriber.Status,
  86. Attribs: m.Subscriber.Attribs,
  87. }},
  88. }
  89. if m.Campaign != nil {
  90. pb.Campaign = &campaign{
  91. UUID: m.Campaign.UUID,
  92. Name: m.Campaign.Name,
  93. Tags: m.Campaign.Tags,
  94. }
  95. }
  96. b, err := pb.MarshalJSON()
  97. if err != nil {
  98. return err
  99. }
  100. return p.exec(http.MethodPost, p.o.RootURL, b, nil)
  101. }
  102. // Flush flushes the message queue to the server.
  103. func (p *Postback) Flush() error {
  104. return nil
  105. }
  106. // Close closes idle HTTP connections.
  107. func (p *Postback) Close() error {
  108. p.c.CloseIdleConnections()
  109. return nil
  110. }
  111. func (p *Postback) exec(method, rURL string, reqBody []byte, headers http.Header) error {
  112. var (
  113. err error
  114. postBody io.Reader
  115. )
  116. // Encode POST / PUT params.
  117. if method == http.MethodPost || method == http.MethodPut {
  118. postBody = bytes.NewReader(reqBody)
  119. }
  120. req, err := http.NewRequest(method, rURL, postBody)
  121. if err != nil {
  122. return err
  123. }
  124. if headers != nil {
  125. req.Header = headers
  126. } else {
  127. req.Header = http.Header{}
  128. }
  129. req.Header.Set("User-Agent", "listmonk")
  130. // Optional BasicAuth.
  131. if p.authStr != "" {
  132. req.Header.Set("Authorization", p.authStr)
  133. }
  134. // If a content-type isn't set, set the default one.
  135. if req.Header.Get("Content-Type") == "" {
  136. if method == http.MethodPost || method == http.MethodPut {
  137. req.Header.Add("Content-Type", "application/json")
  138. }
  139. }
  140. // If the request method is GET or DELETE, add the params as QueryString.
  141. if method == http.MethodGet || method == http.MethodDelete {
  142. req.URL.RawQuery = string(reqBody)
  143. }
  144. r, err := p.c.Do(req)
  145. if err != nil {
  146. return err
  147. }
  148. defer func() {
  149. // Drain and close the body to let the Transport reuse the connection
  150. io.Copy(ioutil.Discard, r.Body)
  151. r.Body.Close()
  152. }()
  153. if r.StatusCode != http.StatusOK {
  154. return fmt.Errorf("non-OK response from Postback server: %d", r.StatusCode)
  155. }
  156. return nil
  157. }