1 package org.codehaus.xfire.client;
2
3 import java.util.ArrayList;
4 import java.util.List;
5
6 import javax.xml.stream.XMLStreamException;
7 import javax.xml.stream.XMLStreamReader;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11 import org.codehaus.xfire.MessageContext;
12 import org.codehaus.xfire.XFire;
13 import org.codehaus.xfire.XFireException;
14 import org.codehaus.xfire.XFireRuntimeException;
15 import org.codehaus.xfire.exchange.InMessage;
16 import org.codehaus.xfire.exchange.OutMessage;
17 import org.codehaus.xfire.exchange.RobustInOutExchange;
18 import org.codehaus.xfire.fault.XFireFault;
19 import org.codehaus.xfire.handler.AbstractHandlerSupport;
20 import org.codehaus.xfire.handler.HandlerPipeline;
21 import org.codehaus.xfire.handler.OutMessageSender;
22 import org.codehaus.xfire.handler.ParseMessageHandler;
23 import org.codehaus.xfire.handler.Phase;
24 import org.codehaus.xfire.service.OperationInfo;
25 import org.codehaus.xfire.service.Service;
26 import org.codehaus.xfire.service.binding.AbstractBinding;
27 import org.codehaus.xfire.service.binding.ObjectBinding;
28 import org.codehaus.xfire.transport.Channel;
29 import org.codehaus.xfire.transport.ChannelEndpoint;
30 import org.codehaus.xfire.transport.Transport;
31
32 public class Client
33 extends AbstractHandlerSupport
34 implements ChannelEndpoint
35 {
36 private static final Log log = LogFactory.getLog(Client.class);
37
38 private Object[] response;
39 private Transport transport;
40 private Service service;
41 private ObjectBinding clientBinding;
42 private String url;
43 private int timeout = 10*1000;
44 private MessageContext context;
45 private XFireFault fault;
46 private String endpointUri;
47 private List inPhases;
48 private List outPhases;
49
50 /*** The XFire instance. This is only needed when invoking local services. */
51 private XFire xfire;
52
53 public Client(Transport transport, Service service, String url)
54 {
55 this(transport, service, url, null);
56 }
57
58 public Client(Transport transport, Service service, String url, String endpointUri)
59 {
60 this.transport = transport;
61 this.url = url;
62 this.endpointUri = endpointUri;
63
64 clientBinding = (ObjectBinding) ((AbstractBinding) service.getBinding()).clone();
65 clientBinding.setClientModeOn(true);
66
67
68 this.service = new Service(service.getServiceInfo());
69 this.service.setBinding(clientBinding);
70 this.service.setFaultSerializer(service.getFaultSerializer());
71 this.service.setSoapVersion(service.getSoapVersion());
72
73 inPhases = new ArrayList();
74 outPhases = new ArrayList();
75 createPhases();
76
77 addOutHandler(new OutMessageSender());
78 addInHandler(new ParseMessageHandler());
79 }
80
81 public Object[] invoke(OperationInfo op, Object[] params) throws XFireException, XFireFault
82 {
83 try
84 {
85 OutMessage msg = new OutMessage("targeturl");
86 msg.setBody(params);
87 msg.setUri(url);
88 msg.setSerializer(service.getBinding());
89 msg.setAction(op.getAction());
90 msg.setChannel(getOutChannel());
91
92 context = new MessageContext();
93 context.setService(service);
94 context.setXFire(xfire);
95
96 RobustInOutExchange exchange = new RobustInOutExchange(context);
97 exchange.setOperation(op);
98 exchange.setOutMessage(msg);
99 context.setExchange(exchange);
100
101 HandlerPipeline pipeline = new HandlerPipeline(outPhases);
102 pipeline.addHandlers(getOutHandlers());
103 pipeline.addHandlers(transport.getOutHandlers());
104
105 pipeline.invoke(context);
106 }
107 catch (Exception e1)
108 {
109 throw XFireFault.createFault(e1);
110 }
111
112 /***
113 * If this is an asynchronous channel, we'll need to sleep() and wait
114 * for a response. Channels such as HTTP will have the response set
115 * by the time we get to this point.
116 */
117 if (response == null && fault == null)
118 {
119 int count = 0;
120 while (response == null && fault == null && count < timeout)
121 {
122 try
123 {
124 Thread.sleep(50);
125 count += 50;
126 }
127 catch (InterruptedException e)
128 {
129 break;
130 }
131 }
132 }
133
134 if (fault != null)
135 {
136 XFireFault localFault = fault;
137 fault = null;
138 throw localFault;
139 }
140
141 Object[] localResponse = response;
142 response = null;
143 return localResponse;
144 }
145
146 public void onReceive(MessageContext recvContext, InMessage msg)
147 {
148 if (log.isDebugEnabled()) log.debug("Received message to " + msg.getUri());
149
150 if (context.getExchange() == null)
151 {
152 context.setExchange(new RobustInOutExchange(context));
153 }
154
155 RobustInOutExchange exchange = (RobustInOutExchange) context.getExchange();
156 exchange.setInMessage(msg);
157
158 HandlerPipeline pipeline = new HandlerPipeline(inPhases);
159 pipeline.addHandlers(getInHandlers());
160 pipeline.addHandlers(transport.getInHandlers());
161
162 try
163 {
164 pipeline.invoke(context);
165
166 finishReadingMessage(msg, context);
167
168 response = ((List) msg.getBody()).toArray();
169 }
170 catch (Exception e1)
171 {
172 XFireFault fault = XFireFault.createFault(e1);
173 pipeline.handleFault(fault, context);
174
175 this.fault = fault;
176 }
177 }
178
179 public void finishReadingMessage(InMessage message, MessageContext context)
180 throws XFireFault
181 {
182 XMLStreamReader reader = message.getXMLStreamReader();
183
184 try
185 {
186 while (reader.hasNext()) reader.next();
187 }
188 catch (XMLStreamException e)
189 {
190 throw new XFireFault("Couldn't parse message.", e, XFireFault.SENDER);
191 }
192 }
193
194 protected void createPhases()
195 {
196 inPhases = new ArrayList();
197 inPhases.add(new Phase(Phase.TRANSPORT, 1000));
198 inPhases.add(new Phase(Phase.PARSE, 2000));
199 inPhases.add(new Phase(Phase.PRE_DISPATCH, 3000));
200 inPhases.add(new Phase(Phase.DISPATCH, 4000));
201 inPhases.add(new Phase(Phase.POLICY, 5000));
202 inPhases.add(new Phase(Phase.USER, 6000));
203 inPhases.add(new Phase(Phase.PRE_INVOKE, 7000));
204 inPhases.add(new Phase(Phase.SERVICE, 8000));
205
206 outPhases = new ArrayList();
207 outPhases.add(new Phase(Phase.POST_INVOKE, 1000));
208 outPhases.add(new Phase(Phase.POLICY, 2000));
209 outPhases.add(new Phase(Phase.USER, 3000));
210 outPhases.add(new Phase(Phase.TRANSPORT, 4000));
211 outPhases.add(new Phase(Phase.SEND, 5000));
212 }
213
214 public Channel getOutChannel()
215 {
216 Channel channel = null;
217 try
218 {
219 String uri = getEndpointUri();
220 if (uri == null)
221 channel = getTransport().createChannel();
222 else
223 channel = getTransport().createChannel(uri);
224 }
225 catch (Exception e)
226 {
227 throw new XFireRuntimeException("Couldn't open channel.", e);
228 }
229
230 channel.setEndpoint(this);
231
232 return channel;
233 }
234
235 public Transport getTransport()
236 {
237 return transport;
238 }
239
240 public void receive(Object response)
241 {
242 this.response = ((List) response).toArray();
243 }
244
245 public Service getService()
246 {
247 return service;
248 }
249
250 public String getUrl()
251 {
252 return url;
253 }
254
255 public void setUrl(String url)
256 {
257 this.url = url;
258 }
259
260 public String getEndpointUri()
261 {
262 return endpointUri;
263 }
264
265 public void setEndpointUri(String endpointUri)
266 {
267 this.endpointUri = endpointUri;
268 }
269
270 public int getTimeout()
271 {
272 return timeout;
273 }
274
275 public void setTimeout(int timeout)
276 {
277 this.timeout = timeout;
278 }
279
280 public void receiveFault(XFireFault fault)
281 {
282 this.fault = fault;
283 }
284
285 public XFire getXFire()
286 {
287 return xfire;
288 }
289
290 public void setXFire(XFire xfire)
291 {
292 this.xfire = xfire;
293 }
294 }