source: src/main/java/geniusweb/partiesserver/websocket/SendBuffer.java@ 16

Last change on this file since 16 was 1, checked in by bart, 5 years ago

Initial Release

File size: 2.1 KB
RevLine 
[1]1package geniusweb.partiesserver.websocket;
2
3import java.util.concurrent.LinkedBlockingQueue;
4import java.util.logging.Level;
5
6import javax.websocket.RemoteEndpoint.Basic;
7
8import tudelft.utilities.logging.Reporter;
9
10/**
11 * A send buffer for @ServerEndpoint objects that makes websockets asynchronous.
12 * The "asynchronous" mode for websockets can not be used because it throws
13 * exceptions the moment you try to send a second datapacket into the socket
14 * while the first is not yet finished.
15 *
16 * <p>
17 * After use, {@link #stop()} must be called to stop the thread managing this
18 * queue. Failure to do this may cause memory leaks.
19 */
20public class SendBuffer {
21
22 private final Basic remote;
23 private final LinkedBlockingQueue<String> fifo = new LinkedBlockingQueue<String>();
24 private final Thread thread;
25 private final Reporter log;
26
27 public SendBuffer(final Basic basicRemote, final Reporter logger) {
28 this.remote = basicRemote;
29 this.log = logger;
30 thread = new Thread(new Runnable() {
31
32 @Override
33 public void run() {
34 try {
35 while (true) {
36 remote.sendText(fifo.take());
37 }
38 } catch (InterruptedException e) {
39 // normal termination
40 log.log(Level.FINE,
41 "sendbuffer is interrupted, assuming the party died.");
42 } catch (Exception e) {
43 logger.log(Level.SEVERE, "Failed to write to stream", e);
44 }
45 }
46 });
47 thread.start();
48 }
49
50 public void send(String text) {
51 try {
52 fifo.put(text);
53 } catch (InterruptedException e) {
54 log.log(Level.WARNING,
55 "sender queue has been interrupted, message may have not been sent",
56 e);
57 }
58 }
59
60 /**
61 * Stop the que, disable, terminate threads
62 */
63 public void stop() {
64 if (!fifo.isEmpty()) {
65 try {
66 log.log(Level.FINE, "Allowing short time to send remaining "
67 + fifo.size() + "messages");
68 thread.sleep(1000);
69 } catch (InterruptedException e) {
70 log.log(Level.FINE,
71 "Waiting was interrupted, final messages have been deleted");
72 }
73 }
74 thread.interrupt();
75 try {
76 thread.join(400);
77 } catch (InterruptedException e) {
78 log.log(Level.WARNING,
79 "thread termination was interrupted, unsure about thread state");
80 }
81 }
82
83}
Note: See TracBrowser for help on using the repository browser.