django使用celery执行异步任务时采用信号实现每个

知识点

通过本文可以获取的知识点有:

1、celery信号中的 ??logging signal??

??after_setup_logger 参考地址??

2、Django中如何配置和使用celery

3、Django中如何加载celery 信号

主要是Django中应用入口的 ??ready(self)?? 函数认识和使用

4、Python logging??自定义 Handler??

??Python logging 模块介绍??

需求分析

1、每个任务的日志独立存放,那么肯定是要能获取到任务id,然后按照??任务id设定日志文件路径??

2、Django程序中执行task,那么程序中日志的写入,肯定不能使用print打印输出到启动程序Django主日志中去, 那肯定是采用??logging模块??配置不同的logger来实现

3、Django怎么把自定义的logger和celery关联起来呢, celery有自己自带的logger(??from celery.utils.log import get_task_logger??),每个task 独立日志肯定不能放到这个自带的logger

需求实现一、我们先创建Django工程和测试用的应用demoapp,然后在应用中利用celery跑一个任务task

先给出工程和应用的结构

├── demoapp│ ├── __init__.py│ ├── admin.py│ ├── apps.py│ ├── celery│ │ └── __init__.py│ ├── migrations│ │ └── __init__.py│ ├── models.py│ ├── tasks.py│ ├── tests.py│ ├── urls.py│ └── views.py├── django_celery_singal│ ├── __init__.py│ ├── __pycache__│ │ ├── __init__.cpython-36.pyc│ │ └── settings.cpython-36.pyc│ ├── asgi.py│ ├── settings.py│ ├── urls.py│ └── wsgi.py└── manage.py

注意这里有个知识点

一般情况,Django继承celery的时候,大家默认是在工程文件夹(比如这里的 django_celery_singal)下创建celery,然后在工程的??__init__.py?? 文件中进行import加载。

其实这不是唯一选择。可以放到任何地方,关键在于启动celery的时候??-A?? 参数后面根的值

1.1、配置celery

# demoapp/celery/__init__.pyimport osfrom celery import Celeryos.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘django_celery_singal.settings’)app = Celery(__name__)# 从Django的settings.py加载 celery的配置app.config_from_object(‘django.conf:settings’, namespace=’CELERY’)# 自动发现应用中的tasks(应用中的tasks.py文件中定义的任务)app.autodiscover_tasks()# demo_celery_signal/settings.py# settings for celeryCELERY_TIMEZONE = ‘Asia/Shanghai’CELERY_ENABLE_UTC = FalseCELERY_TASK_TRACK_STARTED = TrueCELERY_TASK_TIME_LIMIT = 30 * 60CELERY_BROKER_URL = “redis://127.0.0.1:6379/11″CELERY_RESULT_BACKEND = “redis://127.0.0.1:6379/11″CELERY_RESULT_SERIALIZER = ‘json’

重点????: 这里切记两个点:

1、在 demoapp/\init.py 中引入celery ,内容如 ??from .celery import app as celery_app??

2、切记把demoapp加入到 ??INSTALLED_APPS?? 中(建议:创建完应用第一时间就加入到该配置项中去)

1.2、在应用中定义task

# demoapp/tasks.pyfrom celery import shared_task@shared_taskdef task_hello(): print(‘Hello Task under demoapp’)

1.3、task绑定到APP的view中并配置URL

# demoapp/views.pyfrom django.http import HttpResponsefrom demoapp.tasks import task_hellodef demo(request): task_hello.delay() return HttpResponse(“Task Executed”)

配置APP的URL

# demoapp/urls.pyfrom django.urls import pathfrom demoapp import views urlpatterns = [ path(‘demo/’, views.demo, name=’demo-task’),]

把APP的URL加入到工程URL入口中去

# demo_celery_singal/urls.pyfrom django.urls import includeurlpatterns = [ … …, path(‘demoapp/’, include(‘demoapp.urls’)),]

1.4、启动Django和celery

启动celery,注意这里启动的方式 ??celery -A demoapp worker -l info??? 是 ??demoapp??? 应用,而不是工程名称??django_celery_signal?? ,因为celery使用的位置在 demoapp 中

启动Django服务 ??python manage.py runserver 127.0.0.1:8088??

1.5、访问测试celery任务

访问 ??http://127.0.0.1:8088/demoapp/demo/??

前端页面显示 ??Task Executed??? (view视图的返回),在后台celery的日志中显示如下,知道??Django+celery运行异步任务??,搭建完成

[2022-11-05 09:19:50,973: WARNING/ForkPoolWorker-8] Hello Task under demoapp[2022-11-05 09:19:50,974: WARNING/ForkPoolWorker-8][2022-11-05 09:19:50,978: INFO/ForkPoolWorker-8] Task demoapp.tasks.task_hello[ac93e84d-9903-4b7c-a444-64cf6be5a3da] succeeded in 0.022122333000879735s: ‘ok’

二、自定义logging的Handler

因为要把每个task的日志放到独立的文件中的,这个日志的??处理 handler?? 就需要自定义了

因为是放到日志文件中,看了??logging??? 模块的介绍我们知道, ??FileHandler??? 是继承自??StreamHandler??? 或者我们这里也继承??StreamHandler??

