1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
|
//-----------------------------------------------------------------------
// <copyright file="CoordinatingChannel.cs" company="Andrew Arnott">
// Copyright (c) Andrew Arnott. All rights reserved.
// </copyright>
//-----------------------------------------------------------------------
namespace DotNetOpenAuth.Test.Mocks {
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Text;
using System.Threading;
using DotNetOpenAuth.Messaging;
using DotNetOpenAuth.Messaging.Reflection;
using DotNetOpenAuth.Test.OpenId;
using NUnit.Framework;
internal class CoordinatingChannel : Channel {
/// <summary>
/// A lock to use when checking and setting the <see cref="waitingForMessage"/>
/// or the <see cref="simulationCompleted"/> fields.
/// </summary>
/// <remarks>
/// This is a static member so that all coordinating channels share a lock
/// since they peak at each others fields.
/// </remarks>
private static readonly object waitingForMessageCoordinationLock = new object();
/// <summary>
/// The original product channel whose behavior is being modified to work
/// better in automated testing.
/// </summary>
private Channel wrappedChannel;
/// <summary>
/// A flag set to true when this party in a two-party test has completed
/// its part of the testing.
/// </summary>
private bool simulationCompleted;
/// <summary>
/// A thread-coordinating signal that is set when another thread has a
/// message ready for this channel to receive.
/// </summary>
private EventWaitHandle incomingMessageSignal = new AutoResetEvent(false);
/// <summary>
/// A thread-coordinating signal that is set briefly by this thread whenever
/// a message is picked up.
/// </summary>
private EventWaitHandle messageReceivedSignal = new AutoResetEvent(false);
/// <summary>
/// A flag used to indicate when this channel is waiting for a message
/// to arrive.
/// </summary>
private bool waitingForMessage;
/// <summary>
/// An incoming message that has been posted by a remote channel and
/// is waiting for receipt by this channel.
/// </summary>
private IDictionary<string, string> incomingMessage;
private MessageReceivingEndpoint incomingMessageRecipient;
/// <summary>
/// A delegate that gets a chance to peak at and fiddle with all
/// incoming messages.
/// </summary>
private Action<IProtocolMessage> incomingMessageFilter;
/// <summary>
/// A delegate that gets a chance to peak at and fiddle with all
/// outgoing messages.
/// </summary>
private Action<IProtocolMessage> outgoingMessageFilter;
/// <summary>
/// Initializes a new instance of the <see cref="CoordinatingChannel"/> class.
/// </summary>
/// <param name="wrappedChannel">The wrapped channel. Must not be null.</param>
/// <param name="incomingMessageFilter">The incoming message filter. May be null.</param>
/// <param name="outgoingMessageFilter">The outgoing message filter. May be null.</param>
internal CoordinatingChannel(Channel wrappedChannel, Action<IProtocolMessage> incomingMessageFilter, Action<IProtocolMessage> outgoingMessageFilter)
: base(GetMessageFactory(wrappedChannel), wrappedChannel.BindingElements.ToArray()) {
Contract.Requires<ArgumentNullException>(wrappedChannel != null);
this.wrappedChannel = wrappedChannel;
this.incomingMessageFilter = incomingMessageFilter;
this.outgoingMessageFilter = outgoingMessageFilter;
// Preserve any customized binding element ordering.
this.CustomizeBindingElementOrder(this.wrappedChannel.OutgoingBindingElements, this.wrappedChannel.IncomingBindingElements);
}
/// <summary>
/// Gets or sets the coordinating channel used by the other party.
/// </summary>
internal CoordinatingChannel RemoteChannel { get; set; }
/// <summary>
/// Indicates that the simulation that uses this channel has completed work.
/// </summary>
/// <remarks>
/// Calling this method is not strictly necessary, but it gives the channel
/// coordination a chance to recognize when another channel is left dangling
/// waiting for a message from another channel that may never come.
/// </remarks>
internal void Close() {
lock (waitingForMessageCoordinationLock) {
this.simulationCompleted = true;
if (this.RemoteChannel.waitingForMessage && this.RemoteChannel.incomingMessage == null) {
TestUtilities.TestLogger.Debug("CoordinatingChannel is closing while remote channel is waiting for an incoming message. Signaling channel to unblock it to receive a null message.");
this.RemoteChannel.incomingMessageSignal.Set();
}
this.Dispose();
}
}
/// <summary>
/// Replays the specified message as if it were received again.
/// </summary>
/// <param name="message">The message to replay.</param>
internal void Replay(IProtocolMessage message) {
this.ProcessIncomingMessage(CloneSerializedParts(message));
}
/// <summary>
/// Called from a remote party's thread to post a message to this channel for processing.
/// </summary>
/// <param name="message">The message that this channel should receive. This message will be cloned.</param>
internal void PostMessage(IProtocolMessage message) {
if (this.incomingMessage != null) {
// The remote party hasn't picked up the last message we sent them.
// Wait for a short period for them to pick it up before failing.
TestBase.TestLogger.Warn("We're blocked waiting to send a message to the remote party and they haven't processed the last message we sent them.");
this.RemoteChannel.messageReceivedSignal.WaitOne(500);
}
ErrorUtilities.VerifyInternal(this.incomingMessage == null, "Oops, a message is already waiting for the remote party!");
this.incomingMessage = this.MessageDescriptions.GetAccessor(message).Serialize();
var directedMessage = message as IDirectedProtocolMessage;
this.incomingMessageRecipient = directedMessage != null ? new MessageReceivingEndpoint(directedMessage.Recipient, directedMessage.HttpMethods) : null;
this.incomingMessageSignal.Set();
}
protected internal override HttpRequestInfo GetRequestFromContext() {
MessageReceivingEndpoint recipient;
var messageData = this.AwaitIncomingMessage(out recipient);
if (messageData != null) {
return new CoordinatingHttpRequestInfo(this, this.MessageFactory, messageData, recipient);
} else {
return new CoordinatingHttpRequestInfo(recipient);
}
}
protected override IProtocolMessage RequestCore(IDirectedProtocolMessage request) {
this.ProcessMessageFilter(request, true);
// Drop the outgoing message in the other channel's in-slot and let them know it's there.
this.RemoteChannel.PostMessage(request);
// Now wait for a response...
MessageReceivingEndpoint recipient;
IDictionary<string, string> responseData = this.AwaitIncomingMessage(out recipient);
ErrorUtilities.VerifyInternal(recipient == null, "The recipient is expected to be null for direct responses.");
// And deserialize it.
IDirectResponseProtocolMessage responseMessage = this.MessageFactory.GetNewResponseMessage(request, responseData);
if (responseMessage == null) {
return null;
}
var responseAccessor = this.MessageDescriptions.GetAccessor(responseMessage);
responseAccessor.Deserialize(responseData);
this.ProcessMessageFilter(responseMessage, false);
return responseMessage;
}
protected override OutgoingWebResponse PrepareDirectResponse(IProtocolMessage response) {
this.ProcessMessageFilter(response, true);
return new CoordinatingOutgoingWebResponse(response, this.RemoteChannel);
}
protected override OutgoingWebResponse PrepareIndirectResponse(IDirectedProtocolMessage message) {
this.ProcessMessageFilter(message, true);
// In this mock transport, direct and indirect messages are the same.
return this.PrepareDirectResponse(message);
}
protected override IDirectedProtocolMessage ReadFromRequestCore(HttpRequestInfo request) {
if (request.Message != null) {
this.ProcessMessageFilter(request.Message, false);
}
return request.Message;
}
protected override IDictionary<string, string> ReadFromResponseCore(IncomingWebResponse response) {
return this.wrappedChannel.ReadFromResponseCoreTestHook(response);
}
protected override void ProcessIncomingMessage(IProtocolMessage message) {
this.wrappedChannel.ProcessIncomingMessageTestHook(message);
}
/// <summary>
/// Clones a message, instantiating the new instance using <i>this</i> channel's
/// message factory.
/// </summary>
/// <typeparam name="T">The type of message to clone.</typeparam>
/// <param name="message">The message to clone.</param>
/// <returns>The new instance of the message.</returns>
/// <remarks>
/// This Clone method should <i>not</i> be used to send message clones to the remote
/// channel since their message factory is not used.
/// </remarks>
protected virtual T CloneSerializedParts<T>(T message) where T : class, IProtocolMessage {
Contract.Requires<ArgumentNullException>(message != null);
IProtocolMessage clonedMessage;
var messageAccessor = this.MessageDescriptions.GetAccessor(message);
var fields = messageAccessor.Serialize();
MessageReceivingEndpoint recipient = null;
var directedMessage = message as IDirectedProtocolMessage;
var directResponse = message as IDirectResponseProtocolMessage;
if (directedMessage != null && directedMessage.IsRequest()) {
if (directedMessage.Recipient != null) {
recipient = new MessageReceivingEndpoint(directedMessage.Recipient, directedMessage.HttpMethods);
}
clonedMessage = this.MessageFactory.GetNewRequestMessage(recipient, fields);
} else if (directResponse != null && directResponse.IsDirectResponse()) {
clonedMessage = this.MessageFactory.GetNewResponseMessage(directResponse.OriginatingRequest, fields);
} else {
throw new InvalidOperationException("Totally expected a message to implement one of the two derived interface types.");
}
ErrorUtilities.VerifyInternal(clonedMessage != null, "Message factory did not generate a message instance for " + message.GetType().Name);
// Fill the cloned message with data.
var clonedMessageAccessor = this.MessageDescriptions.GetAccessor(clonedMessage);
clonedMessageAccessor.Deserialize(fields);
return (T)clonedMessage;
}
private static IMessageFactory GetMessageFactory(Channel channel) {
Contract.Requires<ArgumentNullException>(channel != null);
return channel.MessageFactoryTestHook;
}
private IDictionary<string, string> AwaitIncomingMessage(out MessageReceivingEndpoint recipient) {
// Special care should be taken so that we don't indefinitely
// wait for a message that may never come due to a bug in the product
// or the test.
// There are two scenarios that we need to watch out for:
// 1. Two channels are waiting to receive messages from each other.
// 2. One channel is waiting for a message that will never come because
// the remote party has already finished executing.
lock (waitingForMessageCoordinationLock) {
// It's possible that a message was just barely transmitted either to this
// or the remote channel. So it's ok for the remote channel to be waiting
// if either it or we are already about to receive a message.
ErrorUtilities.VerifyInternal(!this.RemoteChannel.waitingForMessage || this.RemoteChannel.incomingMessage != null || this.incomingMessage != null, "This channel is expecting an incoming message from another channel that is also blocked waiting for an incoming message from us!");
// It's permissible that the remote channel has already closed if it left a message
// for us already.
ErrorUtilities.VerifyInternal(!this.RemoteChannel.simulationCompleted || this.incomingMessage != null, "This channel is expecting an incoming message from another channel that has already been closed.");
this.waitingForMessage = true;
}
this.incomingMessageSignal.WaitOne();
lock (waitingForMessageCoordinationLock) {
this.waitingForMessage = false;
var response = this.incomingMessage;
recipient = this.incomingMessageRecipient;
this.incomingMessage = null;
this.incomingMessageRecipient = null;
// Briefly signal to another thread that might be waiting for our inbox to be empty
this.messageReceivedSignal.Set();
this.messageReceivedSignal.Reset();
return response;
}
}
private void ProcessMessageFilter(IProtocolMessage message, bool outgoing) {
if (outgoing) {
if (this.outgoingMessageFilter != null) {
this.outgoingMessageFilter(message);
}
} else {
if (this.incomingMessageFilter != null) {
this.incomingMessageFilter(message);
}
}
}
}
}
|