Back to Top

Message Queue in Magento2

Updated 27 September 2021

Hello folks!!

In this article, we will learn about the Message queue and how to use the Message queue in Magetno2.

Message queue A message queue is a form of asynchronous service-to-service communication. which temporarily stores messages, until they are processed and deleted. Each message is processed only once, by a single consumer.

Configuring the message queue: A message queue requires 4 XML files in <vendor>/<module>/etc folder.

  1. communication.xml – This file defines aspects of the message queue system that all communication types have in common. 

We declare the topic as ‘notifycustomer.massmail

Searching for an experienced
Magento 2 Company ?
Find out More
  • The topic definition must include its datatype ‘string‘.
  • The handler class ‘Webkul\CustomWork\Model\Consumer’ with ‘process’ method specifies the class where the logic for handling messages exists.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="notifycustomer.massmail" request="string">
        <handler name="notifycustomer.massmail" type="Webkul\CustomWork\Model\Consumer" method="process" />
    </topic>
</config>

2. queue.xml – This file defines the relationship between an existing queue and its consumer.

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd">
    <broker topic="notifycustomer.massmail" exchange="magento-db" type="db">
        <queue name="notifycustomer.massmail"
               consumer="notifycustomer.massmail"
               consumerInstance="Magento\Framework\MessageQueue\Consumer"
               handler="Webkul\CustomWork\Model\Consumer::process"/>
    </broker>
</config>

3. queue_publisher.xml – This file defines which connection and exchange to use to publish messages for a specific topic. 

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="notifycustomer.massmail">
        <connection name="db" exchange="magento-db" />
    </publisher>
</config>

4. queue_topology.xml – This file defines the message routing rules and declares queues and exchanges.

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento-db" type="topic" connection="db">
        <binding id="updateBindingvendor" topic="notifycustomer.massmail" destinationType="queue"    destination="notifycustomer.massmail"/>
    </exchange>
</config>

Now we will define how to send a message from the publisher to a queue

Let’s start with a query-> Suppose the magento admin wants to sell the book’s subscription, In such a case, Admin has added Book (B1 parent product) series to the catalog. Customers can place an order for Book (B1). After a certain duration, Admin also adds Book (B2 child product) to the book series, In such a case mass emails would also be sent to those customers who have subscribed (B1). So that (B1) customers can also subscribe to (B2) products.

For the above query, we will create an AfterProductSaveFomBackend observer to send messages from the publisher to a queue.

Webkul\CustomWork\Observer\AfterProductSaveFomBackend.php

In this class we will call a publish function to send message from publish to a queue

namespace Webkul\CustomWork\Observer;

use Magento\Framework\Event\ObserverInterface;
use Magento\Catalog\Api\ProductRepositoryInterface;


class AfterProductSave implements ObserverInterface
{
     public function __construct( 
        \Magento\Framework\Message\ManagerInterface $messageManager,
        ProductRepositoryInterface $productRepository,               
        \Magento\Framework\MessageQueue\PublisherInterface $publisher,
        \Magento\Framework\Json\Helper\Data $jsonHelper,
        \Magento\Store\Model\StoreManagerInterface $storeManager        
    ) {
        $this->_messageManager = $messageManager;
        $this->productRepository = $productRepository;        
        $this->publisher = $publisher;
        $this->jsonHelper = $jsonHelper;
        $this->storeManager = $storeManager;
    }

    /**
     * execute
     * @param Magento\Framework\Event\Observer $observer
     * @return void
     */
    public function execute(\Magento\Framework\Event\Observer $observer)
    { 
        try {
            $controller = $observer->getController();
            $childProduct = $observer->getProduct();
            $params = $controller->getRequest()->getParams();
            $wkParentProductSku = $params['product']['wk_parent_product_sku'];
            // you can create a product attribute text field 'wk_parent_product_sku' for paraent product
            if ($wkParentProductSku) {
                
                $paretntProduct = $this->productRepository->get($wkParentProductSku); 
                $parentProductId = $paretntProduct->getEntityId();
                $childProductId = $childProduct->getEntityId();            
                
                $stores = $this->storeManager->getStores();
                $storeId = 0;
                foreach ($stores as $store) {
                    $storeId = $store->getId();
                    if ($storeId > 0) {
                        break;
                    }
                } 
                $parentProductUrl = $paretntProduct->setStoreId($storeId)->getProductUrl();
                $parentProductName = $paretntProduct->getName();
                $childProductUrl = $childProduct->setStoreId($storeId)->getProductUrl();
                $childProductName = $childProduct->getName();
                $details[] = [
                    "parent_product_id" => $parentProductId,
                    "parent_product_url" => $parentProductUrl,
                    "parent_product_name" => $parentProductName,
                    "child_product_url" => $childProductUrl,
                    "child_product_name" => $childProductName,                    
                    "child_product_id" => $childProductId
                ];
                <strong>$this->publisher->publish(
                    'notifycustomer.massmail',
                    $this->jsonHelper->jsonEncode($details)
                );</strong> 
                if ($details) {
                    $this->_messageManager->addSuccess(
                        __('Message is added to queue!!')
                    );
                } else {
                    $this->_messageManager->addSuccess(
                        __('Something Went Wrong!!')
                    );
                }  
            }
        } catch (\Exception $e) {
            $this->_messageManager->addError($e->getMessage());
        }  
    }
}

