//----------------------------------------------------------------------- // // Copyright (c) Andrew Arnott. All rights reserved. // //----------------------------------------------------------------------- namespace DotNetOpenAuth.Test.Mocks { using System; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Linq; using System.Text; using System.Threading; using DotNetOpenAuth.Messaging; using DotNetOpenAuth.Messaging.Reflection; using DotNetOpenAuth.Test.OpenId; using NUnit.Framework; internal class CoordinatingChannel : Channel { /// /// A lock to use when checking and setting the /// or the fields. /// /// /// This is a static member so that all coordinating channels share a lock /// since they peak at each others fields. /// private static readonly object waitingForMessageCoordinationLock = new object(); /// /// The original product channel whose behavior is being modified to work /// better in automated testing. /// private Channel wrappedChannel; /// /// A flag set to true when this party in a two-party test has completed /// its part of the testing. /// private bool simulationCompleted; /// /// A thread-coordinating signal that is set when another thread has a /// message ready for this channel to receive. /// private EventWaitHandle incomingMessageSignal = new AutoResetEvent(false); /// /// A thread-coordinating signal that is set briefly by this thread whenever /// a message is picked up. /// private EventWaitHandle messageReceivedSignal = new AutoResetEvent(false); /// /// A flag used to indicate when this channel is waiting for a message /// to arrive. /// private bool waitingForMessage; /// /// An incoming message that has been posted by a remote channel and /// is waiting for receipt by this channel. /// private IDictionary incomingMessage; private MessageReceivingEndpoint incomingMessageRecipient; /// /// A delegate that gets a chance to peak at and fiddle with all /// incoming messages. /// private Action incomingMessageFilter; /// /// A delegate that gets a chance to peak at and fiddle with all /// outgoing messages. /// private Action outgoingMessageFilter; /// /// Initializes a new instance of the class. /// /// The wrapped channel. Must not be null. /// The incoming message filter. May be null. /// The outgoing message filter. May be null. internal CoordinatingChannel(Channel wrappedChannel, Action incomingMessageFilter, Action outgoingMessageFilter) : base(GetMessageFactory(wrappedChannel), wrappedChannel.BindingElements.ToArray()) { Contract.Requires(wrappedChannel != null); this.wrappedChannel = wrappedChannel; this.incomingMessageFilter = incomingMessageFilter; this.outgoingMessageFilter = outgoingMessageFilter; // Preserve any customized binding element ordering. this.CustomizeBindingElementOrder(this.wrappedChannel.OutgoingBindingElements, this.wrappedChannel.IncomingBindingElements); } /// /// Gets or sets the coordinating channel used by the other party. /// internal CoordinatingChannel RemoteChannel { get; set; } /// /// Indicates that the simulation that uses this channel has completed work. /// /// /// Calling this method is not strictly necessary, but it gives the channel /// coordination a chance to recognize when another channel is left dangling /// waiting for a message from another channel that may never come. /// internal void Close() { lock (waitingForMessageCoordinationLock) { this.simulationCompleted = true; if (this.RemoteChannel.waitingForMessage && this.RemoteChannel.incomingMessage == null) { TestUtilities.TestLogger.Debug("CoordinatingChannel is closing while remote channel is waiting for an incoming message. Signaling channel to unblock it to receive a null message."); this.RemoteChannel.incomingMessageSignal.Set(); } this.Dispose(); } } /// /// Replays the specified message as if it were received again. /// /// The message to replay. internal void Replay(IProtocolMessage message) { this.ProcessIncomingMessage(CloneSerializedParts(message)); } /// /// Called from a remote party's thread to post a message to this channel for processing. /// /// The message that this channel should receive. This message will be cloned. internal void PostMessage(IProtocolMessage message) { if (this.incomingMessage != null) { // The remote party hasn't picked up the last message we sent them. // Wait for a short period for them to pick it up before failing. TestBase.TestLogger.Warn("We're blocked waiting to send a message to the remote party and they haven't processed the last message we sent them."); this.RemoteChannel.messageReceivedSignal.WaitOne(500); } ErrorUtilities.VerifyInternal(this.incomingMessage == null, "Oops, a message is already waiting for the remote party!"); this.incomingMessage = this.MessageDescriptions.GetAccessor(message).Serialize(); var directedMessage = message as IDirectedProtocolMessage; this.incomingMessageRecipient = directedMessage != null ? new MessageReceivingEndpoint(directedMessage.Recipient, directedMessage.HttpMethods) : null; this.incomingMessageSignal.Set(); } protected internal override HttpRequestInfo GetRequestFromContext() { MessageReceivingEndpoint recipient; var messageData = this.AwaitIncomingMessage(out recipient); if (messageData != null) { return new CoordinatingHttpRequestInfo(this, this.MessageFactory, messageData, recipient); } else { return new CoordinatingHttpRequestInfo(recipient); } } protected override IProtocolMessage RequestCore(IDirectedProtocolMessage request) { this.ProcessMessageFilter(request, true); // Drop the outgoing message in the other channel's in-slot and let them know it's there. this.RemoteChannel.PostMessage(request); // Now wait for a response... MessageReceivingEndpoint recipient; IDictionary responseData = this.AwaitIncomingMessage(out recipient); ErrorUtilities.VerifyInternal(recipient == null, "The recipient is expected to be null for direct responses."); // And deserialize it. IDirectResponseProtocolMessage responseMessage = this.MessageFactory.GetNewResponseMessage(request, responseData); if (responseMessage == null) { return null; } var responseAccessor = this.MessageDescriptions.GetAccessor(responseMessage); responseAccessor.Deserialize(responseData); this.ProcessMessageFilter(responseMessage, false); return responseMessage; } protected override OutgoingWebResponse PrepareDirectResponse(IProtocolMessage response) { this.ProcessMessageFilter(response, true); return new CoordinatingOutgoingWebResponse(response, this.RemoteChannel); } protected override OutgoingWebResponse PrepareIndirectResponse(IDirectedProtocolMessage message) { this.ProcessMessageFilter(message, true); // In this mock transport, direct and indirect messages are the same. return this.PrepareDirectResponse(message); } protected override IDirectedProtocolMessage ReadFromRequestCore(HttpRequestInfo request) { if (request.Message != null) { this.ProcessMessageFilter(request.Message, false); } return request.Message; } protected override IDictionary ReadFromResponseCore(IncomingWebResponse response) { return this.wrappedChannel.ReadFromResponseCoreTestHook(response); } protected override void ProcessIncomingMessage(IProtocolMessage message) { this.wrappedChannel.ProcessIncomingMessageTestHook(message); } /// /// Clones a message, instantiating the new instance using this channel's /// message factory. /// /// The type of message to clone. /// The message to clone. /// The new instance of the message. /// /// This Clone method should not be used to send message clones to the remote /// channel since their message factory is not used. /// protected virtual T CloneSerializedParts(T message) where T : class, IProtocolMessage { Contract.Requires(message != null); IProtocolMessage clonedMessage; var messageAccessor = this.MessageDescriptions.GetAccessor(message); var fields = messageAccessor.Serialize(); MessageReceivingEndpoint recipient = null; var directedMessage = message as IDirectedProtocolMessage; var directResponse = message as IDirectResponseProtocolMessage; if (directedMessage != null && directedMessage.IsRequest()) { if (directedMessage.Recipient != null) { recipient = new MessageReceivingEndpoint(directedMessage.Recipient, directedMessage.HttpMethods); } clonedMessage = this.MessageFactory.GetNewRequestMessage(recipient, fields); } else if (directResponse != null && directResponse.IsDirectResponse()) { clonedMessage = this.MessageFactory.GetNewResponseMessage(directResponse.OriginatingRequest, fields); } else { throw new InvalidOperationException("Totally expected a message to implement one of the two derived interface types."); } ErrorUtilities.VerifyInternal(clonedMessage != null, "Message factory did not generate a message instance for " + message.GetType().Name); // Fill the cloned message with data. var clonedMessageAccessor = this.MessageDescriptions.GetAccessor(clonedMessage); clonedMessageAccessor.Deserialize(fields); return (T)clonedMessage; } private static IMessageFactory GetMessageFactory(Channel channel) { Contract.Requires(channel != null); return channel.MessageFactoryTestHook; } private IDictionary AwaitIncomingMessage(out MessageReceivingEndpoint recipient) { // Special care should be taken so that we don't indefinitely // wait for a message that may never come due to a bug in the product // or the test. // There are two scenarios that we need to watch out for: // 1. Two channels are waiting to receive messages from each other. // 2. One channel is waiting for a message that will never come because // the remote party has already finished executing. lock (waitingForMessageCoordinationLock) { // It's possible that a message was just barely transmitted either to this // or the remote channel. So it's ok for the remote channel to be waiting // if either it or we are already about to receive a message. ErrorUtilities.VerifyInternal(!this.RemoteChannel.waitingForMessage || this.RemoteChannel.incomingMessage != null || this.incomingMessage != null, "This channel is expecting an incoming message from another channel that is also blocked waiting for an incoming message from us!"); // It's permissible that the remote channel has already closed if it left a message // for us already. ErrorUtilities.VerifyInternal(!this.RemoteChannel.simulationCompleted || this.incomingMessage != null, "This channel is expecting an incoming message from another channel that has already been closed."); this.waitingForMessage = true; } this.incomingMessageSignal.WaitOne(); lock (waitingForMessageCoordinationLock) { this.waitingForMessage = false; var response = this.incomingMessage; recipient = this.incomingMessageRecipient; this.incomingMessage = null; this.incomingMessageRecipient = null; // Briefly signal to another thread that might be waiting for our inbox to be empty this.messageReceivedSignal.Set(); this.messageReceivedSignal.Reset(); return response; } } private void ProcessMessageFilter(IProtocolMessage message, bool outgoing) { if (outgoing) { if (this.outgoingMessageFilter != null) { this.outgoingMessageFilter(message); } } else { if (this.incomingMessageFilter != null) { this.incomingMessageFilter(message); } } } } }