55 lines
1.1 KiB
Go
55 lines
1.1 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"time"
|
||
|
|
||
|
"github.com/labstack/echo/v4"
|
||
|
)
|
||
|
|
||
|
// handleEventStream serves an endpoint that never closes and pushes a
|
||
|
// live event stream (text/event-stream) such as a error messages.
|
||
|
func handleEventStream(c echo.Context) error {
|
||
|
var (
|
||
|
app = c.Get("app").(*App)
|
||
|
)
|
||
|
|
||
|
h := c.Response().Header()
|
||
|
h.Set(echo.HeaderContentType, "text/event-stream")
|
||
|
h.Set(echo.HeaderCacheControl, "no-store")
|
||
|
h.Set(echo.HeaderConnection, "keep-alive")
|
||
|
|
||
|
// Subscribe to the event stream with a random ID.
|
||
|
id := fmt.Sprintf("api:%v", time.Now().UnixNano())
|
||
|
sub, err := app.events.Subscribe(id)
|
||
|
if err != nil {
|
||
|
log.Fatalf("error subscribing to events: %v", err)
|
||
|
}
|
||
|
|
||
|
ctx := c.Request().Context()
|
||
|
for {
|
||
|
select {
|
||
|
case e := <-sub:
|
||
|
b, err := json.Marshal(e)
|
||
|
if err != nil {
|
||
|
app.log.Printf("error marshalling event: %v", err)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
fmt.Printf("data: %s\n\n", b)
|
||
|
|
||
|
c.Response().Write([]byte(fmt.Sprintf("retry: 3000\ndata: %s\n\n", b)))
|
||
|
c.Response().Flush()
|
||
|
|
||
|
case <-ctx.Done():
|
||
|
// On HTTP connection close, unsubscribe.
|
||
|
app.events.Unsubscribe(id)
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|