实战Java内存泄漏问题分析

hazelcast 提供了3中方法调用startCleanup:第一种是在ConcuurentMapManager的构造函数中,通过调用node的executorManager中的ScheduledExecutorService来创建每秒执行一次cleanup操作的线程(代码如下)。由于这是ConcuurentMapManager构造函数的代码,所以这种调用startCleanup的操作是默认就会有的。

node.executorManager.getScheduledExecutorService().scheduleAtFixedRate(new Runnable() {           publicvoid run() {               for (CMap cMap : maps.values()) {                   cMap.startCleanup(false);               }           }        }, 1, 1, TimeUnit.SECONDS);

第二种是通过配置文件来触发startCleanup的执行,配置 PutOperationhandlerif overcapacity policy,我们系统的配置文件没有配置这方面的policy,所有这种方式在我们系统中没有使用。

第三种是自己直接写代码去调用startCleanup函数(public方法,线程安全的). 这个没有实现在我们的系统中。

所以我的调查方向放在了第一种调用的情况,hazelcast里面的ScheduledExecutorService是通过java.util.ScheduledThreadPoolExecutor 来实现的.

esScheduled = new ScheduledThreadPoolExecutor(5, new ExecutorThreadFactory(node.threadGroup,                node.getThreadPoolNamePrefix("scheduled"), classLoader), new RejectionHandler()) {            protected void beforeExecute(Thread t, Runnable r) {                threadPoolBeforeExecute(t, r);            }        }

查看ScheduledThreadPoolExecutor的实现,它把线程实现分成了3个部分: runnable tasks可执行任务, workers to execute the tasks执行任务的具体线程 以及 ScheduledThreadPoolExecutor 调度workers按照要求执行runnable tasks。我们通过scheduleAtFixdRate提交了task,scheduleAtFixedRate先把它打包成重复执行的ScheduleFutureTask

<pre name="code" class="java">    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                                  long initialDelay,                                                  long period,                                                  TimeUnit unit) {        if (command == null || unit == null)            throw new NullPointerException();        if (period <= 0)            throw new IllegalArgumentException();        RunnableScheduledFuture<?> t = decorateTask(command,            new <strong>ScheduledFutureTas</strong>k<Object>(command,                                            null,                                            triggerTime(initialDelay, unit),                                            unit.toNanos(period)));        delayedExecute(t);        return t;}

ScheduleFutureTask的run方法实现重新schedule:

public void  run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic))  cancel(false); else if (!periodic)  ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) {  setNextRunTime(); <strong> reExecutePeriodic(outerTask);</strong> }}

delayedExecute里面如果当前worker的数目小于初始化定义的CorePool的数目,就创建新的worker线程,然后把task放到queue里面

private void delayedExecute(Runnable command) {        if (isShutdown()) {            reject(command);            return;        }        // Prestart a thread if necessary. We cannot prestart it        // running the task because the task (probably) shouldn't be        // run yet, so thread will just idle until delay elapses.        if (getPoolSize() < getCorePoolSize())            prestartCoreThread();       <strong> super.getQueue().add(command);</strong>} public boolean prestartCoreThread() {        return addIfUnderCorePoolSize(null);    }    private boolean addIfUnderCorePoolSize(Runnable firstTask) {        Thread t = null;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (poolSize < corePoolSize && runState == RUNNING)                t = addThread(firstTask);        } finally {            mainLock.unlock();        }        return t != null;    }private Thread addThread(Runnable firstTask) {        Worker w = new Worker(firstTask);        Thread t = threadFactory.newThread(w);        boolean workerStarted = false;        if (t != null) {            if (t.isAlive()) // precheck that t is startable                throw new IllegalThreadStateException();            w.thread = t;            workers.add(w);            int nt = ++poolSize;            if (nt > largestPoolSize)                largestPoolSize = nt;            try {                t.start();                workerStarted = true;            }            finally {                if (!workerStarted)                    workers.remove(w);            }        }        return t;}

所有启动的worker就做一件事情,从queue中取task执行

     try {                hasRun = true;                Runnable task = firstTask;                firstTask = null;                while (task != null || (task = <strong>getTask</strong>()) != null) {                    <strong>runTask(task);</strong>                    task = null;                }            } finally {                workerDone(this);            }        }    }    Runnable getTask() {       <strong> for (;;) {</strong>            try {                int state = runState;                if (state > SHUTDOWN)                    return null;                Runnable r;                if (state == SHUTDOWN)  // Help drain queue                    r = workQueue.poll();                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);                else                   <strong> r = workQueue.take();</strong>                if (r != null)                    return r;                if (workerCanExit()) {                    if (runState >= SHUTDOWN) // Wake up others                        interruptIdleWorkers();                    return null;                }                // Else retry            } catch (InterruptedException ie) {                // On interruption, re-check runState            }        }}private void runTask(Runnable task) {            final ReentrantLock runLock = this.runLock;            runLock.lock();            try {                if ((runState >= STOP ||                    (Thread.interrupted() && runState >= STOP)) &&                    hasRun)                    thread.interrupt();                boolean ran = false;                beforeExecute(thread, task);               <strong> try {                    task.run();                    ran = true;                    afterExecute(task, null);                    ++completedTasks;                } catch (RuntimeException ex) {                    if (!ran)                        afterExecute(task, ex);                    throw ex;                }</strong>            } finally {                runLock.unlock();            }        }

了解了java threadpool的工作原理之后,我们可以知道,startCleanup是代码pass给ScheduledThreadPoolExecutor的runnable task,它不被执行,可能的原因有:

1. ScheduledThreadPoolExecutor初始化时候出错,task完全没有提交成功。由于lastCleanup并不是系统应用的启动时间,已经过了几个月了,所以,很明显在系统初始化的时候,esScheduled(ScheduledThreadPoolExecutor)还是正常工作的,只是突然在2月4号停止了工作,所以这种可能性可以排除。2. Worker 没有正常工作,不在从ScheduledThreadPoolExecutor的queue里面取数据,这个很快就被我排除了:

首先heap dump中有5个pending workers in esScheduled (0/2/3/5/9):

其次从thread dump中可以看出,这五个线程都是在等着从queue里面取数据:

    ……   <strong> at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)[optimiz</strong>ed]    at java/util/concurrent/DelayQueue.take(DelayQueue.java:164)[optimized]    at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)[inlined]    at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)[optimized]    at java/util/concurrent/ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)[optimized]    at java/util/concurrent/ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)    at java/lang/Thread.run(Thread.java:662)    at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method)    -- end of tracehz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-2" id=51 idx=0xd8 tid=32639 prio=5 alive, parked, native_blockedhz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-3" id=52 idx=0xdc tid=32640 prio=5 alive, parked, native_blockedhz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-4" id=53 idx=0xe0 tid=32641 prio=5 alive, parked, native_blockedhz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-5" id=75590 idx=0x3cc tid=3308 prio=5 alive, parked, native_blocked

所以worker不正常也被排除了。

3. 我们提交给系统的runner task自动从queue里面消失了,从memory dump中确实发现queue没有tasks了而没有task的原因很明显是因为当前task执行完之后没有重新reschedule,至于原因,由于scheduledFutrueTask已经不存在,无法从memory dump和thread dump中分析出结果,成为了一个谜。。。。。。

public void  run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic))  cancel(false); else if (!periodic)  ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) {  setNextRunTime(); <strong> reExecutePeriodic(outerTask);</strong> }}

我想有一天和你去旅行。去那没有去过的地方,

实战Java内存泄漏问题分析

相关文章:

你感兴趣的文章:

标签云: