Несколько месяцев назад я начал разрабатывать специальный проект, и мне была нужна возможность добавить несколько служб WCF и сделать их доступными через одну конечную точку. В поисках решения я наткнулся на службы маршрутизации WCF. Этот вид службы позволяет объединять несколько конечных точек WCF из нескольких служб WCF и предоставлять их через одну конечную точку (агрегация служб). Но это не единственное ваше преимущество.
Перехватывая сообщения, поступающие через службу маршрутизации WCF, вы также можете проверить, есть ли действительные токены пользователя, реализующие аутентификацию и многое другое. Со всей необходимой информацией на месте, я написал этот маленький POC ( P крышу O F Cодин раз), чтобы продемонстрировать, как это можно сделать. ? Вот официальное MSDN-определение службы маршрутизации WCF:
Служба
маршрутизации является универсальным
посредником SOAP, который действует как
маршрутизатор сообщений . Основная функциональность службы маршрутизации — это возможность маршрутизировать сообщения на
основе содержимого сообщения , что позволяет пересылать сообщение на конечную точку клиента на основе значения в самом сообщении либо в заголовке, либо в теле сообщения.
Другое большое преимущество служб WCF-Routing заключается в том, что маршруты могут быть настроены динамически во время выполнения с использованием
RoutingExtensions на
основе интерфейса IExtension . Это позволяет добавлять конечные точки других служб WCF с помощью фильтров сообщений. Вот официальное определение MSDN:
Фильтры сообщений, используемые службой маршрутизации, предоставляют общие функции выбора сообщений, такие как оценка имени конечной точки, на которую было отправлено сообщение, действия SOAP или адреса или префикса адреса, на который было отправлено сообщение. Фильтры также могут быть объединены с
условием AND , так что сообщения будут направляться только в конечную точку, если сообщение соответствует обоим фильтрам. Вы также можете создавать собственные фильтры, создавая собственную реализацию
MessageFilter .
Вы можете прочитать больше о различных типах фильтров сообщений здесь:
Фильтры сообщений в MSDN В этой реализации я использую
EndPointAddressMessageFilter для предоставления адреса конечной точки службы WCF в службе маршрутизации. Как упоминалось ранее, службы маршрутизации WCF можно динамически настраивать для добавления конечных точек службы WCF во время выполнения. Это то, что я хотел запустить полностью автоматизированным и масштабируемым в Windows Azure. Я не буду вдаваться в подробности того, как работают службы WCF-маршрутизации. Вы можете прочитать все о службах маршрутизации WCF в MSDN:
Маршрутизация
Основная Архитектура
Базовая архитектура решения не очень сложна. Основная часть — сама служба маршрутизации WCF. Если определенная WCF-служба хочет быть добавленной к маршрутизатору, она помещает определенное сообщение в служебную шину, и WCF-Routing-Service получает эти сообщения. Уже добавленные сервисы будут обрабатываться из SQL-таблицы, которая будет добавлена после перезапуска. Это означает, что нам не нужно перезапускать каждую WCF-службу, которая уже была добавлена в таблицу Service-Routing.
Конфигурация рабочей роли (служба маршрутизации WCF)
Вкладка «Конечные точки»
Нам нужна публичная конечная точка для нашей службы маршрутизации. На странице свойств для нашей рабочей роли мы можем установить общедоступную конечную точку ввода. Эта конечная точка будет принимать простые TCP-сообщения на предопределенном порту 10100.
Вкладка «Настройки»
Чтобы иметь возможность обращаться к услуге позже через доменное имя, а не только с использованием IP-адреса хоста, добавляется дополнительная запись «Домен». Поскольку служба работает только на локальном компьютере, для нее задано значение «localhost». Вы можете изменить это позже, если хотите запустить этот POC в Windows Azure.
Файл «ServcieDefinition.csdef»
Чтобы позволить рабочей роли запускаться с повышенными привилегиями и открывать пользовательские порты для своих конечных точек, в тег «WorkerRole» необходимо добавить элемент «Runtime»:
<Runtime executionContext="elevated" />
Реализация POC
Реализация базовой службы маршрутизации
Нам нужно зарегистрировать узел службы WCF типа
RoutingService в нашей рабочей роли, чтобы запустить новую службу WCF-Routing-Service. Лично я предпочитаю настраивать каждую WCF-службу с помощью кода, а не с помощью файла конфигурации XML. Вот как выглядит рабочая роль с базовой реализацией WCF-Router (подробнее об этом позже):
using System; using System.Diagnostics; using System.Linq; using System.Net; using System.Threading; using Microsoft.WindowsAzure.ServiceRuntime; using System.ServiceModel; using System.ServiceModel.Routing; using SelfUpdatingServiceRouter.Behaviours; using System.ServiceModel.Description; namespace SelfUpdatingServiceRouter { public class WorkerRole : RoleEntryPoint { private string endPointAddress; private string listenAddress; private RoleInstanceEndpoint endpointAzure; /// <summary> /// Called by Windows Azure after the role instance has been initialized. This method /// serves as the /// main thread of execution for your role. /// </summary> /// <remarks> /// <para> /// Override the Run method to implement your own code to manage the role's execution. /// The Run method should implement /// a long-running thread that carries out operations for the role. The default implementation /// sleeps for an infinite /// period, blocking return indefinitely. /// </para> /// <para> /// The role recycles when the Run method returns. /// </para> /// <para> /// Any exception that occurs within the Run method is an unhandled exception. /// </para> /// </remarks> public override void Run() { //Create a new WCF host of type RoutingService using (ServiceHost host = new ServiceHost(typeof(RoutingService))) { //Configure the host this.ConfigureServiceHost(host); while (true) { Thread.Sleep(10000); Trace.TraceInformation("Routing Service Working...", "Information"); Trace.TraceInformation(listenAddress); } } } /// <summary> /// Configures the service host. /// </summary> /// <param name="host">The host.</param> private void ConfigureServiceHost(ServiceHost host) { try { endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["RoutingServiceMain"]; this.endPointAddress = string.Format("http://{0}/ServiceRouter", endpointAzure.IPEndpoint); this.listenAddress = string.Format("http://{0}:{1}/ServiceRouter/", RoleEnvironment.GetConfigurationSettingValue("Domain"), endpointAzure.IPEndpoint.Port); var httpBinding = new BasicHttpBinding(); httpBinding.SendTimeout = TimeSpan.FromMinutes(1); httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(1); var routerEndpoint = host.AddServiceEndpoint(typeof(IRequestReplyRouter), httpBinding, this.endPointAddress, new Uri(this.listenAddress)); routerEndpoint.Name = "RouterMain"; host.Description.Behaviors.Add(new RoutingBehavior(new RoutingConfiguration())); host.Description.Behaviors.Add(new RoutingUpdateBehaviour()); ServiceMetadataBehavior smb = new ServiceMetadataBehavior(); smb.HttpGetEnabled = true; smb.HttpGetUrl = new Uri(this.endPointAddress); host.Description.Behaviors.Add(smb); ServiceDebugBehavior debug = host.Description.Behaviors.Find<ServiceDebugBehavior>(); // if not found - add behavior with setting turned on if (debug == null) { host.Description.Behaviors.Add( new ServiceDebugBehavior() { IncludeExceptionDetailInFaults = true }); } else { // make sure setting is turned ON if (!debug.IncludeExceptionDetailInFaults) { debug.IncludeExceptionDetailInFaults = true; } } host.Open(); } catch (Exception ex) { Trace.TraceError(ex.Message); } } /// <summary> /// Called by Windows Azure to initialize the role instance. /// </summary> /// <remarks> /// <para> /// Override the OnStart method to run initialization code for your role. /// </para> /// <para> /// Before the OnStart method returns, the instance's status is set to Busy and the /// instance is not available /// for requests via the load balancer. /// </para> /// <para> /// If the OnStart method returns false, the instance is immediately stopped. If /// the method /// returns true, then Windows Azure starts the role by calling the <see cref="M:Microsoft.WindowsAzure.ServiceRuntime.RoleEntryPoint.Run" /> /// method. /// </para> /// <para> /// A web role can include initialization code in the ASP.NET Application_Start method /// instead of the OnStart method. /// Application_Start is called after the OnStart method. /// </para> /// <para> /// Any exception that occurs within the OnStart method is an unhandled exception. /// </para> /// </remarks> /// <returns> /// True if initialization succeeds, False if it fails. The default implementation /// returns True. /// </returns> public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // For information on handling configuration changes // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. return base.OnStart(); } } }
Автобусная часть — фон
В первой версии этого сервиса я использовал чистые сборки Service Bus .NET. Несколько дней назад я обнаружил
библиотеку CloudFx (Cloud Application Framework & Extensions) в NuGet. Эта библиотека была изначально написана людьми Microsoft для ускорения разработки облачных проектов. Он основан на библиотеке «Reactive Extensions» (Rx). Библиотека CloudFx позволяет использовать шаблон публикации / подписки для потоков данных. Потоки данных могут быть событиями, лентами Twitter, запросами веб-служб и т. Д. С помощью LINQ. Вот небольшой пример, который использует событие MouseClick с шаблоном публикации / подписки (Источник: MSDN):
///Declare an observable public ISubject<MouseEventArgs> MouseMove; ///Publish data MouseMove.OnNext(args); ///Subscribe to an observable MouseMove.Subscribe(args => Display(args));
Как вы можете видеть, больше нет обычного шаблона Event / Delegate. Создается
объект ISubject типа
MouseEventArgs, и в следующий раз, когда происходит событие мыши, EventArgs публикуются для всех подписчиков. Несколько строк кода для создания удивительности — очень нравится! Вот некоторые из замечательных возможностей:
- Отправлять и получать сообщения асинхронно, используя раздел служебной шины
- Отправка и получение одноадресных и многоадресных сообщений с помощью служебной шины
- Простое использование методов «Положить» и «Получить» (отправить и получить) (Очень удобно для загрузки и скачивания больших двоичных объектов)
- Обработка ошибок
- Межрелигиозное общение
В целом, это значительно сокращает код, необходимый для реализации этих сценариев. Пожалуйста, посмотрите отличные
образцы CloudFx на MSDN .
Базовая реализация DAL
Класс RouteMeModel
using System; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations.Schema; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Models { /// <summary> /// The model class holding /// the WCF properties to be /// sent to the routing service. /// </summary> [Table("RouteModel")] public class RouteMeModel { /// <summary> /// Gets or sets the serice id. /// </summary> /// <value>The serice id.</value> [Key] [Column("ServiceUID")] public string SericeUID { get; set; } /// <summary> /// Gets or sets the end point address. /// </summary> /// <value>The end point address.</value> [Column("EndPointAddress")] public string EndPointAddress { get; set; } /// <summary> /// Gets or sets the name of the service. /// </summary> /// <value>The name of the service.</value> [Column("ServiceName")] public string ServiceName { get; set; } /// <summary> /// Gets or sets the name of the contract. /// </summary> /// <value>The name of the contract.</value> [Column("ContractName")] public string ContractName { get; set; } /// <summary> /// Gets or sets the full name of the assembly. /// </summary> /// <value>The full name of the assembly.</value> [Column("FullAssemblyName")] public string FullAssemblyName { get; set; } } }
Класс
RoutMeModel будет передавать все необходимые данные по служебной шине в службу маршрутизации, которая включает в себя:
- Адрес конечной точки
- Наименование услуги
- Полное название контракта (включая пространство имен), реализованное сервисом
- Имя файла исполняемой сборки, в которой размещена реализация сервиса
Класс RouteModel также будет использоваться в качестве EF-объекта для управления данными конечной точки WCF.
Предпосылки
Для нашей локальной разработки мы будем использовать SQL Server Express 2012. Если на вашем локальном компьютере не установлен SQL Server Express 2012, вы можете получить его здесь:
Microsoft SQL Server 2012 Express
Добавление реализации DbContext и настройка ее с помощью Fluent-Configuration.
Поскольку у нас очень простая модель, конфигурация нашего
DbContext :
using System; using System.Collections.Generic; using System.Configuration; using System.Data.Entity; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Models { public class RouteContext:DbContext { /// <summary> /// Gets or sets the services. /// </summary> /// <value>The services.</value> public DbSet<RouteMeModel> Services { get; set; } //TODO:INSERT YOUR CONNECTION STRING HERE public RouteContext() : base(@"[YOUR SQL CONNECTION STRING HERE") { } /// <summary> /// This method is called when the model for a derived context has been initialized, /// but /// before the model has been locked down and used to initialize the context. The /// default /// implementation of this method does nothing, but it can be overridden in a derived /// class /// such that the model can be further configured before it is locked down. /// </summary> /// <param name="modelBuilder">The builder that defines the model for the context /// being created.</param> /// <remarks> /// Typically, this method is called only once when the first instance of a derived /// context /// is created. The model for that context is then cached and is for all further /// instances of /// the context in the app domain. This caching can be disabled by setting the ModelCaching /// property on the given ModelBuidler, but note that this can seriously degrade /// performance. /// More control over caching is provided through use of the DbModelBuilder and DbContextFactory /// classes directly. /// </remarks> protected override void OnModelCreating(DbModelBuilder modelBuilder) { //Autodetect changes this.Configuration.AutoDetectChangesEnabled = true; //Define the primary key modelBuilder.Entity<RouteMeModel>().HasKey<string>(r => r.SericeUID); //set the connstraints for the entity fields modelBuilder.Entity<RouteMeModel>().Property(r => r.ServiceName).HasMaxLength(120); //Max lenght for a identifier modelBuilder.Entity<RouteMeModel>().Property(r => r.ContractName).HasMaxLength(511); //Max lenght for a URL in a browser. Should fit modelBuilder.Entity<RouteMeModel>().Property(r => r.EndPointAddress).HasMaxLength(2083); //Max length for a assemlby name. Including 256 chars for MAX_PATH + assembly full name //assuming it has to fit using the standard GAC path modelBuilder.Entity<RouteMeModel>().Property(r => r.FullAssemblyName).HasMaxLength(356); //let it do the needed stuff //base.OnModelCreating(modelBuilder); } } }
Следующее, что нужно сделать, это сгенерировать таблицу данных с помощью «Консоли диспетчера пакетов» и EF. Сначала нужно включить миграцию, затем добавить миграцию, и последнее, что нужно сделать, это обновить базу данных. Вот три команды для ввода в «Консоль диспетчера пакетов»:
- Enable-миграции
- add-миграция Initial (начальная — имя для миграции)
- обновление базы данных
Это создаст таблицу данных с использованием подхода «сначала код».
Динамическая настройка WCF-маршрутизатора — базовый сценарий
Должна быть возможность отправки данных конечной точки WCF-службы подписчику раздела служебной шины. Подписчик (в нашем случае служба маршрутизации WCF) управляет полученными данными конечной точки и преобразует их в конечные точки, которые необходимо динамически добавить в таблицу маршрутизации службы маршрутизации WCF. Это префектный вариант использования для
одноадресного сценария. Сообщения адресованы только
одному конкретному получателю, который является службой маршрутизации WCF.
Установка CloudFX с использованием NuGet
Нам нужно установить последнюю предварительную версию, чтобы она работала с последней версией Azure SDK (2.1 на момент написания этой статьи).
Настройка CloudFx
CloudFx настраивается с использованием стандартных файлов app.config. Чтобы упростить процесс настройки, я взял файл конфигурации из примера решения CloudFX и адаптировал его в соответствии со своими потребностями. Наиболее важная настройка — это та часть, где происходят настройки служебной шины:
<ServiceBusConfiguration defaultEndpoint="wcfrouter" defaultNamespace="[YOUR NAMESPACE HERE]" defaultIssuerName="owner" defaultIssuerSecret="[YOUR KEY HERE] <add name="wcfrouter" endpointType="Topic" topicPath="WcfRouterSample" /> </ServiceBusConfiguration>
Короче говоря, значения:
- defaultEndPoint — это имя конечной точки CloudFx.
- defaultNamespace — ваше имя пространства имен служебной шины
- defaultIssuer — это источник вашего токена ACS в Azure (стандартный владелец)
- defaultIssuerSecret — это ключ по умолчанию (токен ACS), выданный «владельцем»
- endPointType — это тип конечной точки, такой как тема, очередь, ретранслятор и т. д.
- topicPath Здесь вы задаете название темы, которая будет создана, если она еще не существует.
Для имени подписки будет установлено свойство «Кому» контекста сообщения маршрутизации. В данном примере «ServiceRouter». Подробнее о контексте сообщения маршрутизации позже.
Реализация на стороне маршрутизатора WCF
На стороне WCF-маршрутизатора необходимо выполнить следующие действия:
- Проверьте, есть ли записи в таблице RouteModel , если да, проверьте, были ли добавлены службы, если нет, добавьте новый маршрут в таблицу.
- Прослушивание входящих запросов, сигнализирующих о том, что WCF-служба хочет быть добавленной. Прежде чем это произойдет, проверьте, есть ли сервис в базе данных, если это так, проверьте, изменилось ли что-то, и обновите, а не просто добавьте или перезагрузите его из базы данных.
- Маршрутизация происходит на основе фильтрации адресов конечных точек
- Таблица фильтров и фильтры будут добавляться динамически по запросу
- Контракты будут добавляться динамически, используя описания контрактов.
- Отдельная сборка, содержащая договор на обслуживание, будет загружена из хранилища BLOB-объектов и использована для описания договора.
- Функциональность динамического обновления будет реализована с использованием пользовательского поведения и реализации пользовательского расширения.
Все требования упакованы в пользовательскую реализацию IExtension <T>, которую можно найти в пространстве имен System.ServiceModel (System.ServiceModel.dll). Интерфейс IExtension <T> позволяет расширить
- System.ServiceModel.IExtensibleObject <Т>
- System.ServcieModel.IContextChannel
- System.ServiceModel.ServiceHost (вот что здесь происходит)
- System.ServiceModel.InstanceContext
- System.ServiceModel.OperationContext
используя шаблон расширяемого объекта. Подробнее об этом можно прочитать здесь:
Интерфейс IExtension <T> в MSDN Подробнее о расширяемых объектах и шаблоне «ExtensibleObject <T>»
здесь . Чтобы сделать расширение доступным для нашей службы WCF-маршрутизации, мы делаем его доступным через поведение службы, которое мы можем добавить к нашей службе WCF-маршрутизации во время выполнения. Поведения — это в основном элементы конфигурации WCF, которые позволяют расширять определенные объекты WCF и добавлять дополнительные функциональные возможности.
Лично я рассматриваю поведение сервиса как своего рода «загрузчик», который позволяет вам добавлять «плагины» (расширения) к WCF-сервису во время выполнения (для этого я их и использую). Поведение службы раскрывает узел службы, если вы реализуете интерфейс IServiceBehaviour, так что вы можете добавлять расширения для этого конкретного узла. «Хук», который мы можем использовать для добавления расширения к текущему хосту службы, заключается в реализации интерфейса IServiceBehavior и наследовании от абстрактного класса BehaviourExtensionElement (представляет элемент конфигурации WCF). Затем мы можем использовать метод ApplyDispatchBehaviour (происходит от IServiceBehavior) и добавить наше расширение к сервис-хосту.
using System; using System.Collections.Generic; using System.Linq; using System.ServiceModel.Configuration; using System.ServiceModel.Description; using System.Text; using System.Threading.Tasks; using SelfUpdatingServiceRouter.Extensions; namespace SelfUpdatingServiceRouter.Behaviours { public class RoutingUpdateBehaviour : BehaviorExtensionElement, IServiceBehavior { /// <summary> /// Gets the type of behavior. /// </summary> /// <returns>The type of behavior.</returns> /// <value></value> public override Type BehaviorType { get { return typeof(RoutingUpdateBehaviour); } } /// <summary> /// Creates a behavior extension based on the current configuration settings. /// </summary> /// <returns>The behavior extension.</returns> protected override object CreateBehavior() { return new RoutingUpdateBehaviour(); } /// <summary> /// Adds the binding parameters. /// </summary> /// <param name="serviceDescription">The service description.</param> /// <param name="serviceHostBase">The service host base.</param> /// <param name="endpoints">The endpoints.</param> /// <param name="bindingParameters">The binding parameters.</param> public void AddBindingParameters(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase, System.Collections.ObjectModel.Collection<ServiceEndpoint> endpoints, System.ServiceModel.Channels.BindingParameterCollection bindingParameters) { //Not implemented } /// <summary> /// Provides the ability to change run-time property values or insert custom /// extension objects such as error handlers, message or parameter interceptors, /// security extensions, and other custom extension objects. /// </summary> /// <param name="serviceDescription">The service description.</param> /// <param name="serviceHostBase">The host that is currently being built.</param> public void ApplyDispatchBehavior(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase) { RouterUpdateExtension updateExtension = new RouterUpdateExtension(); serviceHostBase.Extensions.Add(updateExtension); } /// <summary> /// Provides the ability to inspect the service host and the service description /// to confirm that the service can run successfully. /// </summary> /// <param name="serviceDescription">The service description.</param> /// <param name="serviceHostBase">The service host that is currently being constructed.</param> public void Validate(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase) { //Not implemented } } }
Расширение для нашего хоста-службы (в данном случае это служба маршрутизации WCF) реализуется путем реализации интерфейса IExtension <ServiceHostBase> и интерфейса IDisposable. Это самый мощный фрагмент кода во всем решении. Возможно, это наиболее полный пример того, как динамически настроить службу WCF-маршрутизации с использованием только кода.
using Microsoft.Experience.CloudFx.Framework.Configuration; using Microsoft.Experience.CloudFx.Framework.Messaging; using Microsoft.Experience.CloudFx.Framework.Storage; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.ServiceRuntime; using Models; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reactive; using System.Reflection; using System.ServiceModel; using System.ServiceModel.Description; using System.ServiceModel.Dispatcher; using System.ServiceModel.Routing; using System.Text; using System.Threading.Tasks; namespace SelfUpdatingServiceRouter.Extensions { class RouterUpdateExtension : IExtension<ServiceHostBase>, IDisposable { private RoleInstanceEndpoint endpointAzure; ServiceHostBase owner; IObserver<RouteMeModel> modelObserver; ServiceBusPublishSubscribeChannel pubSubChannel; List<ServiceEndpoint> serviceEndPoints; //The routing configuration to use RoutingConfiguration rc; /// <summary> /// Attaches the specified owner. /// </summary> /// <param name="owner">The owner.</param> public void Attach(ServiceHostBase owner) { this.owner = owner; //This could be done using the owner, you can read it near the bottom of this code-file endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["RoutingServiceMain"]; //start the fun this.Init(); } /// <summary> /// Detaches the specified owner. /// </summary> /// <param name="owner">The owner.</param> public void Detach(ServiceHostBase owner) { this.Dispose(); } public void Dispose() { //Dispose all the stuff } /// <summary> /// Inits the Service Bus stuff /// </summary> private void Init() { this.serviceEndPoints = new List<ServiceEndpoint>(); this.rc = new RoutingConfiguration(); this.AddRoutingEntries(); this.SetupServiceBus(); } /// <summary> /// Adds the routing entries. /// </summary> private void AddRoutingEntries() { using (var ctx = new RouteContext()) { if (ctx.Services.Count() > 0) { foreach (var entry in ctx.Services) { //Add a route for the service AddServiceBusEntry(entry); } } } } /// <summary> /// Setups the service bus. /// </summary> private void SetupServiceBus() { //Setup the service subscription channel var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName); pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint)); CreateSubscriptionForService(pubSubChannel, "RoutingService"); } /// <summary> /// Creates the subscription for service. /// </summary> /// <param name="pubSubChannel">The pub sub channel.</param> /// <param name="p">The p.</param> private void CreateSubscriptionForService(ServiceBusPublishSubscribeChannel pubSubChannel, string serviceName) { //Define a filter to receive only messages that have a specific "To" property var filter = FilterExpressions.GroupOr( FilterExpressions.MatchTo(serviceName), FilterExpressions.MatchTo("ServiceRouter")); //Now we need to create an observer, that will check for new incoming messages modelObserver = Observer.Create<RouteMeModel>(msg => { var exists = CheckIfRoutingEntryExists(msg); if (!exists) { AddNewServiceEntry(msg); } }); pubSubChannel.Subscribe(serviceName,modelObserver,filter); } /// <summary> /// Adds the new service entry. /// </summary> /// <param name="msg">The MSG.</param> private void AddNewServiceEntry(RouteMeModel msg) { using (var ctx = new RouteContext()) { ctx.ChangeTracker.DetectChanges(); msg.SericeUID = Guid.NewGuid().ToString(); ctx.Services.Add(msg); ctx.SaveChanges(); //Add a route for the service AddServiceBusEntry(msg); } } /// <summary> /// Checks if routing entry exists. /// </summary> /// <param name="msg">The MSG.</param> /// <returns></returns> private bool CheckIfRoutingEntryExists(RouteMeModel msg) { using (var ctx = new RouteContext()) { var entry = (from service in ctx.Services where service.ServiceName.Equals(msg.ServiceName) && msg.ContractName.Equals(msg.ContractName) select service).FirstOrDefault(); if(entry == null) { return false; } else { return true; } } } /// <summary> /// Adds the service bus entry. /// </summary> /// <param name="message">The message.</param> private void AddServiceBusEntry(RouteMeModel message) { //Load the contract assembly from blob storage //Get the current type of contract to add to the client endpoint var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString"); var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection)); var contractAssemblyContainer = CloudConfigurationManager.GetSetting("AssemblyContainerName"); var contractAssemblyName = CloudConfigurationManager.GetSetting("ContractAssemblyName"); byte[] data = null; using (MemoryStream mstream = new MemoryStream()) { var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream); if (gotIt) { //Get the byte content. This methods does not care about the position data = mstream.ToArray(); } } //Now we load the contract assembly var assembly = Assembly.Load(data); Type contractType = assembly.GetType(message.ContractName); //The contract description we use var conDesc = ContractDescription.GetContract(contractType); var HTTPbinding = new BasicHttpBinding(); var currentServiceEndPoint = new ServiceEndpoint( conDesc, HTTPbinding, new EndpointAddress(message.EndPointAddress)); currentServiceEndPoint.Name = message.ServiceName; var routerMainEndpoint = owner.Description.Endpoints.Where(ep => ep.Name == "RouterMain").FirstOrDefault(); var conDescRouter = ContractDescription.GetContract(typeof(IRequestReplyRouter)); var rEndPoint = new ServiceEndpoint(conDescRouter,new BasicHttpBinding(), new EndpointAddress( routerMainEndpoint.Address.Uri.OriginalString +"/" + message.ServiceName)); rEndPoint.Name = message.ServiceName; this.owner.AddServiceEndpoint(rEndPoint); var addressFilter = new EndpointAddressMessageFilter(new EndpointAddress(routerMainEndpoint.Address.Uri.OriginalString + "/" + message.ServiceName)); //We don't want to route on headers only rc.RouteOnHeadersOnly = false; //Add the filter table rc.FilterTable.Add(addressFilter, new List<ServiceEndpoint>() { currentServiceEndPoint }); //Apply the dynamic configuration this.owner.Extensions.Find<RoutingExtension>().ApplyConfiguration(rc); } /// <summary> /// Checks if end point was added. /// </summary> /// <param name="host">The host.</param> /// <param name="endpointAddress">The endpoint address.</param> /// <returns></returns> private static bool CheckIfEndPointWasAdded(ServiceHostBase host, string endpointAddress) { bool isPresent = false; foreach (var endpoint in host.Description.Endpoints.ToList()) { if (endpoint.ListenUri.AbsoluteUri.Equals(endpointAddress)) { isPresent = true; break; } } return isPresent; } } }
Основными игроками в этой реализации являются следующие методы:
- AddRoutingEntries и AddServiceBusEntry, оба отвечают за добавление записей конечных точек в базу данных
- SetupServiceBus и CreateSubscriptionForService, оба отвечают за настройку подписки на тему служебной шины, управляемой CloudFx
- AddServiceBusEntry (должен быть переименован), отвечающий за динамическую конфигурацию WCF-маршрутизатора
Давайте рассмотрим некоторые методы и сосредоточимся на их реализации. Сначала методы, которые отвечают за настройку CloudFx.
SetupServiceBus-метод
Этот метод загружает конфигурацию служебной шины CloudFx с помощью специального класса CloudApplicationConfiguration CloudFx. После загрузки конфигурации он устанавливает ServicePublishSubscribeChannel, используя конечную точку конфигурации по умолчанию для загрузки конфигурации служебной шины из файла app.config. ServicePublishSubscribeChannel можно использовать для публикации сообщений по определенной теме служебной шины или для подписки на сообщения по определенной теме служебной шины. В этом случае мы делаем подписку и ждем поступления тематических сообщений.
/// <summary> /// Setups the service bus. /// </summary> private void SetupServiceBus() { //Setup the service subscription channel var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName); pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint)); CreateSubscriptionForService(pubSubChannel, "RoutingService"); }
CreateSubscriptionForService-Method
Вот где происходит настоящее волшебство CloudFx. Сначала мы устанавливаем фильтр-выражение для входящих сообщений. Мы получаем все сообщения, которые содержат либо имя службы, либо строковую константу «ServiceRouter». Все остальные сообщения не представляют интереса. Затем мы создаем Observer <RouteModel>. Наблюдатель проверит тему служебной шины на наличие соответствующих сообщений, используя критерии фильтра, которые мы определили ранее. Последнее, что мы делаем, это подписываемся на наш паб / подканал, используя имя службы (название канала) нашего наблюдателя и фильтр. С этого момента работает цикл обработки сообщений, готовый принимать сообщения асинхронно. Это круто!
/// <summary> /// Creates the subscription for service. /// </summary> /// <param name="pubSubChannel">The pub sub channel.</param> /// <param name="p">The p.</param> private void CreateSubscriptionForService(ServiceBusPublishSubscribeChannel pubSubChannel, string serviceName) { //Define a filter to receive only messages that have a specific "To" property var filter = FilterExpressions.GroupOr( FilterExpressions.MatchTo(serviceName), FilterExpressions.MatchTo("ServiceRouter")); //Now we need to create an observer, that will check for new incoming messages modelObserver = Observer.Create<RouteMeModel>(msg => { var exists = CheckIfRoutingEntryExists(msg); if (!exists) { AddNewServiceEntry(msg); } }); pubSubChannel.Subscribe(serviceName,modelObserver,filter); }
Метод AddServiceBusEntry
Этот метод реализует некоторые действительно интересные вещи:
- Загрузка контрактной сборки для сервисов, которые будут добавлены в WCF-маршрутизатор из хранилища BLOB-объектов Azure с использованием CloudFx!
- Он добавляет адреса конечных точек служб WCF для маршрутизации, а также фильтры к службе маршрутизации WCF и переконфигурирует службу маршрутизации WCF во время выполнения!
Чтобы загрузить контрактную сборку из хранилища BLOB-объектов Azure, нам потребуется всего три строки кода: следующая строка загружает параметры конфигурации (строку подключения служебной шины) из нашего файла конфигурации
var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString");
Эта строка создает новый экземпляр ReliableCloudBlobStorage:
var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection));
и эта строка загружает блок контрактной сборки, используя экземпляр ReliableCloudBlobStorage:
var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream);
Возвращает true, если загрузка прошла успешно, иначе false. Вот и весь код, необходимый для загрузки блоба. Для загрузки блоба вы используете Put-метод ?
/// <summary> /// Adds the service bus entry. /// </summary> /// <param name="message">The message.</param> private void AddServiceBusEntry(RouteMeModel message) { //Load the contract assembly from blob storage //Get the current type of contract to add to the client endpoint var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString"); var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection)); var contractAssemblyContainer = CloudConfigurationManager.GetSetting("AssemblyContainerName"); var contractAssemblyName = CloudConfigurationManager.GetSetting("ContractAssemblyName"); byte[] data = null; using (MemoryStream mstream = new MemoryStream()) { var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream); if (gotIt) { //Get the byte content. This methods does not care about the position data = mstream.ToArray(); } } //Now we load the contract assembly var assembly = Assembly.Load(data); Type contractType = assembly.GetType(message.ContractName); //The contract description we use var conDesc = ContractDescription.GetContract(contractType); var HTTPbinding = new BasicHttpBinding(); var currentServiceEndPoint = new ServiceEndpoint( conDesc, HTTPbinding, new EndpointAddress(message.EndPointAddress)); currentServiceEndPoint.Name = message.ServiceName; var routerMainEndpoint = owner.Description.Endpoints.Where(ep => ep.Name == "RouterMain").FirstOrDefault(); var conDescRouter = ContractDescription.GetContract(typeof(IRequestReplyRouter)); var rEndPoint = new ServiceEndpoint(conDescRouter,new BasicHttpBinding(), new EndpointAddress( routerMainEndpoint.Address.Uri.OriginalString +"/" + message.ServiceName)); rEndPoint.Name = message.ServiceName; this.owner.AddServiceEndpoint(rEndPoint); var addressFilter = new EndpointAddressMessageFilter(new EndpointAddress(routerMainEndpoint.Address.Uri.OriginalString + "/" + message.ServiceName)); //We don't want to route on headers only rc.RouteOnHeadersOnly = false; //Add the filter table rc.FilterTable.Add(addressFilter, new List<ServiceEndpoint>() { currentServiceEndPoint }); //Apply the dynamic configuration this.owner.Extensions.Find<RoutingExtension>().ApplyConfiguration(rc); }
Издательская часть
Издательская часть реализована в отдельной сборке под названием «ServiceMessenger». Он содержит только один класс, класс Messenger. На эту сборку должны ссылаться любые службы WCF, которые хотят добавить свои конечные точки в маршрутизатор WCF.
using Microsoft.Experience.CloudFx.Framework.Configuration; using Microsoft.Experience.CloudFx.Framework.Messaging; using Models; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ServiceMessenger { public class Messenger { private RouteMeModel model; /// <summary> /// Initializes a new instance of the <see cref="Messenger" /> class. /// </summary> /// <param name="servcieModel">The servcie model.</param> public Messenger(string endPointAddress, string contracName, string assemblyName, string serviceName) { this.model = new RouteMeModel() { ContractName = contracName, EndPointAddress = endPointAddress, FullAssemblyName = assemblyName, ServiceName = serviceName }; } /// <summary> /// Setups the service bus. /// </summary> public void SendMessageToRouter() { //Setup the service subscription channel var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName); var routerServiceCtx = new RoutingMessageContext { To = "ServiceRouter" }; using (var pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint))) { pubSubChannel.Settings.MessageTimeToLive = TimeSpan.FromSeconds(120); //publish the message pubSubChannel.Publish(this.model, routerServiceCtx); } } } }
Создание нового экземпляра позволяет передавать все важные параметры для создания новой записи маршрутизации и отправлять ее через служебную шину в службу WCF-Routing, где расширение RouterUpdate получит данные и добавит новую конечную точку службы, если она еще не существует Процесс инициализации для создания нового канала ServcieBusPublishSubscriber идентичен тому, который мы использовали для подписки на тему служебной шины.
/// <summary> /// Setups the service bus. /// </summary> public void SendMessageToRouter() { //Setup the service subscription channel var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName); var routerServiceCtx = new RoutingMessageContext { To = "ServiceRouter" }; using (var pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint))) { pubSubChannel.Settings.MessageTimeToLive = TimeSpan.FromSeconds(120); //publish the message pubSubChannel.Publish(this.model, routerServiceCtx); } }
Есть два отличия:
- Создается экземпляр RoutingMessageContext, а получатель устанавливается через свойство «Кому»
- Метод Publish ServiceBusPublishSubscriberChannel используется для отправки экземпляра класса RouteModel по проводной сети, который содержит данные конечной точки службы, которую требуется добавить в службу маршрутизации WCF.
Это все, что нужно сделать, чтобы опубликовать сообщение для определенной темы служебной шины с помощью CloudFx!
Примеры WCF-сервисов и WCF-тестовый клиент
Существуют две службы WCF, которые публикуют свои конечные точки с помощью сборки ServiceMessenger на маршрутизаторе WCF:
- Привет мир
- и HelloWorldExtended
Каждый из них реализует простой сервис-метод, который возвращает строку. Обе службы имеют одну конечную точку и одно поведение публикации метаданных. Оба сервисных интерфейса определены в ContractAssembly, который был загружен в локальное хранилище для тестирования (это то, что вы должны сделать также, если вы хотите протестировать решение). Службы размещены в отдельных рабочих ролях, которые практически идентичны. За исключением конечных точек обслуживания:
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Threading; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.Diagnostics; using Microsoft.WindowsAzure.ServiceRuntime; using Microsoft.WindowsAzure.Storage; using System.ServiceModel; using ContractAssembly; using ServiceMessenger; using System.ServiceModel.Description; namespace HelloWorld { public class WorkerRole : RoleEntryPoint { private string endPointAddress; private string listenAddress; private RoleInstanceEndpoint endpointAzure; public override void Run() { using (ServiceHost host = new ServiceHost(typeof(HelloWorld))) { Thread.Sleep(60000); // This is a sample worker implementation. Replace with your logic. Trace.TraceInformation("HelloWorldExtended entry point called", "Information"); this.ConfigureServiceHost(host); while (true) { Thread.Sleep(1000); Trace.TraceInformation(endPointAddress); } } } /// <summary> /// Configures the service host. /// </summary> /// <param name="host">The host.</param> private void ConfigureServiceHost(ServiceHost host) { try { endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["HelloWorldMain"]; this.endPointAddress = string.Format("http://{0}/HelloWorld", endpointAzure.IPEndpoint); this.listenAddress = string.Format("http://{0}:{1}/HelloWorld/", RoleEnvironment.GetConfigurationSettingValue("Domain"), endpointAzure.IPEndpoint.Port); var httpBinding = new BasicHttpBinding(); httpBinding.SendTimeout = TimeSpan.FromMinutes(1); httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(1); var helloWorldExtendedEndPoint = host.AddServiceEndpoint(typeof(IHelloWorldService), httpBinding, this.endPointAddress, new Uri(this.listenAddress)); helloWorldExtendedEndPoint.Name = "HelloWorld"; var messenger = new Messenger(this.endPointAddress, "ContractAssembly.IHelloWorldService", "ContractAssembly.dll", "HelloWorld"); messenger.SendMessageToRouter(); ServiceMetadataBehavior smb = new ServiceMetadataBehavior(); smb.HttpGetEnabled = true; smb.HttpGetUrl = new Uri(this.endPointAddress); host.Description.Behaviors.Add(smb); host.Open(); } catch (Exception ex) { Trace.TraceError(ex.Message); } } public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // For information on handling configuration changes // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. return base.OnStart(); } } }
Чтобы опубликовать одну конечную точку каждого сервиса, мы используем две строки кода:
var messenger = new Messenger(this.endPointAddress, "ContractAssembly.IHelloWorldService", "ContractAssembly.dll", "HelloWorld"); messenger.SendMessageToRouter();
Мы создаем новый экземпляр Messenger и публикуем новую конечную точку службы, используя метод SendMessageToRouter. Милая! Для быстрого обзора решения, пожалуйста, посмотрите эту кодовую карту:
Ну, вот и все! Надеюсь, вам понравилось читать эту статью!
Код на GitHub