work queue in Linux 2.3.36

新work queue工作机制

工作队列(workqueue)是Linux kernel中将工作推后执行的一种机制。这种机制和BH或Tasklets不同之处在于工作队列是把推后的工作交由一个内核线程去执行,因此工作队列的优势就在于它允许重新调度甚至睡眠。

Linux的work queue在2.6.0 到2.6.19以及到2.6.36工作队列发生了一些变化。本文主要对新版本做一些分析。

虽然自从2.6.0之后,Linux对work queue进行了优化,但是kernel用到create_workqueue的模块越来越多,而调用create_workqueue会在每个cpu上都创建一个work_thread, 每个cpu都分配一个cpu_workqueue_struct以及workqueue_struct,而如果没被queue_work的话根本没机会工作,这样仍然相当浪费内存资源,而且加重了cpu loading。另外,同一个work queue上的每个work都是按照串行执行的,假如其中一个work的调度程序睡眠了,那么后面的work也将无法工作。

自从2.6.36以后,work queue的机制发生了很大变化,所有的work queue都被合并成

一个work queue,work thread也不是和work queue一一关联,work何时工作紧紧按照工作的重要性以及时间紧迫性来划分。也就是说新机制是按照cpu数量来创建work thread,而不是work queue。

下面我们还是通过代码分析吧:

初始化workqueus及创建work threads:

系统启动时调用init_workqueus()@kernel/kernel/workqueue.c

static int __initinit_workqueues(void){       unsigned int cpu;       int i;        cpu_notifier(workqueue_cpu_callback,CPU_PRI_WORKQUEUE);        /* initialize gcwqs *//* 前面有说过,新机制将workqueues都排到gcwq上管理了,每个cpu各一个gcwq。*/       for_each_gcwq_cpu(cpu) {              struct global_cwq *gcwq =get_gcwq(cpu);              spin_lock_init(&gcwq->lock);              INIT_LIST_HEAD(&gcwq->worklist);              gcwq->cpu = cpu;              gcwq->flags |=GCWQ_DISASSOCIATED;              INIT_LIST_HEAD(&gcwq->idle_list);              for (i = 0; i <BUSY_WORKER_HASH_SIZE; i++)                     INIT_HLIST_HEAD(&gcwq->busy_hash[i]);              init_timer_deferrable(&gcwq->idle_timer);              gcwq->idle_timer.function =idle_worker_timeout;              gcwq->idle_timer.data =(unsigned long)gcwq;              setup_timer(&gcwq->mayday_timer,gcwq_mayday_timeout,                         (unsigned long)gcwq);              ida_init(&gcwq->worker_ida);              gcwq->trustee_state =TRUSTEE_DONE;              init_waitqueue_head(&gcwq->trustee_wait);       }        /* create the initial worker */       for_each_online_gcwq_cpu(cpu) {              struct global_cwq *gcwq =get_gcwq(cpu);              struct worker *worker;              if (cpu != WORK_CPU_UNBOUND)                     gcwq->flags &=~GCWQ_DISASSOCIATED;/* 开机启动初始化后创建workthread主要是这里实现*/              worker = create_worker(gcwq,true);              BUG_ON(!worker);              spin_lock_irq(&gcwq->lock);              start_worker(worker);              spin_unlock_irq(&gcwq->lock);       }/* 创建系统开机后默认的workqueue,平常我们调用的schedule_work()其实就是用的system_wq这个work queue,可参考schedule_work()实现。*/       system_wq =alloc_workqueue("events", 0, 0);       system_long_wq =alloc_workqueue("events_long", 0, 0);       system_nrt_wq =alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);       system_unbound_wq =alloc_workqueue("events_unbound", WQ_UNBOUND,                                       WQ_UNBOUND_MAX_ACTIVE);       system_freezable_wq =alloc_workqueue("events_freezable",                                         WQ_FREEZABLE, 0);       BUG_ON(!system_wq || !system_long_wq ||!system_nrt_wq ||             !system_unbound_wq || !system_freezable_wq);       return 0;}early_initcall(init_workqueues);

再看create_worker是如何创建work thread的:

