消息队列之RabbitMQ(二) Laravel中使用rabbitmq


消息队列之RabbitMQ(二) Laravel中使用rabbitmq

简介

博主在之前的文章大致介绍了下 rabbitmq 的原理、应用场景。接下来讲下如何在 laravel5.5 中去使用。

安装rabbitmq

在本地进行测试的时候,使用 docker 去快速安装并检验预期结果是非常有效且合理的。因此本次将使用 docker 去构建 rabbitmq容器。

docker run -d --hostname rabbit --name rabbit -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq:3.8.3-management

大致介绍下上面参数

  • --hostname 设置该节点默认主机名
  • -- name 设置容器名
  • -p 指定端口映射。5672:5672意思是访问宿主机的5672端口的请求会被转发至容器的5672端口。在rabbitmq中,5672端口是程序用于访问rabbitmq的接口、15672则是rabbitmq自带的可视化UI管理界面的访问端口,可以通过在浏览器访问localhost:15672访问管理控制台、25672端口则被用于rabbitmq集群之间通讯使用
  • -v 则是挂载目录或者文件,可以将宿主机的指定目录或者文件映射到容器里,这样宿主机和容器对该目录的操作会同步,防止了数据会因为容器的原因导致丢失。

准备工作

php安装amqp拓展

上文说到,rabbitmq是基于amqp协议的,因此在laravel中也是通过amqp协议去和rabbitmq程序进行通讯的,因此php需要安装amqp拓展。

sudo apt-get update
sudo apt-get install php-amqp

Laravel中配置rabbitmq

博主在laravel中是使用 laravel-queue-rabbitmq包来实现和rabbitmq程序进行通讯的。

安装与配置

因为博主使用的是laravel5.5版本,所以laravel-queue-rabbitmq的版本是6.0

  1. 安装
composer require vladimir-yuldashev/laravel-queue-rabbitmq=6.0
  1. config/app.php文件中的providers中加入
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
  1. app/config/queue.php中的connections数组加入下面配置信息
'rabbitmq' => [

            'driver' => 'rabbitmq',

            'dsn' => env('RABBITMQ_DSN', null),

            /*
             * Could be one a class that implements \Interop\Amqp\AmqpConnectionFactory for example:
             *  - \EnqueueAmqpExt\AmqpConnectionFactory if you install enqueue/amqp-ext
             *  - \EnqueueAmqpLib\AmqpConnectionFactory if you install enqueue/amqp-lib
             *  - \EnqueueAmqpBunny\AmqpConnectionFactory if you install enqueue/amqp-bunny
             */

            'factory_class' => Enqueue\AmqpLib\AmqpConnectionFactory::class,

            'host' => env('RABBITMQ_HOST', '127.0.0.1'),
            'port' => env('RABBITMQ_PORT', 5672),

            'vhost' => env('RABBITMQ_VHOST', '/'),
            'login' => env('RABBITMQ_LOGIN', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),

            'queue' => env('RABBITMQ_QUEUE', 'default'),

            'options' => [

                'exchange' => [

                    'name' => env('RABBITMQ_EXCHANGE_NAME'),

                    /*
                     * Determine if exchange should be created if it does not exist.
                     */

                    'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),

                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */

                    'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
                    'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                    'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
                    'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
                ],

                'queue' => [

                    /*
                     * Determine if queue should be created if it does not exist.
                     */

                    'declare' => env('RABBITMQ_QUEUE_DECLARE', true),

                    /*
                     * Determine if queue should be binded to the exchange created.
                     */

                    'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),

                    /*
                     * Read more about possible values at https://www.rabbitmq.com/tutorials/amqp-concepts.html
                     */

                    'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
                    'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
                    'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                    'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
                    'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
                ],
            ],

            /*
             * Determine the number of seconds to sleep if there's an error communicating with rabbitmq
             * If set to false, it'll throw an exception rather than doing the sleep for X seconds.
             */

            'sleep_on_error' => env('RABBITMQ_ERROR_SLEEP', false),

            /*
             * Optional SSL params if an SSL connection is used
             * Using an SSL connection will also require to configure your RabbitMQ to enable SSL. More details can be founds here: https://www.rabbitmq.com/ssl.html
             */

            'ssl_params' => [
                'ssl_on' => env('RABBITMQ_SSL', false),
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],

        ],
  1. 在项目根目录的.env文件配置如下环境变量
RABBITMQ_HOST=172.26.0.8 # 我是因为没有将rabbitmq容器加入lnmp的networks中,所以使用的是ip
RABBITMQ_PORT=5672       # 和rabbitmq通讯的端口
RABBITMQ_VHOST=/
RABBITMQ_LOGIN=guest # 账号
RABBITMQ_PASSWORD=guest # 密码
RABBITMQ_QUEUE=test_queue # 队列名称。如果你没有它会默认创建 Exchanges和Queue
  1. 创建任务类Job
php artisan make:job Queue

执行之后在app/Jobs/目录下生成Queue.php文件,并写入对应逻辑

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class Queue implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $uid;
    protected $username;
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($uid, $username)
    {
        $this->uid = $uid;
        $this->username = $username;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        echo '用户id ==='.$this->uid.'。用户名 ==='.$this->username;

    }
}
  1. 在控制器里派发任务,生产消息至队列中
<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Foundation\Bus\DispatchesJobs;
use Illuminate\Routing\Controller as BaseController;
use Illuminate\Foundation\Validation\ValidatesRequests;
use Illuminate\Foundation\Auth\Access\AuthorizesRequests;
use App\Jobs\Queue;

class Controller extends BaseController
{
    use AuthorizesRequests, DispatchesJobs, ValidatesRequests;

    public function index(Request $request)
    {
        $users = [
            ['uid' => 1, 'username' => '小明'],
            ['uid' => 2, 'username' => '小红'],
            ['uid' => 3, 'username' => '悟空'],
            ['uid' => 4, 'username' => '贝吉塔'],
            ['uid' => 5, 'username' => '盖伦'],
            ['uid' => 6, 'username' => '蛮王'],
            ['uid' => 7, 'username' => '赵信'],
        ];

        foreach ($users as $index => $user){
            $queue = new Queue($user['uid'],$user['username']);
            $this->dispatch($queue->onQueue('test_queue'));
        }

        return response("OK!",200);
    }
}

我们是可以在管理控制台看到Queue里被写入了7条message

message

  1. 开启队列并消费它
php artisan queue:work rabbitmq --queue=test_queue --tries=3

可以看到之前生产的7条数据已经被消费了

message2

其他的信息我们可以去管理控制台里查看。


文章作者: 我若为侠
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 我若为侠 !
 上一篇
php8的新特性(一) 主要新特性 php8的新特性(一) 主要新特性
php8的主要新特性命名参数定义声明$中的参数名称不带符号将是参数名称,或函数形参的名字取$后变为实参的命名函数 格式:name:"nickname" 或 name:$name 示例function info($nickname, $a
下一篇 
MySQL中联合查询的一些优化感悟 MySQL中联合查询的一些优化感悟
MySQL中联合查询的一些优化感悟背景今天博主在left join的SQL语句优化中发现了一个奇怪的现象,随后在网上查阅了相关资料,现在复盘并记录下来。 描述现在有 data_users 以及 data_posts 两张表。 data_us
  目录