前言

rocketmq是阿里巴巴开源的分布式消息中间件,本文将介绍如何在laravel中使用rocketmq。

环境准备

安装rocketmq

直接使用composer安装

1
composer require aliyunmq/mq-http-sdk

添加composer.json

1
2
3
4
5
{
"require": {
"aliyunmq/mq-http-sdk": ">=1.0.4"
}
}

执行composer install

1
2
# -vvv 可以查看详细的安装过程
composer install -vvv

RocketMQ的使用

RocketMqHandler

用来生成rocketmq的生产者和消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
<?php

namespace App\Handlers;

use MQ\MQClient;

/**
* @property RocketMqHandler $_instance
* @property MQClient $client
* @property \MQ\MQProducer $producer
* @property \MQ\MQConsumer $consumer
* @property \MQ\MQTransProducer $transactionProducer
*/
class RocketMqHandler
{
private static RocketMqHandler $_instance;

/**
* 构造函数
*/
public function __construct()
{
$this->client = new MQClient(
config('aliyun.mq.endpoint'),
config('aliyun.mq.access_id'),
config('aliyun.mq.access_key')
);
}

/**
* 获取实例对象
*
* @return RocketMqHandler
*/
public static function getInstance(): RocketMqHandler
{
if (!isset(self::$_instance)) {
self::$_instance = new self();
}
return self::$_instance;
}

/**
* 生成生产者对象
*
* @param string $instanceId 实例ID
* @param string $topicName 主题名称
* @return \MQ\MQProducer 生产者对象
*/
public static function producer($instanceId, $topicName): \MQ\MQProducer
{
$handler = self::getInstance();
return $handler->client->getProducer($instanceId, $topicName);
}

/**
* 消费者方法
* @param int $instanceId 实例ID
* @param string $topicName 主题名称
* @param string $groupId 分组ID
* @return \MQ\MQConsumer
*/
public static function consumer($instanceId, $topicName, $groupId): \MQ\MQConsumer
{
$handler = self::getInstance();
return $handler->client->getConsumer($instanceId, $topicName,$groupId);
}

/**
* 事务生产者方法
* @param int $instanceId 实例ID
* @param string $topicName 主题名称
* @param string $groupId 分组ID
* @return \MQ\MQTransProducer
*/
public static function transactionProducer($instanceId, $topicName, $groupId): \MQ\MQTransProducer
{
$handler = self::getInstance();
return $handler->client->getTransProducer($instanceId, $topicName,$groupId);
}

}

RocketMqProducer

发布

1
2
3
4
5
6
7
8
9
10
11
12
13
<?php
// 创建 TopicMessage
$publishMessage = new TopicMessage($data);
// 设置属性
$publishMessage->putProperty("a", $i);
// 设置消息KEY 默认 request_id
$publishMessage->setMessageKey("MessageKey");
// 设置消息的定时投递,单位毫秒,默认立即投递
$publishMessage->setStartDeliverTime(Carbon::now()->addMinute()->getTimestampMs());
// 发送
$result = RocketMqHandler::producer(config('aliyun.mq.instance_id'), config('aliyun.mq.topic'))->publishMessage($publishMessage);
// 打印消息ID和消息体MD5值
print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";

RocketMqConsumer

订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 创建消费实例 所属的 Topic 控制台创建的 Consumer ID(Group ID) Topic所属实例ID,默认实例为空NULL
$consumer = RocketMqHandler::consumer(config('aliyun.mq.instance_id'), config('aliyun.mq.topic'), config('aliyun.mq.group_id'));
while (true) {
try {
// 长轮询消费消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
$messages = $consumer->consumeMessage(
3, // 一次最多消费3条(最多可设置为16条)
3 // 长轮询时间3秒(最多可设置为30秒)
);
} catch (\Exception $exception) {
// 没有消息
if ($exception instanceof MessageNotExistException) {
$this->line(Carbon::now()->toDateTimeString() . ' 没有消息');
continue;
}
print_r($e->getMessage() . "\n");
// 休息3s
sleep(3);
continue;
}
// 消费消息
// 处理业务逻辑
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// $message->getNextConsumeTime()前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
print_r($receiptHandles);
try {
$consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof AckMessageException) {
// 某些消息的句柄可能超时了会导致确认不成功
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
}