Add support for streaming async events via HTTP serverside events.

- `GET /api/events?type=error` opens a long-lived HTTP server side
  event connection that streams error messages.
- async (typically SMTP) errors are now streamed to the frontend and
  disaplyed as an error toast on the admin UI.
This commit is contained in:
Kailash Nadh 2023-05-26 22:07:58 +05:30
parent d359ad27aa
commit 0b2da4c664
10 changed files with 193 additions and 12 deletions

View file

@ -89,7 +89,7 @@ func handleReloadApp(c echo.Context) error {
app := c.Get("app").(*App)
go func() {
<-time.After(time.Millisecond * 500)
app.sigChan <- syscall.SIGHUP
app.chReload <- syscall.SIGHUP
}()
return c.JSON(http.StatusOK, okResp{true})
}

54
cmd/events.go Normal file
View file

@ -0,0 +1,54 @@
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/labstack/echo/v4"
)
// handleEventStream serves an endpoint that never closes and pushes a
// live event stream (text/event-stream) such as a error messages.
func handleEventStream(c echo.Context) error {
var (
app = c.Get("app").(*App)
)
h := c.Response().Header()
h.Set(echo.HeaderContentType, "text/event-stream")
h.Set(echo.HeaderCacheControl, "no-store")
h.Set(echo.HeaderConnection, "keep-alive")
// Subscribe to the event stream with a random ID.
id := fmt.Sprintf("api:%v", time.Now().UnixNano())
sub, err := app.events.Subscribe(id)
if err != nil {
log.Fatalf("error subscribing to events: %v", err)
}
ctx := c.Request().Context()
for {
select {
case e := <-sub:
b, err := json.Marshal(e)
if err != nil {
app.log.Printf("error marshalling event: %v", err)
continue
}
fmt.Printf("data: %s\n\n", b)
c.Response().Write([]byte(fmt.Sprintf("retry: 3000\ndata: %s\n\n", b)))
c.Response().Flush()
case <-ctx.Done():
// On HTTP connection close, unsubscribe.
app.events.Unsubscribe(id)
return nil
}
}
return nil
}

View file

