前言
rocketmq是阿里巴巴开源的分布式消息中间件,本文将介绍如何在laravel中使用rocketmq。
环境准备
安装rocketmq
直接使用composer安装
1
| composer require aliyunmq/mq-http-sdk
|
添加composer.json
1 2 3 4 5
| { "require": { "aliyunmq/mq-http-sdk": ">=1.0.4" } }
|
执行composer install
1 2
| # -vvv 可以查看详细的安装过程 composer install -vvv
|
RocketMQ的使用
RocketMqHandler
用来生成rocketmq的生产者和消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| <?php
namespace App\Handlers;
use MQ\MQClient;
class RocketMqHandler { private static RocketMqHandler $_instance;
public function __construct() { $this->client = new MQClient( config('aliyun.mq.endpoint'), config('aliyun.mq.access_id'), config('aliyun.mq.access_key') ); }
public static function getInstance(): RocketMqHandler { if (!isset(self::$_instance)) { self::$_instance = new self(); } return self::$_instance; }
public static function producer($instanceId, $topicName): \MQ\MQProducer { $handler = self::getInstance(); return $handler->client->getProducer($instanceId, $topicName); }
public static function consumer($instanceId, $topicName, $groupId): \MQ\MQConsumer { $handler = self::getInstance(); return $handler->client->getConsumer($instanceId, $topicName,$groupId); }
public static function transactionProducer($instanceId, $topicName, $groupId): \MQ\MQTransProducer { $handler = self::getInstance(); return $handler->client->getTransProducer($instanceId, $topicName,$groupId); }
}
|
RocketMqProducer
发布
1 2 3 4 5 6 7 8 9 10 11 12 13
| <?php
$publishMessage = new TopicMessage($data);
$publishMessage->putProperty("a", $i);
$publishMessage->setMessageKey("MessageKey");
$publishMessage->setStartDeliverTime(Carbon::now()->addMinute()->getTimestampMs());
$result = RocketMqHandler::producer(config('aliyun.mq.instance_id'), config('aliyun.mq.topic'))->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
|
RocketMqConsumer
订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| $consumer = RocketMqHandler::consumer(config('aliyun.mq.instance_id'), config('aliyun.mq.topic'), config('aliyun.mq.group_id')); while (true) { try { $messages = $consumer->consumeMessage( 3, // 一次最多消费3条(最多可设置为16条) 3 ); } catch (\Exception $exception) { if ($exception instanceof MessageNotExistException) { $this->line(Carbon::now()->toDateTimeString() . ' 没有消息'); continue; } print_r($e->getMessage() . "\n"); sleep(3); continue; } $receiptHandles = array(); foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n", $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(), $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(), $message->getMessageKey()); print_r($message->getProperties()); } print_r($receiptHandles); try { $consumer->ackMessage($receiptHandles); } catch (\Exception $e) { if ($e instanceof AckMessageException) { printf("Ack Error, RequestId:%s\n", $e->getRequestId()); foreach ($e->getAckMessageErrorItems() as $errorItem) { printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode()); } } } }
|