Статьи

Управление сообщениями очереди с помощью клиента RabbitMQ

Часто мы сталкиваемся с проблемами с очередями ошибок или очередями мертвых писем, которые заполняются сообщениями. А поскольку они являются самым низким приоритетом для любого приложения, общая тенденция состоит в том, чтобы забыть об этих очередях. Однако эти очереди так же важны, как и любые другие очереди, и нам необходимо периодически очищать сообщения из этих очередей, иначе они будут занимать пространство, отведенное для RabbitMQ.

Теперь, когда мы думаем об очистке сообщений очереди ошибок, какие у нас есть варианты? Мы не можем просто слепо удалять все сообщения, так как некоторые из них могут быть важными. Например, в критическом сценарии, таком как банковское дело, сообщение, которое не было успешно обработано пакетным процессом, может привести к попаданию этого сообщения в очередь ошибок. Эта ошибка может быть вызвана временными сбоями (например, ошибка сети, взаимная блокировка базы данных, недоступность службы и т. Д.), Поэтому нам необходимо повторно запустить это сообщение из очереди ошибок. Мы должны иметь возможность просмотреть сообщение, прежде чем принять решение о его очистке или перемещении обратно в исходную очередь для обработки. Кроме того, должна быть функция выборочной очистки / перемещения сообщений или массовой очистки / перемещения сообщений. Итак, для управления любым сообщением очереди ошибок или любым сообщением очереди, мы должны искать эти функции:

  • Удалить / удалить все сообщения из очереди.
  • Выборочно удалять / удалять сообщения из очереди.
  • Переместить все сообщения из очереди в исходную очередь (или любую другую очередь).
  • Выборочное перемещение сообщений из очереди в исходную очередь (или любую другую очередь).

Реализация

Использование протокола AMQPS имеет некоторые ограничения в отношении информации, которую мы можем получить, и операций, которые мы можем выполнять с помощью клиента RabbitMQ. Тем не менее, RabbitMQ имеет HTTP-клиент, который позволяет вам выполнять все операции, но команда сетевой безопасности в большинстве организаций не будет открывать этот порт брандмауэра для вас из соображений безопасности.

Я буду использовать RabbitMQ Client для этой статьи. Клиент RabbitMQ .NET представляет собой реализацию клиентской библиотеки AMQP 0-9-1 для C # (и, неявно, для других языков .NET). Начиная с версии 4.0, он поддерживает .NET Core и .NET 4.5.1+.

Библиотека имеет открытый исходный код и имеет двойную лицензию в соответствии с Apache License v2 и Mozilla Public License v1.1. Текущая версия клиентской библиотеки RabbitMQ .NET / C # — 5.0.1. Рекомендуется использовать пакет NuGet. Примечания к выпуску можно найти на странице релизов GitHub.

Пользовательский интерфейс

Для параметров Move All и Purge All мы ищем этот интерфейс:

Мы должны создать файл JSON, содержащий имя очереди ошибок и исходное имя очереди вместе с приложением и группой приложений. JSON будет выглядеть примерно так:

{
        "ApplicationGroup": "Group1",
        "ApplicationName": "Application1",
        "OriginalQueueName": "OriginalQueue1",           
        "ErrorQueueName": "Queue1"          
}

Нажав на Имя очереди,  вы попадете на другой экран, который позволит нам выборочно удалять или перемещать сообщения после просмотра.

Фрагменты кода

Давайте пройдемся по фрагментам кода для методов, которые нам нужны для выполнения необходимых операций:

Просмотреть всю информацию об очереди

Этот метод будет получать всю информацию о количестве сообщений от RabbitMQ. Это вернет список объектов сообщения. Код прост:

[Serializable]
    public class Message
    {        
      private readonly string serializableBody;
      public Message ( string serializableBody )
      {
        this.serializableBody = serializableBody;
      }
      public MessageHeader Header
      {
        get;
        private set;
      }
      public string MessageBody
      {
        get 
        {
          return this.serializableBody;
        }           
      }
      public byte[] GetBody()
      {
        return (byte[])((object)Encoding.ASCII.GetBytes(this.MessageBody));              
      }
}

