Merge tag 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/dledford/rdma
[linux-drm-fsl-dcu.git] / net / sunrpc / xprtrdma / svc_rdma_transport.c
index b348b4adef29a48246709cc7f32cf576865753eb..5763825d09bf776bfa89f0007be1805203d96adf 100644 (file)
@@ -153,18 +153,76 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt)
 }
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
-struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
+                                          gfp_t flags)
 {
        struct svc_rdma_op_ctxt *ctxt;
 
-       ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
-                               GFP_KERNEL | __GFP_NOFAIL);
-       ctxt->xprt = xprt;
-       INIT_LIST_HEAD(&ctxt->dto_q);
+       ctxt = kmalloc(sizeof(*ctxt), flags);
+       if (ctxt) {
+               ctxt->xprt = xprt;
+               INIT_LIST_HEAD(&ctxt->free);
+               INIT_LIST_HEAD(&ctxt->dto_q);
+       }
+       return ctxt;
+}
+
+static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
+{
+       unsigned int i;
+
+       /* Each RPC/RDMA credit can consume a number of send
+        * and receive WQEs. One ctxt is allocated for each.
+        */
+       i = xprt->sc_sq_depth + xprt->sc_rq_depth;
+
+       while (i--) {
+               struct svc_rdma_op_ctxt *ctxt;
+
+               ctxt = alloc_ctxt(xprt, GFP_KERNEL);
+               if (!ctxt) {
+                       dprintk("svcrdma: No memory for RDMA ctxt\n");
+                       return false;
+               }
+               list_add(&ctxt->free, &xprt->sc_ctxts);
+       }
+       return true;
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+       struct svc_rdma_op_ctxt *ctxt = NULL;
+
+       spin_lock_bh(&xprt->sc_ctxt_lock);
+       xprt->sc_ctxt_used++;
+       if (list_empty(&xprt->sc_ctxts))
+               goto out_empty;
+
+       ctxt = list_first_entry(&xprt->sc_ctxts,
+                               struct svc_rdma_op_ctxt, free);
+       list_del_init(&ctxt->free);
+       spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+out:
        ctxt->count = 0;
        ctxt->frmr = NULL;
-       atomic_inc(&xprt->sc_ctxt_used);
        return ctxt;
+
+out_empty:
+       /* Either pre-allocation missed the mark, or send
+        * queue accounting is broken.
+        */
+       spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+       ctxt = alloc_ctxt(xprt, GFP_NOIO);
+       if (ctxt)
+               goto out;
+
+       spin_lock_bh(&xprt->sc_ctxt_lock);
+       xprt->sc_ctxt_used--;
+       spin_unlock_bh(&xprt->sc_ctxt_lock);
+       WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
+       return NULL;
 }
 
 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
@@ -174,11 +232,11 @@ void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
        for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
                /*
                 * Unmap the DMA addr in the SGE if the lkey matches
-                * the sc_dma_lkey, otherwise, ignore it since it is
+                * the local_dma_lkey, otherwise, ignore it since it is
                 * an FRMR lkey and will be unmapped later when the
                 * last WR that uses it completes.
                 */
