Hello folks!!
In this article, we will learn about the Message queue and how to use the Message queue in Magetno2.
Message queue – A message queue is a form of asynchronous service-to-service communication. which temporarily stores messages, until they are processed and deleted. Each message is processed only once, by a single consumer.
Configuring the message queue: A message queue requires 4 XML files in <vendor>/<module>/etc
folder.
- communication.xml – This file defines aspects of the message queue system that all communication types have in common.
We declare the topic as ‘notifycustomer.massmail’
- The topic definition must include its datatype ‘string‘.
- The handler class ‘Webkul\CustomWork\Model\Consumer’ with ‘process’ method specifies the class where the logic for handling messages exists.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd"> <topic name="notifycustomer.massmail" request="string"> <handler name="notifycustomer.massmail" type="Webkul\CustomWork\Model\Consumer" method="process" /> </topic> </config>
2. queue.xml – This file defines the relationship between an existing queue and its consumer.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd"> <broker topic="notifycustomer.massmail" exchange="magento-db" type="db"> <queue name="notifycustomer.massmail" consumer="notifycustomer.massmail" consumerInstance="Magento\Framework\MessageQueue\Consumer" handler="Webkul\CustomWork\Model\Consumer::process"/> </broker> </config>
3. queue_publisher.xml – This file defines which connection and exchange to use to publish messages for a specific topic.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd"> <publisher topic="notifycustomer.massmail"> <connection name="db" exchange="magento-db" /> </publisher> </config>
4. queue_topology.xml – This file defines the message routing rules and declares queues and exchanges.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd"> <exchange name="magento-db" type="topic" connection="db"> <binding id="updateBindingvendor" topic="notifycustomer.massmail" destinationType="queue" destination="notifycustomer.massmail"/> </exchange> </config>
Now we will define how to send a message from the publisher to a queue –
Let’s start with a query-> Suppose the magento admin wants to sell the book’s subscription, In such a case, Admin has added Book (B1 parent product) series to the catalog. Customers can place an order for Book (B1). After a certain duration, Admin also adds Book (B2 child product) to the book series, In such a case mass emails would also be sent to those customers who have subscribed (B1). So that (B1) customers can also subscribe to (B2) products.
For the above query, we will create an AfterProductSaveFomBackend observer to send messages from the publisher to a queue.
Webkul\CustomWork\Observer\AfterProductSaveFomBackend.php
In this class we will call a publish function to send message from publish to a queue
namespace Webkul\CustomWork\Observer; use Magento\Framework\Event\ObserverInterface; use Magento\Catalog\Api\ProductRepositoryInterface; class AfterProductSave implements ObserverInterface { public function __construct( \Magento\Framework\Message\ManagerInterface $messageManager, ProductRepositoryInterface $productRepository, \Magento\Framework\MessageQueue\PublisherInterface $publisher, \Magento\Framework\Json\Helper\Data $jsonHelper, \Magento\Store\Model\StoreManagerInterface $storeManager ) { $this->_messageManager = $messageManager; $this->productRepository = $productRepository; $this->publisher = $publisher; $this->jsonHelper = $jsonHelper; $this->storeManager = $storeManager; } /** * execute * @param Magento\Framework\Event\Observer $observer * @return void */ public function execute(\Magento\Framework\Event\Observer $observer) { try { $controller = $observer->getController(); $childProduct = $observer->getProduct(); $params = $controller->getRequest()->getParams(); $wkParentProductSku = $params['product']['wk_parent_product_sku']; // you can create a product attribute text field 'wk_parent_product_sku' for paraent product if ($wkParentProductSku) { $paretntProduct = $this->productRepository->get($wkParentProductSku); $parentProductId = $paretntProduct->getEntityId(); $childProductId = $childProduct->getEntityId(); $stores = $this->storeManager->getStores(); $storeId = 0; foreach ($stores as $store) { $storeId = $store->getId(); if ($storeId > 0) { break; } } $parentProductUrl = $paretntProduct->setStoreId($storeId)->getProductUrl(); $parentProductName = $paretntProduct->getName(); $childProductUrl = $childProduct->setStoreId($storeId)->getProductUrl(); $childProductName = $childProduct->getName(); $details[] = [ "parent_product_id" => $parentProductId, "parent_product_url" => $parentProductUrl, "parent_product_name" => $parentProductName, "child_product_url" => $childProductUrl, "child_product_name" => $childProductName, "child_product_id" => $childProductId ]; <strong>$this->publisher->publish( 'notifycustomer.massmail', $this->jsonHelper->jsonEncode($details) );</strong> if ($details) { $this->_messageManager->addSuccess( __('Message is added to queue!!') ); } else { $this->_messageManager->addSuccess( __('Something Went Wrong!!') ); } } } catch (\Exception $e) { $this->_messageManager->addError($e->getMessage()); } } }
Processing message from queue – In this step we’ll expand the handler class declared in communication.xml file.
Webkul\CustomWork\Model\Consumer.php
In this class we will write our logic.
namespace Webkul\CustomWork\Model; use Magento\Framework\MessageQueue\ConsumerConfiguration; use Magento\Framework\App\Config\ScopeConfigInterface; /** * Class Consumer used to process OperationInterface messages. */ class Consumer extends ConsumerConfiguration { const CONSUMER_NAME = "notifycustomer.massmail"; const QUEUE_NAME = "notifycustomer.massmail"; public function __construct( \Magento\Framework\Json\Helper\Data $jsonHelper, \Webkul\CustomWork\Helper\Email $emailHelper, \Magento\Framework\Message\ManagerInterface $messageManager, \Magento\Sales\Api\OrderRepositoryInterface $orderRepository, \Magento\Catalog\Model\ResourceModel\Product\CollectionFactory $productCollectionFactory, ScopeConfigInterface $scopeConfig ) { $this->jsonHelper = $jsonHelper; $this->_emailHelper = $emailHelper; $this->messageManager = $messageManager; $this->orderRepository = $orderRepository; $this->_productCollectionFactory = $productCollectionFactory; $this->scopeConfig = $scopeConfig; } /** * consumer process start * @param string $messagesBody * @return string */ public function process($request) { try { $data = $this->jsonHelper->jsonDecode($request, true); foreach ($data as $productdata) { $parentProductId = $productdata['parent_product_id']; $parentProductUrl = $productdata['parent_product_url']; $parentProductName = $productdata['parent_product_name']; $childProductUrl = $productdata['child_product_url']; $childProductName = $productdata['child_product_name']; // get parant product $collection = $this->_productCollectionFactory->create() ->addFieldToFilter( [ 'product_ids', 'product_ids', 'product_ids', 'product_ids' ], [ ['like' => $parentProductId], ['like' => '%,'.$parentProductId.',%'], ['like' => $parentProductId.',%'], ['like' => '%,'.$parentProductId] ] ); foreach ($collection as $preorderItem) { $orderId = $preorderItem->getOrderId(); $order = $this->getOrderById($orderId); $status = false; $productForUrl = null; foreach ($order->getAllVisibleItems() as $orderItem) { $productForUrl = $orderItem->getProduct(); if ($orderItem->getChildrenItems()) { foreach ($orderItem->getChildrenItems() as $childItem) { if ($childItem->getProductId() == $parentProductId) { $status = true; break; } } if ($status) { break; } } else { if ($orderItem->getProductId() == $parentProductId) { $status = true; break; } } } if ($status) { $order = $orderItem->getOrder(); $orderId = $order->getId(); $emailId = $order->getCustomerEmail(); $customerName = $order->getCustomerName(); $customerId = $order->getCustomerId(); $adminEmail = $this->scopeConfig->getValue('trans_email/ident_support/email'); if ($adminEmail == '') { return; } $senderInfo = []; $receiverInfo = []; $senderInfo = [ 'name' => 'Store Owner', 'email' => $adminEmail, ]; $receiverInfo = [ 'name' => $customerName, 'email' => $emailId, ]; $templateVars = [ "parent_product_url" => $parentProductUrl, "parent_product_name" => $parentProductName, "child_product_url" => $childProductUrl, "child_product_name" => $childProductName, "customer_name" => $customerName ]; // send data for email $this->_emailHelper->sendEmailToCustomer( $templateVars, $senderInfo, $receiverInfo ); } } } } catch (\Exception $e) { $this->messageManager->addError($e->getMessage()); } } /** * get order id * @param int $orderId * @return object */ public function getOrderById($orderId) { return $this->orderRepository->get($orderId); } }
Executing message queue in Magento 2 – Now run below commands for Magento to recognize our queue.
- php bin/magento setup:upgrade
- php bin/magento setup:di:compile
To check if the queue is registered or not – php bin/magento queue:consumers:list
Consumers are executed by cron, Aside from running via cron, consumers can be executed with the following command- php bin/magento queue:consumers:start notifycustomer.massmail &
Be the first to comment.