View Javadoc

1   package org.codehaus.xfire.transport.local;
2   
3   import java.io.IOException;
4   import java.io.OutputStream;
5   import java.io.PipedInputStream;
6   import java.io.PipedOutputStream;
7   
8   import javax.xml.stream.XMLStreamReader;
9   import javax.xml.stream.XMLStreamWriter;
10  
11  import org.codehaus.xfire.MessageContext;
12  import org.codehaus.xfire.XFire;
13  import org.codehaus.xfire.XFireException;
14  import org.codehaus.xfire.XFireRuntimeException;
15  import org.codehaus.xfire.exchange.InMessage;
16  import org.codehaus.xfire.exchange.OutMessage;
17  import org.codehaus.xfire.service.Service;
18  import org.codehaus.xfire.transport.AbstractChannel;
19  import org.codehaus.xfire.transport.Channel;
20  import org.codehaus.xfire.util.STAXUtils;
21  
22  public class LocalChannel
23      extends AbstractChannel
24  {
25      private String uri;
26      protected static final String SENDER_URI = "senderUri";
27      protected static final String OLD_CONTEXT = "urn:xfire:transport:local:oldContext";
28      
29      public LocalChannel(String uri, LocalTransport transport)
30      {
31          setUri(uri);
32          setTransport(transport);
33      }
34  
35      public void open()
36      {
37      }
38  
39      public void send(final MessageContext context, final OutMessage message) throws XFireException
40      {
41          if (message.getUri().equals(Channel.BACKCHANNEL_URI))
42          {
43              final OutputStream out = (OutputStream) context.getProperty(Channel.BACKCHANNEL_URI);
44              if (out != null)
45              {
46                  final XMLStreamWriter writer = STAXUtils.createXMLStreamWriter(out, message.getEncoding());
47  
48                  message.getSerializer().writeMessage(message, writer, context);
49              }
50              else
51              {
52                  MessageContext oldContext = (MessageContext) context.getProperty(OLD_CONTEXT);
53                  
54                  sendViaNewChannel(context, oldContext, message, (String) context.getProperty(SENDER_URI));
55              }
56          }
57          else
58          {
59              MessageContext receivingContext = new MessageContext();
60              receivingContext.setXFire(context.getXFire());
61              receivingContext.setService(getService(context.getXFire(), message.getUri()));
62              receivingContext.setProperty(OLD_CONTEXT, context);
63              receivingContext.setProperty(SENDER_URI, getUri());
64              
65              sendViaNewChannel(context, receivingContext, message, message.getUri());
66          }
67      }
68  
69      protected Service getService(XFire xfire, String uri)
70      {
71          int i = uri.indexOf("//");
72          
73          if (i == -1 || xfire == null) return null;
74          
75          return xfire.getServiceRegistry().getService(uri.substring(i+2));
76      }
77  
78      private void sendViaNewChannel(final MessageContext context,
79                                     final MessageContext receivingContext,
80                                     final OutMessage message,
81                                     final String uri) throws XFireException
82      {
83          try
84          {
85              final PipedInputStream stream = new PipedInputStream();
86              final PipedOutputStream outStream = new PipedOutputStream(stream);
87              
88              final Channel channel;
89              try
90              {
91                  channel = getTransport().createChannel(uri);
92              }
93              catch (Exception e)
94              {
95                  throw new XFireException("Couldn't create channel.", e);
96              }
97              
98              Thread writeThread = new Thread(new Runnable()
99              {
100                 public void run()
101                 {
102                     try
103                     {
104                         final XMLStreamWriter writer = 
105                             STAXUtils.createXMLStreamWriter(outStream, message.getEncoding());
106                         message.getSerializer().writeMessage(message, writer, context);
107 
108                         writer.close();
109                         outStream.close();
110                     }
111                     catch (Exception e)
112                     {
113                         throw new XFireRuntimeException("Couldn't write stream.", e);
114                     }
115                 };
116             });
117             
118             Thread readThread = new Thread(new Runnable() 
119             {
120                 public void run() 
121                 {
122                    try
123                     {
124                        final XMLStreamReader reader = STAXUtils.createXMLStreamReader(stream, message.getEncoding());
125                        final InMessage inMessage = new InMessage(reader, uri);
126                        inMessage.setEncoding(message.getEncoding());
127 
128                        channel.receive(receivingContext, inMessage);
129                        
130                        reader.close();
131                        stream.close();
132                     }
133                     catch (Exception e)
134                     {
135                         throw new XFireRuntimeException("Couldn't read stream.", e);
136                     }
137                 };
138             });
139 
140             
141             writeThread.start();
142             readThread.start();
143         }
144         catch (IOException e)
145         {
146             throw new XFireRuntimeException("Couldn't create stream.", e);
147         }
148     }
149 
150     public void close()
151     {
152     }
153 }