package geniusweb.runserver;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.logging.Level;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import geniusweb.actions.Action;
import geniusweb.actions.PartyId;
import geniusweb.inform.Inform;
import geniusweb.protocol.partyconnection.ProtocolToPartyConn;
import geniusweb.references.Reference;
import tudelft.utilities.listener.DefaultListenable;
import tudelft.utilities.logging.ReportToLogger;
import tudelft.utilities.logging.Reporter;
/**
* A websocket based {@link ProtocolToPartyConn}. it contains a websocket
* connection to a party (that will be started using the partyserver
* start-new-party protocol). Must be public so that it can be accessed as
* ClientEndpoint by apache tomcat. This is internally used.
*
*
* The @ClientEndpoint annotation is important, the runserver is creating these
* sockets programatically, from a received ws: URL.
*/
@ClientEndpoint
public class WebSocketProtToPartyConn extends DefaultListenable
implements ProtocolToPartyConn {
// 20MB seems reasonable limit?
private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024;
private final Reporter log;
private final Reference reference;
private WebSocketContainer container;
private Session session = null;
private URI partyuri;
private Throwable error = null;
/**
* Buffers incoming text.
*/
private StringBuffer buffer = new StringBuffer();
/**
* Constructor that uses default {@link Reporter}
*
* @param reference the address to to a http GET to get a websocket to a new
* running party.
*/
public WebSocketProtToPartyConn(Reference reference) {
this(reference, new ReportToLogger("runserver"));
}
/**
*
* @param reference the address to to a http GET to get a websocket to a new
* running party.
* @param reporter the {@link Reporter} used for logging important
* events/issues
* @throws IOException if the party does not start properly.
*/
public WebSocketProtToPartyConn(Reference reference, Reporter reporter) {
this.reference = reference;
this.log = reporter;
}
public WebSocketProtToPartyConn init() throws IOException {
log.log(Level.INFO, "Trying to start up party " + reference);
URI partyuri = startParty(reference);
container = ContainerProvider.getWebSocketContainer();
// #50 disable: might cause large buffer. And why is this a good limit
// anyway
// container.setDefaultMaxTextMessageBufferSize(200000);
log.log(Level.INFO,
"Trying to make websocket connection to running party "
+ partyuri);
try {
container.connectToServer(this, partyuri);
} catch (DeploymentException e) {
throw new IOException(
"Failed to connect with running party at " + partyuri, e);
}
log.log(Level.INFO, "Created websocket connection at " + partyuri);
return this;
}
/**
*
* @param reference the address to to a http GET to get a websocket to a new
* running party.
* @return URI of websocket behind which is a running party
* @throws IOException if party did not start correctly
*/
private URI startParty(Reference reference) throws IOException {
HttpURLConnection conn = (HttpURLConnection) reference.getURI().toURL()
.openConnection();
// if this throws, the IOException seems to contain the "retry later"
// message already
conn.setRequestMethod("GET");
conn.setReadTimeout(20000);
conn.connect();
BufferedReader rd;
try {
rd = new BufferedReader(
new InputStreamReader(conn.getInputStream()));
} catch (IOException e) {
if (conn.getErrorStream() == null) {
throw new IOException("Failed to connect " + reference
+ ", no server error message", e);
}
// read the detail message from the error stream
rd = new BufferedReader(
new InputStreamReader(conn.getErrorStream()));
throw new IOException(rd.readLine(), e);
}
String partysocketaddress = rd.readLine();
log.log(Level.INFO, "startparty returned " + partysocketaddress);
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new IOException(
"Something went wrong connecting to the party:"
+ conn.getResponseCode());
}
try {
this.partyuri = new URI(partysocketaddress);
} catch (URISyntaxException e1) {
throw new IOException("Failed to start party at " + reference, e1);
}
return partyuri;
}
/************* implements {@link ConnectionWithParty} ***************/
@Override
public void send(Inform info) throws IOException {
if (session == null)
return;
session.getBasicRemote()
.sendText(Jackson.instance().writeValueAsString(info));
}
@Override
public Reference getReference() {
return this.reference;
}
@Override
public PartyId getParty() {
String name = partyuri.getPath();
return new PartyId(name.substring(name.lastIndexOf('/') + 1));
}
@Override
public void close() {
log.log(Level.INFO, "Party close called");
// a close event is not necessarily a bug?
// setError(new SocketException("socket is closed"));
try {
session.close();
this.session = null;
} catch (IOException e) {
log.log(Level.SEVERE, "failed to close websocket", e);
}
}
/************* implements ClientEndpoint ***************/
@OnOpen
public void onOpen(Session session) {
log.log(Level.INFO, "Party is being connected: " + session);
// #50 we now buffer partial messages
// session.setMaxTextMessageBufferSize(MAX_MESSAGE_SIZE);
// session.setMaxBinaryMessageBufferSize(MAX_MESSAGE_SIZE);
this.session = session;
}
@OnClose
public void onClose(Session userSession, CloseReason reason) {
log.log(Level.INFO, "closing websocket " + reference + ":" + reason);
if (reason.getReasonPhrase() != null
&& !reason.getReasonPhrase().isEmpty()) {
setError(new RemoteException("Session " + reference
+ " closed abnormally (see party server for more details) "
+ reason.getReasonPhrase()));
}
// seems NOT an error. Unless the party deliberately and silently
// closes its connection.
// Setting error will cause a print of stacktrace that is disturbing.
// It seems we can not check at this point if session was finished.
// setError(new SocketException("socket has been closed"));
}
@OnMessage
public void processMessage(String message, boolean islast) {
// #50 avoid fixed (large) buffers
buffer.append(message);
if (islast) {
String msg = buffer.toString();
buffer = new StringBuffer();
try {
Action act = Jackson.instance().readValue(msg, Action.class);
notifyListeners(act);
} catch (IOException e) {
log.log(Level.WARNING,
"received bad message from client " + reference, e);
setError(e);
close();
}
}
}
@OnError
public void processError(Throwable t) {
log.log(Level.SEVERE, "Something went wrong internally!", t);
setError(t);
}
@Override
public URI getRemoteURI() {
return partyuri;
}
@Override
public Throwable getError() {
return error;
}
/**
* Sets the connection to final error state and report null to listeners.
* Only the first error is reported, the rest is ignored.
*
* @param err the error that occurred.
*/
private void setError(Throwable err) {
boolean isSet = false;
synchronized (this) {
if (this.error == null) {
this.error = err;
isSet = true;
}
}
if (isSet) {
// special event that tells protocol that we're probably in error
// state.
notifyListeners(null);
}
}
}
/**
* Indicates an exceeption occured on a remote machine. The remote machine did
* not pass a stacktrace (because it does not fit into a websocket close call).
*
*/
class RemoteException extends Exception {
public RemoteException(String message) {
super(message, null, false, false);
}
}