ocfs2_dlm: Calling post handler function in assert master handler
[linux-drm-fsl-dcu.git] / fs / ocfs2 / dlm / dlmthread.c
index 610dc76a851b985a5364c88006cfb47b549196c5..8ffa0916eb86fc1c111ae3964b156505f714c6e6 100644 (file)
@@ -54,9 +54,6 @@
 #include "cluster/masklog.h"
 
 static int dlm_thread(void *data);
-static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
-                                 struct dlm_lock_resource *lockres);
-
 static void dlm_flush_asts(struct dlm_ctxt *dlm);
 
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
@@ -82,14 +79,33 @@ repeat:
        current->state = TASK_RUNNING;
 }
 
-
-int __dlm_lockres_unused(struct dlm_lock_resource *res)
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
 {
        if (list_empty(&res->granted) &&
            list_empty(&res->converting) &&
-           list_empty(&res->blocked) &&
-           list_empty(&res->dirty))
-               return 1;
+           list_empty(&res->blocked))
+               return 0;
+       return 1;
+}
+
+/* "unused": the lockres has no locks, is not on the dirty list,
+ * has no inflight locks (in the gap between mastery and acquiring
+ * the first lock), and has no bits in its refmap.
+ * truly ready to be freed. */
+int __dlm_lockres_unused(struct dlm_lock_resource *res)
+{
+       if (!__dlm_lockres_has_locks(res) &&
+           (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
+               /* try not to scan the bitmap unless the first two
+                * conditions are already true */
+               int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
+               if (bit >= O2NM_MAX_NODES) {
+                       /* since the bit for dlm->node_num is not
+                        * set, inflight_locks better be zero */
+                       BUG_ON(res->inflight_locks != 0);
+                       return 1;
+               }
+       }
        return 0;
 }
 
@@ -107,31 +123,20 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 
        if (__dlm_lockres_unused(res)){
                if (list_empty(&res->purge)) {
-                       mlog(0, "putting lockres %.*s from purge list\n",
-                            res->lockname.len, res->lockname.name);
+                       mlog(0, "putting lockres %.*s:%p onto purge list\n",
+                            res->lockname.len, res->lockname.name, res);
 
                        res->last_used = jiffies;
+                       dlm_lockres_get(res);
                        list_add_tail(&res->purge, &dlm->purge_list);
                        dlm->purge_count++;
-
-                       /* if this node is not the owner, there is
-                        * no way to keep track of who the owner could be.
-                        * unhash it to avoid serious problems. */
-                       if (res->owner != dlm->node_num) {
-                               mlog(0, "%s:%.*s: doing immediate "
-                                    "purge of lockres owned by %u\n",
-                                    dlm->name, res->lockname.len,
-                                    res->lockname.name, res->owner);
-
-                               dlm_purge_lockres_now(dlm, res);
-                       }
                }
        } else if (!list_empty(&res->purge)) {
-               mlog(0, "removing lockres %.*s from purge list, "
-                    "owner=%u\n", res->lockname.len, res->lockname.name,
-                    res->owner);
+               mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
+                    res->lockname.len, res->lockname.name, res, res->owner);
 
                list_del_init(&res->purge);
+               dlm_lockres_put(res);
                dlm->purge_count--;
        }
 }
@@ -149,68 +154,65 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
        spin_unlock(&dlm->spinlock);
 }
 
-/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
- * to do migration, but will re-acquire before exit. */
-void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
+static int dlm_purge_lockres(struct dlm_ctxt *dlm,
+                            struct dlm_lock_resource *res)
 {
        int master;
-       int ret;
+       int ret = 0;
 
-       spin_lock(&lockres->spinlock);
-       master = lockres->owner == dlm->node_num;
-       spin_unlock(&lockres->spinlock);
-
-       mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
-            lockres->lockname.name, master);
-
-       /* Non master is the easy case -- no migration required, just
-        * quit. */
+       spin_lock(&res->spinlock);
+       if (!__dlm_lockres_unused(res)) {
+               spin_unlock(&res->spinlock);
+               mlog(0, "%s:%.*s: tried to purge but not unused\n",
+                    dlm->name, res->lockname.len, res->lockname.name);
+               return -ENOTEMPTY;
+       }
+       master = (res->owner == dlm->node_num);
        if (!master)
-               goto finish;
-
-       /* Wheee! Migrate lockres here! */
-       spin_unlock(&dlm->spinlock);
-again:
+               res->state |= DLM_LOCK_RES_DROPPING_REF;
+       spin_unlock(&res->spinlock);
 
-       ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
-       if (ret == -ENOTEMPTY) {
-               mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
-                    lockres->lockname.len, lockres->lockname.name);
+       mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
+            res->lockname.name, master);
 
-               BUG();
-       } else if (ret < 0) {
-               mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
-                    lockres->lockname.len, lockres->lockname.name);
-               msleep(100);
-               goto again;
+       if (!master) {
+               spin_lock(&res->spinlock);
+               /* This ensures that clear refmap is sent after the set */
+               __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+               spin_unlock(&res->spinlock);
+               /* drop spinlock to do messaging, retake below */
+               spin_unlock(&dlm->spinlock);
+               /* clear our bit from the master's refmap, ignore errors */
+               ret = dlm_drop_lockres_ref(dlm, res);
+               if (ret < 0) {
+                       mlog_errno(ret);
+                       if (!dlm_is_host_down(ret))
+                               BUG();
+               }
+               mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
+                    dlm->name, res->lockname.len, res->lockname.name, ret);
+               spin_lock(&dlm->spinlock);
        }
 
