使用 优先队列
这个需要使用到优先队列, 这是支持设置优先级的队列, 官网文档 提到, 在声明队列时, 设置 ("x-max-priority", 10)
历史消息与新消息区分
在消息属性中设置 时间戳
, 并且设置一个新消息阈值, 超过这个时间就是历史消息, 这样, 当获取到历史消息时, 可以标记为 ack
后, 重新入队, 并把优先级调到最低, 这样就成功区分新旧消息了
生产者实现
import pika
import time
class Producer(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost'))
self._channel = None
@property
def channel(self):
if self.connection.is_closed:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost'))
return self.connection.channel()
if self._channel is None:
self._channel = self.connection.channel()
return self._channel
if not self._channel.is_open:
return self.connection.channel()
return self._channel
def declare_priority_queue(self, queue_name="priority-queue"):
self.channel.queue_declare(queue=queue_name,
arguments={'x-max-priority': 10})
def publish_message(self, message, priority, timestamp=None):
timestamp = time.time() if not timestamp else timestamp
properties = pika.BasicProperties(priority=priority,
headers={'timestamp': str(timestamp)})
self.channel.basic_publish(exchange='',
routing_key='priority-queue',
body=message,
properties=properties)
if __name__ == "__main__":
p = Producer()
with p.channel as channel:
p.declare_priority_queue()
p.publish_message("你好", priority=5)
消费端实现
import pika
import time
from python.rabbitmq.优先队列.producer import Producer
class Consumer(Producer):
NEW_MESSAGE_THRESHOLD = 3
def callback(self, ch, method, properties, body):
timestamp = float(properties.headers.get('timestamp'))
current_time = int(time.time())
if properties.priority > 0 and current_time - timestamp > self.NEW_MESSAGE_THRESHOLD:
# 超时, 认为是历史消息, 重新设置优先为 0
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.publish_message(message=body, priority=0, timestamp=timestamp)
print(f"历史消息, 重新入队: {body.decode()}")
return
# 正常消费
if properties.priority > 0:
print(f"消费新消息: {body.decode()}, timestamp: {timestamp}")
else:
print(f"消费历史消息: {body.decode()}, timestamp: {timestamp}")
time.sleep(1)
self.channel.basic_ack(delivery_tag=method.delivery_tag)
def consuming(self):
self.channel.basic_consume(queue='priority-queue',
on_message_callback=self.callback,
auto_ack=False)
self.channel.start_consuming()
if __name__ == "__main__":
c = Consumer()
c.consuming()