import osfrom logging import StreamHandlerfrom celery import current_taskfrom celery.signals import task_prerun, task_postrunclass CeleryTaskLoggerHandler(StreamHandler): terminator = ‘\r\n’ def __init__(self, *args, **kwargs): self.task_id_fd_mapper = {} super().__init__(*args, **kwargs) # 使用 celery的task信号,设置任务开始和结束时的执行的东西 # 主要是获取task_id 然后创建对应的独立任务日志文件 task_prerun.connect(self.on_task_start) task_postrun.connect(self.on_start_end) @staticmethod def get_current_task_id(): # celery 内置提供方法获取task_id if not current_task: return task_id = current_task.request.root_id return task_id def on_task_start(self, sender, task_id, **kwargs): # 这里是根据task_id 定义每个任务的日志文件存放 log_path = os.path.join(‘logs/’, f”{task_id}.log”) f = open(log_path, ‘a’) self.task_id_fd_mapper[task_id] = f def on_start_end(self, sender, task_id, **kwargs): f = self.task_id_fd_mapper.pop(task_id, None) if f and not f.closed: f.close() self.task_id_fd_mapper.pop(task_id, None) def emit(self, record): # 自定义Handler必须要重写的一个方法 task_id = self.get_current_task_id() if not task_id: return try: f = self.task_id_fd_mapper.get(task_id) self.write_task_log(f, record) self.flush() except Exception: self.handleError(record) def write_task_log(self, f, record): # 日志的实际写入 if not f: raise ValueError(‘Not found thread task file’) msg = self.format(record) f.write(msg) f.write(self.terminator) f.flush() def flush(self): for f in self.task_id_fd_mapper.values(): f.flush()

三、Django配置调用celery的logging signal3.1、先创建信号处理函数

先定义信号回调处理函数add_celery_logger_handler, 然后进行信号的绑定,绑定一般是采用??装饰器??的方式

当然也可以不采用这种方式,然后在需要使用信号的地方,进行单独绑定配置(??after_setup_logger.connect(add_celery_logger_handler)??)

import loggingfrom celery.signals import after_setup_logger@after_setup_logger.connectdef add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs): if not logger: return logger = logging.getLogger(‘celery_signal’) handler = logging.FileHandler(‘celery_signal.log’) formatter = logging.Formatter(logging.BASIC_FORMAT) handler.setFormatter(formatter) logger.addHandler(handler) logger.info(“Here call the celery logging signal – after_setup_logger”)

这个时候重新启动celery,是不会产生??celery_signal.log?? 文件,那就更不会调用对应的信号回调了

3.2、Django绑定celery的信号

上面只是定了信号的??回调函数?? 然后和信号进行了绑定,但是Django怎么调用celery的信号处理呢?

答案是利用Django应用的入口??ready() 函数??

# demoapp/apps.pyfrom django.apps import AppConfigclass DemoappConfig(AppConfig): default_auto_field = ‘django.db.models.BigAutoField’ name = ‘demoapp’ verbose_name = “Celery Signal App” def ready(self): # 可以添加如下语句测试 Django启动的时候会不会执行到这里 print(‘我被执行了!’) # 导入上面定义的 信号处理回调 from demoapp.celery import signal_handler super().ready()

然后重新启动celery, 查看 celery_signal.log 日志文件

cat celery_signal.logINFO:celery_signal:Here call the celery logging signal – after_setup_logger

可以知道,Django 绑定了 celery的信号

四、使用celery的信号after_setup_logger绑定自定义的Handler

修改上面定义的 信号回调函数,绑定自定义的日志处理Handler

# demoapp/celery/signal_handler.pyfrom celery.signals import after_setup_loggerfrom .logger import CeleryTaskLoggerHandler@after_setup_logger.connectdef add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs): if not logger: return task_handler = CeleryTaskLoggerHandler() task_handler.setLevel(loglevel) formatter = logging.Formatter(format) task_handler.setFormatter(formatter) logger.addHandler(task_handler)

这里需要先手动在工程目录下创建一个 logs 文件夹,因为handler中没有对应logs不存在做判断处理

然后我们访问Django的view视图 ??http://127.0.0.1:8088/demoapp/demo/ ?? 查看logs 目录发现有个UUID为文件名的log文件

[ 22-11-05 18:17 ] [ colinspace ] [/tmp/django_celery_singal] ls -l logstotal 8-rw-r–r– 1 colinspace wheel 138 11 5 18:17 8ba8d8d6-9d28-4b64-a396-b91f11ae0df8.log[ 22-11-05 18:17 ] [ colinspace ] [/tmp/django_celery_singal] cat logs/8ba8d8d6-9d28-4b64-a396-b91f11ae0df8.logHello Task under demoappTask demoapp.tasks.task_hello[8ba8d8d6-9d28-4b64-a396-b91f11ae0df8] succeeded in 0.03539445897331461s: ‘ok’

这里的??Hello Task under demoapp??? 是task输出的日志,??’ok’?? 是task的返回值,没有的话是None

至此,完全实现了刚开始的需求。


完美实现~ 项目源码详见??https://gitee.com/colin5063/django-learnning-examples/tree/master/django_celery_singal??

如果觉得文章对你有用,请不吝点赞和关注公众号搜索 ??全栈运维??? 或者 ??DailyJobOps??

个人博客??http://blog.colinspace.com/??

知乎平台??https://www.zhihu.com/people/colin-31-49??

简书平台??https://www.jianshu.com/u/6d793fbacc88??

世界上那些最容易的事情中,拖延时间最不费力。

django使用celery执行异步任务时采用信号实现每个

相关文章:

你感兴趣的文章:

标签云: