package geniusweb.partiesserver.websocket; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import javax.websocket.RemoteEndpoint.Basic; import tudelft.utilities.logging.Reporter; /** * A send buffer for @ServerEndpoint objects that makes websockets asynchronous. * The "asynchronous" mode for websockets can not be used because it throws * exceptions the moment you try to send a second datapacket into the socket * while the first is not yet finished. * *

* After use, {@link #stop()} must be called to stop the thread managing this * queue. Failure to do this may cause memory leaks. */ public class SendBuffer { private final Basic remote; /** * Buffer contains jackson-serialized objects, so can be "null" or "" but * not null. "" (empty string) indicates buffer should be closed and * terminated. */ private final LinkedBlockingQueue fifo = new LinkedBlockingQueue(); private final Reporter log; public SendBuffer(final Basic basicRemote, final Reporter logger) { this.remote = basicRemote; this.log = logger; Thread thread = new Thread(new Runnable() { @Override public void run() { String txt = "-"; try { while (true) { txt = fifo.take(); if (txt.isEmpty()) break; remote.sendText(txt); } } catch (InterruptedException e) { // normal termination log.log(Level.FINE, "sendbuffer is interrupted, assuming the party died."); } catch (Exception e) { logger.log(Level.SEVERE, "Failed to write '" + txt + "' to stream", e); } log.log(Level.INFO, "SendBuffer is closed."); } }, "SendBuffer"); thread.start(); } /** * * @param text (non-emptyy) text to be sent. Typically a jackson-serialized * string. */ public void send(String text) { if (fifo.contains("")) return; if (text == null || text.isEmpty()) throw new IllegalArgumentException( "Text must not be null or empty"); try { fifo.put(text); } catch (InterruptedException e) { log.log(Level.WARNING, "sender queue has been interrupted, message may have not been sent", e); } } /** * Stop the que, disable, terminate threads */ public synchronized void stop() { if (!fifo.contains("")) try { fifo.put(""); } catch (InterruptedException e) { log.log(Level.WARNING, "Queue termination was interrupted, unsure about thread state"); } } }