[Serializable]
    public class MessageHeader
    {
        public MessageHeader()
        {
            this.MessageId = Guid.NewGuid().ToString();
            this.Properties = new Dictionary<string, string>();
            this.IsPersistent = true;
        }
        public string AppId { get; set; }   
        public string MessageId { get; set; }
        public string MessageName { get; set; }
        public DateTime GeneratedAtUtc { get; set; }
        public string ExpirationInMilliseconds { get; set; }
        public bool IsPersistent { get; set; }
        public bool Delete { get; set; }
        public bool Move { get; set; }  
        public IDictionary<string, string> Properties { get; private set; }

    }

  public class QueueInfoModel
  {        
    public string Environment { get; set; }
    public string ApplicationGroup { get; set; }
    public string ApplicationName { get; set; }
    public string ErrorQueueName { get; set; 
    public string OriginalQueueName { get; set; }  
    public int MessageCount { get; set;}
  }

public class RabbitMQQueueManager
{
  private readonly string environment;
  private readonly log4net.ILog logger; 
  private const string DELETE_KEY = "delete";
  private const string MOVE_KEY = "move";
  private const string ORIGINAL_QUEUENAME_KEY = "originalQueueName";
  private const string ERROR_QUEUENAME_KEY = "errorQueueName";
  private const string EXCHANGE_KEY = "originalQueueExchange";
  private const ushort PREFETCH_SIZE = 50;
  public List<QueueInfoModel> GetAllQueueData ( string appgroup = "", string appname = "", string queuename = "" )
{
    List<QueueInfoModel> queueDetails;
    List<QueueInfoModel> queueInfoList = new List<QueueInfoModel> ();
    try
    {
      queueDetails = ReadAllQueueInfoFromJsonConfig ();
      if (!string.IsNullOrEmpty (appgroup))
      {
        queueDetails = queueDetails.Where (x => x.ApplicationGroup.Contains (appgroup, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      if (!string.IsNullOrEmpty (appname))
      {
        queueDetails = queueDetails.Where (x => x.ApplicationName.Contains (appname, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      if (!string.IsNullOrEmpty (queuename))
      {
        queueDetails = queueDetails.Where (x => x.ErrorQueueName.Contains (queuename, StringComparison.CurrentCultureIgnoreCase) || x.OriginalQueueName.Contains (queuename, StringComparison.CurrentCultureIgnoreCase)).ToList ();
      }
      using (var connection = GetRabbitMqConnection ())
      {
        using (var channel = connection.CreateModel ())
        {
          foreach (var queueDetail in queueDetails)
          {
            try
            {
              queueDetail.MessageCount = (int)channel.MessageCount (queueDetail.ErrorQueueName);
            }
            catch (Exception ex)
            {
              this.logger.Error (string.Format ("Error Occured while getting message count for the queue {0}, skipping this queue", queueDetail.ErrorQueueName), ex);
            }                            
            queueInfoList.Add (queueDetail);                           
          }
        }
      }
    }
    catch (Exception)
    {
      throw;
    }
    finally
    {
      queueDetails = null;
    }
    return queueInfoList;
  }
  private List<QueueInfoModel> ReadAllQueueInfoFromJsonConfig ()
  {
    var fullpath = AppDomain.CurrentDomain.BaseDirectory + @"\ErrorMessageQueues.Json";
    var queueDetails = JsonConvert.DeserializeObject<List<QueueInfoModel>> (File.ReadAllText (fullpath));

    return queueDetails;
  }

  private IConnection GetRabbitMqConnection ()
  {
    try
    {               
      ConnectionFactory connectionFactory = new ConnectionFactory ();
      connectionFactory.Uri = "Your URI";
      connectionFactory.Protocol = Protocols.DefaultProtocol;
      connectionFactory.AutomaticRecoveryEnabled = true;
      IConnection conn = connectionFactory.CreateConnection ();
      return conn;
    }
    catch (Exception)
    {               
      throw;
    }
}
}

Удалить / удалить все сообщения

Это еще одна простая операция. Клиент RabbitMQ предоставляет метод очистки.

using (var connection = GetRabbitMqConnection ())
{
  using (var channel = connection.CreateModel ())
  {
    channel.QueuePurge (queueName);
  }
}

Переместить все сообщения из очереди в исходную очередь (или любую другую очередь)

В этой операции мы будем читать каждое сообщение в очереди, а затем публиковать его в очереди назначения. Мы должны позаботиться о любых исключительных случаях, которые могут привести к потере сообщения. Клиент RabbitMQ не предоставляет готовых функциональных возможностей «из коробки», поэтому мы сначала прочитаем сообщение и опубликуем его в очереди назначения. Затем мы подтвердим (подтвердим) сообщение, которое будет удалено из очереди. В случае каких-либо исключений при публикации сообщения, нам нужно НАЧАТЬ (не подтверждать) сообщение. Это вернет сообщение в очередь ошибок.

public List<string> MoveAllMessagesToQueue ( string errorQueueName, string originalQueueName )
{
  QueueingBasicConsumer consumer = null;            
  List<string> messageList = new List<string> ();
  BasicDeliverEventArgs result = null;
  using (var rmqConnection = GetRabbitMqConnection ())
  {
    using (var channel = rmqConnection.CreateModel ())
    {
      try
      {
        var queueMessageCount = (ushort)channel.MessageCount (errorQueueName);
        //Check if =queue message count is less than prefetch count, 
        //if yes then set the prefetch count to queue message count.
        var pfCount = queueMessageCount >= PREFETCH_SIZE ? PREFETCH_SIZE : queueMessageCount;

        channel.BasicQos (0, pfCount, false);
        consumer = new QueueingBasicConsumer (channel);
        channel.BasicConsume (errorQueueName, false, consumer);
        for (int i = 0; i < queueMessageCount; i++)
        {
          if (!channel.IsOpen)
          {
            throw new ApplicationException ("Channel is closed");
          }
          result = consumer.Queue.Dequeue ();
          try
          {
            channel.BasicPublish (string.Empty, originalQueueName, true, result.BasicProperties, result.Body);
            channel.BasicAck (result.DeliveryTag, false);                                
            messageList.Add (result.BasicProperties.MessageId);
          }
          catch (Exception ex)
          {
            ////Nack the message in case of any exception while reading the message.
            channel.BasicNack (result.DeliveryTag, false, true);
            this.logger.Warn ("Error Occured while performing delete operation for message ID: " + result.BasicProperties.MessageId, ex);
          }
        }
      }
      catch (Exception)
      {
        ////Nack the message back to queue in case of exception
        if (result != null)
        {
          channel.BasicNack (result.DeliveryTag, false, true);
        }
        throw;
      }
    }
  }

  this.logger.Info (string.Format ("Successfully moved all messages from error queue {0} to destination queue {1}", errorQueueName, originalQueueName));

  return messageList;
}

Получать сообщения из очереди для просмотра

Этот метод будет вызван, как только мы нажмем на любую очередь, чтобы увидеть сообщения. Для этого мы удалим каждое сообщение и NACK, чтобы вернуть его в очередь. Это эквивалентно функциональности PEEK, но, поскольку RabbitMQ Client не предоставляет этого готового решения, мы должны сделать этот обходной путь. Нам нужно только просмотреть эти данные и затем решить, какую операцию нам нужно выполнить (очистить / переместить). Кроме того, мы передаем количество сообщений, которые нам нужно просмотреть — это будет полезно при поиске страниц.

public List<Message> GetMessagesFromQueueNoAck ( string queueName, int messageCount = -1 )
        {
            QueueingBasicConsumer consumer = null;          
            var responseMessages = new List<Message> ();
            BasicDeliverEventArgs result = null;
            using (var rmqConnection = GetRabbitMqConnection ())
            {
                using (var channel = rmqConnection.CreateModel ())
                {
                    try
                    {
                        var queueMessageCount = (int)channel.MessageCount (queueName);
                        var queueInfo = GetAllQueueData ("", "", queueName).FirstOrDefault ();
                        var count = messageCount > -1 ? messageCount <= queueMessageCount ? messageCount : queueMessageCount : queueMessageCount;
                        var pfCount = count >= PREFETCH_SIZE ? PREFETCH_SIZE : count;
                        channel.BasicQos (0, (ushort)pfCount, false);
                        consumer = new QueueingBasicConsumer (channel);
                        channel.BasicConsume (queueName, false, consumer);
                        for (int i = 0; i < pfCount; i++)
                        {
                            if (!channel.IsOpen)
                            {
                                throw new ApplicationException ("Channel is closed");
                            }
                            result = consumer.Queue.Dequeue ();
                            try
                            {
                                string messageData = System.Text.Encoding.UTF8.GetString (result.Body);
                                var rMessage = new Message (messageData);
                                RmqHeaderHandler.ReadRmqMessageProperties (result.BasicProperties, rMessage);
                                channel.BasicNack (result.DeliveryTag, false, true);
                                ////Set Message properties
                                Type t = queueInfo.GetType ();
                                foreach (PropertyInfo pi in t.GetProperties ())
                                {
                                    rMessage.Header.Properties.Add (pi.Name, pi.GetValue (queueInfo, null).ToString ());
                                }
                                responseMessages.Add (rMessage);
                            }
                            catch (Exception ex)
                            {
                                ////Nack the message in case of any exception while reading the message.
                                channel.BasicNack (result.DeliveryTag, false, true);
                                this.logger.Warn ("Error Occured while getting message for message ID" + result.BasicProperties.MessageId, ex);
                            }
                        }
                    }
                    catch (Exception)
                    {
                        ////Nack the message back to queue in case of exception
                        if (result != null)
                        {
                            channel.BasicNack (result.DeliveryTag, false, true);
                        }
                        throw;
                    }
                }
            }
            return responseMessages;
        }

public static class RmqHeaderHandler
    {
        private const byte NonPersistentDeliveryMode = 1;
        private const byte PersistentDeliveryMode = 2;
        private const string SecurityTokenKey = "SecurityToken";
        private const string Properties = "properties";
        private const string MessageNameKey = "MessageName";
        private const string SystemPropertiesKey = "SystemProperties";
        private const string ApplicationPropertiesKey = "ApplicationProperties";
        #region Public Methods
        public static void ReadDynamicMessageProperties(dynamic messageProperties, Message message)
        {
            try
            {                   
                message.Header.AppId = messageProperties.appId;
                message.Header.MessageId = messageProperties.messageId;  
                message.Header.GeneratedAtUtc = messageProperties.generatedAtUtc;
                message.Header.ExpirationInMilliseconds = messageProperties.expirationInMilliseconds;
                message.Header.IsPersistent = messageProperties.isPersistent;
                message.Header.Delete = Convert.ToBoolean(messageProperties.delete);
                message.Header.Move = Convert.ToBoolean(messageProperties.move);
                if (messageProperties.ContainsKey ("messageName"))
                {
                    message.Header.MessageName = messageProperties.messageName;
                }                
                if (messageProperties.ContainsKey("properties"))
                {  
                    var customProperties = Newtonsoft.Json.JsonConvert.DeserializeObject<Dictionary<string, string>>(Convert.ToString(messageProperties.properties));
                    foreach (var propPair in customProperties)
                    {
                        message.Header.Properties.Add(propPair.Key, propPair.Value);
                    }
                }
            }
            catch(Exception)
            {
                throw;
            }
        }

        public static void ReadRmqMessageProperties(IBasicProperties messageProperties, Message message)
        {
            message.Header.AppId = messageProperties.AppId;
            message.Header.MessageId = messageProperties.MessageId;
            message.Header.GeneratedAtUtc = new DateTime(messageProperties.Timestamp.UnixTime);
            message.Header.ExpirationInMilliseconds = messageProperties.Expiration;
            message.Header.IsPersistent = messageProperties.DeliveryMode == PersistentDeliveryMode;
            if (messageProperties.Headers.ContainsKey (SystemPropertiesKey))
            {
                var systemProperties = DeserializeMessageProperties ((byte[])messageProperties.Headers[SystemPropertiesKey]);
                if (systemProperties.ContainsKey (MessageNameKey))
                {
                    message.Header.MessageName = systemProperties[MessageNameKey];
                }
            }
            if (messageProperties.Headers.ContainsKey(ApplicationPropertiesKey))
            {
                var applicationProperties = DeserializeMessageProperties((byte[])messageProperties.Headers[ApplicationPropertiesKey]);
                foreach (var propPair in applicationProperties)
                {
                    message.Header.Properties.Add(propPair.Key, propPair.Value);
                }
            }
        }

        #endregion
        #region private methods        
        private static Dictionary<string, string> DeserializeMessageProperties(byte[] properties)
        {
            var serializer = new JsonMessageSerializer();
            var serializedText = serializer.Serialize(properties);
            return serializer.Deserialize<Dictionary<string, string>>(serializedText);
        }
        #endregion
}

Выполнить выбранное удаление / перемещение по очереди сообщений

Этот метод позаботится о трех сценариях:

  1. Move the selected message to the original queue: For this, we will look for the message marked with the «move» attribute. We will first deque a batch of messages and then match the message IDs with the provided messages. If there’s a match, we will pick only those messages and publish to the original queue. Once published, we will ACK the message to be removed from the error queue. In case of an exception, we will NACK the message to be put back to the error queue.

  2. Delete selected messages: For this, we will look for messages marked with the «delete» attribute. We will first deque a batch of messages and then match the message IDs with the provided messages. If there’s a match, we will pick only those messages and NACK them. This will remove them from error queue. 

  3. Push messages to the back of the queue: This scenario will happen when the user has not made a selection of «purge» or «move» for any queue message or messages. In this case, we will just publish these messages back to the same queue. This will push these messages to the back of the queue. We will be able to browse to the next set of messages. For this, we will match the batch of dequeued messages without any deletion of the move attribute. Then, we will publish to the same queue and ACK the message. This can be done in a transaction, as well, to prevent message loss.