static structworker *create_worker(struct global_cwq *gcwq, bool bind){       bool on_unbound_cpu = gcwq->cpu ==WORK_CPU_UNBOUND;       struct worker *worker = NULL;       int id = -1;        spin_lock_irq(&gcwq->lock);       while (ida_get_new(&gcwq->worker_ida,&id)) {              spin_unlock_irq(&gcwq->lock);              if(!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))                     goto fail;              spin_lock_irq(&gcwq->lock);       }       spin_unlock_irq(&gcwq->lock);/* 为work分配空间,初始化worker*/       worker = alloc_worker();       if (!worker)              goto fail;       worker->gcwq = gcwq;       worker->id = id;       /* kthread_create_on_node和 kthread_create都可以创建worker_thread,区别在于是否和cpu绑定,前者别是不依赖于CPU而工作,可以在任何CPU上工作,而后者表示分别在各个CPU上创建一个workthread来工作。从ps命令里就可以看到像kworker/0:0,kworker/1:0, kworker/u:0这样的进程就是这里创建的workthread了!*/       if (!on_unbound_cpu)              worker->task =kthread_create_on_node(worker_thread, worker,cpu_to_node(gcwq->cpu),"kworker/%u:%d", gcwq->cpu, id);       else              worker->task =kthread_create(worker_thread, worker, "kworker/u:%d", id);       if (IS_ERR(worker->task))              goto fail;        /*        *A rogue worker will become a regular one if CPU comes        *online later on.  Make sure every workerhas        *PF_THREAD_BOUND set.        */       if (bind && !on_unbound_cpu)              kthread_bind(worker->task,gcwq->cpu);       else {              worker->task->flags |=PF_THREAD_BOUND;              if (on_unbound_cpu)                     worker->flags |=WORKER_UNBOUND;       }        return worker;fail:       if (id >= 0) {              spin_lock_irq(&gcwq->lock);              ida_remove(&gcwq->worker_ida,id);              spin_unlock_irq(&gcwq->lock);       }       kfree(worker);       return NULL;}

处理works:

由kthread_create_on_node()或 kthread_create()创建了work thread之后,它就开始运行起来了:

static intworker_thread(void *__worker){       struct worker *worker = __worker;       struct global_cwq *gcwq =worker->gcwq;        /* tell the scheduler that this is aworkqueue worker */       worker->task->flags |=PF_WQ_WORKER;/* 最后的代码用gotowoke_up来表示work thread是一个无限循环。*/woke_up:       spin_lock_irq(&gcwq->lock);       /* DIE can be set only while we're idle,checking here is enough */       if (worker->flags & WORKER_DIE) {              spin_unlock_irq(&gcwq->lock);              worker->task->flags &=~PF_WQ_WORKER;              return 0;       }       worker_leave_idle(worker);recheck:       /* no more worker necessary? *//* 如果有高优先级的work需要处理,而且当前已经没有空闲的workthread可以来处理掉这个高优先级work,那下一步就要创建新的workthread来处理掉,读者可自行分析need_more_woker()的实现。这里就体现了新机制对于高优先级先处理的方法。*/       if (!need_more_worker(gcwq))              goto sleep;/*新建一个workthread,可以看出,新的机制已经不想老的那样不管如何情况只要creatework queue就创建work thread,浪费内存资源。在manage_workers() -> maybe_create_worker ()-> create_worker (), create_worker()前面分析过了,它会creatework thread!*/       /* do we need to manage? */       if (unlikely(!may_start_working(gcwq))&& manage_workers(worker))              goto recheck;       /*        *->scheduled list can only be filled while a worker is        *preparing to process a work or actually processing it.        *Make sure nobody diddled with it while I was sleeping.        */       BUG_ON(!list_empty(&worker->scheduled));        /*        *When control reaches this point, we're guaranteed to have        *at least one idle worker or that someone else has already        *assumed the manager role.        */       worker_clr_flags(worker, WORKER_PREP);        do {              struct work_struct *work =                     list_first_entry(&gcwq->worklist,                                    struct work_struct, entry);/* 在创建里新的work thead去处理高优先级的work之后,终于轮到处理自己的work了。核心在process_one_work().*/              if (likely(!(*work_data_bits(work)& WORK_STRUCT_LINKED))) {                     /* optimization path, notstrictly necessary */                     process_one_work(worker,work);                     if(unlikely(!list_empty(&worker->scheduled)))                            process_scheduled_works(worker);              } else {                     move_linked_works(work,&worker->scheduled, NULL);                     process_scheduled_works(worker);              }       } while (keep_working(gcwq));       worker_set_flags(worker, WORKER_PREP,false);sleep:       /*在休眠之前,再一次判断当前有没有新的work需要处理。所以即使本work睡眠了,其他work也可以继续工作,这样就不会存在老的机制那样一个worksleep会阻塞其他work执行。*/       if(unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))              goto recheck;        /*        *gcwq->lock is held and there's no work to process and no        *need to manage, sleep.  Workers are wokenup only while        *holding gcwq->lock or from local cpu, so setting the        *current state before releasing gcwq->lock is enough to        *prevent losing any event.        */       worker_enter_idle(worker);       __set_current_state(TASK_INTERRUPTIBLE);       spin_unlock_irq(&gcwq->lock);       schedule();       goto woke_up;}