-               if (ctxt->sge[i].lkey == xprt->sc_dma_lkey) {
+               if (ctxt->sge[i].lkey == xprt->sc_pd->local_dma_lkey) {
                        atomic_dec(&xprt->sc_dma_used);
                        ib_dma_unmap_page(xprt->sc_cm_id->device,
                                            ctxt->sge[i].addr,
@@ -190,35 +248,108 @@ void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
 
 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
 {
-       struct svcxprt_rdma *xprt;
+       struct svcxprt_rdma *xprt = ctxt->xprt;
        int i;
 
-       xprt = ctxt->xprt;
        if (free_pages)
                for (i = 0; i < ctxt->count; i++)
                        put_page(ctxt->pages[i]);
 
-       kmem_cache_free(svc_rdma_ctxt_cachep, ctxt);
-       atomic_dec(&xprt->sc_ctxt_used);
+       spin_lock_bh(&xprt->sc_ctxt_lock);
+       xprt->sc_ctxt_used--;
+       list_add(&ctxt->free, &xprt->sc_ctxts);
+       spin_unlock_bh(&xprt->sc_ctxt_lock);
 }
 
-/*
- * Temporary NFS req mappings are shared across all transport
- * instances. These are short lived and should be bounded by the number
- * of concurrent server threads * depth of the SQ.
- */
-struct svc_rdma_req_map *svc_rdma_get_req_map(void)
+static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
+{
+       while (!list_empty(&xprt->sc_ctxts)) {
+               struct svc_rdma_op_ctxt *ctxt;
+
+               ctxt = list_first_entry(&xprt->sc_ctxts,
+                                       struct svc_rdma_op_ctxt, free);
+               list_del(&ctxt->free);
+               kfree(ctxt);
+       }
+}
+
+static struct svc_rdma_req_map *alloc_req_map(gfp_t flags)
 {
        struct svc_rdma_req_map *map;
-       map = kmem_cache_alloc(svc_rdma_map_cachep,
-                              GFP_KERNEL | __GFP_NOFAIL);
+
+       map = kmalloc(sizeof(*map), flags);
+       if (map)
+               INIT_LIST_HEAD(&map->free);
+       return map;
+}
+
+static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt)
+{
+       unsigned int i;
+
+       /* One for each receive buffer on this connection. */
+       i = xprt->sc_max_requests;
+
+       while (i--) {
+               struct svc_rdma_req_map *map;
+
+               map = alloc_req_map(GFP_KERNEL);
+               if (!map) {
+                       dprintk("svcrdma: No memory for request map\n");
+                       return false;
+               }
+               list_add(&map->free, &xprt->sc_maps);
+       }
+       return true;
+}
+
+struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt)
+{
+       struct svc_rdma_req_map *map = NULL;
+
+       spin_lock(&xprt->sc_map_lock);
+       if (list_empty(&xprt->sc_maps))
+               goto out_empty;
+
+       map = list_first_entry(&xprt->sc_maps,
+                              struct svc_rdma_req_map, free);
+       list_del_init(&map->free);
+       spin_unlock(&xprt->sc_map_lock);
+
+out:
        map->count = 0;
        return map;
+
+out_empty:
+       spin_unlock(&xprt->sc_map_lock);
+
+       /* Pre-allocation amount was incorrect */
+       map = alloc_req_map(GFP_NOIO);
+       if (map)
+               goto out;
+
+       WARN_ONCE(1, "svcrdma: empty request map list?\n");
+       return NULL;
+}
+
+void svc_rdma_put_req_map(struct svcxprt_rdma *xprt,
+                         struct svc_rdma_req_map *map)
+{
+       spin_lock(&xprt->sc_map_lock);
+       list_add(&map->free, &xprt->sc_maps);
+       spin_unlock(&xprt->sc_map_lock);
 }
 
-void svc_rdma_put_req_map(struct svc_rdma_req_map *map)
+static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
 {
-       kmem_cache_free(svc_rdma_map_cachep, map);
+       while (!list_empty(&xprt->sc_maps)) {
+               struct svc_rdma_req_map *map;
+
+               map = list_first_entry(&xprt->sc_maps,
+                                      struct svc_rdma_req_map, free);
+               list_del(&map->free);
+               kfree(map);
+       }
 }
 
 /* ib_cq event handler */
@@ -386,46 +517,44 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
 static void process_context(struct svcxprt_rdma *xprt,
                            struct svc_rdma_op_ctxt *ctxt)
 {
+       struct svc_rdma_op_ctxt *read_hdr;
+       int free_pages = 0;
+
        svc_rdma_unmap_dma(ctxt);
 
        switch (ctxt->wr_op) {
        case IB_WR_SEND:
-               if (ctxt->frmr)
-                       pr_err("svcrdma: SEND: ctxt->frmr != NULL\n");
-               svc_rdma_put_context(ctxt, 1);
+               free_pages = 1;
                break;
 
        case IB_WR_RDMA_WRITE:
-               if (ctxt->frmr)
-                       pr_err("svcrdma: WRITE: ctxt->frmr != NULL\n");
-               svc_rdma_put_context(ctxt, 0);
                break;
 
        case IB_WR_RDMA_READ:
        case IB_WR_RDMA_READ_WITH_INV:
                svc_rdma_put_frmr(xprt, ctxt->frmr);
-               if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
-                       struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
-                       if (read_hdr) {
-                               spin_lock_bh(&xprt->sc_rq_dto_lock);
-                               set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-                               list_add_tail(&read_hdr->dto_q,
-                                             &xprt->sc_read_complete_q);
-                               spin_unlock_bh(&xprt->sc_rq_dto_lock);
-                       } else {
-                               pr_err("svcrdma: ctxt->read_hdr == NULL\n");
-                       }
-                       svc_xprt_enqueue(&xprt->sc_xprt);
-               }
+
+               if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags))
+                       break;
+
+               read_hdr = ctxt->read_hdr;
                svc_rdma_put_context(ctxt, 0);
