/
events.go
73 lines (59 loc) · 1.67 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package controllers
import (
"io"
"fmt"
e "github.com/chances/party-server/errors"
"github.com/chances/party-server/events"
"github.com/chances/party-server/session"
"github.com/gin-gonic/gin"
)
// Events controller
type Events struct {
Controller
}
// NewEvents creates a new Party controller
func NewEvents() Events {
newEvents := Events{}
newEvents.Setup()
return newEvents
}
// Stream events to a client EventSource
func (cr *Events) Stream(ch string) gin.HandlerFunc {
return func(c *gin.Context) {
currentParty, err := session.CurrentParty(c)
if err != nil {
c.Error(e.BadRequest.WithDetail("User has not joined a party"))
c.Abort()
}
channel := currentParty.RoomCode + ch
listener := events.Listen(channel)
defer events.StopListening(channel, listener)
c.Stream(func(w io.Writer) bool {
message, _ := (<-listener).(string)
// End the stream if this is a guest session and it has expired
// TODO: Abstract this out into a StreamConditionally method
if session.IsGuest(c) {
guest := *session.CurrentGuest(c)
if token, ok := guest["Token"]; ok {
if exists, _ := cr.Cache.Exists(token.(string)); !exists {
return false
}
} else {
return false
}
}
if message == "heartbeat" {
sseComment(c, message)
return true
}
// TODO: Provide some way to parameterize the event type? (name param, here)
c.SSEvent(ch, message)
return true
})
}
}
func sseComment(c *gin.Context, comment string) {
header := c.Writer.Header()
header["Content-Type"] = []string{"text/event-stream"}
c.Writer.WriteString(fmt.Sprintf(":%s\n", comment))
}