# RabbitMq 常用函数
rabbitmq 这里使用以下这个包:
"github.com/rabbitmq/amqp091-go"
# 连接 rabbitmq
func Dial(url string) (*Connection, error)
func InitRabbitMq() (*amqp.Connection, error) { | |
//connect to rabbit rabbitmq | |
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") | |
if err != nil { | |
log.Printf("Failed to connect to RabbitMQ") | |
return nil, err | |
} | |
return conn, nil | |
} |
调用完后关闭连接
defer conn.Close() |
# 创建管道
func (c *Connection) Channel() (Channel, error)
用于从连接(Connection)中创建一个通道(Channel)对象
ch, err := conn.Channel() | |
if err != nil { | |
log.Printf("Failed to open a channel") | |
return | |
} | |
defer ch.Close() |
在使用 RabbitMQ 时,我们通常会先创建一个连接对象,然后通过该连接对象创建一个或多个通道对象,使用通道对象来进行交互。通过通道对象,我们可以声明队列和交换机、绑定队列和交换机、发送和接收消息等操作。通常情况下,我们在使用完通道对象后需要手动将其关闭,以释放资源和避免连接泄漏。
# 声明队列
func (ch *Channel) QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args Table) (Queue, error)
func QueueDeclare(channel *amqp091.Channel) (amqp091.Queue, error) { | |
q, err := channel.QueueDeclare( | |
"name", // name | |
true, // durable | |
false, // delete when unused | |
false, // exclusive | |
false, // no-wait | |
nil, // arguments | |
) | |
return q, err | |
} |
name string
: 表示要声明的队列名称,即消息将要发送到哪个队列。在 RabbitMQ 中,队列名称是一个字符串,可以由字母、数字、下划线和短横线组成,长度不超过 255 个字符。durable bool
: 表示队列是否持久化。如果设置为 true,则表示该队列会在服务器重启后仍然存在;如果设置为 false,则表示该队列仅在当前连接有效,服务器重启后会被删除。autoDelete bool
: 表示队列是否自动删除。如果设置为 true,则表示该队列在所有消费者断开连接后会自动删除;如果设置为 false,则表示该队列会一直存在,直到被显式删除或服务器重启。exclusive bool
: 表示队列是否独占。如果设置为 true,则表示只有当前连接可以使用该队列,其他连接无法使用;如果设置为 false,则表示该队列可以被多个连接使用。noWait bool
: 表示是否需要等待服务器响应。如果设置为 true,则表示不需要等待服务器响应;如果设置为 false,则表示需要等待服务器响应。args Table
: 表示一些额外的参数。通常情况下,我们可以将该参数设置为 nil,表示不需要额外的参数。
# Quality of Service
Quality of Service
用于设置通道的 QoS(Quality of Service)参数。
func (ch *Channel) Qos(prefetchCount int, prefetchSize int, global bool) error
下面是每个参数的含义解释:
prefetchCount int
表示每个消费者最多只能获取 n 条未确认的消息。这里的 "未确认" 是指消费者还没有发送 ack 回执确认收到这条消息。prefetchSize int
0 表示每条消息的大小不做限制。如果需要限制每条消息的大小,可以设置一个正整数,表示最大的消息大小。global bool
: 表示这个 QoS 设置只对当前通道有效,而不会影响到全局范围内的所有连接。如果设置为 true,则表示这个 QoS 设置对所有连接都有效。
总之,该函数的作用是设置通道的 QoS 参数,以便控制消费者接收消息的速度和数量。通过设置 QoS 参数,我们可以控制每个消费者最多获取多少条未确认的消息,以及每条消息的大小限制等。这样可以避免因为一次性获取过多的消息导致消费者处理不过来的问题,从而保证消息系统的稳定性和可靠性。
# 消费者
func (ch *Channel) Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args Table) (<-chan Delivery, error)
msgs, err := ch.Consume( | |
"queue", // queue | |
"consumer", // consumer | |
true, // auto-ack | |
false, // exclusive | |
false, // no-local | |
false, // no-wait | |
nil, // args | |
) | |
if err != nil { | |
log.Printf("register consume err:%v\n", err) | |
return | |
} |
queue string
: 表示要消费的队列名称,即从哪个队列中接收消息。consumer string
: 表示消费者的名称,即标识当前消费者身份的字符串。如果有多个消费者同时消费同一个队列,那么每个消费者的名称应该是唯一的。autoAck bool
: 表示是否自动确认消息。如果设置为 true,则表示消费者接收到消息后会自动发送确认消息给服务器;如果设置为 false,则表示消费者需要手动确认消息。手动确认消息可以确保消息只被消费一次,避免了因为消费者异常退出而导致消息重复消费的问题,但是也会增加消息处理的复杂度。exclusive bool
: 表示是否独占队列。如果设置为 true,则表示只有当前连接可以使用该队列,其他连接无法使用;如果设置为 false,则表示该队列可以被多个连接使用。noLocal bool
: 表示是否禁止消费者收到自己发送的消息。如果设置为 true,则表示消费者不会收到自己发送的消息;如果设置为 false,则表示消费者可以收到自己发送的消息。noWait bool
: 表示是否需要等待服务器响应。如果设置为 true,则表示不需要等待服务器响应;如果设置为 false,则表示需要等待服务器响应。args Table
: 表示一些额外的参数。通常情况下,我们可以将该参数设置为 nil,表示不需要额外的参数。
总之,该函数的作用是从名为 "queue" 的队列中消费消息,并根据参数的不同来控制消费者的身份、消息确认方式、队列独占等属性。 消费者可以通过通道对象(Channel)来接收消息,每次接收到一条消息后,将会生成一个消息对象,我们可以从该对象中获取消息的内容和属性。消息的处理通常需要放在一个无限循环中,消费者会一直等待新消息的到来。
# 消息确认
func (d Delivery) Ack(multiple bool) error
用于向 RabbitMQ 服务器确认消费者已经成功处理并接收到了某条消息。
# 自动确认
消费者在接收到消息后,如果在自动确认的情况下,由于网络问题、程序错误等原因没有正确地处理该消息,那么消息就会丢失。因为 RabbitMQ 服务器会认为该消息已经被正确消费,将其标记为已经确认,从而将该消息从队列中删除,如果消费者没有正确地处理该消息,那么该消息就会永久丢失。
在高并发的情况下,消费者在接收到消息后,如果在自动确认的情况下,由于消费者的处理能力不足,不能够及时地处理完所有的消息,那么就会出现消息堆积的情况。当消息堆积到一定程度时,就会导致消费者宕机或者出现性能问题,从而导致消息消费的延迟和丢失。
# 手动确认
consume
函数中设置autoAck bool
为false
Ack
表示确认消费者已经成功处理并接收到了某条消息。multiple bool
: 表示是否批量确认。如果设置为 true,则表示将当前 Delivery 之前所有未确认的消息一起确认,否则只确认当前 Delivery 消息。
# 发送消息
func (ch *Channel) PublishWithContext(ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg Publishing) error
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
defer cancel() | |
body, err := json.Marshal(&user) | |
if err != nil { | |
log.Printf("encoding error:%v\n", err) | |
return | |
} | |
err = chan.PublishWithContext( | |
ctx, | |
"", // exchange | |
q.Name, // routing key | |
false, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
ContentType: "application/json", | |
Body: body, | |
}) | |
if err != nil { | |
log.Printf(" err:%v\n", err) | |
return | |
} |
context.Context
: 上下文对象,用于控制消息发送的超时时间和取消机制。在该代码中,我们使用了一个带有超时时间的上下文对象 ctx 来控制消息发送的超时时间为 5 秒,如果在 5 秒内没有成功发送消息,则会自动取消消息发送。exchange string
: 表示不需要指定交换机,即直接将消息发送到队列中。key string
: 表示消息的路由键,即消息要发送到哪个队列中。mandatory bool
: 表示是否强制要求消息必须发送到队列中。如果设置为 true,则表示如果没有找到匹配的队列,则会返回一个错误;如果设置为 false,则表示如果没有找到匹配的队列,则会将消息丢弃。immediate bool
: 表示是否立即发送消息。如果设置为 true,则表示如果没有找到匹配的消费者,则会将消息立即返回;如果设置为 false,则表示如果没有找到匹配的消费者,则会将消息保存在队列中,等待消费者连接后再发送。msg Publishing
: 表示要发送的消息对象,该对象包含了消息的各种属性和内容。在该代码中,我们设置了消息的类型(ContentType)和消息内容(Body)等属性,以便消费者在接收到消息后能够正确地处理它。
总之,该函数的作用是向名为 chan
的通道中发送一条消息,并根据参数的不同来控制消息发送的目标队列、消息内容和属性等。发送者可以通过该函数将消息发送到队列中,然后由消费者来接收和处理。其中, ctx
参数用于控制发送消息的超时时间和取消机制,可以避免因为网络等原因导致的长时间等待和阻塞。
# msg Publishing
msg Publishing
需要传入 amqp.Publishing{}
结构体类型,用于表示 AMQP 协议中的消息。在使用 AMQP 客户端发送消息时,我们可以通过 amqp.Publishing{}
来设置消息的各种属性和内容,例如:
ContentType
: 表示消息的内容类型,例如 "text/plain" 、 "application/json" 等。Body
: 表示消息的内容,通常是一个字节数组。Headers
: 表示消息的头部信息,可以是一个键值对的映射。DeliveryMode
: 表示消息的传输模式,例如 Persistent 模式表示持久化消息, Transient 模式表示非持久化消息。Priority
: 表示消息的优先级,用于控制消息的发送顺序。CorrelationId
: 表示消息的关联 ID,用于关联一组相关的消息。ReplyTo
: 表示消息的回复队列,用于指定回复消息的目标队列。Expiration
: 表示消息的有效期,用于控制消息的过期时间。MessageId
: 表示消息的唯一 ID,用于识别一条特定的消息。Timestamp
: 表示消息的时间戳,用于记录消息的创建时间。Type
: 表示消息的类型,用于指定不同类型的消息。UserId
: 表示消息的用户 ID,用于指定发送消息的用户身份。AppId
: 表示消息的应用程序 ID,用于指定发送消息的应用程序身份。
总之, amqp.Publishing{}
可以用于设置消息的各种属性和内容,以便消费者在接收到消息后能够正确地处理它。一般来说,我们至少需要设置 ContentType
和 Body
属性,以确保消息的内容得到正确传递。其他的属性则可以根据具体的需求来设置。