-               break;
+
+               spin_lock_bh(&xprt->sc_rq_dto_lock);
+               set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+               list_add_tail(&read_hdr->dto_q,
+                             &xprt->sc_read_complete_q);
+               spin_unlock_bh(&xprt->sc_rq_dto_lock);
+               svc_xprt_enqueue(&xprt->sc_xprt);
+               return;
 
        default:
-               printk(KERN_ERR "svcrdma: unexpected completion type, "
-                      "opcode=%d\n",
-                      ctxt->wr_op);
+               dprintk("svcrdma: unexpected completion opcode=%d\n",
+                       ctxt->wr_op);
                break;
        }
+
+       svc_rdma_put_context(ctxt, free_pages);
 }
 
 /*
@@ -523,19 +652,15 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
        INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
        INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
        INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
+       INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
+       INIT_LIST_HEAD(&cma_xprt->sc_maps);
        init_waitqueue_head(&cma_xprt->sc_send_wait);
 
        spin_lock_init(&cma_xprt->sc_lock);
        spin_lock_init(&cma_xprt->sc_rq_dto_lock);
        spin_lock_init(&cma_xprt->sc_frmr_q_lock);
-
-       cma_xprt->sc_ord = svcrdma_ord;
-
-       cma_xprt->sc_max_req_size = svcrdma_max_req_size;
-       cma_xprt->sc_max_requests = svcrdma_max_requests;
-       cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
-       atomic_set(&cma_xprt->sc_sq_count, 0);
-       atomic_set(&cma_xprt->sc_ctxt_used, 0);
+       spin_lock_init(&cma_xprt->sc_ctxt_lock);
+       spin_lock_init(&cma_xprt->sc_map_lock);
 
        if (listener)
                set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
@@ -543,7 +668,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
        return cma_xprt;
 }
 
-int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
+int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
 {
        struct ib_recv_wr recv_wr, *bad_recv_wr;
        struct svc_rdma_op_ctxt *ctxt;
@@ -561,7 +686,9 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
                        pr_err("svcrdma: Too many sges (%d)\n", sge_no);
                        goto err_put_ctxt;
                }
-               page = alloc_page(GFP_KERNEL | __GFP_NOFAIL);
+               page = alloc_page(flags);
+               if (!page)
+                       goto err_put_ctxt;
                ctxt->pages[sge_no] = page;
                pa = ib_dma_map_page(xprt->sc_cm_id->device,
                                     page, 0, PAGE_SIZE,
@@ -571,7 +698,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
                atomic_inc(&xprt->sc_dma_used);
                ctxt->sge[sge_no].addr = pa;
                ctxt->sge[sge_no].length = PAGE_SIZE;
-               ctxt->sge[sge_no].lkey = xprt->sc_dma_lkey;
+               ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
                ctxt->count = sge_no + 1;
                buflen += PAGE_SIZE;
        }
@@ -886,11 +1013,9 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        struct rdma_conn_param conn_param;
        struct ib_cq_init_attr cq_attr = {};
        struct ib_qp_init_attr qp_attr;
-       struct ib_device_attr devattr;
-       int uninitialized_var(dma_mr_acc);
-       int need_dma_mr = 0;
-       int ret;
-       int i;
+       struct ib_device *dev;
+       unsigned int i;
+       int ret = 0;
 
        listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
        clear_bit(XPT_CONN, &xprt->xpt_flags);
@@ -910,37 +1035,42 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
                newxprt, newxprt->sc_cm_id);
 
-       ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
-       if (ret) {
-               dprintk("svcrdma: could not query device attributes on "
-                       "device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
-               goto errout;
-       }
+       dev = newxprt->sc_cm_id->device;
 
        /* Qualify the transport resource defaults with the
         * capabilities of this particular device */
