1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.activeio.adapter;
18
19 import java.io.IOException;
20 import java.io.InputStream;
21
22 import org.activeio.Channel;
23 import org.activeio.Packet;
24 import org.activeio.SynchChannel;
25 import org.activeio.packet.EOSPacket;
26
27 /***
28 * Provides an InputStream for a given SynchChannel.
29 *
30 * @version $Revision$
31 */
32 public class SynchChannelInputStream extends InputStream {
33
34 private final SynchChannel channel;
35 private Packet lastPacket;
36 private boolean closed;
37 private long timeout = Channel.WAIT_FOREVER_TIMEOUT;
38
39 /***
40 * @param channel
41 */
42 public SynchChannelInputStream(final SynchChannel channel) {
43 this.channel = channel;
44 }
45
46 /***
47 * @see java.io.InputStream#read()
48 */
49 public int read() throws IOException {
50 while( true ) {
51 if( lastPacket==null ) {
52 try {
53 lastPacket = channel.read(timeout);
54 } catch (IOException e) {
55 throw (IOException)new IOException("Channel failed: "+e.getMessage()).initCause(e);
56 }
57 }
58 if( lastPacket.hasRemaining() ) {
59 return lastPacket.read();
60 }
61 }
62 }
63
64 /***
65 * @see java.io.InputStream#read(byte[], int, int)
66 */
67 public int read(byte[] b, int off, int len) throws IOException {
68 while( true ) {
69 if( lastPacket==null || !lastPacket.hasRemaining() ) {
70 try {
71 lastPacket = channel.read(timeout);
72 } catch (IOException e) {
73 throw (IOException)new IOException("Channel failed: "+e.getMessage()).initCause(e);
74 }
75 }
76 if( lastPacket==EOSPacket.EOS_PACKET ) {
77 return -1;
78 }
79 if( lastPacket!=null && lastPacket.hasRemaining() ) {
80 return lastPacket.read(b, off, len);
81 }
82 }
83 }
84
85 /***
86 * @see java.io.InputStream#close()
87 */
88 public void close() throws IOException {
89 closed=true;
90 super.close();
91 }
92
93 public boolean isClosed() {
94 return closed;
95 }
96
97 /***
98 * @param timeout
99 */
100 public void setTimeout(long timeout) {
101 if( timeout <= 0 )
102 timeout = Channel.WAIT_FOREVER_TIMEOUT;
103 this.timeout = timeout;
104 }
105
106 /***
107 * @return
108 */
109 public long getTimeout() {
110 if( timeout == Channel.WAIT_FOREVER_TIMEOUT )
111 return 0;
112 return timeout;
113 }
114 }