-       spin_lock(&dlm->spinlock);
-
-finish:
-       if (!list_empty(&lockres->purge)) {
-               list_del_init(&lockres->purge);
+       if (!list_empty(&res->purge)) {
+               mlog(0, "removing lockres %.*s:%p from purgelist, "
+                    "master = %d\n", res->lockname.len, res->lockname.name,
+                    res, master);
+               list_del_init(&res->purge);
+               dlm_lockres_put(res);
                dlm->purge_count--;
        }
-       __dlm_unhash_lockres(lockres);
-}
-
-/* make an unused lockres go away immediately.
- * as soon as the dlm spinlock is dropped, this lockres
- * will not be found. kfree still happens on last put. */
-static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
-                                 struct dlm_lock_resource *lockres)
-{
-       assert_spin_locked(&dlm->spinlock);
-       assert_spin_locked(&lockres->spinlock);
-
-       BUG_ON(!__dlm_lockres_unused(lockres));
+       __dlm_unhash_lockres(res);
 
-       if (!list_empty(&lockres->purge)) {
-               list_del_init(&lockres->purge);
-               dlm->purge_count--;
+       /* lockres is not in the hash now.  drop the flag and wake up
+        * any processes waiting in dlm_get_lock_resource. */
+       if (!master) {
+               spin_lock(&res->spinlock);
+               res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+               spin_unlock(&res->spinlock);
+               wake_up(&res->wq);
        }
-       __dlm_unhash_lockres(lockres);
+       return 0;
 }
 
 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -254,13 +256,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
                        break;
                }
 
+               mlog(0, "removing lockres %.*s:%p from purgelist\n",
+                    lockres->lockname.len, lockres->lockname.name, lockres);
                list_del_init(&lockres->purge);
+               dlm_lockres_put(lockres);
                dlm->purge_count--;
 
                /* This may drop and reacquire the dlm spinlock if it
                 * has to do migration. */
                mlog(0, "calling dlm_purge_lockres!\n");
-               dlm_purge_lockres(dlm, lockres);
+               if (dlm_purge_lockres(dlm, lockres))
+                       BUG();
                mlog(0, "DONE calling dlm_purge_lockres!\n");
 
                /* Avoid adding any scheduling latencies */
@@ -453,12 +459,17 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
        assert_spin_locked(&res->spinlock);
 
        /* don't shuffle secondary queues */
-       if ((res->owner == dlm->node_num) &&
-           !(res->state & DLM_LOCK_RES_DIRTY)) {
-               /* ref for dirty_list */
-               dlm_lockres_get(res);
-               list_add_tail(&res->dirty, &dlm->dirty_list);
-               res->state |= DLM_LOCK_RES_DIRTY;
+       if ((res->owner == dlm->node_num)) {
+               if (res->state & (DLM_LOCK_RES_MIGRATING |
+                                 DLM_LOCK_RES_BLOCK_DIRTY))
+                   return;
+
+               if (list_empty(&res->dirty)) {
+                       /* ref for dirty_list */
+                       dlm_lockres_get(res);
+                       list_add_tail(&res->dirty, &dlm->dirty_list);
+                       res->state |= DLM_LOCK_RES_DIRTY;
+               }
        }
 }
 
@@ -637,7 +648,7 @@ static int dlm_thread(void *data)
                        dlm_lockres_get(res);
 
                        spin_lock(&res->spinlock);
-                       res->state &= ~DLM_LOCK_RES_DIRTY;
+                       /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
                        list_del_init(&res->dirty);
                        spin_unlock(&res->spinlock);
                        spin_unlock(&dlm->spinlock);
@@ -661,10 +672,11 @@ static int dlm_thread(void *data)
                        /* it is now ok to move lockreses in these states
                         * to the dirty list, assuming that they will only be
                         * dirty for a short while. */
+                       BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
                        if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
-                                         DLM_LOCK_RES_MIGRATING |
                                          DLM_LOCK_RES_RECOVERING)) {
                                /* move it to the tail and keep going */
+                               res->state &= ~DLM_LOCK_RES_DIRTY;
                                spin_unlock(&res->spinlock);
                                mlog(0, "delaying list shuffling for in-"
                                     "progress lockres %.*s, state=%d\n",
@@ -685,6 +697,7 @@ static int dlm_thread(void *data)
 
                        /* called while holding lockres lock */
                        dlm_shuffle_lists(dlm, res);
+                       res->state &= ~DLM_LOCK_RES_DIRTY;
                        spin_unlock(&res->spinlock);
 
                        dlm_lockres_calc_usage(dlm, res);
@@ -695,11 +708,8 @@ in_progress:
                        /* if the lock was in-progress, stick
                         * it on the back of the list */
                        if (delay) {
-                               /* ref for dirty_list */
-                               dlm_lockres_get(res);
                                spin_lock(&res->spinlock);
-                               list_add_tail(&res->dirty, &dlm->dirty_list);
-                               res->state |= DLM_LOCK_RES_DIRTY;
+                               __dlm_dirty_lockres(dlm, res);
                                spin_unlock(&res->spinlock);
                        }
                        dlm_lockres_put(res);
@@ -717,7 +727,7 @@ in_progress:
 
                /* yield and continue right away if there is more work to do */
                if (!n) {
-                       yield();
+                       cond_resched();
                        continue;
                }