1 package org.activeio.adapter;
2
3 import java.io.IOException;
4 import java.io.InterruptedIOException;
5
6 import org.activeio.AsynchChannel;
7 import org.activeio.ChannelFactory;
8 import org.activeio.FilterAsynchChannel;
9 import org.activeio.Packet;
10
11 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
12 import EDU.oswego.cs.dl.util.concurrent.Channel;
13 import EDU.oswego.cs.dl.util.concurrent.Executor;
14 import EDU.oswego.cs.dl.util.concurrent.Latch;
15 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
16 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
17
18 public class AsynchWriteAsynchChannelAdapter extends FilterAsynchChannel {
19
20 static public class ObjectDispatcherX implements Runnable {
21
22 private final Executor executor;
23 private final Channel queue;
24 private final SynchronizedInt size = new SynchronizedInt(0);
25 private final AsynchWriteAsynchChannelAdapter objectListener;
26 private long pollDelay=10;
27
28 public ObjectDispatcherX(AsynchWriteAsynchChannelAdapter objectListener) {
29 this(objectListener, 10);
30 }
31
32 public ObjectDispatcherX(AsynchWriteAsynchChannelAdapter objectListener, int queueSize) {
33 this(objectListener, ChannelFactory.DEFAULT_EXECUTOR, new BoundedBuffer(queueSize));
34 }
35
36 public ObjectDispatcherX(AsynchWriteAsynchChannelAdapter objectListener, Executor executor, Channel queue) {
37 this.objectListener = objectListener;
38 this.executor = executor;
39 this.queue=queue;
40 }
41
42 public void add(Object o) throws InterruptedException {
43 int t = size.increment();
44 queue.put(o);
45 if( t==1 ) {
46 executor.execute(this);
47 }
48 }
49
50 synchronized public void run() {
51 int t = size.get();
52 while( t > 0 ) {
53 int count=0;
54 try {
55 Object o;
56 while( (o=queue.poll(pollDelay))!=null ) {
57 count++;
58 objectListener.onObject(o);
59 }
60 } catch (InterruptedException e) {
61 Thread.currentThread().interrupt();
62 return;
63 } finally {
64 t = size.subtract(count);
65 }
66 }
67 }
68
69 }
70 static public class ObjectDispatcher {
71
72 private final PooledExecutor executor;
73 private final AsynchWriteAsynchChannelAdapter objectListener;
74
75 public ObjectDispatcher(AsynchWriteAsynchChannelAdapter objectListener) {
76 this(objectListener, 10);
77 }
78
79 public ObjectDispatcher(AsynchWriteAsynchChannelAdapter objectListener, int queueSize) {
80 this.objectListener = objectListener;
81 executor = new PooledExecutor(new BoundedBuffer(queueSize), 1);
82 executor.waitWhenBlocked();
83 }
84
85 public void add(final Object o) throws InterruptedException {
86 executor.execute(new Runnable(){
87 public void run() {
88 objectListener.onObject(o);
89 }
90 });
91 }
92 }
93
94 private final ObjectDispatcher dispatcher;
95 private static final Object FLUSH_COMMAND = new Object();
96
97 public AsynchWriteAsynchChannelAdapter(AsynchChannel next) {
98 this(next, 10);
99 }
100
101 public AsynchWriteAsynchChannelAdapter(AsynchChannel next, int queueSize) {
102 super(next);
103 this.dispatcher = new ObjectDispatcher(this, queueSize);
104 }
105
106 public void onObject(Object o) {
107 try {
108 if( o == FLUSH_COMMAND ) {
109 next.flush();
110 return;
111 }
112 if( o.getClass() == Latch.class ) {
113 next.flush();
114 ((Latch)o).release();
115 return;
116 }
117 next.write((Packet)o);
118 } catch (IOException e) {
119 channelListener.onPacketError(e);
120 }
121 }
122
123 public void write(Packet packet) throws IOException {
124 try {
125 dispatcher.add(packet);
126 } catch (InterruptedException e) {
127 throw new InterruptedIOException();
128 }
129 }
130
131 public void flush() throws IOException {
132 flush(NO_WAIT_TIMEOUT);
133 }
134
135 public void stop(long timeout) throws IOException {
136 flush(WAIT_FOREVER_TIMEOUT);
137 }
138
139
140 /***
141 * @param timeout
142 * @throws InterruptedIOException
143 */
144 private void flush(long timeout) throws InterruptedIOException {
145 try {
146 if( timeout == NO_WAIT_TIMEOUT ) {
147 dispatcher.add(FLUSH_COMMAND);
148 } else if( timeout == WAIT_FOREVER_TIMEOUT ) {
149 Latch l = new Latch();
150 dispatcher.add(l);
151 l.acquire();
152 } else {
153 Latch l = new Latch();
154 dispatcher.add(l);
155 l.attempt(timeout);
156 }
157 } catch (InterruptedException e) {
158 throw new InterruptedIOException();
159 }
160 }
161 }