-       newxprt->sc_max_sge = min((size_t)devattr.max_sge,
+       newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge,
                                  (size_t)RPCSVC_MAXPAGES);
-       newxprt->sc_max_sge_rd = min_t(size_t, devattr.max_sge_rd,
+       newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd,
                                       RPCSVC_MAXPAGES);
-       newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
-                                  (size_t)svcrdma_max_requests);
-       newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+       newxprt->sc_max_req_size = svcrdma_max_req_size;
+       newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr,
+                                        svcrdma_max_requests);
+       newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr,
+                                           svcrdma_max_bc_requests);
+       newxprt->sc_rq_depth = newxprt->sc_max_requests +
+                              newxprt->sc_max_bc_requests;
+       newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth;
+
+       if (!svc_rdma_prealloc_ctxts(newxprt))
+               goto errout;
+       if (!svc_rdma_prealloc_maps(newxprt))
+               goto errout;
 
        /*
         * Limit ORD based on client limit, local device limit, and
         * configured svcrdma limit.
         */
-       newxprt->sc_ord = min_t(size_t, devattr.max_qp_rd_atom, newxprt->sc_ord);
+       newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord);
        newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord);
 
-       newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
+       newxprt->sc_pd = ib_alloc_pd(dev);
        if (IS_ERR(newxprt->sc_pd)) {
                dprintk("svcrdma: error creating PD for connect request\n");
                goto errout;
        }
        cq_attr.cqe = newxprt->sc_sq_depth;
-       newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+       newxprt->sc_sq_cq = ib_create_cq(dev,
                                         sq_comp_handler,
                                         cq_event_handler,
                                         newxprt,
@@ -949,8 +1079,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
                dprintk("svcrdma: error creating SQ CQ for connect request\n");
                goto errout;
        }
-       cq_attr.cqe = newxprt->sc_max_requests;
-       newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+       cq_attr.cqe = newxprt->sc_rq_depth;
+       newxprt->sc_rq_cq = ib_create_cq(dev,
                                         rq_comp_handler,
                                         cq_event_handler,
                                         newxprt,
@@ -964,7 +1094,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        qp_attr.event_handler = qp_event_handler;
        qp_attr.qp_context = &newxprt->sc_xprt;
        qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
-       qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+       qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
        qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
        qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
        qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
@@ -978,7 +1108,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
                "    cap.max_send_sge = %d\n"
                "    cap.max_recv_sge = %d\n",
                newxprt->sc_cm_id, newxprt->sc_pd,
-               newxprt->sc_cm_id->device, newxprt->sc_pd->device,
+               dev, newxprt->sc_pd->device,
                qp_attr.cap.max_send_wr,
                qp_attr.cap.max_recv_wr,
                qp_attr.cap.max_send_sge,
@@ -1014,9 +1144,9 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
         *      of an RDMA_READ. IB does not.
         */
        newxprt->sc_reader = rdma_read_chunk_lcl;
-       if (devattr.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
+       if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
                newxprt->sc_frmr_pg_list_len =
-                       devattr.max_fast_reg_page_list_len;
+                       dev->attrs.max_fast_reg_page_list_len;
                newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
                newxprt->sc_reader = rdma_read_chunk_frmr;
        }
@@ -1024,44 +1154,16 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        /*
         * Determine if a DMA MR is required and if so, what privs are required
         */
-       if (!rdma_protocol_iwarp(newxprt->sc_cm_id->device,
-                                newxprt->sc_cm_id->port_num) &&
-           !rdma_ib_or_roce(newxprt->sc_cm_id->device,
-                            newxprt->sc_cm_id->port_num))
+       if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) &&
+           !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num))
                goto errout;
 
-       if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG) ||
-           !(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
-               need_dma_mr = 1;
-               dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
-               if (rdma_protocol_iwarp(newxprt->sc_cm_id->device,
-                                       newxprt->sc_cm_id->port_num) &&
-                   !(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG))
-                       dma_mr_acc |= IB_ACCESS_REMOTE_WRITE;
-       }
-
-       if (rdma_protocol_iwarp(newxprt->sc_cm_id->device,
-                               newxprt->sc_cm_id->port_num))
+       if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num))
                newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
 
