rabbitmq-优先队列-优先消费新消息-然后才消费历史消息

使用 优先队列

这个需要使用到优先队列, 这是支持设置优先级的队列, 官网文档 提到, 在声明队列时, 设置 ("x-max-priority", 10)

历史消息与新消息区分

在消息属性中设置 时间戳, 并且设置一个新消息阈值, 超过这个时间就是历史消息, 这样, 当获取到历史消息时, 可以标记为 ack 后, 重新入队, 并把优先级调到最低, 这样就成功区分新旧消息了

生产者实现

import pika
import time


class Producer(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost'))
        self._channel = None

    @property
    def channel(self):
        if self.connection.is_closed:
            self.connection = pika.BlockingConnection(
                pika.ConnectionParameters('localhost'))
            return self.connection.channel()
        if self._channel is None:
            self._channel = self.connection.channel()
            return self._channel
        if not self._channel.is_open:
            return self.connection.channel()
        return self._channel

    def declare_priority_queue(self, queue_name="priority-queue"):
        self.channel.queue_declare(queue=queue_name,
                                   arguments={'x-max-priority': 10})

    def publish_message(self, message, priority, timestamp=None):
        timestamp = time.time() if not timestamp else timestamp
        properties = pika.BasicProperties(priority=priority,
                                          headers={'timestamp': str(timestamp)})
        self.channel.basic_publish(exchange='',
                                   routing_key='priority-queue',
                                   body=message,
                                   properties=properties)


if __name__ == "__main__":
    p = Producer()
    with p.channel as channel:
        p.declare_priority_queue()
        p.publish_message("你好", priority=5)

消费端实现

import pika
import time
from python.rabbitmq.优先队列.producer import Producer


class Consumer(Producer):
    NEW_MESSAGE_THRESHOLD = 3

    def callback(self, ch, method, properties, body):
        timestamp = float(properties.headers.get('timestamp'))
        current_time = int(time.time())

        if properties.priority > 0 and current_time - timestamp > self.NEW_MESSAGE_THRESHOLD:
            # 超时, 认为是历史消息, 重新设置优先为 0
            self.channel.basic_ack(delivery_tag=method.delivery_tag)
            self.publish_message(message=body, priority=0, timestamp=timestamp)
            print(f"历史消息, 重新入队: {body.decode()}")
            return

        # 正常消费
        if properties.priority > 0:
            print(f"消费新消息: {body.decode()}, timestamp: {timestamp}")
        else:
            print(f"消费历史消息: {body.decode()}, timestamp: {timestamp}")
        time.sleep(1)
        self.channel.basic_ack(delivery_tag=method.delivery_tag)

    def consuming(self):
        self.channel.basic_consume(queue='priority-queue',
                                   on_message_callback=self.callback,
                                   auto_ack=False)
        self.channel.start_consuming()


if __name__ == "__main__":
    c = Consumer()
    c.consuming()