package geniusweb.partiesserver.websocket; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import javax.websocket.RemoteEndpoint.Basic; import tudelft.utilities.logging.Reporter; /** * A send buffer for @ServerEndpoint objects that makes websockets asynchronous. * The "asynchronous" mode for websockets can not be used because it throws * exceptions the moment you try to send a second datapacket into the socket * while the first is not yet finished. * *

* After use, {@link #stop()} must be called to stop the thread managing this * queue. Failure to do this may cause memory leaks. */ public class SendBuffer { private final Basic remote; /** * Buffer contains jackson-serialized objects, so can be "null" or "" but * not null. "" (empty string) indicates buffer should be closed and * terminated. */ private final LinkedBlockingQueue fifo = new LinkedBlockingQueue(); private Thread thread; private final Reporter log; public SendBuffer(final Basic basicRemote, final Reporter logger) { this.remote = basicRemote; this.log = logger; thread = new Thread(new Runnable() { @Override public void run() { String txt = "-"; try { while (true) { txt = fifo.take(); if (txt.isEmpty()) break; remote.sendText(txt); } } catch (InterruptedException e) { // normal termination log.log(Level.FINE, "sendbuffer is interrupted, assuming the party died."); } catch (Exception e) { logger.log(Level.SEVERE, "Failed to write '" + txt + "' to stream", e); } log.log(Level.INFO, "SendBuffer is closed."); } }, "SendBuffer"); thread.start(); } /** * * @param text text to be sent. Typically/never null as this already is a * jackson-serialized string. */ public void send(String text) { try { fifo.put(text); } catch (InterruptedException e) { log.log(Level.WARNING, "sender queue has been interrupted, message may have not been sent", e); } } /** * Stop the que, disable, terminate threads */ public synchronized void stop() { try { stop1(); } catch (InterruptedException e) { log.log(Level.WARNING, "Queue termination was interrupted, unsure about thread state"); } thread = null; } private void stop1() throws InterruptedException { if (thread == null) return; fifo.put(""); Thread.sleep(100); if (fifo.isEmpty()) return; log.log(Level.FINE, "Allowing short time to send remaining " + fifo.size() + "messages"); Thread.sleep(1000); if (fifo.isEmpty()) return; log.log(Level.WARNING, "Fifo is still not fihished. Requesting kill"); thread.interrupt(); } }