消息队列之RabbitMQ(二) Laravel中使用rabbitmq
简介
博主在之前的文章大致介绍了下 rabbitmq
的原理、应用场景。接下来讲下如何在 laravel5.5
中去使用。
安装rabbitmq
在本地进行测试的时候,使用 docker
去快速安装并检验预期结果是非常有效且合理的。因此本次将使用 docker
去构建 rabbitmq
容器。
docker run -d --hostname rabbit --name rabbit -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq:3.8.3-management
大致介绍下上面参数
--hostname
设置该节点默认主机名-- name
设置容器名-p
指定端口映射。5672:5672
意思是访问宿主机的5672端口的请求会被转发至容器的5672端口。在rabbitmq
中,5672
端口是程序用于访问rabbitmq
的接口、15672
则是rabbitmq
自带的可视化UI管理界面的访问端口,可以通过在浏览器访问localhost:15672
访问管理控制台、25672
端口则被用于rabbitmq
集群之间通讯使用-v
则是挂载目录或者文件,可以将宿主机的指定目录或者文件映射到容器里,这样宿主机和容器对该目录的操作会同步,防止了数据会因为容器的原因导致丢失。
准备工作
php安装amqp拓展
上文说到,rabbitmq
是基于amqp协议的,因此在laravel
中也是通过amqp协议去和rabbitmq
程序进行通讯的,因此php需要安装amqp拓展。
sudo apt-get update
sudo apt-get install php-amqp
Laravel中配置rabbitmq
博主在laravel
中是使用 laravel-queue-rabbitmq
包来实现和rabbitmq
程序进行通讯的。
安装与配置
因为博主使用的是laravel5.5版本,所以laravel-queue-rabbitmq
的版本是6.0
- 安装
composer require vladimir-yuldashev/laravel-queue-rabbitmq=6.0
- 在
config/app.php
文件中的providers
中加入
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
- 在
app/config/queue.php
中的connections
数组加入下面配置信息
'rabbitmq' => [
'driver' => 'rabbitmq',
'dsn' => env('RABBITMQ_DSN', null),
/*
* Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
* - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext
* - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib
* - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny
*/
'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class,
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'queue' => env('RABBITMQ_QUEUE', 'default'),
'options' => [
'exchange' => [
'name' => env('RABBITMQ_EXCHANGE_NAME'),
/*
* Determine if exchange should be created if it does not exist.
*/
'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
/*
* Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
*/
'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
],
'queue' => [
/*
* Determine if queue should be created if it does not exist.
*/
'declare' => env('RABBITMQ_QUEUE_DECLARE', true),
/*
* Determine if queue should be binded to the exchange created.
*/
'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),
/*
* Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
*/
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
],
],
/*
* Determine the number of seconds to sleep if there's an error communicating with rabbitmq
* If set to false, it'll throw an exception rather than doing the sleep for X seconds.
*/
'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', false),
/*
* Optional SSL params if an SSL connection is used
* Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html
*/
'ssl_params' => [
'ssl_on' => env('RABBITMQ_SSL', false),
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
],
],
- 在项目根目录的
.env
文件配置如下环境变量
RABBITMQ_HOST=172.26.0.8 # 我是因为没有将rabbitmq容器加入lnmp的networks中,所以使用的是ip
RABBITMQ_PORT=5672 # 和rabbitmq通讯的端口
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=guest # 账号
RABBITMQ_PASSWORD=guest # 密码
RABBITMQ_QUEUE=test_queue # 队列名称。如果你没有它会默认创建 Exchanges和Queue
- 创建任务类
Job
php artisan make:job Queue
执行之后在app/Jobs/
目录下生成Queue.php
文件,并写入对应逻辑
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class Queue implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $uid;
protected $username;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($uid, $username)
{
$this->uid = $uid;
$this->username = $username;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
echo '用户id ==='.$this->uid.'。用户名 ==='.$this->username;
}
}
- 在控制器里派发任务,生产消息至队列中
<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use Illuminate\Foundation\Bus\DispatchesJobs;
use Illuminate\Routing\Controller as BaseController;
use Illuminate\Foundation\Validation\ValidatesRequests;
use Illuminate\Foundation\Auth\Access\AuthorizesRequests;
use App\Jobs\Queue;
class Controller extends BaseController
{
use AuthorizesRequests, DispatchesJobs, ValidatesRequests;
public function index(Request $request)
{
$users = [
['uid' => 1, 'username' => '小明'],
['uid' => 2, 'username' => '小红'],
['uid' => 3, 'username' => '悟空'],
['uid' => 4, 'username' => '贝吉塔'],
['uid' => 5, 'username' => '盖伦'],
['uid' => 6, 'username' => '蛮王'],
['uid' => 7, 'username' => '赵信'],
];
foreach ($users as $index => $user){
$queue = new Queue($user['uid'],$user['username']);
$this->dispatch($queue->onQueue('test_queue'));
}
return response("OK!",200);
}
}
我们是可以在管理控制台看到Queue里被写入了7条message
- 开启队列并消费它
php artisan queue:work rabbitmq --queue=test_queue --tries=3
可以看到之前生产的7条数据已经被消费了
其他的信息我们可以去管理控制台里查看。