ocfs2_dlm: Calling post handler function in assert master handler
[linux-drm-fsl-dcu.git] / fs / ocfs2 / dlm / dlmthread.c
index 5be9d14f12cbcddf4ef51cab689d5ce53ea1f1db..8ffa0916eb86fc1c111ae3964b156505f714c6e6 100644 (file)
@@ -39,6 +39,7 @@
 #include <linux/inet.h>
 #include <linux/timer.h>
 #include <linux/kthread.h>
+#include <linux/delay.h>
 
 
 #include "cluster/heartbeat.h"
@@ -53,7 +54,6 @@
 #include "cluster/masklog.h"
 
 static int dlm_thread(void *data);
-
 static void dlm_flush_asts(struct dlm_ctxt *dlm);
 
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
@@ -79,14 +79,33 @@ repeat:
        current->state = TASK_RUNNING;
 }
 
-
-static int __dlm_lockres_unused(struct dlm_lock_resource *res)
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
 {
        if (list_empty(&res->granted) &&
            list_empty(&res->converting) &&
-           list_empty(&res->blocked) &&
-           list_empty(&res->dirty))
-               return 1;
+           list_empty(&res->blocked))
+               return 0;
+       return 1;
+}
+
+/* "unused": the lockres has no locks, is not on the dirty list,
+ * has no inflight locks (in the gap between mastery and acquiring
+ * the first lock), and has no bits in its refmap.
+ * truly ready to be freed. */
+int __dlm_lockres_unused(struct dlm_lock_resource *res)
+{
+       if (!__dlm_lockres_has_locks(res) &&
+           (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
+               /* try not to scan the bitmap unless the first two
+                * conditions are already true */
+               int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
+               if (bit >= O2NM_MAX_NODES) {
+                       /* since the bit for dlm->node_num is not
+                        * set, inflight_locks better be zero */
+                       BUG_ON(res->inflight_locks != 0);
+                       return 1;
+               }
+       }
        return 0;
 }
 
@@ -104,18 +123,20 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 
        if (__dlm_lockres_unused(res)){
                if (list_empty(&res->purge)) {
-                       mlog(0, "putting lockres %.*s from purge list\n",
-                            res->lockname.len, res->lockname.name);
+                       mlog(0, "putting lockres %.*s:%p onto purge list\n",
+                            res->lockname.len, res->lockname.name, res);
 
                        res->last_used = jiffies;
+                       dlm_lockres_get(res);
                        list_add_tail(&res->purge, &dlm->purge_list);
                        dlm->purge_count++;
                }
        } else if (!list_empty(&res->purge)) {
-               mlog(0, "removing lockres %.*s from purge list\n",
-                    res->lockname.len, res->lockname.name);
+               mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
+                    res->lockname.len, res->lockname.name, res, res->owner);
 
                list_del_init(&res->purge);
+               dlm_lockres_put(res);
                dlm->purge_count--;
        }
 }
@@ -133,49 +154,65 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
        spin_unlock(&dlm->spinlock);
 }
 
-/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
- * to do migration, but will re-acquire before exit. */
-void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
+static int dlm_purge_lockres(struct dlm_ctxt *dlm,
+                            struct dlm_lock_resource *res)
 {
        int master;
-       int ret;
+       int ret = 0;
 
-       spin_lock(&lockres->spinlock);
-       master = lockres->owner == dlm->node_num;
-       spin_unlock(&lockres->spinlock);
-
-       mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
-            lockres->lockname.name, master);
-
-       /* Non master is the easy case -- no migration required, just
-        * quit. */
+       spin_lock(&res->spinlock);
+       if (!__dlm_lockres_unused(res)) {
+               spin_unlock(&res->spinlock);
+               mlog(0, "%s:%.*s: tried to purge but not unused\n",
+                    dlm->name, res->lockname.len, res->lockname.name);
+               return -ENOTEMPTY;
+       }
+       master = (res->owner == dlm->node_num);
        if (!master)
-               goto finish;
-
-       /* Wheee! Migrate lockres here! */
-       spin_unlock(&dlm->spinlock);
-again:
+               res->state |= DLM_LOCK_RES_DROPPING_REF;
+       spin_unlock(&res->spinlock);
 
-       ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
-       if (ret == -ENOTEMPTY) {
-               mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
-                    lockres->lockname.len, lockres->lockname.name);
+       mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
+            res->lockname.name, master);
 
-               BUG();
-       } else if (ret < 0) {
-               mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
-                    lockres->lockname.len, lockres->lockname.name);
-               goto again;
+       if (!master) {
+               spin_lock(&res->spinlock);
+               /* This ensures that clear refmap is sent after the set */
+               __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+               spin_unlock(&res->spinlock);
+               /* drop spinlock to do messaging, retake below */
+               spin_unlock(&dlm->spinlock);
+               /* clear our bit from the master's refmap, ignore errors */
+               ret = dlm_drop_lockres_ref(dlm, res);
+               if (ret < 0) {
+                       mlog_errno(ret);
+                       if (!dlm_is_host_down(ret))
+                               BUG();
+               }
+               mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
+                    dlm->name, res->lockname.len, res->lockname.name, ret);
+               spin_lock(&dlm->spinlock);
        }
 
