Python线程实际应用

多任务生产者消费者"""生产者 –> 消费者生产者 1 生产者从中间件(kafka、mq、redis)中获取数据推送到queue中2 事件通知并通知消费者线程开始消费3 每次sleep 0.0001(防止 数据一直为空时占用大量资源)消费者 1 事件消息接收到生产者消息开始消费2 消费者在等待 timeout(s)后接受不到消息 wait 等待 生产者通知消费"""from threading import Thread, Eventimport loggingimport _queuefrom queue import Queueimport timefrom confluent_kafka import Consumerimport djangoimport osos.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")django.setup()from gateway.models import ( Gateway, Sensor)# logger = logging.getLogger()class Product(Thread): """生产者""" def __init__(self, servers: str, group_id: str, topic: str, queue, event ): super(Product, self).__init__() self.queue = queue self.event = event self.c = Consumer( { ‘bootstrap.servers’: servers, ‘group.id’: group_id, ‘auto.offset.reset’: ‘earliest’ } ) self.c.subscribe([topic]) def run(self) -> None: while True: time.sleep(0.0001) data = self.c.poll(1.0) if data is None: continue if data.error(): print(data.error()) continue data = data.value().decode(‘utf-8’) self.queue.put(data) if self.event.isSet() is False: self.event.clear() self.event.set()class ConsumerKafka(Thread): """消费者""" def __init__(self, queue, event, func, timeout: int = 10 ): super(ConsumerKafka, self).__init__() self.queue = queue self.event = event self.timeout = timeout self.func = func def run(self) -> None: while True: try: data = self.queue.get(timeout=self.timeout) print(‘consumer’, data) # logger.error(data.error()) except _queue.Empty: print(‘consumer is wait’) self.event.clear() self.event.wait() else: # 执行更新操作 self.func(data)def data_update(msg): """数据更新""" # 此处可以考虑处理一些异常。否则服务在抛出异常后停在这里。从业务实际角度考虑if __name__ == ‘__main__’: conf = { ‘servers’: ‘ip:port’, ‘group_id’: ‘group’, ‘topic’: ‘topic’ } q = Queue() e = Event() product = Product(queue=q, event=e, **conf) consumer = ConsumerKafka(q, e, data_update) product.start() consumer.start() product.join()

线程的问题还是一个线程在跑。经过queue拆分物尽其用,在对的时间做对的事情。也可以考虑拆成两个进程下协程去实现。协程有线程更高的性能和更少的资源占用

【本文来源:韩国服务器 http://www.yidunidc.com欢迎留下您的宝贵建议】穿别人的鞋,走自己的路,让别追去吧

Python线程实际应用

相关文章:

你感兴趣的文章:

标签云: