123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package postback
- import (
- "bytes"
- "encoding/base64"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "time"
- "github.com/knadh/listmonk/internal/messenger"
- "github.com/knadh/listmonk/models"
- )
- // postback is the payload that's posted as JSON to the HTTP Postback server.
- //easyjson:json
- type postback struct {
- Subject string `json:"subject"`
- ContentType string `json:"content_type"`
- Body string `json:"body"`
- Recipients []recipient `json:"recipients"`
- Campaign *campaign `json:"campaign"`
- }
- type campaign struct {
- UUID string `db:"uuid" json:"uuid"`
- Name string `db:"name" json:"name"`
- Tags []string `db:"tags" json:"tags"`
- }
- type recipient struct {
- UUID string `db:"uuid" json:"uuid"`
- Email string `db:"email" json:"email"`
- Name string `db:"name" json:"name"`
- Attribs models.SubscriberAttribs `db:"attribs" json:"attribs"`
- Status string `db:"status" json:"status"`
- }
- // Options represents HTTP Postback server options.
- type Options struct {
- Name string `json:"name"`
- Username string `json:"username"`
- Password string `json:"password"`
- RootURL string `json:"root_url"`
- MaxConns int `json:"max_conns"`
- Retries int `json:"retries"`
- Timeout time.Duration `json:"timeout"`
- }
- // Postback represents an HTTP Message server.
- type Postback struct {
- authStr string
- o Options
- c *http.Client
- }
- // New returns a new instance of the HTTP Postback messenger.
- func New(o Options) (*Postback, error) {
- authStr := ""
- if o.Username != "" && o.Password != "" {
- authStr = fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(
- []byte(o.Username+":"+o.Password)))
- }
- return &Postback{
- authStr: authStr,
- o: o,
- c: &http.Client{
- Timeout: o.Timeout,
- Transport: &http.Transport{
- MaxIdleConnsPerHost: o.MaxConns,
- MaxConnsPerHost: o.MaxConns,
- ResponseHeaderTimeout: o.Timeout,
- IdleConnTimeout: o.Timeout,
- },
- },
- }, nil
- }
- // Name returns the messenger's name.
- func (p *Postback) Name() string {
- return p.o.Name
- }
- // Push pushes a message to the server.
- func (p *Postback) Push(m messenger.Message) error {
- pb := postback{
- Subject: m.Subject,
- ContentType: m.ContentType,
- Body: string(m.Body),
- Recipients: []recipient{{
- UUID: m.Subscriber.UUID,
- Email: m.Subscriber.Email,
- Name: m.Subscriber.Name,
- Status: m.Subscriber.Status,
- Attribs: m.Subscriber.Attribs,
- }},
- }
- if m.Campaign != nil {
- pb.Campaign = &campaign{
- UUID: m.Campaign.UUID,
- Name: m.Campaign.Name,
- Tags: m.Campaign.Tags,
- }
- }
- b, err := pb.MarshalJSON()
- if err != nil {
- return err
- }
- return p.exec(http.MethodPost, p.o.RootURL, b, nil)
- }
- // Flush flushes the message queue to the server.
- func (p *Postback) Flush() error {
- return nil
- }
- // Close closes idle HTTP connections.
- func (p *Postback) Close() error {
- p.c.CloseIdleConnections()
- return nil
- }
- func (p *Postback) exec(method, rURL string, reqBody []byte, headers http.Header) error {
- var (
- err error
- postBody io.Reader
- )
- // Encode POST / PUT params.
- if method == http.MethodPost || method == http.MethodPut {
- postBody = bytes.NewReader(reqBody)
- }
- req, err := http.NewRequest(method, rURL, postBody)
- if err != nil {
- return err
- }
- if headers != nil {
- req.Header = headers
- } else {
- req.Header = http.Header{}
- }
- req.Header.Set("User-Agent", "listmonk")
- // Optional BasicAuth.
- if p.authStr != "" {
- req.Header.Set("Authorization", p.authStr)
- }
- // If a content-type isn't set, set the default one.
- if req.Header.Get("Content-Type") == "" {
- if method == http.MethodPost || method == http.MethodPut {
- req.Header.Add("Content-Type", "application/json")
- }
- }
- // If the request method is GET or DELETE, add the params as QueryString.
- if method == http.MethodGet || method == http.MethodDelete {
- req.URL.RawQuery = string(reqBody)
- }
- r, err := p.c.Do(req)
- if err != nil {
- return err
- }
- defer func() {
- // Drain and close the body to let the Transport reuse the connection
- io.Copy(ioutil.Discard, r.Body)
- r.Body.Close()
- }()
- if r.StatusCode != http.StatusOK {
- return fmt.Errorf("non-OK response from Postback server: %d", r.StatusCode)
- }
- return nil
- }
|