package gosocketio import ( "encoding/json" "errors" "github.com/graarh/golang-socketio/protocol" "github.com/graarh/golang-socketio/transport" "sync" "time" ) const ( queueBufferSize = 500 ) var ( ErrorWrongHeader = errors.New("Wrong header") ) /** engine.io header to send or receive */ type Header struct { Sid string `json:"sid"` Upgrades []string `json:"upgrades"` PingInterval int `json:"pingInterval"` PingTimeout int `json:"pingTimeout"` } /** socket.io connection handler use IsAlive to check that handler is still working use Dial to connect to websocket use In and Out channels for message exchange Close message means channel is closed ping is automatic */ type Channel struct { conn transport.Connection out chan string header Header alive bool aliveLock sync.Mutex ack ackProcessor server *Server ip string } /** create channel, map, and set active */ func (c *Channel) initChannel() { //TODO: queueBufferSize from constant to server or client variable c.out = make(chan string, queueBufferSize) c.ack.resultWaiters = make(map[int](chan string)) c.alive = true } /** Get id of current socket connection */ func (c *Channel) Id() string { return c.header.Sid } /** Checks that Channel is still alive */ func (c *Channel) IsAlive() bool { return c.alive } /** Close channel */ func CloseChannel(c *Channel, m *methods, args ...interface{}) error { c.aliveLock.Lock() defer c.aliveLock.Unlock() if !c.alive { //already closed return nil } c.conn.Close() c.alive = false //clean outloop for len(c.out) > 0 { <-c.out } c.out <- protocol.CloseMessage m.callLoopEvent(c, OnDisconnection) return nil } //incoming messages loop, puts incoming messages to In channel func inLoop(c *Channel, m *methods) error { for { pkg, err := c.conn.GetMessage() if err != nil { return CloseChannel(c, m, err) } msg, err := protocol.Decode(pkg) if err != nil { CloseChannel(c, m, protocol.ErrorWrongPacket) } switch msg.Type { case protocol.MessageTypeOpen: if err := json.Unmarshal([]byte(msg.Source[1:]), &c.header); err != nil { CloseChannel(c, m, ErrorWrongHeader) } m.callLoopEvent(c, OnConnection) case protocol.MessageTypePing: c.out <- protocol.PongMessage case protocol.MessageTypePong: default: go m.processIncomingMessage(c, msg) } } return nil } var overflooded map[*Channel]struct{} = make(map[*Channel]struct{}) var overfloodedLock sync.Mutex func AmountOfOverflooded() int64 { overfloodedLock.Lock() defer overfloodedLock.Unlock() return int64(len(overflooded)) } /** outgoing messages loop, sends messages from channel to socket */ func outLoop(c *Channel, m *methods) error { for { outBufferLen := len(c.out) if outBufferLen == queueBufferSize { return CloseChannel(c, m, ErrorSocketOverflood) } else if outBufferLen > int(queueBufferSize/2) { overfloodedLock.Lock() overflooded[c] = struct{}{} overfloodedLock.Unlock() } else { overfloodedLock.Lock() delete(overflooded, c) overfloodedLock.Unlock() } msg := <-c.out if msg == protocol.CloseMessage { return nil } err := c.conn.WriteMessage(msg) if err != nil { return CloseChannel(c, m, err) } } return nil } /** Pinger sends ping messages for keeping connection alive */ func pinger(c *Channel) { for { interval, _ := c.conn.PingParams() time.Sleep(interval) if !c.IsAlive() { return } c.out <- protocol.PingMessage } }