<?php
namespace app\controller;

use app\BaseController;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\exception\HttpException;

class Rabbit extends BaseController
{
    protected $config = [];
    protected $channel, $connection;
    const EXCHANGE = 'human_exchange';
    const DELAYED_EXCHANGE = 'human_delayed_exchange';
    const QUEUE = 'human_queue';
    const ROUTING_KEY = 'human';
    const CONSUMER_TAG = 'client';

    public function initialize()
    {
        $config = require __DIR__ . '/../../config/queue.php';

        if (isset($config['RabbitMQ'])) {
            $this->config = $config['RabbitMQ'];
        }

        if (empty($this->config)) {
            throw new HttpException(500, 'Message queue config not exist: RabbitMQ');
        }

        parent::initialize();
    }

    /**
     * 创建RabbitMQ连接和通道
     * @return mixed
     */
    protected function createExchangeAndQueue()
    {
        // 创建连接,指定IP、端口、账号、密码、虚拟主机
        $connection = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['password'], $this->config['vhost']);
        $channel = $connection->channel();

        /**
         * 创建交换机(如果有在后台创建,则无需这一步,下同)
         * 参数一:指定交换机名称
         * 参数二:指定交换机种类(direct:精准推送,fanout:广播推送,topic:组播推送,路由规则可以通配,headers:跟direct一样,性能很差,故很少用了)
         * 参数三:是否检测同名交换机
         * 参数四:是否开启交换机持久化
         * 参数五:通道关闭后是否删除交换机
         */
        $channel->exchange_declare(self::EXCHANGE, AMQPExchangeType::DIRECT, false, true, false);
//        $channel->exchange_declare(self::EXCHANGE, 'x-delayed-message', false, true, false, false, false, ['x-delayed-type' => 'direct']);
        // 创建队列,参数除了类型,其他跟创建交换机一致
        list($queue, $messageCount, $consumerCount) = $channel->queue_declare(self::QUEUE, true, true, false, false);
        // 通道绑定交换机跟队列,最后指定路由规则
        $channel->queue_bind(self::QUEUE, self::EXCHANGE, self::ROUTING_KEY);

        $this->connection = $connection;
        $this->channel = $channel;

        // 返回队列中的未消费的消息数量
        return $messageCount;
    }

    /**
     * 创建RabbitMQ连接和通道(延时队列)
     */
    protected function createDelayedExchangeAndQueue()
    {
        // 创建连接,指定IP、端口、账号、密码、虚拟主机
        $connection = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['password'], $this->config['vhost']);
        $channel = $connection->channel();

        /**
         * 创建交换机(如果有在后台创建,则无需这一步,下同)
         * 参数一:指定交换机名称
         * 参数二:指定交换机种类(x-delayed-message:指定为延时交换机,需给RabbitMQ装rabbitmq_delayed_message_exchange插件来支持)
         * 参数三:是否检测同名交换机
         * 参数四:是否开启交换机持久化
         * 参数五:通道关闭后是否删除交换机
         * 最后参数中设置x-delayed-type交换机原型为direct
         */
        $channel->exchange_declare(self::DELAYED_EXCHANGE, 'x-delayed-message', false, true, false, false, false, ['x-delayed-type' => ['S', 'direct']]);
        // 创建队列,参数除了类型,其他跟创建交换机一致
        $channel->queue_declare(self::QUEUE, false, true, false, false);
        // 通道绑定交换机跟队列,最后指定路由规则
        $channel->queue_bind(self::QUEUE, self::DELAYED_EXCHANGE, self::ROUTING_KEY);

        $this->connection = $connection;
        $this->channel = $channel;
    }

    /**
     * 生产消息,并放入队列
     * @return string
     */
    public function pushMessage()
    {
        $this->createExchangeAndQueue();

        if (!isset($this->connection) || !isset($this->channel)) {
            throw new HttpException(500, 'RabbitMQ connection or channel not exist!');
        }

        $msgBody = json_encode(['name' => 'Neo', 'age' => 30, 'sex' => 'male']);
        // 创建AMQP消息并放入指定的数据,注意第二个参数的delivery_mode为设置该消息持久化
        $message = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        // 在通道中放入消息,并指定交换机跟路由规则
        $this->channel->basic_publish($message, self::EXCHANGE, self::ROUTING_KEY);

        $this->channel->close();
        $this->connection->close();
        return '消息已放入队列';
    }

    /**
     * 生产消息,并延时放入队列
     * @return string
     */
    public function pushDelayedMessage()
    {
        $this->createDelayedExchangeAndQueue();

        if (!isset($this->connection) || !isset($this->channel)) {
            throw new HttpException(500, 'RabbitMQ connection or channel not exist!');
        }

        $msgBody = json_encode(['name' => 'Delay', 'age' => 5, 'sex' => 'male']);
        // 创建AMQP消息并放入指定的数据,注意第二个参数的delivery_mode为设置该消息持久化,最后一个header则为设置5秒延时时间
        $message = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'application_headers' => new AMQPTable(['x-delay' => 5000])]);
        // 在通道中放入消息,并指定交换机跟路由规则
        $this->channel->basic_publish($message, self::DELAYED_EXCHANGE, self::ROUTING_KEY);

        $this->channel->close();
        $this->connection->close();
        return '消息已延时放入队列';
    }

    /**
     * 消费消息,并显示消息的结构
     */
    public function consumeMessage()
    {
        $messageCount = $this->createExchangeAndQueue();

        if (!isset($this->connection) || !isset($this->channel)) {
            throw new HttpException(500, 'RabbitMQ connection or channel not exist!');
        }

        /**
         * 默认情况下,队列会把消息公平的分配给各个消费者
         * 如果某个消费者脚本处理完成分配给他的消息任务后,会一直空闲
         * 另外一个消费者脚本处理的消息都非常耗时,这就容易导致消费者脚本得不到合理利用,
         * 加入此句话,是告诉队列,取消把消息公平分配到各个脚本,而是那个脚本空闲,就交给它一个消息任务
         * 这样,合理利用到每一个空闲的消费者脚本
         */
        $this->channel->basic_qos(null, 1, null);

        /**
         * 从队列中读取数据
         * 第四个参数 no_ack = false 时,表示进行ack应答,确保消息已经处理
         * 最后一个参数则是回调函数,传入消息
         */
        $this->channel->basic_consume(self::QUEUE, self::CONSUMER_TAG, false, false, false, false, function ($message) {
            file_put_contents('message.txt', $message->body, FILE_APPEND);

            // 处理完后通知队列可以删除消息了,如果no_ack = false时缺少这句,则队列不会删除已处理完的消息,当脚本挂掉时,会把分配给当前队列的所有消息再次重新分配给其他队列,会导致消息会重复处理,内存占用越来越高
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        });

        // 监听消息,一有消息,立马就处理
        /*while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }*/

        // 官方原代码中while监听会导致死循环,故处理之
        for ($i = 1; $i <= $messageCount; $i++) {
            if ($this->channel->is_consuming()) {
                $this->channel->wait();
            }
        }

        $this->channel->close();
        $this->connection->close();
    }
}