Merge tag 'v3.5-rc1'
[linux-drm-fsl-dcu.git] / net / ceph / osd_client.c
index 1ffebed5ce0f9a629ad2733349b8e33c326850d5..db2da54f7336f12a31932791aa67e5c3ff3edbb9 100644 (file)
@@ -139,15 +139,14 @@ void ceph_osdc_release_request(struct kref *kref)
 
        if (req->r_request)
                ceph_msg_put(req->r_request);
-       if (req->r_reply)
-               ceph_msg_put(req->r_reply);
        if (req->r_con_filling_msg) {
-               dout("release_request revoking pages %p from con %p\n",
+               dout("%s revoking pages %p from con %p\n", __func__,
                     req->r_pages, req->r_con_filling_msg);
-               ceph_con_revoke_message(req->r_con_filling_msg,
-                                     req->r_reply);
-               ceph_con_put(req->r_con_filling_msg);
+               ceph_msg_revoke_incoming(req->r_reply);
+               req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
        }
+       if (req->r_reply)
+               ceph_msg_put(req->r_reply);
        if (req->r_own_pages)
                ceph_release_page_vector(req->r_pages,
                                         req->r_num_pages);
@@ -624,7 +623,7 @@ static void osd_reset(struct ceph_connection *con)
 /*
  * Track open sessions with osds.
  */
-static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
+static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 {
        struct ceph_osd *osd;
 
@@ -634,15 +633,14 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
+       osd->o_osd = onum;
        INIT_LIST_HEAD(&osd->o_requests);
        INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
 
-       ceph_con_init(osdc->client->msgr, &osd->o_con);
-       osd->o_con.private = osd;
-       osd->o_con.ops = &osd_con_ops;
-       osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+       ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr,
+               CEPH_ENTITY_TYPE_OSD, onum);
 
        INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
@@ -853,7 +851,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
        if (req->r_osd) {
                /* make sure the original request isn't in flight. */
-               ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+               ceph_msg_revoke(req->r_request);
 
                list_del_init(&req->r_osd_item);
                if (list_empty(&req->r_osd->o_requests) &&
@@ -880,7 +878,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 static void __cancel_request(struct ceph_osd_request *req)
 {
        if (req->r_sent && req->r_osd) {
-               ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+               ceph_msg_revoke(req->r_request);
                req->r_sent = 0;
        }
 }
@@ -998,15 +996,13 @@ static int __map_request(struct ceph_osd_client *osdc,
        req->r_osd = __lookup_osd(osdc, o);
        if (!req->r_osd && o >= 0) {
                err = -ENOMEM;
-               req->r_osd = create_osd(osdc);
+               req->r_osd = create_osd(osdc, o);
                if (!req->r_osd) {
                        list_move(&req->r_req_lru_item, &osdc->req_notarget);
                        goto out;
                }
 
                dout("map_request osd %p is osd%d\n", req->r_osd, o);
-               req->r_osd->o_osd = o;
-               req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
                __insert_osd(osdc, req->r_osd);
 
                ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
@@ -1216,7 +1212,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        if (req->r_con_filling_msg == con && req->r_reply == msg) {
                dout(" dropping con_filling_msg ref %p\n", con);
                req->r_con_filling_msg = NULL;
-               ceph_con_put(con);
+               con->ops->put(con);
        }
 
        if (!req->r_got_reply) {
@@ -1391,7 +1387,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                             epoch, maplen);
                        newmap = osdmap_apply_incremental(&p, next,
                                                          osdc->osdmap,
-                                                         osdc->client->msgr);
+                                                         &osdc->client->msgr);
                        if (IS_ERR(newmap)) {
                                err = PTR_ERR(newmap);
                                goto bad;
@@ -2025,10 +2021,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        }
 
        if (req->r_con_filling_msg) {
-               dout("get_reply revoking msg %p from old con %p\n",
+               dout("%s revoking msg %p from old con %p\n", __func__,
                     req->r_reply, req->r_con_filling_msg);
-               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
-               ceph_con_put(req->r_con_filling_msg);
+               ceph_msg_revoke_incoming(req->r_reply);
+               req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
                req->r_con_filling_msg = NULL;
        }
 
@@ -2063,7 +2059,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 #endif
        }
        *skip = 0;
-       req->r_con_filling_msg = ceph_con_get(con);
+       req->r_con_filling_msg = con->ops->get(con);
        dout("get_reply tid %lld %p\n", tid, m);
 
 out:
@@ -2080,6 +2076,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
        int type = le16_to_cpu(hdr->type);
        int front = le32_to_cpu(hdr->front_len);
 
+       *skip = 0;
        switch (type) {
        case CEPH_MSG_OSD_MAP:
        case CEPH_MSG_WATCH_NOTIFY: