package geniusweb.partiesserver.websocket;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import javax.websocket.RemoteEndpoint.Basic;
import tudelft.utilities.logging.Reporter;
/**
* A send buffer for @ServerEndpoint objects that makes websockets asynchronous.
* The "asynchronous" mode for websockets can not be used because it throws
* exceptions the moment you try to send a second datapacket into the socket
* while the first is not yet finished.
*
*
* After use, {@link #stop()} must be called to stop the thread managing this
* queue. Failure to do this may cause memory leaks.
*/
public class SendBuffer {
private final Basic remote;
/**
* Buffer contains jackson-serialized objects, so can be "null" or "" but
* not null. "" (empty string) indicates buffer should be closed and
* terminated.
*/
private final LinkedBlockingQueue fifo = new LinkedBlockingQueue();
private Thread thread;
private final Reporter log;
public SendBuffer(final Basic basicRemote, final Reporter logger) {
this.remote = basicRemote;
this.log = logger;
thread = new Thread(new Runnable() {
@Override
public void run() {
String txt = "-";
try {
while (true) {
txt = fifo.take();
if (txt.isEmpty())
break;
remote.sendText(txt);
}
} catch (InterruptedException e) {
// normal termination
log.log(Level.FINE,
"sendbuffer is interrupted, assuming the party died.");
} catch (Exception e) {
logger.log(Level.SEVERE,
"Failed to write '" + txt + "' to stream", e);
}
log.log(Level.INFO, "SendBuffer is closed.");
}
}, "SendBuffer");
thread.start();
}
/**
*
* @param text text to be sent. Typically/never null as this already is a
* jackson-serialized string.
*/
public void send(String text) {
try {
fifo.put(text);
} catch (InterruptedException e) {
log.log(Level.WARNING,
"sender queue has been interrupted, message may have not been sent",
e);
}
}
/**
* Stop the que, disable, terminate threads
*/
public synchronized void stop() {
try {
stop1();
} catch (InterruptedException e) {
log.log(Level.WARNING,
"Queue termination was interrupted, unsure about thread state");
}
thread = null;
}
private void stop1() throws InterruptedException {
if (thread == null)
return;
fifo.put("");
Thread.sleep(100);
if (fifo.isEmpty())
return;
log.log(Level.FINE, "Allowing short time to send remaining "
+ fifo.size() + "messages");
Thread.sleep(1000);
if (fifo.isEmpty())
return;
log.log(Level.WARNING, "Fifo is still not fihished. Requesting kill");
thread.interrupt();
}
}