1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.activeio.filter;
18
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21
22 import org.activeio.AsynchChannel;
23 import org.activeio.FilterAsynchChannel;
24 import org.activeio.Packet;
25
26 import EDU.oswego.cs.dl.util.concurrent.Mutex;
27 import EDU.oswego.cs.dl.util.concurrent.Sync;
28
29 /***
30 * Used to synchronize concurrent access to an ASynchChannel.
31 *
32 * Uses a {@see EDU.oswego.cs.dl.util.concurrent.Sync} object
33 * for write operations. All other operations such as {@see #stop(long)}
34 * and {@see #stop} just do a normal java synchronization against the SynchornizedSynchChannel
35 * object instance. It is assumed that the Async message delivery is not
36 * concurrent and therefore does not require synchronization.
37 *
38 */
39 public class SynchornizedAsynchChannel extends FilterAsynchChannel {
40
41 private final Sync writeLock;
42
43 public SynchornizedAsynchChannel(AsynchChannel next) {
44 this(next, new Mutex());
45 }
46
47 public SynchornizedAsynchChannel(AsynchChannel next, Sync writeLock) {
48 super(next);
49 this.writeLock = writeLock;
50 }
51
52 public void write(Packet packet) throws IOException {
53 try {
54 writeLock.acquire();
55 } catch (InterruptedException e) {
56 throw new InterruptedIOException(e.getMessage());
57 }
58 try {
59 getNext().write(packet);
60 } finally {
61 writeLock.release();
62 }
63 }
64
65 public void flush() throws IOException {
66 try {
67 writeLock.acquire();
68 } catch (InterruptedException e) {
69 throw new InterruptedIOException(e.getMessage());
70 }
71 try {
72 getNext().flush();
73 } finally {
74 writeLock.release();
75 }
76 }
77
78 synchronized public void dispose() {
79 super.dispose();
80 }
81
82 synchronized public Object narrow(Class target) {
83 return super.narrow(target);
84 }
85
86 synchronized public void start() throws IOException {
87 super.start();
88 }
89
90 synchronized public void stop(long timeout) throws IOException {
91 super.stop(timeout);
92 }
93
94 public Sync getWriteLock() {
95 return writeLock;
96 }
97 }