Часто нам нужно выполнять определенные задачи повторно. В этой статье мы поговорим о создании простого планировщика задач в Windows Azure. На самом деле мы разработаем простой планировщик задач, который будет работать в рабочей роли Windows Azure. Мы также обсудим некоторые другие альтернативы, доступные нам в Windows Azure.
Проэкт
В целях демонстрации мы попытаемся создать простой сервис, который пингует некоторые общедоступные веб-сайты (например, www.microsoft.com и т. Д.) И сохраняет результат в Windows Azure Table Storage. Это очень похоже на сервис, предлагаемый Pingdom. В качестве аргумента давайте назовем эту услугу « Пингдом бедного человека » .
Мы будем хранить сайты, которые нам нужно пропинговать, в таблице в хранилище таблиц Windows Azure, и каждую минуту мы будем извлекать этот список оттуда, проверять их и затем сохранять результат обратно в хранилище таблиц Windows Azure (конечно, в другой таблице). ). Мы запустим этот сервис в двух экземплярах рабочей роли X-Small, чтобы показать, как мы можем обрабатывать проблемы параллелизма, чтобы каждый экземпляр обрабатывал уникальный набор веб-сайтов. Предположим, что всего мы пропингуем 10 веб-сайтов, и каждый экземпляр рабочей роли будет пинговать 5 веб-сайтов каждую минуту, чтобы нагрузка была равномерно распределена между несколькими экземплярами.
Реализация планировщика
В основе планировщика заданий лежит механизм планирования. Есть так много вариантов, доступных для вас. Вы можете использовать встроенные в TimeNet объекты .Net Framework или использовать доступные библиотеки третьей части. На мой взгляд, не стоит пытаться строить это самостоятельно и использовать то, что доступно там. Для целей этого проекта мы будем использовать Quartz Scheduler Engine ( http://quartznet.sourceforge.net/ ). Он чрезвычайно надежен, используется очень многими людьми и, наконец, с открытым исходным кодом. По своему опыту я обнаружил, что он чрезвычайно гибкий и простой в использовании.
Особенности дизайна
В среде с несколькими экземплярами нам нужно учесть несколько вещей:
Только один экземпляр выбирает основные данные
Мы хотим убедиться, что только один экземпляр извлекает основные данные, т.е. данные, необходимые для обработки планировщиком. Для этого мы будем полагаться на функциональность лизинга BLOB-объектов . Аренда — это эксклюзивная блокировка большого двоичного объекта, предотвращающая изменение этого большого двоичного объекта. В нашем приложении каждый экземпляр будет пытаться получить в аренду блоб, и только один экземпляр будет успешным. Экземпляр, который сможет получить аренду для BLOB-объекта (назовем его « Мастер-экземпляр »), получит основные данные. Все остальные случаи (назовем их « Slave Instances»») Просто подождет, пока с этими данными будет создан экземпляр мастера времени. Обратите внимание, что главный экземпляр на самом деле еще не выполнит задачу, т.е. в нашем случае пингует сайты. Он просто прочитает данные из источника и отправит их в какое-то место, откуда и главный, и подчиненный экземпляры будут выбирать данные и обрабатывать эти данные (т.е. в нашем случае пинговать сайты).
Разделение труда
Важно, чтобы мы в полной мере использовали все экземпляры, в которых работает наше приложение (в нашем случае 2). Итак, что произойдет, это то, что главный экземпляр извлечет данные из источника и поместит их в очередь, которая опрашивается всеми экземплярами. Для простоты в сообщении будет просто URL, который нам нужно пропинговать. Поскольку мы знаем, что существует два экземпляра, и нам нужно обработать десять веб-сайтов, каждый экземпляр будет «ПОЛУЧАТЬ» 5 сообщений. Затем каждый экземпляр будет читать содержимое сообщения (которое является URL-адресом), а затем пинговать эти URL-адреса и записывать результат.
Спусковой механизм
В обычных реализациях рабочих ролей рабочая роль находится в бесконечном цикле, в основном спящем. Он просыпается ото сна, делает некоторую обработку и возвращается ко сну. Поскольку для планирования мы полагаемся на Quartz, мы будем полагаться только на Quartz для запуска задач вместо рабочей роли. Это дало бы нам возможность вводить больше видов запланированных задач, не беспокоясь о том, чтобы реализовать их в нашей рабочей роли. Для объяснения давайте предположим, что нам нужно обработать 2 запланированных задания — одно выполняется каждую минуту, а другое — каждый час. Если бы мы реализовали это в логике сна рабочей роли, это стало бы несколько сложным. Когда вы начинаете добавлять все больше и больше запланированных задач, уровень сложности значительно возрастает. С Кварцем это действительно просто.
Сохраняя вещи простыми
Для простоты поста в этом блоге мы не будем беспокоиться о том, как обрабатывать различные ошибки. Мы просто предположим, что все в порядке, и нам не придется беспокоиться о временных ошибках из хранилища. В реальном приложении необходимо учитывать и эти вещи.
Архитектура высокого уровня
С учетом этих конструктивных соображений архитектура приложения и процесс будут выглядеть следующим образом:
Таким образом, каждую минуту, Кварц будет запускать задачу. Как только задача будет запущена, вот что произойдет:
- Каждый экземпляр будет пытаться получить аренду определенного блоба.
- Как известно, удастся только один экземпляр. Предположим, что основному экземпляру потребуется около 15 секунд, чтобы прочитать данные из источника и поместить их в очередь. Подчиненные экземпляры будут ждать 15 секунд, пока главный экземпляр сделает этот бит.
- Главный экземпляр будет извлекать данные из источника основных данных (в нашем случае Windows Azure Table Storage). Ведомые экземпляры все еще ждут.
- Главный экземпляр будет помещать данные в очередь. Ведомые экземпляры все еще ждут.
- Все экземпляры теперь будут «получать» сообщения из очереди. Реализуя семантику «GET» (вместо «PEEK»), мы следим за тем, чтобы сообщение выбиралось только одним экземпляром. Как только сообщение будет получено, оно будет немедленно удалено.
- Каждый экземпляр рабочей роли получает URI для проверки связи с содержимым сообщения и запускает процесс проверки данных. Пинг будет выполняться путем создания веб-запроса «Получить» для этого URI и чтения ответа.
- Как только результат проверки будет возвращен, мы сохраним результаты в хранилище таблиц, а затем дождемся следующего запуска Quartz.
Код
Хватит болтать! Давайте посмотрим на некоторый код .
юридические лица
Поскольку мы храним основные данные, а также результаты в Windows Azure Table Storage, давайте создадим два класса, которые будут хранить эти данные. Оба они будут производными от класса TableEntity.
PingItem.cs
Эта сущность будет представлять элементы для проверки связи. Мы сделаем все просто и у нас будет только одно свойство, содержащее URL для пинга. Вот как выглядит код:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Table; namespace PoorMansPingdomWorkerRole { public class PingItem : TableEntity { public PingItem() { PartitionKey = "PingItem"; RowKey = Guid.NewGuid().ToString(); } /// <summary> /// Gets or sets the URL to be pinged. /// </summary> public string Url { get; set; } public override string ToString() { return this.RowKey + "|" + Url; } public static PingItem ParseFromString(string s) { string[] splitter = {"|"}; string[] rowKeyAndUrl = s.Split(splitter, StringSplitOptions.RemoveEmptyEntries); return new PingItem() { PartitionKey = "PingItem", RowKey = rowKeyAndUrl[0], Url = rowKeyAndUrl[1], }; } } }
PingResult.cs
Эта сущность будет хранить результат пинга.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Table; namespace PoorMansPingdomWorkerRole { public class PingResult : TableEntity { /// <summary> /// Gets or sets the URL pinged. /// </summary> public string Url { get; set; } /// <summary> /// Gets or sets the HTTP Status code. /// </summary> public string StatusCode { get; set; } /// <summary> /// Gets or sets the time taken to process the ping in milliseconds. /// </summary> public double TimeTaken { get; set; } public long ContentLength { get; set; } } }
Код приложения
Инициализация рабочей роли — основные настройки
Поскольку наша реализация зависит от определенных предположений, мы обеспечим их выполнение, реализовав их на этапе инициализации рабочей роли. Вещи, которые мы сделали бы:
- Обеспечение того, что таблица, в которой будут храниться результаты, уже присутствует.
- Обеспечение того, что блоб, на который мы получим аренду, уже присутствует.
Для большей гибкости мы определим ряд настроек в файле конфигурации. Вот как будет выглядеть наш файл конфигурации для этих вещей:
<?xml version="1.0" encoding="utf-8"?> <ServiceConfiguration serviceName="PoorMansPingdom" xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceConfiguration" osFamily="3" osVersion="*" schemaVersion="2012-10.1.8"> <Role name="PoorMansPingdomWorkerRole"> <Instances count="2" /> <ConfigurationSettings> <Setting name="Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString" value="UseDevelopmentStorage=true" /> <!-- Storage account where our data will be stored. --> <Setting name="StorageAccount" value="UseDevelopmentStorage=true" /> <!-- Name of the table where master data will be stored. --> <Setting name="PingItemsTableName" value="PingItems"/> <!-- Name of the table where we'll store the results. --> <Setting name="PingResultsTableName" value="PingResults" /> <!-- Blob container name where we'll store the blob which will be leased. --> <Setting name="BlobContainer" value="lease-blob-container" /> <!-- Name of the blob on which each instance will try and acquire the lease. --> <Setting name="BlobToBeLeased" value="lease-blob.txt" /> <!-- Name of the queue from which messages will be read. --> <Setting name="ProcessQueueName" value="ping-items-queue"/> </ConfigurationSettings> </Role> </ServiceConfiguration>
Мы напишем функцию, которая будет вызываться в процессе инициализации для установки основных настроек:
private void Init() { // Get the cloud storage account. CloudStorageAccount storageAccount = CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageAccount")); // Get the name of the blob container string blobContainerName = RoleEnvironment.GetConfigurationSettingValue("BlobContainer"); CloudBlobContainer blobContainer = storageAccount.CreateCloudBlobClient().GetContainerReference(blobContainerName); // Create the blob container. blobContainer.CreateIfNotExists(); // Get the blob name string blobName = RoleEnvironment.GetConfigurationSettingValue("BlobToBeLeased"); CloudBlockBlob blob = blobContainer.GetBlockBlobReference(blobName); // Write some dummy data in the blob. string blobContent = "This is dummy data"; // Upload blob using (MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes(blobContent))) { blob.UploadFromStream(ms); } // Get the table name for storing results. string tableName = RoleEnvironment.GetConfigurationSettingValue("PingResultsTableName"); // Create the table. CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(tableName); table.CreateIfNotExists(); // Get the queue name where ping items will be stored. string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName"); // Create the queue. CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName); queue.CreateIfNotExists(); }
И вот как мы будем называть это:
public override void Run() { // This is a sample worker implementation. Replace with your logic. Trace.WriteLine("PoorMansPingdomWorkerRole entry point called", "Information"); // Call the initialization routine. Init(); while (true) { Thread.Sleep(10000); Trace.WriteLine("Working", "Information"); } }
Теперь, если мы запустим эту вещь, мы увидим следующее в нашей учетной записи хранения.
Создание запланированного задания и планирование задачи
Теперь давайте создадим работу и планируем ее. Начнем с того, что работа не будет делать никакой работы. Мы просто создадим класс с именем PingJob и предложим ему реализовать интерфейс IInterruptableJob в библиотеке Quartz.
using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading; using Microsoft.WindowsAzure.Diagnostics; using Microsoft.WindowsAzure.ServiceRuntime; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; using Microsoft.WindowsAzure.Storage.Table; using Microsoft.WindowsAzure.Storage.Queue; using Quartz; namespace PoorMansPingdomWorkerRole { public class PingJob : IInterruptableJob { public void Execute(IJobExecutionContext context) { Trace.WriteLine(string.Format("[{0}] - Executing ping job", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"))); } public void Interrupt() { throw new NotImplementedException(); } } }
Теперь давайте наметим эту работу. Для этого нам нужно будет определить график работы CRON, который мы сделаем в нашем файле конфигурации приложения, чтобы мы могли изменить его на лету, если это будет необходимо:
<!-- Ping Job Cron Schedule. Executes every minute --> <Setting name="PingJobCronSchedule" value="0 0/1 * * * ?"/>
И тогда в нашем WorkerRole.cs мы запланируем эту работу:
private void ScheduleJob() { DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.Now); // construct a scheduler factory ISchedulerFactory schedFact = new StdSchedulerFactory(); // get a scheduler IScheduler sched = schedFact.GetScheduler(); sched.Start(); JobDataMap jobDataMap = new JobDataMap(); IJobDetail websitePingJobDetail = JobBuilder.Create<PingJob>() .WithIdentity("WebsitePingJob", "group1") .WithDescription("Website Ping Job") .UsingJobData(jobDataMap) .Build(); ITrigger websitePingJobTrigger = TriggerBuilder.Create() .WithIdentity("WebsitePingJob", "group1") .StartAt(runTime) .WithCronSchedule(RoleEnvironment.GetConfigurationSettingValue("PingJobCronSchedule")) .StartNow() .Build(); sched.ScheduleJob(websitePingJobDetail, websitePingJobTrigger); }
Мы просто вызовем эту функцию в методе Run () нашей роли, как показано ниже, и наша работа теперь запланирована. Это будет срабатывать каждую минуту. Это так просто!
public override void Run() { // This is a sample worker implementation. Replace with your logic. Trace.WriteLine("PoorMansPingdomWorkerRole entry point called", "Information"); // Call the initialization routine. Init(); // Call the job scheduling routine. ScheduleJob(); while (true) { Thread.Sleep(10000); Trace.WriteLine("Working", "Information"); } }
Чтобы убедиться, что задание выполняется правильно, вот вывод в эмуляторе вычислений для обоих экземпляров роли:
Теперь нам осталось только реализовать функциональность работы. Итак, давайте сделаем это.
Приобретение аренды
Как упомянуто выше, первое, что мы хотим сделать, это попытаться приобрести блоб в аренду.
private bool AcquireLease() { try { blob.AcquireLease(TimeSpan.FromSeconds(15), null); return true; } catch (Exception exception) { return false; } }
Мы сделаем все просто, и если есть какое-то исключение, мы просто предположим, что другой экземпляр получил аренду для BLOB-объекта. В реальном сценарии вы должны принять во внимание надлежащие исключения.
// Try and acquire the lease. if (AcquireLease()) { Trace.WriteLine(string.Format("[{0}] - Lease acquired. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id)); // If successful then read the data. } else { Trace.WriteLine(string.Format("[{0}] - Failed to acquire lease. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id)); // Else just sleep for 15 seconds. Thread.Sleep(15 * 1000); }
Чтение основных данных
Следующим шагом будет чтение основных данных. Опять же, сохраняя простоту, мы не будем беспокоиться об исключениях. Мы просто убедимся, что данные есть в нашей таблице « PingItems ». Для этой записи в блоге я просто ввел данные в эту таблицу вручную.
private List<PingItem> ReadMasterData() { string pingItemTableName = RoleEnvironment.GetConfigurationSettingValue("PingItemsTableName"); CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(pingItemTableName); TableQuery<PingItem> query = new TableQuery<PingItem>(); var queryResult = table.ExecuteQuery<PingItem>(query); return queryResult.ToList(); }
Сохранение данных в очереди процессов
Теперь мы сохраним данные в очереди процесса.
private void SaveMessages(List<PingItem> pingItems) { string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName"); CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName); foreach (var pingItem in pingItems) { CloudQueueMessage msg = new CloudQueueMessage(pingItem.ToString()); queue.AddMessage(msg, TimeSpan.FromSeconds(45)); } }
Извлекать данные из очереди процесса
Далее мы извлечем данные из очереди процесса и обработаем эти записи. Каждый экземпляр будет получать 5 сообщений из очереди. Опять же, ради простоты, как только сообщение получено, мы немедленно удалим его. В реальном сценарии нужно будет удерживать это сообщение до тех пор, пока оно не будет обработано должным образом.
private List<PingItem> FetchMessages(int maximumMessagesToFetch) { string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName"); CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName); var messages = queue.GetMessages(maximumMessagesToFetch); List<PingItem> itemsToBeProcessed = new List<PingItem>(); foreach (var message in messages) { itemsToBeProcessed.Add(PingItem.ParseFromString(message.AsString)); queue.DeleteMessage(message); } return itemsToBeProcessed; }
Элементы процесса
Это последний этап нашей задачи. Мы напишем функцию, которая будет извлекать URL-адрес и возвращает объект PingResult, который мы сохраним в хранилище таблицы.
private PingResult FetchUrl(PingItem item) { DateTime startDateTime = DateTime.UtcNow; TimeSpan elapsedTime = TimeSpan.FromSeconds(0); string statusCode = ""; long contentLength = 0; try { HttpWebRequest req = (HttpWebRequest)WebRequest.Create(item.Url); req.Timeout = 30 * 1000;//Let's timeout the request in 30 seconds. req.Method = "GET"; using (HttpWebResponse resp = (HttpWebResponse)req.GetResponse()) { DateTime endDateTime = DateTime.UtcNow; elapsedTime = new TimeSpan(endDateTime.Ticks - startDateTime.Ticks); statusCode = resp.StatusCode.ToString(); contentLength = resp.ContentLength; } } catch (WebException webEx) { DateTime endDateTime = DateTime.UtcNow; elapsedTime = new TimeSpan(endDateTime.Ticks - startDateTime.Ticks); statusCode = webEx.Status.ToString(); } return new PingResult() { PartitionKey = DateTime.UtcNow.Ticks.ToString("d19"), RowKey = item.RowKey, Url = item.Url, StatusCode = statusCode, ContentLength = contentLength, TimeTaken = elapsedTime.TotalMilliseconds, }; }
private void SaveResult(PingResult result) { string tableName = RoleEnvironment.GetConfigurationSettingValue("PingResultsTableName"); CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(tableName); TableOperation addOperation = TableOperation.Insert(result); table.Execute(addOperation); }
public void Execute(IJobExecutionContext context) { // Introduce a random delay between 100 and 200 ms to to avoid race condition. Thread.Sleep((new Random()).Next(100, 200)); Trace.WriteLine(string.Format("[{0}] - Executing ping job. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id)); Init(); // Try and acquire the lease. if (AcquireLease()) { Trace.WriteLine(string.Format("[{0}] - Lease acquired. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id)); // If successful then read the data. var itemsToBeProcessed = ReadMasterData(); //Now save this data as messages in process queue. SaveMessages(itemsToBeProcessed); } else { Trace.WriteLine(string.Format("[{0}] - Failed to acquire lease. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id)); // Else just sleep for 15 seconds. Thread.Sleep(15 * 1000); } // Now we'll fetch 5 messages from top of queue var itemsToBeProcessedByThisInstance = FetchMessages(5); if (itemsToBeProcessedByThisInstance.Count > 0) { int numTasks = itemsToBeProcessedByThisInstance.Count; List<Task> tasks = new List<Task>(); for (int i = 0; i < numTasks; i++) { var pingItem = itemsToBeProcessedByThisInstance[i]; var task = Task.Factory.StartNew(() => { var pingResult = FetchUrl(pingItem); SaveResult(pingResult); }); tasks.Add(task); } Task.WaitAll(tasks.ToArray()); } }
Довольно просто, да!
Готовый код
Вот полный код для проверки связи в одном месте .
Файл конфигурации
<?xml version="1.0" encoding="utf-8"?> <ServiceConfiguration serviceName="PoorMansPingdom" xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceConfiguration" osFamily="3" osVersion="*" schemaVersion="2012-10.1.8"> <Role name="PoorMansPingdomWorkerRole"> <Instances count="2" /> <ConfigurationSettings> <Setting name="Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString" value="UseDevelopmentStorage=true" /> <!-- Storage account where our data will be stored. --> <Setting name="StorageAccount" value="UseDevelopmentStorage=true" /> <!-- Name of the table where master data will be stored. --> <Setting name="PingItemsTableName" value="PingItems" /> <!-- Name of the table where we'll store the results. --> <Setting name="PingResultsTableName" value="PingResults" /> <!-- Blob container name where we'll store the blob which will be leased. --> <Setting name="BlobContainer" value="lease-blob-container" /> <!-- Name of the blob on which each instance will try and acquire the lease. --> <Setting name="BlobToBeLeased" value="lease-blob.txt" /> <!-- Name of the queue from which messages will be read. --> <Setting name="ProcessQueueName" value="ping-items-queue" /> <!-- Ping Job Cron Schedule --> <Setting name="PingJobCronSchedule" value="0 0/1 * * * ?" /> </ConfigurationSettings> </Role> </ServiceConfiguration>
WorkerRole.cs
using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading; using Microsoft.WindowsAzure.Diagnostics; using Microsoft.WindowsAzure.ServiceRuntime; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; using Microsoft.WindowsAzure.Storage.Table; using Microsoft.WindowsAzure.Storage.Queue; using Quartz; using Quartz.Impl; namespace PoorMansPingdomWorkerRole { public class WorkerRole : RoleEntryPoint { public override void Run() { // This is a sample worker implementation. Replace with your logic. Trace.WriteLine("PoorMansPingdomWorkerRole entry point called", "Information"); // Call the initialization routine. Init(); // Call the job scheduling routine. ScheduleJob(); while (true) { Thread.Sleep(10000); //Trace.WriteLine("Working", "Information"); } } public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // For information on handling configuration changes // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. return base.OnStart(); } private void Init() { // Get the cloud storage account. CloudStorageAccount storageAccount = CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageAccount")); // Get the name of the blob container string blobContainerName = RoleEnvironment.GetConfigurationSettingValue("BlobContainer"); CloudBlobContainer blobContainer = storageAccount.CreateCloudBlobClient().GetContainerReference(blobContainerName); // Create the blob container. blobContainer.CreateIfNotExists(); // Get the blob name string blobName = RoleEnvironment.GetConfigurationSettingValue("BlobToBeLeased"); CloudBlockBlob blob = blobContainer.GetBlockBlobReference(blobName); // Write some dummy data in the blob. string blobContent = "This is dummy data"; // Upload blob using (MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes(blobContent))) { blob.UploadFromStream(ms); } // Get the table name for storing results. string tableName = RoleEnvironment.GetConfigurationSettingValue("PingResultsTableName"); // Create the table. CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(tableName); table.CreateIfNotExists(); // Get the queue name where ping items will be stored. string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName"); // Create the queue. CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName); queue.CreateIfNotExists(); } private void ScheduleJob() { DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.Now); // construct a scheduler factory ISchedulerFactory schedFact = new StdSchedulerFactory(); // get a scheduler IScheduler sched = schedFact.GetScheduler(); sched.Start(); JobDataMap jobDataMap = new JobDataMap(); IJobDetail websitePingJobDetail = JobBuilder.Create<PingJob>() .WithIdentity("WebsitePingJob", "group1") .WithDescription("Website Ping Job") .UsingJobData(jobDataMap) .Build(); ITrigger websitePingJobTrigger = TriggerBuilder.Create() .WithIdentity("WebsitePingJob", "group1") .StartAt(runTime) .WithCronSchedule(RoleEnvironment.GetConfigurationSettingValue("PingJobCronSchedule")) .StartNow() .Build(); sched.ScheduleJob(websitePingJobDetail, websitePingJobTrigger); } } }
PingJob.cs
using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading; using Microsoft.WindowsAzure.Diagnostics; using Microsoft.WindowsAzure.ServiceRuntime; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; using Microsoft.WindowsAzure.Storage.Table; using Microsoft.WindowsAzure.Storage.Queue; using Quartz; using System.Threading.Tasks; namespace PoorMansPingdomWorkerRole { public class PingJob : IInterruptableJob { CloudStorageAccount storageAccount; CloudBlobContainer blobContainer; CloudBlockBlob blob; public void Execute(IJobExecutionContext context) { // Introduce a random delay between 100 and 200 ms to to avoid race condition. Thread.Sleep((new Random()).Next(100, 200)); Trace.WriteLine(string.Format("[{0}] - Executing ping job. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id)); Init(); // Try and acquire the lease. if (AcquireLease()) { Trace.WriteLine(string.Format("[{0}] - Lease acquired. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id)); // If successful then read the data. var itemsToBeProcessed = ReadMasterData(); //Now save this data as messages in process queue. SaveMessages(itemsToBeProcessed); } else { Trace.WriteLine(string.Format("[{0}] - Failed to acquire lease. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id)); // Else just sleep for 15 seconds. Thread.Sleep(15 * 1000); } // Now we'll fetch 5 messages from top of queue var itemsToBeProcessedByThisInstance = FetchMessages(5); if (itemsToBeProcessedByThisInstance.Count > 0) { int numTasks = itemsToBeProcessedByThisInstance.Count; List<Task> tasks = new List<Task>(); for (int i = 0; i < numTasks; i++) { var pingItem = itemsToBeProcessedByThisInstance[i]; var task = Task.Factory.StartNew(() => { var pingResult = FetchUrl(pingItem); SaveResult(pingResult); }); tasks.Add(task); } Task.WaitAll(tasks.ToArray()); } } public void Interrupt() { throw new NotImplementedException(); } private void Init() { storageAccount = CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageAccount")); string blobContainerName = RoleEnvironment.GetConfigurationSettingValue("BlobContainer"); blobContainer = storageAccount.CreateCloudBlobClient().GetContainerReference(blobContainerName); string blobName = RoleEnvironment.GetConfigurationSettingValue("BlobToBeLeased"); blob = blobContainer.GetBlockBlobReference(blobName); } private bool AcquireLease() { try { blob.AcquireLease(TimeSpan.FromSeconds(45), null); return true; } catch (Exception exception) { return false; } } private List<PingItem> ReadMasterData() { string pingItemTableName = RoleEnvironment.GetConfigurationSettingValue("PingItemsTableName"); CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(pingItemTableName); TableQuery<PingItem> query = new TableQuery<PingItem>(); var queryResult = table.ExecuteQuery<PingItem>(query); return queryResult.ToList(); } private void SaveMessages(List<PingItem> pingItems) { string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName"); CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName); foreach (var pingItem in pingItems) { CloudQueueMessage msg = new CloudQueueMessage(pingItem.ToString()); queue.AddMessage(msg, TimeSpan.FromSeconds(45)); } } private List<PingItem> FetchMessages(int maximumMessagesToFetch) { string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName"); CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName); var messages = queue.GetMessages(maximumMessagesToFetch); List<PingItem> itemsToBeProcessed = new List<PingItem>(); foreach (var message in messages) { itemsToBeProcessed.Add(PingItem.ParseFromString(message.AsString)); queue.DeleteMessage(message); } return itemsToBeProcessed; } private PingResult FetchUrl(PingItem item) { DateTime startDateTime = DateTime.UtcNow; TimeSpan elapsedTime = TimeSpan.FromSeconds(0); string statusCode = ""; long contentLength = 0; try { HttpWebRequest req = (HttpWebRequest)WebRequest.Create(item.Url); req.Timeout = 30 * 1000;//Let's timeout the request in 30 seconds. req.Method = "GET"; using (HttpWebResponse resp = (HttpWebResponse)req.GetResponse()) { DateTime endDateTime = DateTime.UtcNow; elapsedTime = new TimeSpan(endDateTime.Ticks - startDateTime.Ticks); statusCode = resp.StatusCode.ToString(); contentLength = resp.ContentLength; } } catch (WebException webEx) { DateTime endDateTime = DateTime.UtcNow; elapsedTime = new TimeSpan(endDateTime.Ticks - startDateTime.Ticks); statusCode = webEx.Status.ToString(); } return new PingResult() { PartitionKey = DateTime.UtcNow.Ticks.ToString("d19"), RowKey = item.RowKey, Url = item.Url, StatusCode = statusCode, ContentLength = contentLength, TimeTaken = elapsedTime.TotalMilliseconds, }; } private void SaveResult(PingResult result) { string tableName = RoleEnvironment.GetConfigurationSettingValue("PingResultsTableName"); CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(tableName); TableOperation addOperation = TableOperation.Insert(result); table.Execute(addOperation); } } }
Другие альтернативы
Вам не нужно изо всех сил и строить эту вещь самостоятельно. К счастью, есть некоторые вещи, которые доступны вам даже сегодня, которые помогут вам достичь того же. Некоторые параметры находятся за пределами Windows Azure, а некоторые — в Windows Azure. Мы поговорим только о доступных сегодня опциях в Windows Azure:
Планировщик задач службы Windows Azure Mobile
Недавно Windows Azure объявила о доступности функции планировщика заданий в Windows Azure Mobile Service. Вы можете написать код, используя node.js для функциональности работы, а мобильный сервис заботится о выполнении работы. Для получения дополнительной информации, пожалуйста, посетите: http://www.windowsazure.com/en-us/develop/mobile/tutorials/schedule-backend-tasks/ .
Aditi Cloud Services
Aditi ( www.aditi.com ), крупный партнер Microsoft, недавно объявил о доступности службы «Планировщик», которая позволяет выполнять любое задание CRON в облаке. Эта служба также доступна через Windows Azure Marketplace и может быть добавлена в качестве дополнительной функции к вашей подписке. Для получения дополнительной информации, пожалуйста, посетите: http://www.aditicloud.com/ .
Резюме
Как показано выше, довольно просто создать планировщик задач в Windows Azure. Очевидно, я взял довольно простой пример и сделал определенные предположения. Когда вы создадите сервис, подобный этому, для производственного использования, вам потребуется решить ряд проблем, чтобы создать надежный сервис. Надеюсь, вы нашли этот пост полезным. Делитесь своими мыслями, предоставляя комментарии. Если вы обнаружите какие-либо проблемы, пожалуйста, дайте мне знать, и я исправлю их как можно скорее.