@ -161,6 +161,8 @@ func initHTTPHandlers(e *echo.Echo, app *App) {
g.POST("/api/tx", handleSendTxMessage)
g.GET("/api/events", handleEventStream)
if app.constants.BounceWebhooksEnabled {
// Private authenticated bounce endpoint.
g.POST("/webhooks/bounce", handleBounceWebhook)

View file

@ -19,6 +19,7 @@ import (
"github.com/knadh/listmonk/internal/buflog"
"github.com/knadh/listmonk/internal/captcha"
"github.com/knadh/listmonk/internal/core"
"github.com/knadh/listmonk/internal/events"
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/manager"
"github.com/knadh/listmonk/internal/media"
@ -48,12 +49,13 @@ type App struct {
bounce *bounce.Manager
paginator *paginator.Paginator
captcha *captcha.Captcha
events *events.Events
notifTpls *notifTpls
log *log.Logger
bufLog *buflog.BufLog
// Channel for passing reload signals.
sigChan chan os.Signal
chReload chan os.Signal
// Global variable that stores the state indicating that a restart is required
// after a settings update.
@ -66,8 +68,9 @@ type App struct {
var (
// Buffered log writer for storing N lines of log entries for the UI.
evStream = events.New()
bufLog = buflog.New(5000)
lo = log.New(io.MultiWriter(os.Stdout, bufLog), "",
lo = log.New(io.MultiWriter(os.Stdout, bufLog, evStream.ErrWriter()), "",
log.Ldate|log.Ltime|log.Lshortfile)
ko = koanf.New(".")
@ -170,6 +173,7 @@ func main() {
log: lo,
bufLog: bufLog,
captcha: initCaptcha(),
events: evStream,
paginator: paginator.New(paginator.Opt{
DefaultPerPage: 20,
@ -240,11 +244,11 @@ func main() {
// Wait for the reload signal with a callback to gracefully shut down resources.
// The `wait` channel is passed to awaitReload to wait for the callback to finish
// within N seconds, or do a force reload.
app.sigChan = make(chan os.Signal)
signal.Notify(app.sigChan, syscall.SIGHUP)
app.chReload = make(chan os.Signal)
signal.Notify(app.chReload, syscall.SIGHUP)
closerWait := make(chan bool)
<-awaitReload(app.sigChan, closerWait, func() {
<-awaitReload(app.chReload, closerWait, func() {
// Stop the HTTP server.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

View file

@ -196,7 +196,7 @@ func handleUpdateSettings(c echo.Context) error {
// No running campaigns. Reload the app.
go func() {
<-time.After(time.Millisecond * 500)
app.sigChan <- syscall.SIGHUP
app.chReload <- syscall.SIGHUP
}()
return c.JSON(http.StatusOK, okResp{true})

View file

@ -133,6 +133,24 @@ export default Vue.extend({
};
http.send();
},
listenEvents() {
const reMatchLog = /(.+?)\.go:\d+:(.+?)$/im;
const evtSource = new EventSource(uris.errorEvents, { withCredentials: true });
let numEv = 0;
evtSource.onmessage = (e) => {
if (numEv > 50) {
return;
}
numEv += 1;
const d = JSON.parse(e.data);
if (d && d.type === 'error') {
const msg = reMatchLog.exec(d.message.trim());
this.$utils.toast(msg[2], 'is-danger', null, true);
}
};
},
},
computed: {
@ -155,6 +173,8 @@ export default Vue.extend({
window.addEventListener('resize', () => {
this.windowWidth = window.innerWidth;
});
this.listenEvents();
},
});
</script>

View file

@ -22,6 +22,7 @@ export const uris = Object.freeze({
previewTemplate: '/api/templates/:id/preview',
previewRawTemplate: '/api/templates/preview',
exportSubscribers: '/api/subscribers/export',
errorEvents: '/api/events?type=error',
base: `${baseURL}/static`,
root: rootURL,
static: `${baseURL}/static`,

View file

@ -160,11 +160,11 @@ export default class Utils {
});
};
toast = (msg, typ, duration) => {
toast = (msg, typ, duration, queue) => {
Toast.open({
message: this.escapeHTML(msg),
type: !typ ? 'is-success' : typ,
queue: false,
queue,
duration: duration || 3000,
position: 'is-top',
pauseOnHover: true,

100
internal/events/events.go Normal file
View file

@ -0,0 +1,100 @@
// Package events implements a simple event broadcasting mechanism
// for usage in broadcasting error messages, postbacks etc. various
// channels.
package events
import (
"bytes"
"fmt"
"io"
"sync"
)
const (
TypeError = "error"
)
// Event represents a single event in the system.
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Message string `json:"message"`
Data interface{} `json:"data"`
Channels []string `json:"-"`
}
type Events struct {
subs map[string]chan Event
sync.RWMutex
}
// New returns a new instance of Events.
func New() *Events {
return &Events{
subs: make(map[string]chan Event),
}
}
// Subscribe returns a channel to which the given event `types` are streamed.
// id is the unique identifier for the caller. A caller can only register
// for subscription once.
func (ev *Events) Subscribe(id string) (chan Event, error) {
ev.Lock()
defer ev.Unlock()
if ch, ok := ev.subs[id]; ok {
return ch, nil
}
ch := make(chan Event, 100)
ev.subs[id] = ch
return ch, nil
}
// Unsubscribe unsubscribes a subscriber (obviously).
func (ev *Events) Unsubscribe(id string) {
ev.Lock()
defer ev.Unlock()
delete(ev.subs, id)
}
// Publish publishes an event to all subscribers.
func (ev *Events) Publish(e Event) error {
ev.Lock()
defer ev.Unlock()
for _, ch := range ev.subs {
select {
case ch <- e:
default:
return fmt.Errorf("event queue full for type: %s", e.Type)
}
}
return nil
}
// This implements an io.Writer specifically for receiving error messages
// mirrored (io.MultiWriter) from error log writing.
type wri struct {
ev *Events
}
func (w *wri) Write(b []byte) (n int, err error) {
// Only broadcast error messages.
if !bytes.Contains(b, []byte("error")) {
return 0, nil
}
w.ev.Publish(Event{
Type: TypeError,
Message: string(b),
})
return len(b), nil
}
func (ev *Events) ErrWriter() io.Writer {
return &wri{ev: ev}
}

View file

@ -395,8 +395,8 @@ func (m *Manager) worker() {
out.Headers = h
if err := m.messengers[msg.Campaign.Messenger].Push(out); err != nil {
m.logger.Printf("error sending message in campaign %s: subscriber %s: %v",
msg.Campaign.Name, msg.Subscriber.UUID, err)
m.logger.Printf("error sending message in campaign %s: subscriber %d: %v",
msg.Campaign.Name, msg.Subscriber.ID, err)
select {
case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}: