RabbitMQ 基本使用
介绍

RabbitMQ是一个消息代理,事实上,它接收生产者产生的消息,然后将消息传递给消费者。在这个过程中,它可以路由,可以缓冲,或者更具你设定的规则来将消息持久化。

RabbitMQ和消息传输过程中一般会用一些术语:

  • 生产者

意思无非是指发送消息的那一端,如果一个程序发送消息,那么它就将被称为生产者,这里用大写的P来表示。

  • 队列 (RabbitMQ)

相当于邮箱的名字,它活动在RabbitMQ服务器里边。虽然消息流会通过RabbitMQ和你的应用程序,但是只会被存储在队列中。队列是不受任何限制的,它可以尽可能多的去存储你需要存储的消息(本质上来说它是个无限缓冲)。可以多个生产者向同一个消息队列发送消息,也可以多个消费者同时从一个消息队列中来接收消息。

  • 消费者

即接收端,消费者主要是等待接收消息的程序。


Ubuntu 使用 RabbitMQ

# 安装
apt install rebbitmq-server

# 启动服务,端口 5672
service rabbitmq-server start

# 开启管理界面 端口 15672
rabbitmq-plugins enable rabbitmq_management

# 查看用户
rabbitmqctl list_users

# 新增用户
rabbitmqctl add_user username password

# 给予管理员权限
rabbitmqctl set_user_tags username administrator

# 设置用户权限
rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP

# stefanlei 具有 / 这个 virtual host 中所有资源的 配置 读 写 权限。
rabbitmqctl set_permissions -p / stefanlei '.*' '.*' '.*'

发布订阅系统

生产者
import pika

# 用户名密码
username = 'user'
pwd = 'pass'

user_pwd = pika.PlainCredentials(username, pwd)

# 创建连接
s_conn = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', credentials=user_pwd))
chan = s_conn.channel()

# 声明一个队列 生产者和消费者都要声明一个相同的队列
chan.queue_declare(queue='hello')

# 发布消息, exchange 是交换机,消息会发给它,由他决定发给哪个队列
# routing_key 消息发给哪个队列(由exchange发)
# body 消息体
chan.basic_publish(exchange='', routing_key='hello', body='hello world')

# 当生产者发送完消息后,可选择关闭连接
s_conn.close()
消费者
import pika

credentials = pika.PlainCredentials('user', 'pass')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', credentials))
channel = connection.channel()

# 声明消息队列,消息将在这个队列中进行传递。如果队列不存在,则创建
channel.queue_declare(queue='hello')

# 定义一个回调函数来处理,这边的回调函数就是将信息打印出来。
def callback(ch, method, properties, body):
   print(body)

# 告诉 RabbitMQ 使用callback来接收信息
# auto_ack=True 表示在回调函数中是否自动发送发送确认标识
# 这个函数的参数位置,有可能会变,请注意。
channel.basic_consume('hello', callback, auto_ack=False)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
# 这里会一直阻塞
channel.start_consuming()

 

最新评论
2019-05-09 21:26:36 高名扬
不错不错

登录后评论