Processing message from queue – In this step we’ll expand the handler class declared in communication.xml file.

Webkul\CustomWork\Model\Consumer.php

In this class we will write our logic.

namespace Webkul\CustomWork\Model;

use Magento\Framework\MessageQueue\ConsumerConfiguration;
use Magento\Framework\App\Config\ScopeConfigInterface;
/**
 * Class Consumer used to process OperationInterface messages.
 */
class Consumer extends ConsumerConfiguration
{
    const CONSUMER_NAME = "notifycustomer.massmail";

    const QUEUE_NAME = "notifycustomer.massmail";
    
    public function __construct(
        \Magento\Framework\Json\Helper\Data $jsonHelper,
        \Webkul\CustomWork\Helper\Email $emailHelper,
        \Magento\Framework\Message\ManagerInterface $messageManager, 
        \Magento\Sales\Api\OrderRepositoryInterface $orderRepository,
        \Magento\Catalog\Model\ResourceModel\Product\CollectionFactory $productCollectionFactory,
        ScopeConfigInterface $scopeConfig  
    ) {
        
        $this->jsonHelper = $jsonHelper;
        $this->_emailHelper = $emailHelper;
        $this->messageManager = $messageManager;
        $this->orderRepository = $orderRepository;
        $this->_productCollectionFactory = $productCollectionFactory;
        $this->scopeConfig = $scopeConfig;      
    }

    /**
     * consumer process start
     * @param string $messagesBody
     * @return string
     */
    public function process($request)
    {   
        try {
            $data = $this->jsonHelper->jsonDecode($request, true);
            
            foreach ($data as $productdata) {
                $parentProductId = $productdata['parent_product_id'];
                $parentProductUrl = $productdata['parent_product_url'];
                $parentProductName = $productdata['parent_product_name'];
                $childProductUrl = $productdata['child_product_url'];
                $childProductName = $productdata['child_product_name'];
                // get parant product
                $collection = $this->_productCollectionFactory->create()
                ->addFieldToFilter(
                [
                    'product_ids',
                    'product_ids',
                    'product_ids',
                    'product_ids'
                ],
                [
                    ['like' => $parentProductId],
                    ['like' => '%,'.$parentProductId.',%'],
                    ['like' => $parentProductId.',%'],
                    ['like' => '%,'.$parentProductId]
                ]
                );
                foreach ($collection as $preorderItem) {
                    $orderId = $preorderItem->getOrderId();
                    $order = $this->getOrderById($orderId);
                    $status = false;
                    $productForUrl = null;

                    foreach ($order->getAllVisibleItems() as $orderItem) {
                        $productForUrl = $orderItem->getProduct();
                        if ($orderItem->getChildrenItems()) {
                            foreach ($orderItem->getChildrenItems() as $childItem) {
                                if ($childItem->getProductId() == $parentProductId) {
                                    $status = true;
                                    break;
                                }
                            }

                            if ($status) {
                                break;
                            }
                        } else {
                            if ($orderItem->getProductId() == $parentProductId) {
                                $status = true;
                                break;
                            }
                        }
                    }                        
                    if ($status) {                            
                        $order = $orderItem->getOrder();
                        $orderId = $order->getId();
                        $emailId = $order->getCustomerEmail();
                        $customerName = $order->getCustomerName();
                        $customerId = $order->getCustomerId();                             
                        
                        $adminEmail = $this->scopeConfig->getValue('trans_email/ident_support/email');
                        if ($adminEmail == '') {
                            return;
                        }
                        $senderInfo = [];
                        $receiverInfo = [];
                        $senderInfo = [
                            'name' => 'Store Owner',
                            'email' => $adminEmail,
                        ];
                        $receiverInfo = [
                            'name' => $customerName,
                            'email' => $emailId,
                        ];
                        $templateVars = [
                            "parent_product_url" => $parentProductUrl,
                            "parent_product_name" => $parentProductName,
                            "child_product_url" => $childProductUrl,
                            "child_product_name" => $childProductName,
                            "customer_name" => $customerName
                        ];
                        // send data for email
                        $this->_emailHelper->sendEmailToCustomer(
                            $templateVars,
                            $senderInfo,
                            $receiverInfo
                        );
                        
                    }                        
                } 
            }
        } catch (\Exception $e) {
            $this->messageManager->addError($e->getMessage());
        }

    }
    /**
     * get order id
     * @param int $orderId
     * @return object
     */
    public function getOrderById($orderId)
    {
        return $this->orderRepository->get($orderId);
    }
}

Executing message queue in Magento 2 – Now run below commands for Magento to recognize our queue.

  • php bin/magento setup:upgrade
  • php bin/magento setup:di:compile

To check if the queue is registered or not – php bin/magento queue:consumers:list

Consumers are executed by cron, Aside from running via cron, consumers can be executed with the following command- php bin/magento queue:consumers:start notifycustomer.massmail &

. . .

Leave a Comment

Your email address will not be published. Required fields are marked*


Be the first to comment.

Back to Top

Message Sent!

If you have more details or questions, you can reply to the received confirmation email.

Back to Home