]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - net/ceph/mon_client.c
libceph: flush msgr queue during mon_client shutdown
[karo-tx-linux.git] / net / ceph / mon_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/types.h>
5 #include <linux/slab.h>
6 #include <linux/random.h>
7 #include <linux/sched.h>
8
9 #include <linux/ceph/mon_client.h>
10 #include <linux/ceph/libceph.h>
11 #include <linux/ceph/debugfs.h>
12 #include <linux/ceph/decode.h>
13 #include <linux/ceph/auth.h>
14
15 /*
16  * Interact with Ceph monitor cluster.  Handle requests for new map
17  * versions, and periodically resend as needed.  Also implement
18  * statfs() and umount().
19  *
20  * A small cluster of Ceph "monitors" are responsible for managing critical
21  * cluster configuration and state information.  An odd number (e.g., 3, 5)
22  * of cmon daemons use a modified version of the Paxos part-time parliament
23  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
24  * list of clients who have mounted the file system.
25  *
26  * We maintain an open, active session with a monitor at all times in order to
27  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
28  * TCP socket to ensure we detect a failure.  If the connection does break, we
29  * randomly hunt for a new monitor.  Once the connection is reestablished, we
30  * resend any outstanding requests.
31  */
32
33 static const struct ceph_connection_operations mon_con_ops;
34
35 static int __validate_auth(struct ceph_mon_client *monc);
36
37 /*
38  * Decode a monmap blob (e.g., during mount).
39  */
40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
41 {
42         struct ceph_monmap *m = NULL;
43         int i, err = -EINVAL;
44         struct ceph_fsid fsid;
45         u32 epoch, num_mon;
46         u16 version;
47         u32 len;
48
49         ceph_decode_32_safe(&p, end, len, bad);
50         ceph_decode_need(&p, end, len, bad);
51
52         dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
53
54         ceph_decode_16_safe(&p, end, version, bad);
55
56         ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
57         ceph_decode_copy(&p, &fsid, sizeof(fsid));
58         epoch = ceph_decode_32(&p);
59
60         num_mon = ceph_decode_32(&p);
61         ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
62
63         if (num_mon >= CEPH_MAX_MON)
64                 goto bad;
65         m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
66         if (m == NULL)
67                 return ERR_PTR(-ENOMEM);
68         m->fsid = fsid;
69         m->epoch = epoch;
70         m->num_mon = num_mon;
71         ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
72         for (i = 0; i < num_mon; i++)
73                 ceph_decode_addr(&m->mon_inst[i].addr);
74
75         dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
76              m->num_mon);
77         for (i = 0; i < m->num_mon; i++)
78                 dout("monmap_decode  mon%d is %s\n", i,
79                      ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
80         return m;
81
82 bad:
83         dout("monmap_decode failed with %d\n", err);
84         kfree(m);
85         return ERR_PTR(err);
86 }
87
88 /*
89  * return true if *addr is included in the monmap.
90  */
91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
92 {
93         int i;
94
95         for (i = 0; i < m->num_mon; i++)
96                 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
97                         return 1;
98         return 0;
99 }
100
101 /*
102  * Send an auth request.
103  */
104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
105 {
106         monc->pending_auth = 1;
107         monc->m_auth->front.iov_len = len;
108         monc->m_auth->hdr.front_len = cpu_to_le32(len);
109         ceph_msg_revoke(monc->m_auth);
110         ceph_msg_get(monc->m_auth);  /* keep our ref */
111         ceph_con_send(&monc->con, monc->m_auth);
112 }
113
114 /*
115  * Close monitor session, if any.
116  */
117 static void __close_session(struct ceph_mon_client *monc)
118 {
119         dout("__close_session closing mon%d\n", monc->cur_mon);
120         ceph_msg_revoke(monc->m_auth);
121         ceph_con_close(&monc->con);
122         monc->con.private = NULL;
123         monc->cur_mon = -1;
124         monc->pending_auth = 0;
125         ceph_auth_reset(monc->auth);
126 }
127
128 /*
129  * Open a session with a (new) monitor.
130  */
131 static int __open_session(struct ceph_mon_client *monc)
132 {
133         char r;
134         int ret;
135
136         if (monc->cur_mon < 0) {
137                 get_random_bytes(&r, 1);
138                 monc->cur_mon = r % monc->monmap->num_mon;
139                 dout("open_session num=%d r=%d -> mon%d\n",
140                      monc->monmap->num_mon, r, monc->cur_mon);
141                 monc->sub_sent = 0;
142                 monc->sub_renew_after = jiffies;  /* i.e., expired */
143                 monc->want_next_osdmap = !!monc->want_next_osdmap;
144
145                 ceph_con_init(&monc->con, monc, &mon_con_ops,
146                         &monc->client->msgr,
147                         CEPH_ENTITY_TYPE_MON, monc->cur_mon);
148
149                 dout("open_session mon%d opening\n", monc->cur_mon);
150                 ceph_con_open(&monc->con,
151                               &monc->monmap->mon_inst[monc->cur_mon].addr);
152
153                 /* initiatiate authentication handshake */
154                 ret = ceph_auth_build_hello(monc->auth,
155                                             monc->m_auth->front.iov_base,
156                                             monc->m_auth->front_max);
157                 __send_prepared_auth_request(monc, ret);
158         } else {
159                 dout("open_session mon%d already open\n", monc->cur_mon);
160         }
161         return 0;
162 }
163
164 static bool __sub_expired(struct ceph_mon_client *monc)
165 {
166         return time_after_eq(jiffies, monc->sub_renew_after);
167 }
168
169 /*
170  * Reschedule delayed work timer.
171  */
172 static void __schedule_delayed(struct ceph_mon_client *monc)
173 {
174         unsigned delay;
175
176         if (monc->cur_mon < 0 || __sub_expired(monc))
177                 delay = 10 * HZ;
178         else
179                 delay = 20 * HZ;
180         dout("__schedule_delayed after %u\n", delay);
181         schedule_delayed_work(&monc->delayed_work, delay);
182 }
183
184 /*
185  * Send subscribe request for mdsmap and/or osdmap.
186  */
187 static void __send_subscribe(struct ceph_mon_client *monc)
188 {
189         dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
190              (unsigned)monc->sub_sent, __sub_expired(monc),
191              monc->want_next_osdmap);
192         if ((__sub_expired(monc) && !monc->sub_sent) ||
193             monc->want_next_osdmap == 1) {
194                 struct ceph_msg *msg = monc->m_subscribe;
195                 struct ceph_mon_subscribe_item *i;
196                 void *p, *end;
197                 int num;
198
199                 p = msg->front.iov_base;
200                 end = p + msg->front_max;
201
202                 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
203                 ceph_encode_32(&p, num);
204
205                 if (monc->want_next_osdmap) {
206                         dout("__send_subscribe to 'osdmap' %u\n",
207                              (unsigned)monc->have_osdmap);
208                         ceph_encode_string(&p, end, "osdmap", 6);
209                         i = p;
210                         i->have = cpu_to_le64(monc->have_osdmap);
211                         i->onetime = 1;
212                         p += sizeof(*i);
213                         monc->want_next_osdmap = 2;  /* requested */
214                 }
215                 if (monc->want_mdsmap) {
216                         dout("__send_subscribe to 'mdsmap' %u+\n",
217                              (unsigned)monc->have_mdsmap);
218                         ceph_encode_string(&p, end, "mdsmap", 6);
219                         i = p;
220                         i->have = cpu_to_le64(monc->have_mdsmap);
221                         i->onetime = 0;
222                         p += sizeof(*i);
223                 }
224                 ceph_encode_string(&p, end, "monmap", 6);
225                 i = p;
226                 i->have = 0;
227                 i->onetime = 0;
228                 p += sizeof(*i);
229
230                 msg->front.iov_len = p - msg->front.iov_base;
231                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
232                 ceph_msg_revoke(msg);
233                 ceph_con_send(&monc->con, ceph_msg_get(msg));
234
235                 monc->sub_sent = jiffies | 1;  /* never 0 */
236         }
237 }
238
239 static void handle_subscribe_ack(struct ceph_mon_client *monc,
240                                  struct ceph_msg *msg)
241 {
242         unsigned seconds;
243         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
244
245         if (msg->front.iov_len < sizeof(*h))
246                 goto bad;
247         seconds = le32_to_cpu(h->duration);
248
249         mutex_lock(&monc->mutex);
250         if (monc->hunting) {
251                 pr_info("mon%d %s session established\n",
252                         monc->cur_mon,
253                         ceph_pr_addr(&monc->con.peer_addr.in_addr));
254                 monc->hunting = false;
255         }
256         dout("handle_subscribe_ack after %d seconds\n", seconds);
257         monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
258         monc->sub_sent = 0;
259         mutex_unlock(&monc->mutex);
260         return;
261 bad:
262         pr_err("got corrupt subscribe-ack msg\n");
263         ceph_msg_dump(msg);
264 }
265
266 /*
267  * Keep track of which maps we have
268  */
269 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
270 {
271         mutex_lock(&monc->mutex);
272         monc->have_mdsmap = got;
273         mutex_unlock(&monc->mutex);
274         return 0;
275 }
276 EXPORT_SYMBOL(ceph_monc_got_mdsmap);
277
278 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
279 {
280         mutex_lock(&monc->mutex);
281         monc->have_osdmap = got;
282         monc->want_next_osdmap = 0;
283         mutex_unlock(&monc->mutex);
284         return 0;
285 }
286
287 /*
288  * Register interest in the next osdmap
289  */
290 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
291 {
292         dout("request_next_osdmap have %u\n", monc->have_osdmap);
293         mutex_lock(&monc->mutex);
294         if (!monc->want_next_osdmap)
295                 monc->want_next_osdmap = 1;
296         if (monc->want_next_osdmap < 2)
297                 __send_subscribe(monc);
298         mutex_unlock(&monc->mutex);
299 }
300
301 /*
302  *
303  */
304 int ceph_monc_open_session(struct ceph_mon_client *monc)
305 {
306         mutex_lock(&monc->mutex);
307         __open_session(monc);
308         __schedule_delayed(monc);
309         mutex_unlock(&monc->mutex);
310         return 0;
311 }
312 EXPORT_SYMBOL(ceph_monc_open_session);
313
314 /*
315  * The monitor responds with mount ack indicate mount success.  The
316  * included client ticket allows the client to talk to MDSs and OSDs.
317  */
318 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
319                                  struct ceph_msg *msg)
320 {
321         struct ceph_client *client = monc->client;
322         struct ceph_monmap *monmap = NULL, *old = monc->monmap;
323         void *p, *end;
324
325         mutex_lock(&monc->mutex);
326
327         dout("handle_monmap\n");
328         p = msg->front.iov_base;
329         end = p + msg->front.iov_len;
330
331         monmap = ceph_monmap_decode(p, end);
332         if (IS_ERR(monmap)) {
333                 pr_err("problem decoding monmap, %d\n",
334                        (int)PTR_ERR(monmap));
335                 goto out;
336         }
337
338         if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
339                 kfree(monmap);
340                 goto out;
341         }
342
343         client->monc.monmap = monmap;
344         kfree(old);
345
346         if (!client->have_fsid) {
347                 client->have_fsid = true;
348                 mutex_unlock(&monc->mutex);
349                 /*
350                  * do debugfs initialization without mutex to avoid
351                  * creating a locking dependency
352                  */
353                 ceph_debugfs_client_init(client);
354                 goto out_unlocked;
355         }
356 out:
357         mutex_unlock(&monc->mutex);
358 out_unlocked:
359         wake_up_all(&client->auth_wq);
360 }
361
362 /*
363  * generic requests (e.g., statfs, poolop)
364  */
365 static struct ceph_mon_generic_request *__lookup_generic_req(
366         struct ceph_mon_client *monc, u64 tid)
367 {
368         struct ceph_mon_generic_request *req;
369         struct rb_node *n = monc->generic_request_tree.rb_node;
370
371         while (n) {
372                 req = rb_entry(n, struct ceph_mon_generic_request, node);
373                 if (tid < req->tid)
374                         n = n->rb_left;
375                 else if (tid > req->tid)
376                         n = n->rb_right;
377                 else
378                         return req;
379         }
380         return NULL;
381 }
382
383 static void __insert_generic_request(struct ceph_mon_client *monc,
384                             struct ceph_mon_generic_request *new)
385 {
386         struct rb_node **p = &monc->generic_request_tree.rb_node;
387         struct rb_node *parent = NULL;
388         struct ceph_mon_generic_request *req = NULL;
389
390         while (*p) {
391                 parent = *p;
392                 req = rb_entry(parent, struct ceph_mon_generic_request, node);
393                 if (new->tid < req->tid)
394                         p = &(*p)->rb_left;
395                 else if (new->tid > req->tid)
396                         p = &(*p)->rb_right;
397                 else
398                         BUG();
399         }
400
401         rb_link_node(&new->node, parent, p);
402         rb_insert_color(&new->node, &monc->generic_request_tree);
403 }
404
405 static void release_generic_request(struct kref *kref)
406 {
407         struct ceph_mon_generic_request *req =
408                 container_of(kref, struct ceph_mon_generic_request, kref);
409
410         if (req->reply)
411                 ceph_msg_put(req->reply);
412         if (req->request)
413                 ceph_msg_put(req->request);
414
415         kfree(req);
416 }
417
418 static void put_generic_request(struct ceph_mon_generic_request *req)
419 {
420         kref_put(&req->kref, release_generic_request);
421 }
422
423 static void get_generic_request(struct ceph_mon_generic_request *req)
424 {
425         kref_get(&req->kref);
426 }
427
428 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
429                                          struct ceph_msg_header *hdr,
430                                          int *skip)
431 {
432         struct ceph_mon_client *monc = con->private;
433         struct ceph_mon_generic_request *req;
434         u64 tid = le64_to_cpu(hdr->tid);
435         struct ceph_msg *m;
436
437         mutex_lock(&monc->mutex);
438         req = __lookup_generic_req(monc, tid);
439         if (!req) {
440                 dout("get_generic_reply %lld dne\n", tid);
441                 *skip = 1;
442                 m = NULL;
443         } else {
444                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
445                 *skip = 0;
446                 m = ceph_msg_get(req->reply);
447                 /*
448                  * we don't need to track the connection reading into
449                  * this reply because we only have one open connection
450                  * at a time, ever.
451                  */
452         }
453         mutex_unlock(&monc->mutex);
454         return m;
455 }
456
457 static int do_generic_request(struct ceph_mon_client *monc,
458                               struct ceph_mon_generic_request *req)
459 {
460         int err;
461
462         /* register request */
463         mutex_lock(&monc->mutex);
464         req->tid = ++monc->last_tid;
465         req->request->hdr.tid = cpu_to_le64(req->tid);
466         __insert_generic_request(monc, req);
467         monc->num_generic_requests++;
468         ceph_con_send(&monc->con, ceph_msg_get(req->request));
469         mutex_unlock(&monc->mutex);
470
471         err = wait_for_completion_interruptible(&req->completion);
472
473         mutex_lock(&monc->mutex);
474         rb_erase(&req->node, &monc->generic_request_tree);
475         monc->num_generic_requests--;
476         mutex_unlock(&monc->mutex);
477
478         if (!err)
479                 err = req->result;
480         return err;
481 }
482
483 /*
484  * statfs
485  */
486 static void handle_statfs_reply(struct ceph_mon_client *monc,
487                                 struct ceph_msg *msg)
488 {
489         struct ceph_mon_generic_request *req;
490         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
491         u64 tid = le64_to_cpu(msg->hdr.tid);
492
493         if (msg->front.iov_len != sizeof(*reply))
494                 goto bad;
495         dout("handle_statfs_reply %p tid %llu\n", msg, tid);
496
497         mutex_lock(&monc->mutex);
498         req = __lookup_generic_req(monc, tid);
499         if (req) {
500                 *(struct ceph_statfs *)req->buf = reply->st;
501                 req->result = 0;
502                 get_generic_request(req);
503         }
504         mutex_unlock(&monc->mutex);
505         if (req) {
506                 complete_all(&req->completion);
507                 put_generic_request(req);
508         }
509         return;
510
511 bad:
512         pr_err("corrupt generic reply, tid %llu\n", tid);
513         ceph_msg_dump(msg);
514 }
515
516 /*
517  * Do a synchronous statfs().
518  */
519 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
520 {
521         struct ceph_mon_generic_request *req;
522         struct ceph_mon_statfs *h;
523         int err;
524
525         req = kzalloc(sizeof(*req), GFP_NOFS);
526         if (!req)
527                 return -ENOMEM;
528
529         kref_init(&req->kref);
530         req->buf = buf;
531         req->buf_len = sizeof(*buf);
532         init_completion(&req->completion);
533
534         err = -ENOMEM;
535         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
536                                     true);
537         if (!req->request)
538                 goto out;
539         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
540                                   true);
541         if (!req->reply)
542                 goto out;
543
544         /* fill out request */
545         h = req->request->front.iov_base;
546         h->monhdr.have_version = 0;
547         h->monhdr.session_mon = cpu_to_le16(-1);
548         h->monhdr.session_mon_tid = 0;
549         h->fsid = monc->monmap->fsid;
550
551         err = do_generic_request(monc, req);
552
553 out:
554         kref_put(&req->kref, release_generic_request);
555         return err;
556 }
557 EXPORT_SYMBOL(ceph_monc_do_statfs);
558
559 /*
560  * pool ops
561  */
562 static int get_poolop_reply_buf(const char *src, size_t src_len,
563                                 char *dst, size_t dst_len)
564 {
565         u32 buf_len;
566
567         if (src_len != sizeof(u32) + dst_len)
568                 return -EINVAL;
569
570         buf_len = le32_to_cpu(*(u32 *)src);
571         if (buf_len != dst_len)
572                 return -EINVAL;
573
574         memcpy(dst, src + sizeof(u32), dst_len);
575         return 0;
576 }
577
578 static void handle_poolop_reply(struct ceph_mon_client *monc,
579                                 struct ceph_msg *msg)
580 {
581         struct ceph_mon_generic_request *req;
582         struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
583         u64 tid = le64_to_cpu(msg->hdr.tid);
584
585         if (msg->front.iov_len < sizeof(*reply))
586                 goto bad;
587         dout("handle_poolop_reply %p tid %llu\n", msg, tid);
588
589         mutex_lock(&monc->mutex);
590         req = __lookup_generic_req(monc, tid);
591         if (req) {
592                 if (req->buf_len &&
593                     get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
594                                      msg->front.iov_len - sizeof(*reply),
595                                      req->buf, req->buf_len) < 0) {
596                         mutex_unlock(&monc->mutex);
597                         goto bad;
598                 }
599                 req->result = le32_to_cpu(reply->reply_code);
600                 get_generic_request(req);
601         }
602         mutex_unlock(&monc->mutex);
603         if (req) {
604                 complete(&req->completion);
605                 put_generic_request(req);
606         }
607         return;
608
609 bad:
610         pr_err("corrupt generic reply, tid %llu\n", tid);
611         ceph_msg_dump(msg);
612 }
613
614 /*
615  * Do a synchronous pool op.
616  */
617 int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
618                         u32 pool, u64 snapid,
619                         char *buf, int len)
620 {
621         struct ceph_mon_generic_request *req;
622         struct ceph_mon_poolop *h;
623         int err;
624
625         req = kzalloc(sizeof(*req), GFP_NOFS);
626         if (!req)
627                 return -ENOMEM;
628
629         kref_init(&req->kref);
630         req->buf = buf;
631         req->buf_len = len;
632         init_completion(&req->completion);
633
634         err = -ENOMEM;
635         req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
636                                     true);
637         if (!req->request)
638                 goto out;
639         req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
640                                   true);
641         if (!req->reply)
642                 goto out;
643
644         /* fill out request */
645         req->request->hdr.version = cpu_to_le16(2);
646         h = req->request->front.iov_base;
647         h->monhdr.have_version = 0;
648         h->monhdr.session_mon = cpu_to_le16(-1);
649         h->monhdr.session_mon_tid = 0;
650         h->fsid = monc->monmap->fsid;
651         h->pool = cpu_to_le32(pool);
652         h->op = cpu_to_le32(op);
653         h->auid = 0;
654         h->snapid = cpu_to_le64(snapid);
655         h->name_len = 0;
656
657         err = do_generic_request(monc, req);
658
659 out:
660         kref_put(&req->kref, release_generic_request);
661         return err;
662 }
663
664 int ceph_monc_create_snapid(struct ceph_mon_client *monc,
665                             u32 pool, u64 *snapid)
666 {
667         return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
668                                    pool, 0, (char *)snapid, sizeof(*snapid));
669
670 }
671 EXPORT_SYMBOL(ceph_monc_create_snapid);
672
673 int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
674                             u32 pool, u64 snapid)
675 {
676         return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
677                                    pool, snapid, 0, 0);
678
679 }
680
681 /*
682  * Resend pending generic requests.
683  */
684 static void __resend_generic_request(struct ceph_mon_client *monc)
685 {
686         struct ceph_mon_generic_request *req;
687         struct rb_node *p;
688
689         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
690                 req = rb_entry(p, struct ceph_mon_generic_request, node);
691                 ceph_msg_revoke(req->request);
692                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
693         }
694 }
695
696 /*
697  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
698  * renew/retry subscription as needed (in case it is timing out, or we
699  * got an ENOMEM).  And keep the monitor connection alive.
700  */
701 static void delayed_work(struct work_struct *work)
702 {
703         struct ceph_mon_client *monc =
704                 container_of(work, struct ceph_mon_client, delayed_work.work);
705
706         dout("monc delayed_work\n");
707         mutex_lock(&monc->mutex);
708         if (monc->hunting) {
709                 __close_session(monc);
710                 __open_session(monc);  /* continue hunting */
711         } else {
712                 ceph_con_keepalive(&monc->con);
713
714                 __validate_auth(monc);
715
716                 if (monc->auth->ops->is_authenticated(monc->auth))
717                         __send_subscribe(monc);
718         }
719         __schedule_delayed(monc);
720         mutex_unlock(&monc->mutex);
721 }
722
723 /*
724  * On startup, we build a temporary monmap populated with the IPs
725  * provided by mount(2).
726  */
727 static int build_initial_monmap(struct ceph_mon_client *monc)
728 {
729         struct ceph_options *opt = monc->client->options;
730         struct ceph_entity_addr *mon_addr = opt->mon_addr;
731         int num_mon = opt->num_mon;
732         int i;
733
734         /* build initial monmap */
735         monc->monmap = kzalloc(sizeof(*monc->monmap) +
736                                num_mon*sizeof(monc->monmap->mon_inst[0]),
737                                GFP_KERNEL);
738         if (!monc->monmap)
739                 return -ENOMEM;
740         for (i = 0; i < num_mon; i++) {
741                 monc->monmap->mon_inst[i].addr = mon_addr[i];
742                 monc->monmap->mon_inst[i].addr.nonce = 0;
743                 monc->monmap->mon_inst[i].name.type =
744                         CEPH_ENTITY_TYPE_MON;
745                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
746         }
747         monc->monmap->num_mon = num_mon;
748         monc->have_fsid = false;
749         return 0;
750 }
751
752 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
753 {
754         int err = 0;
755
756         dout("init\n");
757         memset(monc, 0, sizeof(*monc));
758         monc->client = cl;
759         monc->monmap = NULL;
760         mutex_init(&monc->mutex);
761
762         err = build_initial_monmap(monc);
763         if (err)
764                 goto out;
765
766         /* connection */
767         /* authentication */
768         monc->auth = ceph_auth_init(cl->options->name,
769                                     cl->options->key);
770         if (IS_ERR(monc->auth)) {
771                 err = PTR_ERR(monc->auth);
772                 goto out_monmap;
773         }
774         monc->auth->want_keys =
775                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
776                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
777
778         /* msgs */
779         err = -ENOMEM;
780         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
781                                      sizeof(struct ceph_mon_subscribe_ack),
782                                      GFP_NOFS, true);
783         if (!monc->m_subscribe_ack)
784                 goto out_auth;
785
786         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
787                                          true);
788         if (!monc->m_subscribe)
789                 goto out_subscribe_ack;
790
791         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
792                                           true);
793         if (!monc->m_auth_reply)
794                 goto out_subscribe;
795
796         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
797         monc->pending_auth = 0;
798         if (!monc->m_auth)
799                 goto out_auth_reply;
800
801         monc->cur_mon = -1;
802         monc->hunting = true;
803         monc->sub_renew_after = jiffies;
804         monc->sub_sent = 0;
805
806         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
807         monc->generic_request_tree = RB_ROOT;
808         monc->num_generic_requests = 0;
809         monc->last_tid = 0;
810
811         monc->have_mdsmap = 0;
812         monc->have_osdmap = 0;
813         monc->want_next_osdmap = 1;
814         return 0;
815
816 out_auth_reply:
817         ceph_msg_put(monc->m_auth_reply);
818 out_subscribe:
819         ceph_msg_put(monc->m_subscribe);
820 out_subscribe_ack:
821         ceph_msg_put(monc->m_subscribe_ack);
822 out_auth:
823         ceph_auth_destroy(monc->auth);
824 out_monmap:
825         kfree(monc->monmap);
826 out:
827         return err;
828 }
829 EXPORT_SYMBOL(ceph_monc_init);
830
831 void ceph_monc_stop(struct ceph_mon_client *monc)
832 {
833         dout("stop\n");
834         cancel_delayed_work_sync(&monc->delayed_work);
835
836         mutex_lock(&monc->mutex);
837         __close_session(monc);
838
839         mutex_unlock(&monc->mutex);
840
841         /*
842          * flush msgr queue before we destroy ourselves to ensure that:
843          *  - any work that references our embedded con is finished.
844          *  - any osd_client or other work that may reference an authorizer
845          *    finishes before we shut down the auth subsystem.
846          */
847         ceph_msgr_flush();
848
849         ceph_auth_destroy(monc->auth);
850
851         ceph_msg_put(monc->m_auth);
852         ceph_msg_put(monc->m_auth_reply);
853         ceph_msg_put(monc->m_subscribe);
854         ceph_msg_put(monc->m_subscribe_ack);
855
856         kfree(monc->monmap);
857 }
858 EXPORT_SYMBOL(ceph_monc_stop);
859
860 static void handle_auth_reply(struct ceph_mon_client *monc,
861                               struct ceph_msg *msg)
862 {
863         int ret;
864         int was_auth = 0;
865
866         mutex_lock(&monc->mutex);
867         if (monc->auth->ops)
868                 was_auth = monc->auth->ops->is_authenticated(monc->auth);
869         monc->pending_auth = 0;
870         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
871                                      msg->front.iov_len,
872                                      monc->m_auth->front.iov_base,
873                                      monc->m_auth->front_max);
874         if (ret < 0) {
875                 monc->client->auth_err = ret;
876                 wake_up_all(&monc->client->auth_wq);
877         } else if (ret > 0) {
878                 __send_prepared_auth_request(monc, ret);
879         } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
880                 dout("authenticated, starting session\n");
881
882                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
883                 monc->client->msgr.inst.name.num =
884                                         cpu_to_le64(monc->auth->global_id);
885
886                 __send_subscribe(monc);
887                 __resend_generic_request(monc);
888         }
889         mutex_unlock(&monc->mutex);
890 }
891
892 static int __validate_auth(struct ceph_mon_client *monc)
893 {
894         int ret;
895
896         if (monc->pending_auth)
897                 return 0;
898
899         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
900                               monc->m_auth->front_max);
901         if (ret <= 0)
902                 return ret; /* either an error, or no need to authenticate */
903         __send_prepared_auth_request(monc, ret);
904         return 0;
905 }
906
907 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
908 {
909         int ret;
910
911         mutex_lock(&monc->mutex);
912         ret = __validate_auth(monc);
913         mutex_unlock(&monc->mutex);
914         return ret;
915 }
916 EXPORT_SYMBOL(ceph_monc_validate_auth);
917
918 /*
919  * handle incoming message
920  */
921 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
922 {
923         struct ceph_mon_client *monc = con->private;
924         int type = le16_to_cpu(msg->hdr.type);
925
926         if (!monc)
927                 return;
928
929         switch (type) {
930         case CEPH_MSG_AUTH_REPLY:
931                 handle_auth_reply(monc, msg);
932                 break;
933
934         case CEPH_MSG_MON_SUBSCRIBE_ACK:
935                 handle_subscribe_ack(monc, msg);
936                 break;
937
938         case CEPH_MSG_STATFS_REPLY:
939                 handle_statfs_reply(monc, msg);
940                 break;
941
942         case CEPH_MSG_POOLOP_REPLY:
943                 handle_poolop_reply(monc, msg);
944                 break;
945
946         case CEPH_MSG_MON_MAP:
947                 ceph_monc_handle_map(monc, msg);
948                 break;
949
950         case CEPH_MSG_OSD_MAP:
951                 ceph_osdc_handle_map(&monc->client->osdc, msg);
952                 break;
953
954         default:
955                 /* can the chained handler handle it? */
956                 if (monc->client->extra_mon_dispatch &&
957                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
958                         break;
959                         
960                 pr_err("received unknown message type %d %s\n", type,
961                        ceph_msg_type_name(type));
962         }
963         ceph_msg_put(msg);
964 }
965
966 /*
967  * Allocate memory for incoming message
968  */
969 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
970                                       struct ceph_msg_header *hdr,
971                                       int *skip)
972 {
973         struct ceph_mon_client *monc = con->private;
974         int type = le16_to_cpu(hdr->type);
975         int front_len = le32_to_cpu(hdr->front_len);
976         struct ceph_msg *m = NULL;
977
978         *skip = 0;
979
980         switch (type) {
981         case CEPH_MSG_MON_SUBSCRIBE_ACK:
982                 m = ceph_msg_get(monc->m_subscribe_ack);
983                 break;
984         case CEPH_MSG_POOLOP_REPLY:
985         case CEPH_MSG_STATFS_REPLY:
986                 return get_generic_reply(con, hdr, skip);
987         case CEPH_MSG_AUTH_REPLY:
988                 m = ceph_msg_get(monc->m_auth_reply);
989                 break;
990         case CEPH_MSG_MON_MAP:
991         case CEPH_MSG_MDS_MAP:
992         case CEPH_MSG_OSD_MAP:
993                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
994                 if (!m)
995                         return NULL;    /* ENOMEM--return skip == 0 */
996                 break;
997         }
998
999         if (!m) {
1000                 pr_info("alloc_msg unknown type %d\n", type);
1001                 *skip = 1;
1002         }
1003         return m;
1004 }
1005
1006 /*
1007  * If the monitor connection resets, pick a new monitor and resubmit
1008  * any pending requests.
1009  */
1010 static void mon_fault(struct ceph_connection *con)
1011 {
1012         struct ceph_mon_client *monc = con->private;
1013
1014         if (!monc)
1015                 return;
1016
1017         dout("mon_fault\n");
1018         mutex_lock(&monc->mutex);
1019         if (!con->private)
1020                 goto out;
1021
1022         if (!monc->hunting)
1023                 pr_info("mon%d %s session lost, "
1024                         "hunting for new mon\n", monc->cur_mon,
1025                         ceph_pr_addr(&monc->con.peer_addr.in_addr));
1026
1027         __close_session(monc);
1028         if (!monc->hunting) {
1029                 /* start hunting */
1030                 monc->hunting = true;
1031                 __open_session(monc);
1032         } else {
1033                 /* already hunting, let's wait a bit */
1034                 __schedule_delayed(monc);
1035         }
1036 out:
1037         mutex_unlock(&monc->mutex);
1038 }
1039
1040 /*
1041  * We can ignore refcounting on the connection struct, as all references
1042  * will come from the messenger workqueue, which is drained prior to
1043  * mon_client destruction.
1044  */
1045 static struct ceph_connection *con_get(struct ceph_connection *con)
1046 {
1047         return con;
1048 }
1049
1050 static void con_put(struct ceph_connection *con)
1051 {
1052 }
1053
1054 static const struct ceph_connection_operations mon_con_ops = {
1055         .get = con_get,
1056         .put = con_put,
1057         .dispatch = dispatch,
1058         .fault = mon_fault,
1059         .alloc_msg = mon_alloc_msg,
1060 };