package geniusweb.runserver; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; import java.util.logging.Level; import javax.websocket.ClientEndpoint; import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; import javax.websocket.DeploymentException; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.WebSocketContainer; import geniusweb.actions.Action; import geniusweb.actions.PartyId; import geniusweb.inform.Inform; import geniusweb.protocol.partyconnection.ProtocolToPartyConn; import geniusweb.references.Reference; import tudelft.utilities.listener.DefaultListenable; import tudelft.utilities.logging.ReportToLogger; import tudelft.utilities.logging.Reporter; /** * A websocket based {@link ProtocolToPartyConn}. it contains a websocket * connection to a party (that will be started using the partyserver * start-new-party protocol). Must be public so that it can be accessed as * ClientEndpoint by apache tomcat. This is internally used. * *

* The @ClientEndpoint annotation is important, the runserver is creating these * sockets programatically, from a received ws: URL. */ @ClientEndpoint public class WebSocketProtToPartyConn extends DefaultListenable implements ProtocolToPartyConn { // 20MB seems reasonable limit? private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024; private final Reporter log; private final Reference reference; private WebSocketContainer container; private Session session = null; private URI partyuri; private Throwable error = null; /** * Buffers incoming text. */ private StringBuffer buffer = new StringBuffer(); /** * Constructor that uses default {@link Reporter} * * @param reference the address to to a http GET to get a websocket to a new * running party. */ public WebSocketProtToPartyConn(Reference reference) { this(reference, new ReportToLogger("runserver")); } /** * * @param reference the address to to a http GET to get a websocket to a new * running party. * @param reporter the {@link Reporter} used for logging important * events/issues * @throws IOException if the party does not start properly. */ public WebSocketProtToPartyConn(Reference reference, Reporter reporter) { this.reference = reference; this.log = reporter; } public WebSocketProtToPartyConn init() throws IOException { log.log(Level.INFO, "Trying to start up party " + reference); URI partyuri = startParty(reference); container = ContainerProvider.getWebSocketContainer(); // #50 disable: might cause large buffer. And why is this a good limit // anyway // container.setDefaultMaxTextMessageBufferSize(200000); log.log(Level.INFO, "Trying to make websocket connection to running party " + partyuri); try { container.connectToServer(this, partyuri); } catch (DeploymentException e) { throw new IOException( "Failed to connect with running party at " + partyuri, e); } log.log(Level.INFO, "Created websocket connection at " + partyuri); return this; } /** * * @param reference the address to to a http GET to get a websocket to a new * running party. * @return URI of websocket behind which is a running party * @throws IOException if party did not start correctly */ private URI startParty(Reference reference) throws IOException { HttpURLConnection conn = (HttpURLConnection) reference.getURI().toURL() .openConnection(); // if this throws, the IOException seems to contain the "retry later" // message already conn.setRequestMethod("GET"); conn.setReadTimeout(20000); conn.connect(); BufferedReader rd; try { rd = new BufferedReader( new InputStreamReader(conn.getInputStream())); } catch (IOException e) { if (conn.getErrorStream() == null) { throw new IOException("Failed to connect " + reference + ", no server error message", e); } // read the detail message from the error stream rd = new BufferedReader( new InputStreamReader(conn.getErrorStream())); throw new IOException(rd.readLine(), e); } String partysocketaddress = rd.readLine(); log.log(Level.INFO, "startparty returned " + partysocketaddress); if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { throw new IOException( "Something went wrong connecting to the party:" + conn.getResponseCode()); } try { this.partyuri = new URI(partysocketaddress); } catch (URISyntaxException e1) { throw new IOException("Failed to start party at " + reference, e1); } return partyuri; } /************* implements {@link ConnectionWithParty} ***************/ @Override public void send(Inform info) throws IOException { if (session == null) return; session.getBasicRemote() .sendText(Jackson.instance().writeValueAsString(info)); } @Override public Reference getReference() { return this.reference; } @Override public PartyId getParty() { String name = partyuri.getPath(); return new PartyId(name.substring(name.lastIndexOf('/') + 1)); } @Override public void close() { log.log(Level.INFO, "Party close called"); // a close event is not necessarily a bug? // setError(new SocketException("socket is closed")); try { session.close(); this.session = null; } catch (IOException e) { log.log(Level.SEVERE, "failed to close websocket", e); } } /************* implements ClientEndpoint ***************/ @OnOpen public void onOpen(Session session) { log.log(Level.INFO, "Party is being connected: " + session); // #50 we now buffer partial messages // session.setMaxTextMessageBufferSize(MAX_MESSAGE_SIZE); // session.setMaxBinaryMessageBufferSize(MAX_MESSAGE_SIZE); this.session = session; } @OnClose public void onClose(Session userSession, CloseReason reason) { log.log(Level.INFO, "closing websocket " + reference + ":" + reason); if (reason.getReasonPhrase() != null && !reason.getReasonPhrase().isEmpty()) { setError(new RemoteException("Session " + reference + " closed abnormally (see party server for more details) " + reason.getReasonPhrase())); } // seems NOT an error. Unless the party deliberately and silently // closes its connection. // Setting error will cause a print of stacktrace that is disturbing. // It seems we can not check at this point if session was finished. // setError(new SocketException("socket has been closed")); } @OnMessage public void processMessage(String message, boolean islast) { // #50 avoid fixed (large) buffers buffer.append(message); if (islast) { String msg = buffer.toString(); buffer = new StringBuffer(); try { Action act = Jackson.instance().readValue(msg, Action.class); notifyListeners(act); } catch (IOException e) { log.log(Level.WARNING, "received bad message from client " + reference, e); setError(e); close(); } } } @OnError public void processError(Throwable t) { log.log(Level.SEVERE, "Something went wrong internally!", t); setError(t); } @Override public URI getRemoteURI() { return partyuri; } @Override public Throwable getError() { return error; } /** * Sets the connection to final error state and report null to listeners. * Only the first error is reported, the rest is ignored. * * @param err the error that occurred. */ private void setError(Throwable err) { boolean isSet = false; synchronized (this) { if (this.error == null) { this.error = err; isSet = true; } } if (isSet) { // special event that tells protocol that we're probably in error // state. notifyListeners(null); } } } /** * Indicates an exceeption occured on a remote machine. The remote machine did * not pass a stacktrace (because it does not fit into a websocket close call). * */ class RemoteException extends Exception { public RemoteException(String message) { super(message, null, false, false); } }