再来看看系统如何将work给处理掉:

static voidprocess_one_work(struct worker *worker, struct work_struct *work)__releases(&gcwq->lock)__acquires(&gcwq->lock){       struct cpu_workqueue_struct *cwq =get_work_cwq(work);       struct global_cwq *gcwq = cwq->gcwq;       struct hlist_head *bwh =busy_worker_head(gcwq, work);       bool cpu_intensive = cwq->wq->flags& WQ_CPU_INTENSIVE;/* 取出用户driver设置的函数*/       work_func_t f = work->func;       int work_color;       struct worker *collision;#ifdefCONFIG_LOCKDEP       /*        *It is permissible to free the struct work_struct from        *inside the function that is called from it, this we need to        *take into account for lockdep too.  Toavoid bogus "held        *lock freed" warnings as well as problems when looking into        *work->lockdep_map, make a copy and use that here.        */       struct lockdep_map lockdep_map =work->lockdep_map;#endif       /*        *A single work shouldn't be executed concurrently by        *multiple workers on a single cpu.  Checkwhether anyone is        *already processing the work.  If so,defer the work to the        *currently executing one.        */       collision =__find_worker_executing_work(gcwq, bwh, work);       if (unlikely(collision)) {              move_linked_works(work,&collision->scheduled, NULL);              return;       }        /* claim and process */       debug_work_deactivate(work);       hlist_add_head(&worker->hentry,bwh);       worker->current_work = work;       worker->current_cwq = cwq;       work_color = get_work_color(work);        /* record the current cpu number in thework data and dequeue */       set_work_cpu(work, gcwq->cpu);       list_del_init(&work->entry);        /*        *If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,        *wake up another worker; otherwise, clear HIGHPRI_PENDING.        *//* 如果全局的gcwq有高优先级的work需要处理,唤醒它执行!*/       if (unlikely(gcwq->flags &GCWQ_HIGHPRI_PENDING)) {              struct work_struct *nwork = list_first_entry(&gcwq->worklist,                                          structwork_struct, entry);              if(!list_empty(&gcwq->worklist) &&                 get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)                     /*唤醒高优先级的work所对应的workthread来工作。*/                     wake_up_worker(gcwq);              else                     gcwq->flags &=~GCWQ_HIGHPRI_PENDING;       }        /*        *CPU intensive works don't participate in concurrency        *management.  They're the scheduler'sresponsibility.        *//*如果当前有对时间敏感的work,那么如果有空闲的workthread的话,也要唤醒相应work thread来工作。*/       if (unlikely(cpu_intensive))              worker_set_flags(worker, WORKER_CPU_INTENSIVE,true);        spin_unlock_irq(&gcwq->lock);        work_clear_pending(work);       lock_map_acquire_read(&cwq->wq->lockdep_map);       lock_map_acquire(&lockdep_map);       trace_workqueue_execute_start(work);       /* 历经千辛万苦,终于跑到要调用的work functionpointer了!!!*/f(work);       /*        *While we must be careful to not use "work" after this, the trace        *point will only record its address.        */       /* 后面就是一些删除work,资源清楚释放,标志重设的工作了。*/       trace_workqueue_execute_end(work);       lock_map_release(&lockdep_map);       lock_map_release(&cwq->wq->lockdep_map);        if (unlikely(in_atomic() ||lockdep_depth(current) > 0)) {              printk(KERN_ERR "BUG:workqueue leaked lock or atomic: "                     "%s/0x%08x/%d\n",                     current->comm, preempt_count(),task_pid_nr(current));              printk(KERN_ERR "    last function: ");              print_symbol("%s\n",(unsigned long)f);              debug_show_held_locks(current);              dump_stack();       }       spin_lock_irq(&gcwq->lock);       /* clear cpu intensive status */       if (unlikely(cpu_intensive))              worker_clr_flags(worker,WORKER_CPU_INTENSIVE);       /* we're done with it, release */       hlist_del_init(&worker->hentry);       worker->current_work = NULL;       worker->current_cwq = NULL;       cwq_dec_nr_in_flight(cwq, work_color,false);} 

创建work queue:

Work thread如何处理掉work已经分析完了,然而对于前面init_workqueues()提到的system_wq是如何得到的还不清楚,另外一个问题:为什么说work thread不依赖于work queue了,下面我们来分析alloc_workqueue():

#define alloc_workqueue(name, flags, max_active)            \       __alloc_workqueue_key((name), (flags),(max_active), NULL, NULL)structworkqueue_struct *__alloc_workqueue_key(const char *name,                                         unsigned int flags,                                          int max_active,                                          struct lock_class_key *key,                                          const char *lock_name){       struct workqueue_struct *wq;       unsigned int cpu;        /*        *Workqueues which may be used during memory reclaim should        *have a rescuer to guarantee forward progress.        *//*WQ_MEM_RECLAIM表示当前内存资源是否紧张,都要执行我这个work.*/       if (flags & WQ_MEM_RECLAIM)              flags |= WQ_RESCUER;        /*        *Unbound workqueues aren't concurrency managed and should be        *dispatched to workers immediately.        */       /* WQ_UNBOUND 表示work不依赖于如何CPU,可以在任意CPU上运行。*/       if (flags & WQ_UNBOUND)              flags |= WQ_HIGHPRI;       /* max_active 限制任意一个CPU上能同时执行的最大work数量。*/       max_active = max_active ?: WQ_DFL_ACTIVE;       max_active =wq_clamp_max_active(max_active, flags, name);       /* 分配 workqueue_struct,将当前workqueue相对应的信息如name, flags等保存起来,其实我们已经知道,在workthread中,这些信息会被用到。*/       wq = kzalloc(sizeof(*wq), GFP_KERNEL);       if (!wq)              goto err;       wq->flags = flags;       wq->saved_max_active = max_active;       mutex_init(&wq->flush_mutex);       atomic_set(&wq->nr_cwqs_to_flush,0);       INIT_LIST_HEAD(&wq->flusher_queue);       INIT_LIST_HEAD(&wq->flusher_overflow);       wq->name = name;       lockdep_init_map(&wq->lockdep_map,lock_name, key, 0);       INIT_LIST_HEAD(&wq->list);       if (alloc_cwqs(wq) < 0)              goto err;       /* 初始化per cpu上的cpu_workqueue_struct信息。*/       for_each_cwq_cpu(cpu, wq) {              struct cpu_workqueue_struct *cwq =get_cwq(cpu, wq);              struct global_cwq *gcwq =get_gcwq(cpu);              BUG_ON((unsigned long)cwq &WORK_STRUCT_FLAG_MASK);              cwq->gcwq = gcwq;              cwq->wq = wq;              cwq->flush_color = -1;              cwq->max_active = max_active;              INIT_LIST_HEAD(&cwq->delayed_works);       }        if (flags & WQ_RESCUER) {              struct worker *rescuer;              if(!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))                     goto err;              wq->rescuer = rescuer =alloc_worker();              if (!rescuer)                     goto err;              rescuer->task =kthread_create(rescuer_thread, wq, "%s", name);              if (IS_ERR(rescuer->task))                     goto err;              rescuer->task->flags |=PF_THREAD_BOUND;              wake_up_process(rescuer->task);       }        /*        *workqueue_lock protects global freeze state and workqueues        *list.  Grab it, set max_activeaccordingly and add the new        *workqueue to workqueues list.        */       spin_lock(&workqueue_lock);       if (workqueue_freezing &&wq->flags & WQ_FREEZABLE)              for_each_cwq_cpu(cpu, wq)                     get_cwq(cpu,wq)->max_active = 0;       /* 将当前wq添加到workqueues里去。*/       list_add(&wq->list,&workqueues);       spin_unlock(&workqueue_lock);       return wq;err:       if (wq) {              free_cwqs(wq);              free_mayday_mask(wq->mayday_mask);              kfree(wq->rescuer);              kfree(wq);       }       return NULL;}EXPORT_SYMBOL_GPL(__alloc_workqueue_key);

新的机制虽然仍然保留了create_workqueue()和 create_singlethread_workqueue()这两个接口,但他们的实现最终其实调用的都是alloc_workqueue(),只是传的flags不一样。如前面所说,新机制的work queue里只有flags才会影响调度的顺序,work queue已经不重要了。

#definecreate_workqueue(name)                              \alloc_workqueue((name),WQ_MEM_RECLAIM, 1)#definecreate_freezable_workqueue(name)                \alloc_workqueue((name),WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, 1)#definecreate_singlethread_workqueue(name)                  \alloc_workqueue((name),WQ_UNBOUND | WQ_MEM_RECLAIM, 1)

挂起work:

再看work queue如何触发work:

queue_work –> queue_work_on-> __queue_work

static void__queue_work(unsigned int cpu, struct workqueue_struct *wq,                      struct work_struct *work){       struct global_cwq *gcwq;       struct cpu_workqueue_struct *cwq;       struct list_head *worklist;       unsigned int work_flags;       unsigned long flags;        debug_work_activate(work);        /* if dying, only works from the sameworkqueue are allowed */       if (unlikely(wq->flags & WQ_DYING)&&          WARN_ON_ONCE(!is_chained_work(wq)))              return;        /* determine gcwq to use */       /* 根据flags获取相应gcwq*/       if (!(wq->flags & WQ_UNBOUND)) {              struct global_cwq *last_gcwq;               if (unlikely(cpu ==WORK_CPU_UNBOUND))                     cpu =raw_smp_processor_id();               /*               * It's multi cpu.  If @wq is non-reentrant and @work               * was previously on a different cpu, it mightstill               * be running there, in which case the workneeds to               * be queued on that cpu to guaranteenon-reentrance.               */              gcwq = get_gcwq(cpu);              if (wq->flags &WQ_NON_REENTRANT &&                 (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {                     struct worker *worker;                      spin_lock_irqsave(&last_gcwq->lock,flags);                      worker =find_worker_executing_work(last_gcwq, work);                      if (worker &&worker->current_cwq->wq == wq)                            gcwq = last_gcwq;                     else {                            /* meh... notrunning there, queue here */                            spin_unlock_irqrestore(&last_gcwq->lock,flags);                            spin_lock_irqsave(&gcwq->lock,flags);                     }              } else                     spin_lock_irqsave(&gcwq->lock,flags);       } else {              gcwq = get_gcwq(WORK_CPU_UNBOUND);              spin_lock_irqsave(&gcwq->lock,flags);       }        /* gcwq determined, get cwq and queue */       cwq = get_cwq(gcwq->cpu, wq);       trace_workqueue_queue_work(cpu, cwq,work);        BUG_ON(!list_empty(&work->entry));        cwq->nr_in_flight[cwq->work_color]++;       work_flags =work_color_to_flags(cwq->work_color);        if (likely(cwq->nr_active <cwq->max_active)) {              trace_workqueue_activate_work(work);              cwq->nr_active++;              worklist =gcwq_determine_ins_pos(gcwq, cwq);       } else {              work_flags |= WORK_STRUCT_DELAYED;              worklist =&cwq->delayed_works;       }       /* 将当前work放到队列上等待执行。*/       insert_work(cwq, work, worklist,work_flags);        spin_unlock_irqrestore(&gcwq->lock,flags);} static voidinsert_work(struct cpu_workqueue_struct *cwq,                     struct work_struct *work,struct list_head *head,                     unsigned int extra_flags){       struct global_cwq *gcwq = cwq->gcwq;        /* we own @work, set data and link */       set_work_cwq(work, cwq, extra_flags);        /*        *Ensure that we get the right work->data if we see the        *result of list_add() below, see try_to_grab_pending().        */       smp_wmb();        list_add_tail(&work->entry, head);        /*        *Ensure either worker_sched_deactivated() sees the above        *list_add_tail() or we see zero nr_running to avoid workers        *lying around lazily while there are works to be processed.        */       smp_mb();       /* 如果当前有高优先级的work或者已经没有空闲的workthread了,well,那就再创建一个workthread来处理。*/       if (__need_more_worker(gcwq))              wake_up_worker(gcwq);}

至此,对work queue的工作机制都分析完了。可以看出,新的机制相对来说更灵活,而且基本上不会浪费内存资源,导致系统过量负载。

或许,不久的将来,create_workqueue()接口都将不复存在….

Reference:

http://lwn.net/Articles/403891/

http://gqf2008.iteye.com/blog/447060

kernel/documentation/Workqueue.txt

2012/08/10

人生不能缺乏的是雨夜--淅沥地,独自的雨夜。这样的雨夜里,

work queue in Linux 2.3.36

相关文章:

你感兴趣的文章:

标签云: