source: simplerunner/src/main/java/geniusweb/simplerunner/BasicConnection.java@ 52

Last change on this file since 52 was 52, checked in by ruud, 14 months ago

Fixed small issues in domaineditor.

File size: 4.1 KB
Line 
1package geniusweb.simplerunner;
2
3import java.io.IOException;
4import java.net.URI;
5import java.util.List;
6import java.util.concurrent.ArrayBlockingQueue;
7import java.util.concurrent.CopyOnWriteArrayList;
8
9import geniusweb.connection.ConnectionEnd;
10import geniusweb.references.Reference;
11import tudelft.utilities.listener.Listener;
12
13/**
14 * A basic connection that implements connection with direct calls
15 *
16 *
17 * @param <IN> the type of the incoming data
18 * @param <OUT> the type of outgoing data
19 */
20public class BasicConnection<IN, OUT> implements ConnectionEnd<IN, OUT> {
21 private List<Listener<IN>> listeners = new CopyOnWriteArrayList<Listener<IN>>();
22
23 private final Info<OUT> EOS = new EOS<OUT>();
24 private final Reference reference;
25 private final URI uri;
26 // to be initialized
27 private Listener<OUT> handler = null;
28 private ArrayBlockingQueue<Info<OUT>> messages = new ArrayBlockingQueue<>(
29 4);
30 private Throwable error;
31
32 /**
33 *
34 * @param reference Reference that was used to create this connection.
35 * @param uri the URI of the remote endpoint that makes up the
36 * connection. This is a URI that uniquely identifies the
37 * remote object
38 */
39 public BasicConnection(Reference reference, URI uri) {
40 this.reference = reference;
41 this.uri = uri;
42 }
43
44 /**
45 * To be called to hook up the other side that will handle a send action
46 * from us. Must be called first.
47 *
48 * @param newhandler a Listener&lt;OUT&gt; that can handle send actions.
49 *
50 */
51 public void init(Listener<OUT> newhandler) {
52 if (handler != null) {
53 throw new IllegalStateException("already initialized");
54 }
55 this.handler = newhandler;
56
57 Thread handlerThread = new Thread(new Runnable() {
58
59 @Override
60 public void run() {
61 try {
62 while (true) {
63 Info<OUT> mess = messages.take();
64 if (mess instanceof EOS)
65 break;
66 handler.notifyChange(((Data<OUT>) mess).get());
67 }
68 } catch (Exception e) {
69 setError(e);
70 }
71 handler = null;
72 System.out.println("BasicConnection closed");
73 }
74 });
75 handlerThread.start();
76 }
77
78 /**
79 * Error condition occurs. Record error and close connection
80 *
81 * @param e
82 */
83 protected synchronized void setError(Throwable e) {
84 if (error == null) {
85 // maybe log instead?
86 e.printStackTrace();
87 error = e;
88 close();
89 }
90 }
91
92 @Override
93 public void send(OUT data) throws IOException {
94 if (handler == null) {
95 throw new IllegalStateException(
96 "BasicConnection has not been initialized or was closed");
97 }
98 try {
99 messages.put(new Data<OUT>(data));
100 } catch (InterruptedException e) {
101 setError(e);
102 }
103 }
104
105 @Override
106 public Reference getReference() {
107 return reference;
108 }
109
110 @Override
111 public URI getRemoteURI() {
112 return uri;
113 }
114
115 @Override
116 public void close() {
117 System.out.println("flushing and terminating " + this);
118 if (handler == null || messages.contains(EOS))
119 return;
120 try {
121 messages.put(EOS);
122 } catch (InterruptedException e) {
123 setError(e);
124 }
125 }
126
127 @Override
128 public String toString() {
129 return "BasicConnection[" + reference + "]";
130 }
131
132 @Override
133 public Throwable getError() {
134 return error;
135 }
136
137 /**
138 *
139 * @return true iff this connection is open. Returns false also when then
140 * handler is in the close-down process
141 */
142 public boolean isOpen() {
143 return handler != null && !messages.contains(EOS);
144 }
145
146 /****************** implements listenable ****************/
147 // override because notifyListeners should throw exceptions.
148 @Override
149 public void addListener(Listener<IN> l) {
150 listeners.add(l);
151 }
152
153 @Override
154 public void removeListener(Listener<IN> l) {
155 listeners.remove(l);
156 }
157
158 public void notifyListeners(IN data) {
159 for (Listener<IN> l : listeners) {
160 l.notifyChange(data);
161 }
162 }
163
164}
165
166/**
167 * Wrapper around data so that we can put Null end EOS in a
168 * {@link ArrayBlockingQueue}
169 *
170 * @param <S> the type of contained data.
171 */
172
173interface Info<S> {
174
175}
176
177class Data<S> implements Info<S> {
178 private final S data;
179
180 public Data(S data) {
181 this.data = data;
182 }
183
184 public S get() {
185 return data;
186 }
187}
188
189/**
190 * End of stream.
191 *
192 * @author wouter
193 *
194 */
195class EOS<S> implements Info<S> {
196}
Note: See TracBrowser for help on using the repository browser.