main.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. native "log"
  6. "net/http"
  7. "os"
  8. "time"
  9. ws "github.com/gorilla/websocket"
  10. "github.com/kelseyhightower/envconfig"
  11. )
  12. var (
  13. version string
  14. date string
  15. logger = native.New(os.Stdout, "", 0)
  16. errors = native.New(os.Stderr, "", 0)
  17. )
  18. var upgrader = ws.Upgrader{
  19. ReadBufferSize: 1024,
  20. WriteBufferSize: 1024,
  21. CheckOrigin: func(r *http.Request) bool {
  22. return true // disable CORS
  23. },
  24. }
  25. type config struct {
  26. Port int64 `default:"8080"`
  27. Prefix string `default:"/"`
  28. }
  29. func main() {
  30. var c config
  31. if err := envconfig.Process("ws", &c); err != nil {
  32. errors.Fatal(err)
  33. }
  34. mngr := manager{
  35. clients: make(map[*client]bool),
  36. add: make(chan *client),
  37. remove: make(chan *client),
  38. broadcast: make(chan []byte),
  39. }
  40. go func() {
  41. for {
  42. select {
  43. case client := <-mngr.add:
  44. mngr.clients[client] = true
  45. case client := <-mngr.remove:
  46. if _, ok := mngr.clients[client]; ok {
  47. delete(mngr.clients, client)
  48. close(client.send)
  49. }
  50. case message := <-mngr.broadcast:
  51. for client := range mngr.clients {
  52. select {
  53. case client.send <- message:
  54. default:
  55. close(client.send)
  56. delete(mngr.clients, client)
  57. }
  58. }
  59. }
  60. }
  61. }()
  62. http.HandleFunc(fmt.Sprintf("%sws", c.Prefix), func(w http.ResponseWriter, r *http.Request) {
  63. conn, err := upgrader.Upgrade(w, r, nil)
  64. if err != nil {
  65. logger.Println(err)
  66. return
  67. }
  68. client := &client{
  69. conn: conn,
  70. send: make(chan []byte, 256),
  71. mngr: mngr,
  72. }
  73. mngr.add <- client
  74. go client.write()
  75. go client.read()
  76. })
  77. http.HandleFunc(c.Prefix, index(c.Prefix))
  78. if c.Prefix != "/" {
  79. http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  80. w.WriteHeader(http.StatusForbidden)
  81. })
  82. }
  83. http.HandleFunc(fmt.Sprintf("%sversion", c.Prefix), func(w http.ResponseWriter, r *http.Request) {
  84. if len(version) > 0 && len(date) > 0 {
  85. fmt.Fprintf(w, "version: %s (built at %s)", version, date)
  86. } else {
  87. w.WriteHeader(http.StatusOK)
  88. }
  89. })
  90. http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
  91. w.WriteHeader(http.StatusOK)
  92. })
  93. logger.Printf("[service] listening on port %d", c.Port)
  94. if err := http.ListenAndServe(fmt.Sprintf(":%d", c.Port), nil); err != nil {
  95. errors.Fatal(err)
  96. }
  97. }
  98. type manager struct {
  99. clients map[*client]bool
  100. add chan *client
  101. remove chan *client
  102. broadcast chan []byte
  103. }
  104. type client struct {
  105. conn *ws.Conn
  106. send chan []byte
  107. mngr manager
  108. }
  109. const (
  110. maxMessageSize = 512
  111. readDeadline = 60 * time.Second
  112. writeDeadline = 10 * time.Second
  113. pingPeriod = (writeDeadline * 9) / 10
  114. )
  115. var (
  116. newline = []byte{'\n'}
  117. space = []byte{' '}
  118. )
  119. func (c *client) read() {
  120. defer func() {
  121. c.mngr.remove <- c
  122. c.conn.Close()
  123. }()
  124. c.conn.SetReadLimit(maxMessageSize)
  125. c.conn.SetReadDeadline(time.Now().Add(readDeadline))
  126. c.conn.SetPongHandler(func(string) error {
  127. c.conn.SetReadDeadline(time.Now().Add(readDeadline))
  128. return nil
  129. })
  130. for {
  131. _, message, err := c.conn.ReadMessage()
  132. if err != nil {
  133. if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
  134. logger.Printf("error: %v", err)
  135. }
  136. break
  137. }
  138. message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  139. c.mngr.broadcast <- message
  140. }
  141. }
  142. func (c *client) write() {
  143. ticker := time.NewTicker(pingPeriod)
  144. defer func() {
  145. ticker.Stop()
  146. c.conn.Close()
  147. }()
  148. for {
  149. select {
  150. case message, ok := <-c.send:
  151. c.conn.SetWriteDeadline(time.Now().Add(writeDeadline))
  152. if !ok {
  153. c.conn.WriteMessage(ws.CloseMessage, []byte{})
  154. return
  155. }
  156. w, err := c.conn.NextWriter(ws.TextMessage)
  157. if err != nil {
  158. return
  159. }
  160. w.Write(message)
  161. n := len(c.send)
  162. for i := 0; i < n; i++ {
  163. w.Write(newline)
  164. w.Write(<-c.send)
  165. }
  166. if err := w.Close(); err != nil {
  167. return
  168. }
  169. case <-ticker.C:
  170. c.conn.SetWriteDeadline(time.Now().Add(writeDeadline))
  171. if err := c.conn.WriteMessage(ws.PingMessage, nil); err != nil {
  172. return
  173. }
  174. }
  175. }
  176. }
  177. var html = `<!DOCTYPE html>
  178. <html lang="en">
  179. <head>
  180. <meta charset="UTF-8">
  181. <title>Chat Example</title>
  182. <script type="text/javascript">
  183. window.onload = function () {
  184. var conn;
  185. var msg = document.getElementById("msg");
  186. var log = document.getElementById("log");
  187. function appendLog(item) {
  188. var doScroll = log.scrollTop > log.scrollHeight - log.clientHeight - 1;
  189. log.appendChild(item);
  190. if (doScroll) {
  191. log.scrollTop = log.scrollHeight - log.clientHeight;
  192. }
  193. }
  194. document.getElementById("form").onsubmit = function () {
  195. if (!conn) {
  196. return false;
  197. }
  198. if (!msg.value) {
  199. return false;
  200. }
  201. conn.send(msg.value);
  202. msg.value = "";
  203. return false;
  204. };
  205. if (window["WebSocket"]) {
  206. conn = new WebSocket("ws://" + document.location.host + "%sws");
  207. conn.onclose = function (evt) {
  208. var item = document.createElement("div");
  209. item.innerHTML = "<b>Connection closed.</b>";
  210. appendLog(item);
  211. };
  212. conn.onmessage = function (evt) {
  213. var messages = evt.data.split('\n');
  214. for (var i = 0; i < messages.length; i++) {
  215. var item = document.createElement("div");
  216. item.innerText = messages[i];
  217. appendLog(item);
  218. }
  219. };
  220. } else {
  221. var item = document.createElement("div");
  222. item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
  223. appendLog(item);
  224. }
  225. };
  226. </script>
  227. <style type="text/css">
  228. html {
  229. overflow: hidden;
  230. }
  231. body {
  232. overflow: hidden;
  233. padding: 0;
  234. margin: 0;
  235. width: 100%%;
  236. height: 100%%;
  237. background: blue;
  238. }
  239. #log {
  240. background: white;
  241. margin: 0;
  242. padding: 0.5em 0.5em 0.5em 0.5em;
  243. position: absolute;
  244. top: 0.5em;
  245. left: 0.5em;
  246. right: 0.5em;
  247. bottom: 3em;
  248. overflow: auto;
  249. }
  250. #form {
  251. padding: 0 0.5em 0 0.5em;
  252. margin: 0;
  253. position: absolute;
  254. bottom: 1em;
  255. left: 0px;
  256. width: 100%;
  257. overflow: hidden;
  258. }
  259. </style>
  260. <link rel="shortcut icon" href="data:image/x-icon;," type="image/x-icon">
  261. </head>
  262. <body>
  263. <div id="log"></div>
  264. <form id="form">
  265. <input type="submit" value="Send" />
  266. <input type="text" id="msg" size="64" />
  267. </form>
  268. </body>
  269. </html>`
  270. func index(prefix string) func(http.ResponseWriter, *http.Request) {
  271. return func(w http.ResponseWriter, r *http.Request) {
  272. fmt.Fprint(w, fmt.Sprintf(html, prefix))
  273. }
  274. }