package geniusweb.partiesserver.websocket;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import javax.websocket.RemoteEndpoint.Basic;
import tudelft.utilities.logging.Reporter;
/**
* A send buffer for @ServerEndpoint objects that makes websockets asynchronous.
* The "asynchronous" mode for websockets can not be used because it throws
* exceptions the moment you try to send a second datapacket into the socket
* while the first is not yet finished.
*
*
* After use, {@link #stop()} must be called to stop the thread managing this
* queue. Failure to do this may cause memory leaks.
*/
public class SendBuffer {
private final Basic remote;
/**
* Buffer contains jackson-serialized objects, so can be "null" or "" but
* not null. "" (empty string) indicates buffer should be closed and
* terminated.
*/
private final LinkedBlockingQueue fifo = new LinkedBlockingQueue();
private final Reporter log;
public SendBuffer(final Basic basicRemote, final Reporter logger) {
this.remote = basicRemote;
this.log = logger;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
String txt = "-";
try {
while (true) {
txt = fifo.take();
if (txt.isEmpty())
break;
remote.sendText(txt);
}
} catch (InterruptedException e) {
// normal termination
log.log(Level.FINE,
"sendbuffer is interrupted, assuming the party died.");
} catch (Exception e) {
logger.log(Level.SEVERE,
"Failed to write '" + txt + "' to stream", e);
}
log.log(Level.INFO, "SendBuffer is closed.");
}
}, "SendBuffer");
thread.start();
}
/**
*
* @param text (non-emptyy) text to be sent. Typically a jackson-serialized
* string.
*/
public void send(String text) {
if (fifo.contains(""))
return;
if (text == null || text.isEmpty())
throw new IllegalArgumentException(
"Text must not be null or empty");
try {
fifo.put(text);
} catch (InterruptedException e) {
log.log(Level.WARNING,
"sender queue has been interrupted, message may have not been sent",
e);
}
}
/**
* Stop the que, disable, terminate threads
*/
public synchronized void stop() {
if (!fifo.contains(""))
try {
fifo.put("");
} catch (InterruptedException e) {
log.log(Level.WARNING,
"Queue termination was interrupted, unsure about thread state");
}
}
}