package geniusweb.simplerunner; import java.io.IOException; import java.net.URI; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import geniusweb.connection.ConnectionEnd; import geniusweb.references.Reference; import tudelft.utilities.listener.Listener; /** * A basic connection that implements connection with direct calls * * * @param the type of the incoming data * @param the type of outgoing data */ public class BasicConnection implements ConnectionEnd { private List> listeners = new CopyOnWriteArrayList>(); private final Info EOS = new EOS(); private final Reference reference; private final URI uri; // to be initialized private Listener handler = null; private ArrayBlockingQueue> messages = new ArrayBlockingQueue<>( 4); private Throwable error; /** * * @param reference Reference that was used to create this connection. * @param uri the URI of the remote endpoint that makes up the * connection. This is a URI that uniquely identifies the * remote object */ public BasicConnection(Reference reference, URI uri) { this.reference = reference; this.uri = uri; } /** * To be called to hook up the other side that will handle a send action * from us. Must be called first. * * @param newhandler a Listener<OUT> that can handle send actions. * */ public void init(Listener newhandler) { if (handler != null) { throw new IllegalStateException("already initialized"); } this.handler = newhandler; Thread handlerThread = new Thread(new Runnable() { @Override public void run() { try { while (true) { Info mess = messages.take(); if (mess instanceof EOS) break; handler.notifyChange(((Data) mess).get()); } } catch (Exception e) { setError(e); } handler = null; System.out.println("BasicConnection closed"); } }); handlerThread.start(); } /** * Error condition occurs. Record error and close connection * * @param e */ protected synchronized void setError(Throwable e) { if (error == null) { // maybe log instead? e.printStackTrace(); error = e; close(); } } @Override public void send(OUT data) throws IOException { if (handler == null) { throw new IllegalStateException( "BasicConnection has not been initialized or was closed"); } try { messages.put(new Data(data)); } catch (InterruptedException e) { setError(e); } } @Override public Reference getReference() { return reference; } @Override public URI getRemoteURI() { return uri; } @Override public void close() { System.out.println("flushing and terminating " + this); if (handler == null || messages.contains(EOS)) return; try { messages.put(EOS); } catch (InterruptedException e) { setError(e); } } @Override public String toString() { return "BasicConnection[" + reference + "]"; } @Override public Throwable getError() { return error; } /** * * @return true iff this connection is open. Returns false also when then * handler is in the close-down process */ public boolean isOpen() { return handler != null && !messages.contains(EOS); } /****************** implements listenable ****************/ // override because notifyListeners should throw exceptions. @Override public void addListener(Listener l) { listeners.add(l); } @Override public void removeListener(Listener l) { listeners.remove(l); } public void notifyListeners(IN data) { for (Listener l : listeners) { l.notifyChange(data); } } } /** * Wrapper around data so that we can put Null end EOS in a * {@link ArrayBlockingQueue} * * @param the type of contained data. */ interface Info { } class Data implements Info { private final S data; public Data(S data) { this.data = data; } public S get() { return data; } } /** * End of stream. * * @author wouter * */ class EOS implements Info { }