-       spin_lock(&dlm->spinlock);
-
-finish:
-       if (!list_empty(&lockres->purge)) {
-               list_del_init(&lockres->purge);
+       if (!list_empty(&res->purge)) {
+               mlog(0, "removing lockres %.*s:%p from purgelist, "
+                    "master = %d\n", res->lockname.len, res->lockname.name,
+                    res, master);
+               list_del_init(&res->purge);
+               dlm_lockres_put(res);
                dlm->purge_count--;
        }
-       __dlm_unhash_lockres(lockres);
+       __dlm_unhash_lockres(res);
+
+       /* lockres is not in the hash now.  drop the flag and wake up
+        * any processes waiting in dlm_get_lock_resource. */
+       if (!master) {
+               spin_lock(&res->spinlock);
+               res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+               spin_unlock(&res->spinlock);
+               wake_up(&res->wq);
+       }
+       return 0;
 }
 
 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -219,13 +256,17 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
                        break;
                }
 
+               mlog(0, "removing lockres %.*s:%p from purgelist\n",
+                    lockres->lockname.len, lockres->lockname.name, lockres);
                list_del_init(&lockres->purge);
+               dlm_lockres_put(lockres);
                dlm->purge_count--;
 
                /* This may drop and reacquire the dlm spinlock if it
                 * has to do migration. */
                mlog(0, "calling dlm_purge_lockres!\n");
-               dlm_purge_lockres(dlm, lockres);
+               if (dlm_purge_lockres(dlm, lockres))
+                       BUG();
                mlog(0, "DONE calling dlm_purge_lockres!\n");
 
                /* Avoid adding any scheduling latencies */
@@ -318,8 +359,7 @@ converting:
 
                target->ml.type = target->ml.convert_type;
                target->ml.convert_type = LKM_IVMODE;
-               list_del_init(&target->list);
-               list_add_tail(&target->list, &res->granted);
+               list_move_tail(&target->list, &res->granted);
 
                BUG_ON(!target->lksb);
                target->lksb->status = DLM_NORMAL;
@@ -380,8 +420,7 @@ blocked:
                     target->ml.type, target->ml.node);
 
                // target->ml.type is already correct
-               list_del_init(&target->list);
-               list_add_tail(&target->list, &res->granted);
+               list_move_tail(&target->list, &res->granted);
 
                BUG_ON(!target->lksb);
                target->lksb->status = DLM_NORMAL;
@@ -420,10 +459,17 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
        assert_spin_locked(&res->spinlock);
 
        /* don't shuffle secondary queues */
-       if ((res->owner == dlm->node_num) &&
-           !(res->state & DLM_LOCK_RES_DIRTY)) {
-               list_add_tail(&res->dirty, &dlm->dirty_list);
-               res->state |= DLM_LOCK_RES_DIRTY;
+       if ((res->owner == dlm->node_num)) {
+               if (res->state & (DLM_LOCK_RES_MIGRATING |
+                                 DLM_LOCK_RES_BLOCK_DIRTY))
+                   return;
+
+               if (list_empty(&res->dirty)) {
+                       /* ref for dirty_list */
+                       dlm_lockres_get(res);
+                       list_add_tail(&res->dirty, &dlm->dirty_list);
+                       res->state |= DLM_LOCK_RES_DIRTY;
+               }
        }
 }
 
@@ -602,10 +648,12 @@ static int dlm_thread(void *data)
                        dlm_lockres_get(res);
 
                        spin_lock(&res->spinlock);
-                       res->state &= ~DLM_LOCK_RES_DIRTY;
+                       /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
                        list_del_init(&res->dirty);
                        spin_unlock(&res->spinlock);
                        spin_unlock(&dlm->spinlock);
+                       /* Drop dirty_list ref */
+                       dlm_lockres_put(res);
 
                        /* lockres can be re-dirtied/re-added to the
                         * dirty_list in this gap, but that is ok */
@@ -624,10 +672,11 @@ static int dlm_thread(void *data)
                        /* it is now ok to move lockreses in these states
                         * to the dirty list, assuming that they will only be
                         * dirty for a short while. */
+                       BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
                        if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
-                                         DLM_LOCK_RES_MIGRATING |
                                          DLM_LOCK_RES_RECOVERING)) {
                                /* move it to the tail and keep going */
+                               res->state &= ~DLM_LOCK_RES_DIRTY;
                                spin_unlock(&res->spinlock);
                                mlog(0, "delaying list shuffling for in-"
                                     "progress lockres %.*s, state=%d\n",
@@ -642,11 +691,13 @@ static int dlm_thread(void *data)
                         * spinlock and do NOT have the dlm lock.
                         * safe to reserve/queue asts and run the lists. */
 
-                       mlog(0, "calling dlm_shuffle_lists with dlm=%p, "
-                            "res=%p\n", dlm, res);
+                       mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
+                            "res=%.*s\n", dlm->name,
+                            res->lockname.len, res->lockname.name);
 
                        /* called while holding lockres lock */
                        dlm_shuffle_lists(dlm, res);
+                       res->state &= ~DLM_LOCK_RES_DIRTY;
                        spin_unlock(&res->spinlock);
 
                        dlm_lockres_calc_usage(dlm, res);
@@ -658,8 +709,7 @@ in_progress:
                         * it on the back of the list */
                        if (delay) {
                                spin_lock(&res->spinlock);
-                               list_add_tail(&res->dirty, &dlm->dirty_list);
-                               res->state |= DLM_LOCK_RES_DIRTY;
+                               __dlm_dirty_lockres(dlm, res);
                                spin_unlock(&res->spinlock);
                        }
                        dlm_lockres_put(res);
@@ -677,7 +727,7 @@ in_progress:
 
                /* yield and continue right away if there is more work to do */
                if (!n) {
-                       yield();
+                       cond_resched();
                        continue;
                }