-       /* Create the DMA MR if needed, otherwise, use the DMA LKEY */
-       if (need_dma_mr) {
-               /* Register all of physical memory */
-               newxprt->sc_phys_mr =
-                       ib_get_dma_mr(newxprt->sc_pd, dma_mr_acc);
-               if (IS_ERR(newxprt->sc_phys_mr)) {
-                       dprintk("svcrdma: Failed to create DMA MR ret=%d\n",
-                               ret);
-                       goto errout;
-               }
-               newxprt->sc_dma_lkey = newxprt->sc_phys_mr->lkey;
-       } else
-               newxprt->sc_dma_lkey =
-                       newxprt->sc_cm_id->device->local_dma_lkey;
-
        /* Post receive buffers */
-       for (i = 0; i < newxprt->sc_max_requests; i++) {
-               ret = svc_rdma_post_recv(newxprt);
+       for (i = 0; i < newxprt->sc_rq_depth; i++) {
+               ret = svc_rdma_post_recv(newxprt, GFP_KERNEL);
                if (ret) {
                        dprintk("svcrdma: failure posting receive buffers\n");
                        goto errout;
@@ -1160,12 +1262,14 @@ static void __svc_rdma_free(struct work_struct *work)
 {
        struct svcxprt_rdma *rdma =
                container_of(work, struct svcxprt_rdma, sc_work);
-       dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+       struct svc_xprt *xprt = &rdma->sc_xprt;
+
+       dprintk("svcrdma: %s(%p)\n", __func__, rdma);
 
        /* We should only be called from kref_put */
-       if (atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0)
+       if (atomic_read(&xprt->xpt_ref.refcount) != 0)
                pr_err("svcrdma: sc_xprt still in use? (%d)\n",
-                      atomic_read(&rdma->sc_xprt.xpt_ref.refcount));
+                      atomic_read(&xprt->xpt_ref.refcount));
 
        /*
         * Destroy queued, but not processed read completions. Note
@@ -1193,15 +1297,22 @@ static void __svc_rdma_free(struct work_struct *work)
        }
 
        /* Warn if we leaked a resource or under-referenced */
-       if (atomic_read(&rdma->sc_ctxt_used) != 0)
+       if (rdma->sc_ctxt_used != 0)
                pr_err("svcrdma: ctxt still in use? (%d)\n",
-                      atomic_read(&rdma->sc_ctxt_used));
+                      rdma->sc_ctxt_used);
        if (atomic_read(&rdma->sc_dma_used) != 0)
                pr_err("svcrdma: dma still in use? (%d)\n",
                       atomic_read(&rdma->sc_dma_used));
 
-       /* De-allocate fastreg mr */
+       /* Final put of backchannel client transport */
+       if (xprt->xpt_bc_xprt) {
+               xprt_put(xprt->xpt_bc_xprt);
+               xprt->xpt_bc_xprt = NULL;
+       }
+
        rdma_dealloc_frmr_q(rdma);
+       svc_rdma_destroy_ctxts(rdma);
+       svc_rdma_destroy_maps(rdma);
 
        /* Destroy the QP if present (not a listener) */
        if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
@@ -1213,9 +1324,6 @@ static void __svc_rdma_free(struct work_struct *work)
        if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
                ib_destroy_cq(rdma->sc_rq_cq);
 
-       if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr))
-               ib_dereg_mr(rdma->sc_phys_mr);
-
        if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
                ib_dealloc_pd(rdma->sc_pd);
 
@@ -1321,7 +1429,9 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
        int length;
        int ret;
 
-       p = alloc_page(GFP_KERNEL | __GFP_NOFAIL);
+       p = alloc_page(GFP_KERNEL);
+       if (!p)
+               return;
        va = page_address(p);
 
        /* XDR encode error */
@@ -1341,7 +1451,7 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
                return;
        }
        atomic_inc(&xprt->sc_dma_used);
-       ctxt->sge[0].lkey = xprt->sc_dma_lkey;
+       ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
        ctxt->sge[0].length = length;
 
        /* Prepare SEND WR */