//-----------------------------------------------------------------------
//
// Copyright (c) Andrew Arnott. All rights reserved.
//
//-----------------------------------------------------------------------
namespace DotNetOpenAuth.Test.Mocks {
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Text;
using System.Threading;
using DotNetOpenAuth.Messaging;
using DotNetOpenAuth.Messaging.Reflection;
using DotNetOpenAuth.Test.OpenId;
using NUnit.Framework;
internal class CoordinatingChannel : Channel {
///
/// A lock to use when checking and setting the
/// or the fields.
///
///
/// This is a static member so that all coordinating channels share a lock
/// since they peak at each others fields.
///
private static readonly object waitingForMessageCoordinationLock = new object();
///
/// The original product channel whose behavior is being modified to work
/// better in automated testing.
///
private Channel wrappedChannel;
///
/// A flag set to true when this party in a two-party test has completed
/// its part of the testing.
///
private bool simulationCompleted;
///
/// A thread-coordinating signal that is set when another thread has a
/// message ready for this channel to receive.
///
private EventWaitHandle incomingMessageSignal = new AutoResetEvent(false);
///
/// A thread-coordinating signal that is set briefly by this thread whenever
/// a message is picked up.
///
private EventWaitHandle messageReceivedSignal = new AutoResetEvent(false);
///
/// A flag used to indicate when this channel is waiting for a message
/// to arrive.
///
private bool waitingForMessage;
///
/// An incoming message that has been posted by a remote channel and
/// is waiting for receipt by this channel.
///
private IDictionary incomingMessage;
private MessageReceivingEndpoint incomingMessageRecipient;
///
/// A delegate that gets a chance to peak at and fiddle with all
/// incoming messages.
///
private Action incomingMessageFilter;
///
/// A delegate that gets a chance to peak at and fiddle with all
/// outgoing messages.
///
private Action outgoingMessageFilter;
///
/// Initializes a new instance of the class.
///
/// The wrapped channel. Must not be null.
/// The incoming message filter. May be null.
/// The outgoing message filter. May be null.
internal CoordinatingChannel(Channel wrappedChannel, Action incomingMessageFilter, Action outgoingMessageFilter)
: base(GetMessageFactory(wrappedChannel), wrappedChannel.BindingElements.ToArray()) {
Contract.Requires(wrappedChannel != null);
this.wrappedChannel = wrappedChannel;
this.incomingMessageFilter = incomingMessageFilter;
this.outgoingMessageFilter = outgoingMessageFilter;
// Preserve any customized binding element ordering.
this.CustomizeBindingElementOrder(this.wrappedChannel.OutgoingBindingElements, this.wrappedChannel.IncomingBindingElements);
}
///
/// Gets or sets the coordinating channel used by the other party.
///
internal CoordinatingChannel RemoteChannel { get; set; }
///
/// Indicates that the simulation that uses this channel has completed work.
///
///
/// Calling this method is not strictly necessary, but it gives the channel
/// coordination a chance to recognize when another channel is left dangling
/// waiting for a message from another channel that may never come.
///
internal void Close() {
lock (waitingForMessageCoordinationLock) {
this.simulationCompleted = true;
if (this.RemoteChannel.waitingForMessage && this.RemoteChannel.incomingMessage == null) {
TestUtilities.TestLogger.Debug("CoordinatingChannel is closing while remote channel is waiting for an incoming message. Signaling channel to unblock it to receive a null message.");
this.RemoteChannel.incomingMessageSignal.Set();
}
this.Dispose();
}
}
///
/// Replays the specified message as if it were received again.
///
/// The message to replay.
internal void Replay(IProtocolMessage message) {
this.ProcessIncomingMessage(CloneSerializedParts(message));
}
///
/// Called from a remote party's thread to post a message to this channel for processing.
///
/// The message that this channel should receive. This message will be cloned.
internal void PostMessage(IProtocolMessage message) {
if (this.incomingMessage != null) {
// The remote party hasn't picked up the last message we sent them.
// Wait for a short period for them to pick it up before failing.
TestBase.TestLogger.Warn("We're blocked waiting to send a message to the remote party and they haven't processed the last message we sent them.");
this.RemoteChannel.messageReceivedSignal.WaitOne(500);
}
ErrorUtilities.VerifyInternal(this.incomingMessage == null, "Oops, a message is already waiting for the remote party!");
this.incomingMessage = this.MessageDescriptions.GetAccessor(message).Serialize();
var directedMessage = message as IDirectedProtocolMessage;
this.incomingMessageRecipient = directedMessage != null ? new MessageReceivingEndpoint(directedMessage.Recipient, directedMessage.HttpMethods) : null;
this.incomingMessageSignal.Set();
}
protected internal override HttpRequestInfo GetRequestFromContext() {
MessageReceivingEndpoint recipient;
var messageData = this.AwaitIncomingMessage(out recipient);
if (messageData != null) {
return new CoordinatingHttpRequestInfo(this, this.MessageFactory, messageData, recipient);
} else {
return new CoordinatingHttpRequestInfo(recipient);
}
}
protected override IProtocolMessage RequestCore(IDirectedProtocolMessage request) {
this.ProcessMessageFilter(request, true);
// Drop the outgoing message in the other channel's in-slot and let them know it's there.
this.RemoteChannel.PostMessage(request);
// Now wait for a response...
MessageReceivingEndpoint recipient;
IDictionary responseData = this.AwaitIncomingMessage(out recipient);
ErrorUtilities.VerifyInternal(recipient == null, "The recipient is expected to be null for direct responses.");
// And deserialize it.
IDirectResponseProtocolMessage responseMessage = this.MessageFactory.GetNewResponseMessage(request, responseData);
if (responseMessage == null) {
return null;
}
var responseAccessor = this.MessageDescriptions.GetAccessor(responseMessage);
responseAccessor.Deserialize(responseData);
this.ProcessMessageFilter(responseMessage, false);
return responseMessage;
}
protected override OutgoingWebResponse PrepareDirectResponse(IProtocolMessage response) {
this.ProcessMessageFilter(response, true);
return new CoordinatingOutgoingWebResponse(response, this.RemoteChannel);
}
protected override OutgoingWebResponse PrepareIndirectResponse(IDirectedProtocolMessage message) {
this.ProcessMessageFilter(message, true);
// In this mock transport, direct and indirect messages are the same.
return this.PrepareDirectResponse(message);
}
protected override IDirectedProtocolMessage ReadFromRequestCore(HttpRequestInfo request) {
if (request.Message != null) {
this.ProcessMessageFilter(request.Message, false);
}
return request.Message;
}
protected override IDictionary ReadFromResponseCore(IncomingWebResponse response) {
return this.wrappedChannel.ReadFromResponseCoreTestHook(response);
}
protected override void ProcessIncomingMessage(IProtocolMessage message) {
this.wrappedChannel.ProcessIncomingMessageTestHook(message);
}
///
/// Clones a message, instantiating the new instance using this channel's
/// message factory.
///
/// The type of message to clone.
/// The message to clone.
/// The new instance of the message.
///
/// This Clone method should not be used to send message clones to the remote
/// channel since their message factory is not used.
///
protected virtual T CloneSerializedParts(T message) where T : class, IProtocolMessage {
Contract.Requires(message != null);
IProtocolMessage clonedMessage;
var messageAccessor = this.MessageDescriptions.GetAccessor(message);
var fields = messageAccessor.Serialize();
MessageReceivingEndpoint recipient = null;
var directedMessage = message as IDirectedProtocolMessage;
var directResponse = message as IDirectResponseProtocolMessage;
if (directedMessage != null && directedMessage.IsRequest()) {
if (directedMessage.Recipient != null) {
recipient = new MessageReceivingEndpoint(directedMessage.Recipient, directedMessage.HttpMethods);
}
clonedMessage = this.MessageFactory.GetNewRequestMessage(recipient, fields);
} else if (directResponse != null && directResponse.IsDirectResponse()) {
clonedMessage = this.MessageFactory.GetNewResponseMessage(directResponse.OriginatingRequest, fields);
} else {
throw new InvalidOperationException("Totally expected a message to implement one of the two derived interface types.");
}
ErrorUtilities.VerifyInternal(clonedMessage != null, "Message factory did not generate a message instance for " + message.GetType().Name);
// Fill the cloned message with data.
var clonedMessageAccessor = this.MessageDescriptions.GetAccessor(clonedMessage);
clonedMessageAccessor.Deserialize(fields);
return (T)clonedMessage;
}
private static IMessageFactory GetMessageFactory(Channel channel) {
Contract.Requires(channel != null);
return channel.MessageFactoryTestHook;
}
private IDictionary AwaitIncomingMessage(out MessageReceivingEndpoint recipient) {
// Special care should be taken so that we don't indefinitely
// wait for a message that may never come due to a bug in the product
// or the test.
// There are two scenarios that we need to watch out for:
// 1. Two channels are waiting to receive messages from each other.
// 2. One channel is waiting for a message that will never come because
// the remote party has already finished executing.
lock (waitingForMessageCoordinationLock) {
// It's possible that a message was just barely transmitted either to this
// or the remote channel. So it's ok for the remote channel to be waiting
// if either it or we are already about to receive a message.
ErrorUtilities.VerifyInternal(!this.RemoteChannel.waitingForMessage || this.RemoteChannel.incomingMessage != null || this.incomingMessage != null, "This channel is expecting an incoming message from another channel that is also blocked waiting for an incoming message from us!");
// It's permissible that the remote channel has already closed if it left a message
// for us already.
ErrorUtilities.VerifyInternal(!this.RemoteChannel.simulationCompleted || this.incomingMessage != null, "This channel is expecting an incoming message from another channel that has already been closed.");
this.waitingForMessage = true;
}
this.incomingMessageSignal.WaitOne();
lock (waitingForMessageCoordinationLock) {
this.waitingForMessage = false;
var response = this.incomingMessage;
recipient = this.incomingMessageRecipient;
this.incomingMessage = null;
this.incomingMessageRecipient = null;
// Briefly signal to another thread that might be waiting for our inbox to be empty
this.messageReceivedSignal.Set();
this.messageReceivedSignal.Reset();
return response;
}
}
private void ProcessMessageFilter(IProtocolMessage message, bool outgoing) {
if (outgoing) {
if (this.outgoingMessageFilter != null) {
this.outgoingMessageFilter(message);
}
} else {
if (this.incomingMessageFilter != null) {
this.incomingMessageFilter(message);
}
}
}
}
}