1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activeio.net;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.net.URI;
23
24 import org.activeio.Channel;
25 import org.activeio.FilterSynchChannel;
26 import org.activeio.FilterSynchChannelServer;
27 import org.activeio.Packet;
28 import org.activeio.SynchChannel;
29 import org.activeio.SynchChannelFactory;
30 import org.activeio.SynchChannelServer;
31
32 /***
33 * Makes all the channels produced by another [@see org.activeio.SynchChannelFactory}
34 * have write operations that have built in delays for testing.
35 *
36 * @version $Revision$
37 */
38 public class SlowWriteSynchChannelFactory implements SynchChannelFactory {
39
40 final SynchChannelFactory next;
41 private final int maxPacketSize;
42 private final long packetDelay;
43
44 public SlowWriteSynchChannelFactory(final SynchChannelFactory next, int maxPacketSize, long packetDelay) {
45 this.next = next;
46 this.maxPacketSize = maxPacketSize;
47 this.packetDelay = packetDelay;
48 }
49
50 class SlowWriteSynchChannel extends FilterSynchChannel {
51 public SlowWriteSynchChannel(SynchChannel next) {
52 super(next);
53 }
54 public void write(Packet packet) throws IOException {
55 packet = packet.slice();
56 while(packet.hasRemaining()) {
57 int size = Math.max(maxPacketSize, packet.remaining());
58 packet.position(size);
59 Packet remaining = packet.slice();
60 packet.flip();
61 Packet data = packet.slice();
62 super.write(data);
63 packet = remaining;
64 try {
65 Thread.sleep(packetDelay);
66 } catch (InterruptedException e) {
67 throw new InterruptedIOException();
68 }
69 }
70 }
71 }
72
73 class SlowWriteSynchChannelServer extends FilterSynchChannelServer {
74 public SlowWriteSynchChannelServer(SynchChannelServer next) {
75 super(next);
76 }
77 public Channel accept(long timeout) throws IOException {
78 Channel channel = super.accept(timeout);
79 if( channel != null ) {
80 channel = new SlowWriteSynchChannel((SynchChannel) channel);
81 }
82 return channel;
83 }
84 }
85
86 public SynchChannelServer bindSynchChannel(URI location) throws IOException {
87 return next.bindSynchChannel(location);
88 }
89
90 public SynchChannel openSynchChannel(URI location) throws IOException {
91 return new SlowWriteSynchChannel(next.openSynchChannel(location));
92 }
93
94 }