package geniusweb.partiesserver.websocket; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import javax.websocket.RemoteEndpoint.Basic; import tudelft.utilities.logging.Reporter; /** * A send buffer for @ServerEndpoint objects that makes websockets asynchronous. * The "asynchronous" mode for websockets can not be used because it throws * exceptions the moment you try to send a second datapacket into the socket * while the first is not yet finished. * *

* After use, {@link #stop()} must be called to stop the thread managing this * queue. Failure to do this may cause memory leaks. */ public class SendBuffer { private final Basic remote; private final LinkedBlockingQueue fifo = new LinkedBlockingQueue(); private final Thread thread; private final Reporter log; public SendBuffer(final Basic basicRemote, final Reporter logger) { this.remote = basicRemote; this.log = logger; thread = new Thread(new Runnable() { @Override public void run() { try { while (true) { remote.sendText(fifo.take()); } } catch (InterruptedException e) { // normal termination log.log(Level.FINE, "sendbuffer is interrupted, assuming the party died."); } catch (Exception e) { logger.log(Level.SEVERE, "Failed to write to stream", e); } } }); thread.start(); } public void send(String text) { try { fifo.put(text); } catch (InterruptedException e) { log.log(Level.WARNING, "sender queue has been interrupted, message may have not been sent", e); } } /** * Stop the que, disable, terminate threads */ public void stop() { if (!fifo.isEmpty()) { try { log.log(Level.FINE, "Allowing short time to send remaining " + fifo.size() + "messages"); thread.sleep(1000); } catch (InterruptedException e) { log.log(Level.FINE, "Waiting was interrupted, final messages have been deleted"); } } thread.interrupt(); try { thread.join(400); } catch (InterruptedException e) { log.log(Level.WARNING, "thread termination was interrupted, unsure about thread state"); } } }