dlm: adopt orphan locks
authorDavid Teigland <teigland@redhat.com>
Fri, 17 Oct 2014 16:05:50 +0000 (11:05 -0500)
committerDavid Teigland <teigland@redhat.com>
Wed, 19 Nov 2014 20:48:02 +0000 (14:48 -0600)
A process may exit, leaving an orphan lock in the lockspace.
This adds the capability for another process to acquire the
orphan lock.  Acquiring the orphan just moves the lock from
the orphan list onto the acquiring process's list of locks.

An adopting process must specify the resource name and mode
of the lock it wants to adopt.  If a matching lock is found,
the lock is moved to the caller's 's list of locks, and the
lkid of the lock is returned like the lkid of a new lock.

If an orphan with a different mode is found, then -EAGAIN is
returned.  If no orphan lock is found on the resource, then
-ENOENT is returned.  No async completion is used because
the result is immediately available.

Also, when orphans are purged, allow a zero nodeid to refer
to the local nodeid so the caller does not need to look up
the local nodeid.

Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/lock.c
fs/dlm/lock.h
fs/dlm/user.c
include/uapi/linux/dlmconstants.h

index 83f3d5520307f86c1e4ba1079b8635358d7bac08..35502d4046f573fac4909cceec1117ef033338e6 100644 (file)
@@ -5886,6 +5886,78 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        return error;
 }
 
+/*
+ * The caller asks for an orphan lock on a given resource with a given mode.
+ * If a matching lock exists, it's moved to the owner's list of locks and
+ * the lkid is returned.
+ */
+
+int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+                    int mode, uint32_t flags, void *name, unsigned int namelen,
+                    unsigned long timeout_cs, uint32_t *lkid)
+{
+       struct dlm_lkb *lkb;
+       struct dlm_user_args *ua;
+       int found_other_mode = 0;
+       int found = 0;
+       int rv = 0;
+
+       mutex_lock(&ls->ls_orphans_mutex);
+       list_for_each_entry(lkb, &ls->ls_orphans, lkb_ownqueue) {
+               if (lkb->lkb_resource->res_length != namelen)
+                       continue;
+               if (memcmp(lkb->lkb_resource->res_name, name, namelen))
+                       continue;
+               if (lkb->lkb_grmode != mode) {
+                       found_other_mode = 1;
+                       continue;
+               }
+
+               found = 1;
+               list_del_init(&lkb->lkb_ownqueue);
+               lkb->lkb_flags &= ~DLM_IFL_ORPHAN;
+               *lkid = lkb->lkb_id;
+               break;
+       }
+       mutex_unlock(&ls->ls_orphans_mutex);
+
+       if (!found && found_other_mode) {
+               rv = -EAGAIN;
+               goto out;
+       }
+
+       if (!found) {
+               rv = -ENOENT;
+               goto out;
+       }
+
+       lkb->lkb_exflags = flags;
+       lkb->lkb_ownpid = (int) current->pid;
+
+       ua = lkb->lkb_ua;
+
+       ua->proc = ua_tmp->proc;
+       ua->xid = ua_tmp->xid;
+       ua->castparam = ua_tmp->castparam;
+       ua->castaddr = ua_tmp->castaddr;
+       ua->bastparam = ua_tmp->bastparam;
+       ua->bastaddr = ua_tmp->bastaddr;
+       ua->user_lksb = ua_tmp->user_lksb;
+
+       /*
+        * The lkb reference from the ls_orphans list was not
+        * removed above, and is now considered the reference
+        * for the proc locks list.
+        */
+
+       spin_lock(&ua->proc->locks_spin);
+       list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
+       spin_unlock(&ua->proc->locks_spin);
+ out:
+       kfree(ua_tmp);
+       return rv;
+}
+
 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
                    uint32_t flags, uint32_t lkid, char *lvb_in)
 {
@@ -6029,7 +6101,7 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
        struct dlm_args args;
        int error;
 
-       hold_lkb(lkb);
+       hold_lkb(lkb); /* reference for the ls_orphans list */
        mutex_lock(&ls->ls_orphans_mutex);
        list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
        mutex_unlock(&ls->ls_orphans_mutex);
@@ -6217,7 +6289,7 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
 {
        int error = 0;
 
-       if (nodeid != dlm_our_nodeid()) {
+       if (nodeid && (nodeid != dlm_our_nodeid())) {
                error = send_purge(ls, nodeid, pid);
        } else {
                dlm_lock_recovery(ls);
index 5e0c72e36a9b3acd7cbd125da1399a75356ee596..ed8ebd3a8593a6a27690cbb149108b929e84d07f 100644 (file)
@@ -49,6 +49,9 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
        unsigned long timeout_cs);
+int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+       int mode, uint32_t flags, void *name, unsigned int namelen,
+       unsigned long timeout_cs, uint32_t *lkid);
 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        uint32_t flags, uint32_t lkid, char *lvb_in);
 int dlm_user_cancel(struct dlm_ls *ls,  struct dlm_user_args *ua_tmp,
index 142e21655eed76e2479404d592f5596c4b745f37..fb85f32e9eca66f238d3cfb97a82781be626d0b2 100644 (file)
@@ -238,6 +238,7 @@ static int device_user_lock(struct dlm_user_proc *proc,
 {
        struct dlm_ls *ls;
        struct dlm_user_args *ua;
+       uint32_t lkid;
        int error = -ENOMEM;
 
        ls = dlm_find_lockspace_local(proc->lockspace);
@@ -260,12 +261,20 @@ static int device_user_lock(struct dlm_user_proc *proc,
        ua->bastaddr = params->bastaddr;
        ua->xid = params->xid;
 
-       if (params->flags & DLM_LKF_CONVERT)
+       if (params->flags & DLM_LKF_CONVERT) {
                error = dlm_user_convert(ls, ua,
                                         params->mode, params->flags,
                                         params->lkid, params->lvb,
                                         (unsigned long) params->timeout);
-       else {
+       } else if (params->flags & DLM_LKF_ORPHAN) {
+               error = dlm_user_adopt_orphan(ls, ua,
+                                        params->mode, params->flags,
+                                        params->name, params->namelen,
+                                        (unsigned long) params->timeout,
+                                        &lkid);
+               if (!error)
+                       error = lkid;
+       } else {
                error = dlm_user_request(ls, ua,
                                         params->mode, params->flags,
                                         params->name, params->namelen,
index 47bf08dc75665a1a2163ff63145cca89bf900a7e..2857bdc5b27b871f11f1ee7fba6d16949b7cbe9f 100644 (file)
  *
  * DLM_LKF_ORPHAN
  *
- * not yet implemented
+ * Acquire an orphan lock.
  *
  * DLM_LKF_ALTPR
  *