使用 Python 提升生产者-消费者范式

[ad_1]

上次我们讨论使用进程和线程的异步编程时,这里有一个链接。 强烈建议在阅读本文之前先阅读这篇文章,因为我们将使用那里提到的概念。 今天我们将简要讨论何时使用什么以及生产者-消费者范式的实现。

流程 线程
进程适用于 CPU 利用率较高的应用程序。 考虑处理大量数字或处理大量数据。 这些任务最好跨进程或 CPU 执行。 线程适用于我们有耗时的 I/O 操作的场景,例如:文件 I/O、http 调用或套接字连接。 进行此类 I/O 调用的线程等待响应,而其他线程可能会继续运行。

生产者-消费者模式

生产者-消费者设计模式今天广泛用于一系列应用程序中。 大多数集成接口都使用这种方法。 消息队列 (MQ) 广泛使用此范例并将其扩展到发布者/订阅者模式、P2P 模式或推/拉模式。

使用这种模式的基本思想是将一组任务分配给异步运行的多个线程。 我们还可以实现生产者和消费者的链接,并使用既充当生产者又充当消费者的中间节点。

以 android 通知为例,服务器将通知推送(生成)到 FCM,FCM 使用它们并进一步推送到各个设备。

另一个现实生活中的例子可能是邮政服务,人们将邮件发送给邮政服务,后者消费它们,然后将它们生产给充当消费者的收件人。

队列

在处理生产者和消费者时,我们需要一个队列。 生产者需要一个地方来推送消息,消费者需要一个地方来读取它们。 考虑到队列是这里广泛使用的数据结构,我们通常希望以 FIFO 方式处理消息。 尽管 List 和 Arrays 等其他数据结构应该可以正常工作。 对生产者和消费者来说非常重要 分享 队列的同一个对象.

生产者

生产者是异步运行并在队列中添加消息的对象。 然后推送到队列中的消息可供消费者使用。 如果消息被消费,生产者不会打扰,他们的工作是有限的,它是推送消息。 可能有多个生产者向队列生成消息。

消费者

消费者不断轮询队列,等待消息到达队列。 一旦消息可用,消费者就开始处理消息。 可能有多个消费者通过队列轮询。

毒药(可选)

帮助确定何时返回正在运行的线程。 当生产者/消费者收到毒药时,这是他们停止的信号。 这确保在处理结束时没有挂起的线程。

自己做:

有多个单一生产者和消费者的例子。 让我们提高一个档次。

假设:

有一些任务(即。 初始任务, final_task)可以比其他人相对更快地完成并且有任务(即。 中介任务) 这需要大量时间来处理。

设计:

将只有一个线程用于运行 初始任务 另一个为 final_task 但是,将有三个线程专用于 中介任务. 只要 初始任务 完成后,它需要被可用的线程拾取,然后执行 中介任务. 一旦完成,结果就会被生成到最终队列中,然后被消耗以执行 final_task.

import time 
from queue import Queue 
from threading import Thread 

Poison = -1 


class TimeConsumingNode(Thread): 
  def __init__(self, consumer_queue: Queue = None, 
               producer_queue: Queue = None, task_time: int = 0): 
      super(TimeConsumingNode, self).__init__() 
      self.consumer_queue = consumer_queue 
      self.producer_queue = producer_queue 
      self.task_time = task_time 

  def run(self) -> None: 
     global Poison 
     while True: 
        consumed_message = self.consumer_queue.get() 
        print(self, f"consumed {consumed_message}") 
        if consumed_message == Poison: 
           print(self, "got poison") 
           # Pass the poison to intermediary producers. 
           self.consumer_queue.put(consumed_message) 
           # Pass the poison to the final consumer. 
           self.producer_queue.put(consumed_message) 
           break 
        # Mock processing the consumed message. 
        time.sleep(self.task_time) 
        # Mock some heavy tasks. 
        time.sleep(self.task_time) 
        print(self, 
              f"took at least {self.task_time * 2} for consuming " 
              f"{consumed_message} and for producing {consumed_message}") 
        self.producer_queue.put(consumed_message) 


# Comparatively faster task node. 
class ProducerNode(Thread): 
  def __init__(self, consumed_queue: Queue = None, 
               produced_queue: Queue = None, 
               task_time: int = 0): 
      super(ProducerNode, self).__init__() 
      self.consumed_queue = consumed_queue 
      self.produced_queue = produced_queue 
      self.task_time = task_time 

  def run(self) -> None: 
      global Poison 
      while not self.consumed_queue.empty(): 
         some_number = self.consumed_queue.get() 
         time.sleep(self.task_time) 
         print(self, f"Producing {some_number}") 
         self.produced_queue.put(some_number) 

      # Add poison. 
      print(self, f"Adding poison") 
      self.produced_queue.put(Poison) 


# Last task again comparatively faster one. 
class ConsumerNode(Thread): 
  def __init__(self, consumed_queue: Queue = None, task_time: int = 0): 
     super(ConsumerNode, self).__init__() 
     self.consumed_queue = consumed_queue 
     self.task_time = task_time 
     self.poisons = 0 

  def run(self) -> None: 
     global Poison 
     while True: 
        consumed_message = self.consumed_queue.get() 
        if consumed_message == Poison: 
           self.poisons = self.poisons + 1 
           print(self, f"got {self.poisons} poisons") 
           if self.poisons == 3: 
              break 
        # Mock some time taking task 
        time.sleep(self.task_time / 2) 
        print(self, 
f"took {self.task_time / 2} to consume {consumed_message}") 


if __name__ == "__main__": 
   my_queue = Queue() 
   my_queue.put(2) 
   my_queue.put(3) 
   my_queue.put(4) 
   my_queue.put(5) 
   my_queue.put(6) 

   intermediary_queue = Queue() 
   final_queue = Queue() 
   first_prod_node = ProducerNode(consumed_queue=my_queue, 
produced_queue=intermediary_queue, 
task_time=3) 

   tc_node_1 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=4) 
   tc_node_2 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=5) 
   tc_node_3 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=2) 

   final_node = ConsumerNode(consumed_queue=final_queue, task_time=1) 

   first_prod_node.start() 
   tc_node_1.start() 
   tc_node_2.start() 
   tc_node_3.start() 
   final_node.start() 

   first_prod_node.join() 
   tc_node_1.join() 
   tc_node_2.join() 
   tc_node_3.join() 
   final_node.join()

输出:

风险:

如果实施不当,可能会导致

  1. 挂线
  2. 死锁
  3. 读取队列时出现 IndexOutOfBound 异常。

结论

生产者-消费者模式是一种非常有用的设计,可以在不同程度上加以利用,以实现多个耗时任务的异步处理。 该概念已被广泛纳入现代消息队列,即。 卡夫卡,RabbitMQ, Cloud AWS、GCP等提供的MQ。 他们强大而危险! 明智地使用它们!

[ad_2]

Related Posts