Reading list Switch to dark mode

    How to configure and use RabbitMQ in Magento 2

    Updated 16 February 2024

    RabbitMQ is an important part of the Magento 2 architecture, and it provides a scalable and reliable messaging system that enables the system to handle complex tasks and high volumes of traffic.

    If you want to further learn more about what is  RabbitMQ you can check it out here.

    Step 1# For configure RabbitMQ in Magento to you need to add RabbitMQ details in Magento env.php file

    env.php file location in magento :  magento-root-dir/app/etc/env.php

        'queue' =>  [
            'amqp' =>  [
                'host' => '34.222.345.76', //host of RabbitMQ
                'port' => '5672', //Port on which RabbitMQ running. 5672 is default port
                'user' => 'admin', // RabbitMQ user name
                'password' => 'xxxxxxxxxxxxx', //RabbitMQ password
                'virtualhost' => '/', //The virtual host for connecting to RabbitMQ. The default is /.
                'ssl' => '',
            ],
        ],

    Now after configuration we will use RabbitMQ in our Custom Module

    For this we need to create following four files in our module etc folder

    Searching for an experienced
    Magento Company ?
    Find out More

    location of etc folder in your module : magento-root/app/code/NameSpace/ModuleName/etc

    Step 2# 1st we will create communication.xml in magento-root/app/code/NameSpace/ModuleName/etc folder

    This file Contains a list of topics. Which are Intended to contain message queue information shared between implementations.

    <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd">
        <!-- you can name topic according to self -->
        <topic name="yourtopibname.topic" request="string" />
    </config>

    Step 3# create queue_topology.xml in magento-root/app/code/NameSpace/ModuleName/etc folder

    This file define the message routing rules and declares queues and exchanges.

    <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_topology.xsd">
        <!-- name : A unique ID for the exchange  -->
        <!-- type : Specifies the type of exchange. Must be topic -->
        <!-- connection: For AMQP connections, a string that identifies the connection. For MySQL connections, the connection name must be db -->
        <exchange name="magento" type="topic" connection="amqp">
            <!-- id: A unique ID for this binding -->
            <!-- topic: The name of a topic -->
            <!-- destinationType: Must be queue -->
            <!-- destination: Identifies the name of a queue -->
            <binding id="uniqueIdBinding" topic="yourtopibname.topic" destinationType="queue" destination="yourQueue"/>
        </exchange>
    </config>

    Step 4# create queue_consumer.xml in magento-root/app/code/NameSpace/ModuleName/etc folder

    This file define the relationship between an existing queue and its consumer.

    <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_consumer.xsd">
        <!-- name: The name of the consumer -->
        <!-- queue: Defines the queue name to send the message to -->
        <!-- connection: For AMQP connections, the connection name must match the connection attribute in the queue_topology.xml file. Otherwise, the connection name must be db -->
        <!-- consumerInstance: The Magento class name that consumes the message -->
        <consumer name="consumerName" queue="yourQueue" connection="amqp" consumerInstance="NameSpace\ModuleName\Model\MassConsumer"/>
    </config>

    Step 5# create queue_publisher.xml in magento-root/app/code/NameSpace/ModuleName/etc folder

    This file defines which connection and exchange to use to publish messages for a specific topic.

    <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_publisher.xsd">
        <!-- topic: Name of topic-->
        <publisher topic="yourtopibname.topic">
            <!-- name: For AMQP connections, the connection name must match the connection attribute in the queue_topology.xml file. Otherwise, the connection name must be db -->
            <!-- exchange: The name of the exchange to publish to. The default system exchange name is magento -->
            <connection name="amqp" exchange="magento" /> <!-- Advanced Message Queuing Protocol  -->
        </publisher>
    </config>

    Now i will login in RabbitMQ panel and check queue listRabbitMq -1

    There are no Queues in panel now i will install our module in magento

    php bin/magento setup:upgrade

    After installation we got our queue in RabbitMQ panel as follow

    RabbitMq -2

    Our queue “yourQueue” added in RabbitMQ now we will write code for publish message in this our queue

    Step 6# Here  i will create a controller from which i will add message in our Queue

    create file : NameSpace\ModuleName\Controller\Adminhtml\PublishMessage\InRabbitMQ.php

    <?php
    
    namespace NameSpace\ModuleName\Controller\Adminhtml\PublishMessage;
    
    use Magento\Backend\App\Action\Context;
    use Magento\Framework\Controller\Result\JsonFactory;
    use Magento\Framework\Json\Helper\Data as JsonHelper;
    use NameSpace\ModuleName\Logger\Logger;
    
    class InRabbitMQ extends \Magento\Backend\App\Action
    {
        /**
         * @var \Magento\Framework\Controller\Result\JsonFactory
         */
        private $resultJsonFactory;
    
        /**
         * @var \Magento\Framework\Json\Helper\Data
         */
        private $jsonHelper;
    
        /**
         * @var \Magento\Framework\MessageQueue\PublisherInterface
         */
        private $publisher;
    
        /**
         * @var \NameSpace\ModuleName\Logger\Logger
         */
        private $logger;
    
        /**
         * @param Context $context
         * @param JsonFactory $resultJsonFactory
         * @param JsonHelper $jsonHelper
         * @param \Magento\Framework\MessageQueue\PublisherInterface $publisher
         * @param Logger $logger
         */
        public function __construct(
            Context $context,
            JsonFactory $resultJsonFactory,
            JsonHelper $jsonHelper,
            \Magento\Framework\MessageQueue\PublisherInterface $publisher, // use for publish message in RabbitMQ
            Logger $logger
        ) {
            parent::__construct($context);
            $this->resultJsonFactory = $resultJsonFactory;
            $this->jsonHelper = $jsonHelper;
            $this->publisher = $publisher;
            $this->logger = $logger;
        }
    
        /**
         * @return \Magento\Backend\Model\View\Result\Page
         */
        public function execute()
        {
            try {
                $resultJson = $this->resultJsonFactory->create();
    
                /**
                 * Here we are using random product id and product data as $item for message publish
                 * @var int $productId 
                 * @var array $item
                 */
                $publishData = ['mage_pro_id' => $productId, 'item' => $item];
                // yourtopibname.topic same as you add in communication.xml file
                $this->publisher->publish('yourtopibname.topic', $this->jsonHelper->jsonEncode($publishData));
                $result = ['msg' => 'success'];
                return $resultJson->setData($result);
            } catch (\Exception $e) {
                $result = ['error' => $e->getMessage()];
                return $resultJson->setData($result);
            }
            
        }
    
        /**
         * Check product import permission.
         *
         * @return bool
         */
        protected function _isAllowed()
        {
            return $this->_authorization->isAllowed('NameSpace_ModuleName::product_import');
        }
    }

    Step 7# we will create Model class for process queue message(data) and return result to consumer.

    create file : NameSpace\ModuleName\Model\ProcessQueueMsg.php

    <?php
    /**
     * @category   NameSpace
     * @package    NameSpace_ModuleName
     * @author     Webkul Software Private Limited
     * @copyright  Copyright (c) Webkul Software Private Limited (https://webkul.com)
     * @license    https://store.webkul.com/license.html
     */
    namespace NameSpace\ModuleName\Model;
    
    /**
     * ProcessQueueMsg Model
     */
    class ProcessQueueMsg
    {
        /**
         * process
         * @param $message
         * @return
         */
        public function process($message)
        {
            // Here you can process your data and and return result
        }
    }

    Step 8# Now we will create consumer class for process queue message

    create file: NameSpace\ModuleName\Model\MassConsumer.php

    <?php
    
    namespace NameSpace\ModuleName\Model;
    
    use Magento\Framework\App\ResourceConnection;
    use NameSpace\ModuleName\Logger\Logger;
    use Magento\Framework\MessageQueue\MessageLockException;
    use Magento\Framework\MessageQueue\ConnectionLostException;
    use Magento\Framework\Exception\NotFoundException;
    use Magento\Framework\MessageQueue\CallbackInvoker;
    use Magento\Framework\MessageQueue\ConsumerConfigurationInterface;
    use Magento\Framework\MessageQueue\EnvelopeInterface;
    use Magento\Framework\MessageQueue\QueueInterface;
    use Magento\Framework\MessageQueue\LockInterface;
    use Magento\Framework\MessageQueue\MessageController;
    use Magento\Framework\MessageQueue\ConsumerInterface;
    
    /**
     * Class Consumer used to process OperationInterface messages.
     *
     * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
     */
    class MassConsumer implements ConsumerInterface
    {
        /**
         * @var \Magento\Framework\MessageQueue\CallbackInvoker
         */
        private $invoker;
    
        /**
         * @var \Magento\Framework\App\ResourceConnection
         */
        private $resource;
    
        /**
         * @var \Magento\Framework\MessageQueue\ConsumerConfigurationInterface
         */
        private $configuration;
    
        /**
         * @var \Magento\Framework\MessageQueue\MessageController
         */
        private $messageController;
    
        /**
         * @var LoggerInterface
         */
        private $logger;
    
        /**
         * @var OperationProcessor
         */
        private $operationProcessor;
    
        /**
         * Initialize dependencies.
         *
         * @param CallbackInvoker $invoker
         * @param ResourceConnection $resource
         * @param MessageController $messageController
         * @param ConsumerConfigurationInterface $configuration
         * @param OperationProcessorFactory $operationProcessorFactory
         * @param LoggerInterface $logger
         */
        public function __construct(
            CallbackInvoker $invoker,
            ResourceConnection $resource,
            MessageController $messageController,
            ConsumerConfigurationInterface $configuration,
            \NameSpace\ModuleName\Model\ProcessQueueMsg $processQueueMsg,
            Logger $logger
        ) {
            $this->invoker = $invoker;
            $this->resource = $resource;
            $this->messageController = $messageController;
            $this->configuration = $configuration;
            $this->processQueueMsg = $processQueueMsg;
            $this->logger = $logger;
        }
    
        /**
         * {@inheritdoc}
         */
        public function process($maxNumberOfMessages = null)
        {
            $queue = $this->configuration->getQueue();
            if (!isset($maxNumberOfMessages)) {
                $queue->subscribe($this->getTransactionCallback($queue));
            } else {
                $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
            }
        }
    
        /**
         * Get transaction callback. This handles the case of async.
         *
         * @param QueueInterface $queue
         * @return \Closure
         */
        private function getTransactionCallback(QueueInterface $queue)
        {
            return function (EnvelopeInterface $message) use ($queue) {
                /** @var LockInterface $lock */
                $lock = null;
                try {
                    $lock = $this->messageController->lock($message, $this->configuration->getConsumerName());
                    $message = $message->getBody();
                    /**
                     * $this->processQueueMsg->process() use for process message which you publish in queue
                     */
                    $data = $this->processQueueMsg->process($message);
                    if ($data === false) {
                        $queue->reject($message); // if get error in message process
                    }
                    $queue->acknowledge($message); // send acknowledge to queue 
                } catch (MessageLockException $exception) {
                    $queue->acknowledge($message);
                } catch (ConnectionLostException $e) {
                    $queue->acknowledge($message);
                    if ($lock) {
                        $this->resource->getConnection()
                            ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
                    }
                } catch (NotFoundException $e) {
                    $queue->acknowledge($message);
                    $this->logger->warning($e->getMessage());
                } catch (\Exception $e) {
                    $queue->reject($message, false, $e->getMessage());
                    $queue->acknowledge($message);
                    if ($lock) {
                        $this->resource->getConnection()
                            ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
                    }
                }
            };
        }
    }

    Step 9# Now we will check our consumer from terminal

    Run following command from magento root directory

    php bin/magento queue:consumers:list

    After run above command you will get following result on terminal

    RabbitMq -4

    Step 10# Now for run consumer we need to execute following command from terminal

    php bin/magento queue:consumers:start consumerName

    For  run this process in backend you need to add “&” in above command as following

    php bin/magento queue:consumers:start consumerName &
    RabbitMq -4

    Thank you  🙂

    . . .

    Leave a Comment

    Your email address will not be published. Required fields are marked*


    Be the first to comment.

    Back to Top

    Message Sent!

    If you have more details or questions, you can reply to the received confirmation email.

    Back to Home