]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - net/ceph/mon_client.c
Merge branch 'for-next' of git://git.kernel.org/pub/scm/linux/kernel/git/nab/target...
[karo-tx-linux.git] / net / ceph / mon_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/types.h>
5 #include <linux/slab.h>
6 #include <linux/random.h>
7 #include <linux/sched.h>
8
9 #include <linux/ceph/ceph_features.h>
10 #include <linux/ceph/mon_client.h>
11 #include <linux/ceph/libceph.h>
12 #include <linux/ceph/debugfs.h>
13 #include <linux/ceph/decode.h>
14 #include <linux/ceph/auth.h>
15
16 /*
17  * Interact with Ceph monitor cluster.  Handle requests for new map
18  * versions, and periodically resend as needed.  Also implement
19  * statfs() and umount().
20  *
21  * A small cluster of Ceph "monitors" are responsible for managing critical
22  * cluster configuration and state information.  An odd number (e.g., 3, 5)
23  * of cmon daemons use a modified version of the Paxos part-time parliament
24  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
25  * list of clients who have mounted the file system.
26  *
27  * We maintain an open, active session with a monitor at all times in order to
28  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
29  * TCP socket to ensure we detect a failure.  If the connection does break, we
30  * randomly hunt for a new monitor.  Once the connection is reestablished, we
31  * resend any outstanding requests.
32  */
33
34 static const struct ceph_connection_operations mon_con_ops;
35
36 static int __validate_auth(struct ceph_mon_client *monc);
37
38 /*
39  * Decode a monmap blob (e.g., during mount).
40  */
41 struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
42 {
43         struct ceph_monmap *m = NULL;
44         int i, err = -EINVAL;
45         struct ceph_fsid fsid;
46         u32 epoch, num_mon;
47         u32 len;
48
49         ceph_decode_32_safe(&p, end, len, bad);
50         ceph_decode_need(&p, end, len, bad);
51
52         dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
53         p += sizeof(u16);  /* skip version */
54
55         ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
56         ceph_decode_copy(&p, &fsid, sizeof(fsid));
57         epoch = ceph_decode_32(&p);
58
59         num_mon = ceph_decode_32(&p);
60         ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
61
62         if (num_mon >= CEPH_MAX_MON)
63                 goto bad;
64         m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
65         if (m == NULL)
66                 return ERR_PTR(-ENOMEM);
67         m->fsid = fsid;
68         m->epoch = epoch;
69         m->num_mon = num_mon;
70         ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
71         for (i = 0; i < num_mon; i++)
72                 ceph_decode_addr(&m->mon_inst[i].addr);
73
74         dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
75              m->num_mon);
76         for (i = 0; i < m->num_mon; i++)
77                 dout("monmap_decode  mon%d is %s\n", i,
78                      ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
79         return m;
80
81 bad:
82         dout("monmap_decode failed with %d\n", err);
83         kfree(m);
84         return ERR_PTR(err);
85 }
86
87 /*
88  * return true if *addr is included in the monmap.
89  */
90 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
91 {
92         int i;
93
94         for (i = 0; i < m->num_mon; i++)
95                 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
96                         return 1;
97         return 0;
98 }
99
100 /*
101  * Send an auth request.
102  */
103 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
104 {
105         monc->pending_auth = 1;
106         monc->m_auth->front.iov_len = len;
107         monc->m_auth->hdr.front_len = cpu_to_le32(len);
108         ceph_msg_revoke(monc->m_auth);
109         ceph_msg_get(monc->m_auth);  /* keep our ref */
110         ceph_con_send(&monc->con, monc->m_auth);
111 }
112
113 /*
114  * Close monitor session, if any.
115  */
116 static void __close_session(struct ceph_mon_client *monc)
117 {
118         dout("__close_session closing mon%d\n", monc->cur_mon);
119         ceph_msg_revoke(monc->m_auth);
120         ceph_msg_revoke_incoming(monc->m_auth_reply);
121         ceph_msg_revoke(monc->m_subscribe);
122         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
123         ceph_con_close(&monc->con);
124
125         monc->pending_auth = 0;
126         ceph_auth_reset(monc->auth);
127 }
128
129 /*
130  * Pick a new monitor at random and set cur_mon.  If we are repicking
131  * (i.e. cur_mon is already set), be sure to pick a different one.
132  */
133 static void pick_new_mon(struct ceph_mon_client *monc)
134 {
135         int old_mon = monc->cur_mon;
136
137         BUG_ON(monc->monmap->num_mon < 1);
138
139         if (monc->monmap->num_mon == 1) {
140                 monc->cur_mon = 0;
141         } else {
142                 int max = monc->monmap->num_mon;
143                 int o = -1;
144                 int n;
145
146                 if (monc->cur_mon >= 0) {
147                         if (monc->cur_mon < monc->monmap->num_mon)
148                                 o = monc->cur_mon;
149                         if (o >= 0)
150                                 max--;
151                 }
152
153                 n = prandom_u32() % max;
154                 if (o >= 0 && n >= o)
155                         n++;
156
157                 monc->cur_mon = n;
158         }
159
160         dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
161              monc->cur_mon, monc->monmap->num_mon);
162 }
163
164 /*
165  * Open a session with a new monitor.
166  */
167 static void __open_session(struct ceph_mon_client *monc)
168 {
169         int ret;
170
171         pick_new_mon(monc);
172
173         monc->hunting = true;
174         if (monc->had_a_connection) {
175                 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
176                 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
177                         monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
178         }
179
180         monc->sub_renew_after = jiffies; /* i.e., expired */
181         monc->sub_renew_sent = 0;
182
183         dout("%s opening mon%d\n", __func__, monc->cur_mon);
184         ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
185                       &monc->monmap->mon_inst[monc->cur_mon].addr);
186
187         /*
188          * send an initial keepalive to ensure our timestamp is valid
189          * by the time we are in an OPENED state
190          */
191         ceph_con_keepalive(&monc->con);
192
193         /* initiate authentication handshake */
194         ret = ceph_auth_build_hello(monc->auth,
195                                     monc->m_auth->front.iov_base,
196                                     monc->m_auth->front_alloc_len);
197         BUG_ON(ret <= 0);
198         __send_prepared_auth_request(monc, ret);
199 }
200
201 static void reopen_session(struct ceph_mon_client *monc)
202 {
203         if (!monc->hunting)
204                 pr_info("mon%d %s session lost, hunting for new mon\n",
205                     monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
206
207         __close_session(monc);
208         __open_session(monc);
209 }
210
211 /*
212  * Reschedule delayed work timer.
213  */
214 static void __schedule_delayed(struct ceph_mon_client *monc)
215 {
216         unsigned long delay;
217
218         if (monc->hunting)
219                 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
220         else
221                 delay = CEPH_MONC_PING_INTERVAL;
222
223         dout("__schedule_delayed after %lu\n", delay);
224         mod_delayed_work(system_wq, &monc->delayed_work,
225                          round_jiffies_relative(delay));
226 }
227
228 const char *ceph_sub_str[] = {
229         [CEPH_SUB_MONMAP] = "monmap",
230         [CEPH_SUB_OSDMAP] = "osdmap",
231         [CEPH_SUB_FSMAP]  = "fsmap.user",
232         [CEPH_SUB_MDSMAP] = "mdsmap",
233 };
234
235 /*
236  * Send subscribe request for one or more maps, according to
237  * monc->subs.
238  */
239 static void __send_subscribe(struct ceph_mon_client *monc)
240 {
241         struct ceph_msg *msg = monc->m_subscribe;
242         void *p = msg->front.iov_base;
243         void *const end = p + msg->front_alloc_len;
244         int num = 0;
245         int i;
246
247         dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
248
249         BUG_ON(monc->cur_mon < 0);
250
251         if (!monc->sub_renew_sent)
252                 monc->sub_renew_sent = jiffies | 1; /* never 0 */
253
254         msg->hdr.version = cpu_to_le16(2);
255
256         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
257                 if (monc->subs[i].want)
258                         num++;
259         }
260         BUG_ON(num < 1); /* monmap sub is always there */
261         ceph_encode_32(&p, num);
262         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
263                 char buf[32];
264                 int len;
265
266                 if (!monc->subs[i].want)
267                         continue;
268
269                 len = sprintf(buf, "%s", ceph_sub_str[i]);
270                 if (i == CEPH_SUB_MDSMAP &&
271                     monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
272                         len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
273
274                 dout("%s %s start %llu flags 0x%x\n", __func__, buf,
275                      le64_to_cpu(monc->subs[i].item.start),
276                      monc->subs[i].item.flags);
277                 ceph_encode_string(&p, end, buf, len);
278                 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
279                 p += sizeof(monc->subs[i].item);
280         }
281
282         BUG_ON(p > end);
283         msg->front.iov_len = p - msg->front.iov_base;
284         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
285         ceph_msg_revoke(msg);
286         ceph_con_send(&monc->con, ceph_msg_get(msg));
287 }
288
289 static void handle_subscribe_ack(struct ceph_mon_client *monc,
290                                  struct ceph_msg *msg)
291 {
292         unsigned int seconds;
293         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
294
295         if (msg->front.iov_len < sizeof(*h))
296                 goto bad;
297         seconds = le32_to_cpu(h->duration);
298
299         mutex_lock(&monc->mutex);
300         if (monc->sub_renew_sent) {
301                 /*
302                  * This is only needed for legacy (infernalis or older)
303                  * MONs -- see delayed_work().
304                  */
305                 monc->sub_renew_after = monc->sub_renew_sent +
306                                             (seconds >> 1) * HZ - 1;
307                 dout("%s sent %lu duration %d renew after %lu\n", __func__,
308                      monc->sub_renew_sent, seconds, monc->sub_renew_after);
309                 monc->sub_renew_sent = 0;
310         } else {
311                 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
312                      monc->sub_renew_sent, monc->sub_renew_after);
313         }
314         mutex_unlock(&monc->mutex);
315         return;
316 bad:
317         pr_err("got corrupt subscribe-ack msg\n");
318         ceph_msg_dump(msg);
319 }
320
321 /*
322  * Register interest in a map
323  *
324  * @sub: one of CEPH_SUB_*
325  * @epoch: X for "every map since X", or 0 for "just the latest"
326  */
327 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
328                                  u32 epoch, bool continuous)
329 {
330         __le64 start = cpu_to_le64(epoch);
331         u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
332
333         dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
334              epoch, continuous);
335
336         if (monc->subs[sub].want &&
337             monc->subs[sub].item.start == start &&
338             monc->subs[sub].item.flags == flags)
339                 return false;
340
341         monc->subs[sub].item.start = start;
342         monc->subs[sub].item.flags = flags;
343         monc->subs[sub].want = true;
344
345         return true;
346 }
347
348 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
349                         bool continuous)
350 {
351         bool need_request;
352
353         mutex_lock(&monc->mutex);
354         need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
355         mutex_unlock(&monc->mutex);
356
357         return need_request;
358 }
359 EXPORT_SYMBOL(ceph_monc_want_map);
360
361 /*
362  * Keep track of which maps we have
363  *
364  * @sub: one of CEPH_SUB_*
365  */
366 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
367                                 u32 epoch)
368 {
369         dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
370
371         if (monc->subs[sub].want) {
372                 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
373                         monc->subs[sub].want = false;
374                 else
375                         monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
376         }
377
378         monc->subs[sub].have = epoch;
379 }
380
381 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
382 {
383         mutex_lock(&monc->mutex);
384         __ceph_monc_got_map(monc, sub, epoch);
385         mutex_unlock(&monc->mutex);
386 }
387 EXPORT_SYMBOL(ceph_monc_got_map);
388
389 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
390 {
391         mutex_lock(&monc->mutex);
392         __send_subscribe(monc);
393         mutex_unlock(&monc->mutex);
394 }
395 EXPORT_SYMBOL(ceph_monc_renew_subs);
396
397 /*
398  * Wait for an osdmap with a given epoch.
399  *
400  * @epoch: epoch to wait for
401  * @timeout: in jiffies, 0 means "wait forever"
402  */
403 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
404                           unsigned long timeout)
405 {
406         unsigned long started = jiffies;
407         long ret;
408
409         mutex_lock(&monc->mutex);
410         while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
411                 mutex_unlock(&monc->mutex);
412
413                 if (timeout && time_after_eq(jiffies, started + timeout))
414                         return -ETIMEDOUT;
415
416                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
417                                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
418                                      ceph_timeout_jiffies(timeout));
419                 if (ret < 0)
420                         return ret;
421
422                 mutex_lock(&monc->mutex);
423         }
424
425         mutex_unlock(&monc->mutex);
426         return 0;
427 }
428 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
429
430 /*
431  * Open a session with a random monitor.  Request monmap and osdmap,
432  * which are waited upon in __ceph_open_session().
433  */
434 int ceph_monc_open_session(struct ceph_mon_client *monc)
435 {
436         mutex_lock(&monc->mutex);
437         __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
438         __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
439         __open_session(monc);
440         __schedule_delayed(monc);
441         mutex_unlock(&monc->mutex);
442         return 0;
443 }
444 EXPORT_SYMBOL(ceph_monc_open_session);
445
446 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
447                                  struct ceph_msg *msg)
448 {
449         struct ceph_client *client = monc->client;
450         struct ceph_monmap *monmap = NULL, *old = monc->monmap;
451         void *p, *end;
452
453         mutex_lock(&monc->mutex);
454
455         dout("handle_monmap\n");
456         p = msg->front.iov_base;
457         end = p + msg->front.iov_len;
458
459         monmap = ceph_monmap_decode(p, end);
460         if (IS_ERR(monmap)) {
461                 pr_err("problem decoding monmap, %d\n",
462                        (int)PTR_ERR(monmap));
463                 goto out;
464         }
465
466         if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
467                 kfree(monmap);
468                 goto out;
469         }
470
471         client->monc.monmap = monmap;
472         kfree(old);
473
474         __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
475         client->have_fsid = true;
476
477 out:
478         mutex_unlock(&monc->mutex);
479         wake_up_all(&client->auth_wq);
480 }
481
482 /*
483  * generic requests (currently statfs, mon_get_version)
484  */
485 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
486
487 static void release_generic_request(struct kref *kref)
488 {
489         struct ceph_mon_generic_request *req =
490                 container_of(kref, struct ceph_mon_generic_request, kref);
491
492         dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
493              req->reply);
494         WARN_ON(!RB_EMPTY_NODE(&req->node));
495
496         if (req->reply)
497                 ceph_msg_put(req->reply);
498         if (req->request)
499                 ceph_msg_put(req->request);
500
501         kfree(req);
502 }
503
504 static void put_generic_request(struct ceph_mon_generic_request *req)
505 {
506         if (req)
507                 kref_put(&req->kref, release_generic_request);
508 }
509
510 static void get_generic_request(struct ceph_mon_generic_request *req)
511 {
512         kref_get(&req->kref);
513 }
514
515 static struct ceph_mon_generic_request *
516 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
517 {
518         struct ceph_mon_generic_request *req;
519
520         req = kzalloc(sizeof(*req), gfp);
521         if (!req)
522                 return NULL;
523
524         req->monc = monc;
525         kref_init(&req->kref);
526         RB_CLEAR_NODE(&req->node);
527         init_completion(&req->completion);
528
529         dout("%s greq %p\n", __func__, req);
530         return req;
531 }
532
533 static void register_generic_request(struct ceph_mon_generic_request *req)
534 {
535         struct ceph_mon_client *monc = req->monc;
536
537         WARN_ON(req->tid);
538
539         get_generic_request(req);
540         req->tid = ++monc->last_tid;
541         insert_generic_request(&monc->generic_request_tree, req);
542 }
543
544 static void send_generic_request(struct ceph_mon_client *monc,
545                                  struct ceph_mon_generic_request *req)
546 {
547         WARN_ON(!req->tid);
548
549         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
550         req->request->hdr.tid = cpu_to_le64(req->tid);
551         ceph_con_send(&monc->con, ceph_msg_get(req->request));
552 }
553
554 static void __finish_generic_request(struct ceph_mon_generic_request *req)
555 {
556         struct ceph_mon_client *monc = req->monc;
557
558         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
559         erase_generic_request(&monc->generic_request_tree, req);
560
561         ceph_msg_revoke(req->request);
562         ceph_msg_revoke_incoming(req->reply);
563 }
564
565 static void finish_generic_request(struct ceph_mon_generic_request *req)
566 {
567         __finish_generic_request(req);
568         put_generic_request(req);
569 }
570
571 static void complete_generic_request(struct ceph_mon_generic_request *req)
572 {
573         if (req->complete_cb)
574                 req->complete_cb(req);
575         else
576                 complete_all(&req->completion);
577         put_generic_request(req);
578 }
579
580 static void cancel_generic_request(struct ceph_mon_generic_request *req)
581 {
582         struct ceph_mon_client *monc = req->monc;
583         struct ceph_mon_generic_request *lookup_req;
584
585         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
586
587         mutex_lock(&monc->mutex);
588         lookup_req = lookup_generic_request(&monc->generic_request_tree,
589                                             req->tid);
590         if (lookup_req) {
591                 WARN_ON(lookup_req != req);
592                 finish_generic_request(req);
593         }
594
595         mutex_unlock(&monc->mutex);
596 }
597
598 static int wait_generic_request(struct ceph_mon_generic_request *req)
599 {
600         int ret;
601
602         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
603         ret = wait_for_completion_interruptible(&req->completion);
604         if (ret)
605                 cancel_generic_request(req);
606         else
607                 ret = req->result; /* completed */
608
609         return ret;
610 }
611
612 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
613                                          struct ceph_msg_header *hdr,
614                                          int *skip)
615 {
616         struct ceph_mon_client *monc = con->private;
617         struct ceph_mon_generic_request *req;
618         u64 tid = le64_to_cpu(hdr->tid);
619         struct ceph_msg *m;
620
621         mutex_lock(&monc->mutex);
622         req = lookup_generic_request(&monc->generic_request_tree, tid);
623         if (!req) {
624                 dout("get_generic_reply %lld dne\n", tid);
625                 *skip = 1;
626                 m = NULL;
627         } else {
628                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
629                 *skip = 0;
630                 m = ceph_msg_get(req->reply);
631                 /*
632                  * we don't need to track the connection reading into
633                  * this reply because we only have one open connection
634                  * at a time, ever.
635                  */
636         }
637         mutex_unlock(&monc->mutex);
638         return m;
639 }
640
641 /*
642  * statfs
643  */
644 static void handle_statfs_reply(struct ceph_mon_client *monc,
645                                 struct ceph_msg *msg)
646 {
647         struct ceph_mon_generic_request *req;
648         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
649         u64 tid = le64_to_cpu(msg->hdr.tid);
650
651         dout("%s msg %p tid %llu\n", __func__, msg, tid);
652
653         if (msg->front.iov_len != sizeof(*reply))
654                 goto bad;
655
656         mutex_lock(&monc->mutex);
657         req = lookup_generic_request(&monc->generic_request_tree, tid);
658         if (!req) {
659                 mutex_unlock(&monc->mutex);
660                 return;
661         }
662
663         req->result = 0;
664         *req->u.st = reply->st; /* struct */
665         __finish_generic_request(req);
666         mutex_unlock(&monc->mutex);
667
668         complete_generic_request(req);
669         return;
670
671 bad:
672         pr_err("corrupt statfs reply, tid %llu\n", tid);
673         ceph_msg_dump(msg);
674 }
675
676 /*
677  * Do a synchronous statfs().
678  */
679 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
680 {
681         struct ceph_mon_generic_request *req;
682         struct ceph_mon_statfs *h;
683         int ret = -ENOMEM;
684
685         req = alloc_generic_request(monc, GFP_NOFS);
686         if (!req)
687                 goto out;
688
689         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
690                                     true);
691         if (!req->request)
692                 goto out;
693
694         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
695         if (!req->reply)
696                 goto out;
697
698         req->u.st = buf;
699
700         mutex_lock(&monc->mutex);
701         register_generic_request(req);
702         /* fill out request */
703         h = req->request->front.iov_base;
704         h->monhdr.have_version = 0;
705         h->monhdr.session_mon = cpu_to_le16(-1);
706         h->monhdr.session_mon_tid = 0;
707         h->fsid = monc->monmap->fsid;
708         send_generic_request(monc, req);
709         mutex_unlock(&monc->mutex);
710
711         ret = wait_generic_request(req);
712 out:
713         put_generic_request(req);
714         return ret;
715 }
716 EXPORT_SYMBOL(ceph_monc_do_statfs);
717
718 static void handle_get_version_reply(struct ceph_mon_client *monc,
719                                      struct ceph_msg *msg)
720 {
721         struct ceph_mon_generic_request *req;
722         u64 tid = le64_to_cpu(msg->hdr.tid);
723         void *p = msg->front.iov_base;
724         void *end = p + msg->front_alloc_len;
725         u64 handle;
726
727         dout("%s msg %p tid %llu\n", __func__, msg, tid);
728
729         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
730         handle = ceph_decode_64(&p);
731         if (tid != 0 && tid != handle)
732                 goto bad;
733
734         mutex_lock(&monc->mutex);
735         req = lookup_generic_request(&monc->generic_request_tree, handle);
736         if (!req) {
737                 mutex_unlock(&monc->mutex);
738                 return;
739         }
740
741         req->result = 0;
742         req->u.newest = ceph_decode_64(&p);
743         __finish_generic_request(req);
744         mutex_unlock(&monc->mutex);
745
746         complete_generic_request(req);
747         return;
748
749 bad:
750         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
751         ceph_msg_dump(msg);
752 }
753
754 static struct ceph_mon_generic_request *
755 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
756                         ceph_monc_callback_t cb, u64 private_data)
757 {
758         struct ceph_mon_generic_request *req;
759
760         req = alloc_generic_request(monc, GFP_NOIO);
761         if (!req)
762                 goto err_put_req;
763
764         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
765                                     sizeof(u64) + sizeof(u32) + strlen(what),
766                                     GFP_NOIO, true);
767         if (!req->request)
768                 goto err_put_req;
769
770         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
771                                   true);
772         if (!req->reply)
773                 goto err_put_req;
774
775         req->complete_cb = cb;
776         req->private_data = private_data;
777
778         mutex_lock(&monc->mutex);
779         register_generic_request(req);
780         {
781                 void *p = req->request->front.iov_base;
782                 void *const end = p + req->request->front_alloc_len;
783
784                 ceph_encode_64(&p, req->tid); /* handle */
785                 ceph_encode_string(&p, end, what, strlen(what));
786                 WARN_ON(p != end);
787         }
788         send_generic_request(monc, req);
789         mutex_unlock(&monc->mutex);
790
791         return req;
792
793 err_put_req:
794         put_generic_request(req);
795         return ERR_PTR(-ENOMEM);
796 }
797
798 /*
799  * Send MMonGetVersion and wait for the reply.
800  *
801  * @what: one of "mdsmap", "osdmap" or "monmap"
802  */
803 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
804                           u64 *newest)
805 {
806         struct ceph_mon_generic_request *req;
807         int ret;
808
809         req = __ceph_monc_get_version(monc, what, NULL, 0);
810         if (IS_ERR(req))
811                 return PTR_ERR(req);
812
813         ret = wait_generic_request(req);
814         if (!ret)
815                 *newest = req->u.newest;
816
817         put_generic_request(req);
818         return ret;
819 }
820 EXPORT_SYMBOL(ceph_monc_get_version);
821
822 /*
823  * Send MMonGetVersion,
824  *
825  * @what: one of "mdsmap", "osdmap" or "monmap"
826  */
827 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
828                                 ceph_monc_callback_t cb, u64 private_data)
829 {
830         struct ceph_mon_generic_request *req;
831
832         req = __ceph_monc_get_version(monc, what, cb, private_data);
833         if (IS_ERR(req))
834                 return PTR_ERR(req);
835
836         put_generic_request(req);
837         return 0;
838 }
839 EXPORT_SYMBOL(ceph_monc_get_version_async);
840
841 static void handle_command_ack(struct ceph_mon_client *monc,
842                                struct ceph_msg *msg)
843 {
844         struct ceph_mon_generic_request *req;
845         void *p = msg->front.iov_base;
846         void *const end = p + msg->front_alloc_len;
847         u64 tid = le64_to_cpu(msg->hdr.tid);
848
849         dout("%s msg %p tid %llu\n", __func__, msg, tid);
850
851         ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
852                                                             sizeof(u32), bad);
853         p += sizeof(struct ceph_mon_request_header);
854
855         mutex_lock(&monc->mutex);
856         req = lookup_generic_request(&monc->generic_request_tree, tid);
857         if (!req) {
858                 mutex_unlock(&monc->mutex);
859                 return;
860         }
861
862         req->result = ceph_decode_32(&p);
863         __finish_generic_request(req);
864         mutex_unlock(&monc->mutex);
865
866         complete_generic_request(req);
867         return;
868
869 bad:
870         pr_err("corrupt mon_command ack, tid %llu\n", tid);
871         ceph_msg_dump(msg);
872 }
873
874 int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
875                             struct ceph_entity_addr *client_addr)
876 {
877         struct ceph_mon_generic_request *req;
878         struct ceph_mon_command *h;
879         int ret = -ENOMEM;
880         int len;
881
882         req = alloc_generic_request(monc, GFP_NOIO);
883         if (!req)
884                 goto out;
885
886         req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
887         if (!req->request)
888                 goto out;
889
890         req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
891                                   true);
892         if (!req->reply)
893                 goto out;
894
895         mutex_lock(&monc->mutex);
896         register_generic_request(req);
897         h = req->request->front.iov_base;
898         h->monhdr.have_version = 0;
899         h->monhdr.session_mon = cpu_to_le16(-1);
900         h->monhdr.session_mon_tid = 0;
901         h->fsid = monc->monmap->fsid;
902         h->num_strs = cpu_to_le32(1);
903         len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
904                                  \"blacklistop\": \"add\", \
905                                  \"addr\": \"%pISpc/%u\" }",
906                       &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
907         h->str_len = cpu_to_le32(len);
908         send_generic_request(monc, req);
909         mutex_unlock(&monc->mutex);
910
911         ret = wait_generic_request(req);
912 out:
913         put_generic_request(req);
914         return ret;
915 }
916 EXPORT_SYMBOL(ceph_monc_blacklist_add);
917
918 /*
919  * Resend pending generic requests.
920  */
921 static void __resend_generic_request(struct ceph_mon_client *monc)
922 {
923         struct ceph_mon_generic_request *req;
924         struct rb_node *p;
925
926         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
927                 req = rb_entry(p, struct ceph_mon_generic_request, node);
928                 ceph_msg_revoke(req->request);
929                 ceph_msg_revoke_incoming(req->reply);
930                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
931         }
932 }
933
934 /*
935  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
936  * renew/retry subscription as needed (in case it is timing out, or we
937  * got an ENOMEM).  And keep the monitor connection alive.
938  */
939 static void delayed_work(struct work_struct *work)
940 {
941         struct ceph_mon_client *monc =
942                 container_of(work, struct ceph_mon_client, delayed_work.work);
943
944         dout("monc delayed_work\n");
945         mutex_lock(&monc->mutex);
946         if (monc->hunting) {
947                 dout("%s continuing hunt\n", __func__);
948                 reopen_session(monc);
949         } else {
950                 int is_auth = ceph_auth_is_authenticated(monc->auth);
951                 if (ceph_con_keepalive_expired(&monc->con,
952                                                CEPH_MONC_PING_TIMEOUT)) {
953                         dout("monc keepalive timeout\n");
954                         is_auth = 0;
955                         reopen_session(monc);
956                 }
957
958                 if (!monc->hunting) {
959                         ceph_con_keepalive(&monc->con);
960                         __validate_auth(monc);
961                 }
962
963                 if (is_auth &&
964                     !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
965                         unsigned long now = jiffies;
966
967                         dout("%s renew subs? now %lu renew after %lu\n",
968                              __func__, now, monc->sub_renew_after);
969                         if (time_after_eq(now, monc->sub_renew_after))
970                                 __send_subscribe(monc);
971                 }
972         }
973         __schedule_delayed(monc);
974         mutex_unlock(&monc->mutex);
975 }
976
977 /*
978  * On startup, we build a temporary monmap populated with the IPs
979  * provided by mount(2).
980  */
981 static int build_initial_monmap(struct ceph_mon_client *monc)
982 {
983         struct ceph_options *opt = monc->client->options;
984         struct ceph_entity_addr *mon_addr = opt->mon_addr;
985         int num_mon = opt->num_mon;
986         int i;
987
988         /* build initial monmap */
989         monc->monmap = kzalloc(sizeof(*monc->monmap) +
990                                num_mon*sizeof(monc->monmap->mon_inst[0]),
991                                GFP_KERNEL);
992         if (!monc->monmap)
993                 return -ENOMEM;
994         for (i = 0; i < num_mon; i++) {
995                 monc->monmap->mon_inst[i].addr = mon_addr[i];
996                 monc->monmap->mon_inst[i].addr.nonce = 0;
997                 monc->monmap->mon_inst[i].name.type =
998                         CEPH_ENTITY_TYPE_MON;
999                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
1000         }
1001         monc->monmap->num_mon = num_mon;
1002         return 0;
1003 }
1004
1005 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1006 {
1007         int err = 0;
1008
1009         dout("init\n");
1010         memset(monc, 0, sizeof(*monc));
1011         monc->client = cl;
1012         monc->monmap = NULL;
1013         mutex_init(&monc->mutex);
1014
1015         err = build_initial_monmap(monc);
1016         if (err)
1017                 goto out;
1018
1019         /* connection */
1020         /* authentication */
1021         monc->auth = ceph_auth_init(cl->options->name,
1022                                     cl->options->key);
1023         if (IS_ERR(monc->auth)) {
1024                 err = PTR_ERR(monc->auth);
1025                 goto out_monmap;
1026         }
1027         monc->auth->want_keys =
1028                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1029                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1030
1031         /* msgs */
1032         err = -ENOMEM;
1033         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1034                                      sizeof(struct ceph_mon_subscribe_ack),
1035                                      GFP_KERNEL, true);
1036         if (!monc->m_subscribe_ack)
1037                 goto out_auth;
1038
1039         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1040                                          GFP_KERNEL, true);
1041         if (!monc->m_subscribe)
1042                 goto out_subscribe_ack;
1043
1044         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1045                                           GFP_KERNEL, true);
1046         if (!monc->m_auth_reply)
1047                 goto out_subscribe;
1048
1049         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1050         monc->pending_auth = 0;
1051         if (!monc->m_auth)
1052                 goto out_auth_reply;
1053
1054         ceph_con_init(&monc->con, monc, &mon_con_ops,
1055                       &monc->client->msgr);
1056
1057         monc->cur_mon = -1;
1058         monc->had_a_connection = false;
1059         monc->hunt_mult = 1;
1060
1061         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1062         monc->generic_request_tree = RB_ROOT;
1063         monc->last_tid = 0;
1064
1065         monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1066
1067         return 0;
1068
1069 out_auth_reply:
1070         ceph_msg_put(monc->m_auth_reply);
1071 out_subscribe:
1072         ceph_msg_put(monc->m_subscribe);
1073 out_subscribe_ack:
1074         ceph_msg_put(monc->m_subscribe_ack);
1075 out_auth:
1076         ceph_auth_destroy(monc->auth);
1077 out_monmap:
1078         kfree(monc->monmap);
1079 out:
1080         return err;
1081 }
1082 EXPORT_SYMBOL(ceph_monc_init);
1083
1084 void ceph_monc_stop(struct ceph_mon_client *monc)
1085 {
1086         dout("stop\n");
1087         cancel_delayed_work_sync(&monc->delayed_work);
1088
1089         mutex_lock(&monc->mutex);
1090         __close_session(monc);
1091         monc->cur_mon = -1;
1092         mutex_unlock(&monc->mutex);
1093
1094         /*
1095          * flush msgr queue before we destroy ourselves to ensure that:
1096          *  - any work that references our embedded con is finished.
1097          *  - any osd_client or other work that may reference an authorizer
1098          *    finishes before we shut down the auth subsystem.
1099          */
1100         ceph_msgr_flush();
1101
1102         ceph_auth_destroy(monc->auth);
1103
1104         WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1105
1106         ceph_msg_put(monc->m_auth);
1107         ceph_msg_put(monc->m_auth_reply);
1108         ceph_msg_put(monc->m_subscribe);
1109         ceph_msg_put(monc->m_subscribe_ack);
1110
1111         kfree(monc->monmap);
1112 }
1113 EXPORT_SYMBOL(ceph_monc_stop);
1114
1115 static void finish_hunting(struct ceph_mon_client *monc)
1116 {
1117         if (monc->hunting) {
1118                 dout("%s found mon%d\n", __func__, monc->cur_mon);
1119                 monc->hunting = false;
1120                 monc->had_a_connection = true;
1121                 monc->hunt_mult /= 2; /* reduce by 50% */
1122                 if (monc->hunt_mult < 1)
1123                         monc->hunt_mult = 1;
1124         }
1125 }
1126
1127 static void handle_auth_reply(struct ceph_mon_client *monc,
1128                               struct ceph_msg *msg)
1129 {
1130         int ret;
1131         int was_auth = 0;
1132
1133         mutex_lock(&monc->mutex);
1134         was_auth = ceph_auth_is_authenticated(monc->auth);
1135         monc->pending_auth = 0;
1136         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1137                                      msg->front.iov_len,
1138                                      monc->m_auth->front.iov_base,
1139                                      monc->m_auth->front_alloc_len);
1140         if (ret > 0) {
1141                 __send_prepared_auth_request(monc, ret);
1142                 goto out;
1143         }
1144
1145         finish_hunting(monc);
1146
1147         if (ret < 0) {
1148                 monc->client->auth_err = ret;
1149         } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
1150                 dout("authenticated, starting session\n");
1151
1152                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1153                 monc->client->msgr.inst.name.num =
1154                                         cpu_to_le64(monc->auth->global_id);
1155
1156                 __send_subscribe(monc);
1157                 __resend_generic_request(monc);
1158
1159                 pr_info("mon%d %s session established\n", monc->cur_mon,
1160                         ceph_pr_addr(&monc->con.peer_addr.in_addr));
1161         }
1162
1163 out:
1164         mutex_unlock(&monc->mutex);
1165         if (monc->client->auth_err < 0)
1166                 wake_up_all(&monc->client->auth_wq);
1167 }
1168
1169 static int __validate_auth(struct ceph_mon_client *monc)
1170 {
1171         int ret;
1172
1173         if (monc->pending_auth)
1174                 return 0;
1175
1176         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1177                               monc->m_auth->front_alloc_len);
1178         if (ret <= 0)
1179                 return ret; /* either an error, or no need to authenticate */
1180         __send_prepared_auth_request(monc, ret);
1181         return 0;
1182 }
1183
1184 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1185 {
1186         int ret;
1187
1188         mutex_lock(&monc->mutex);
1189         ret = __validate_auth(monc);
1190         mutex_unlock(&monc->mutex);
1191         return ret;
1192 }
1193 EXPORT_SYMBOL(ceph_monc_validate_auth);
1194
1195 /*
1196  * handle incoming message
1197  */
1198 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1199 {
1200         struct ceph_mon_client *monc = con->private;
1201         int type = le16_to_cpu(msg->hdr.type);
1202
1203         if (!monc)
1204                 return;
1205
1206         switch (type) {
1207         case CEPH_MSG_AUTH_REPLY:
1208                 handle_auth_reply(monc, msg);
1209                 break;
1210
1211         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1212                 handle_subscribe_ack(monc, msg);
1213                 break;
1214
1215         case CEPH_MSG_STATFS_REPLY:
1216                 handle_statfs_reply(monc, msg);
1217                 break;
1218
1219         case CEPH_MSG_MON_GET_VERSION_REPLY:
1220                 handle_get_version_reply(monc, msg);
1221                 break;
1222
1223         case CEPH_MSG_MON_COMMAND_ACK:
1224                 handle_command_ack(monc, msg);
1225                 break;
1226
1227         case CEPH_MSG_MON_MAP:
1228                 ceph_monc_handle_map(monc, msg);
1229                 break;
1230
1231         case CEPH_MSG_OSD_MAP:
1232                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1233                 break;
1234
1235         default:
1236                 /* can the chained handler handle it? */
1237                 if (monc->client->extra_mon_dispatch &&
1238                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1239                         break;
1240                         
1241                 pr_err("received unknown message type %d %s\n", type,
1242                        ceph_msg_type_name(type));
1243         }
1244         ceph_msg_put(msg);
1245 }
1246
1247 /*
1248  * Allocate memory for incoming message
1249  */
1250 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1251                                       struct ceph_msg_header *hdr,
1252                                       int *skip)
1253 {
1254         struct ceph_mon_client *monc = con->private;
1255         int type = le16_to_cpu(hdr->type);
1256         int front_len = le32_to_cpu(hdr->front_len);
1257         struct ceph_msg *m = NULL;
1258
1259         *skip = 0;
1260
1261         switch (type) {
1262         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1263                 m = ceph_msg_get(monc->m_subscribe_ack);
1264                 break;
1265         case CEPH_MSG_STATFS_REPLY:
1266         case CEPH_MSG_MON_COMMAND_ACK:
1267                 return get_generic_reply(con, hdr, skip);
1268         case CEPH_MSG_AUTH_REPLY:
1269                 m = ceph_msg_get(monc->m_auth_reply);
1270                 break;
1271         case CEPH_MSG_MON_GET_VERSION_REPLY:
1272                 if (le64_to_cpu(hdr->tid) != 0)
1273                         return get_generic_reply(con, hdr, skip);
1274
1275                 /*
1276                  * Older OSDs don't set reply tid even if the orignal
1277                  * request had a non-zero tid.  Workaround this weirdness
1278                  * by falling through to the allocate case.
1279                  */
1280         case CEPH_MSG_MON_MAP:
1281         case CEPH_MSG_MDS_MAP:
1282         case CEPH_MSG_OSD_MAP:
1283         case CEPH_MSG_FS_MAP_USER:
1284                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1285                 if (!m)
1286                         return NULL;    /* ENOMEM--return skip == 0 */
1287                 break;
1288         }
1289
1290         if (!m) {
1291                 pr_info("alloc_msg unknown type %d\n", type);
1292                 *skip = 1;
1293         } else if (front_len > m->front_alloc_len) {
1294                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1295                         front_len, m->front_alloc_len,
1296                         (unsigned int)con->peer_name.type,
1297                         le64_to_cpu(con->peer_name.num));
1298                 ceph_msg_put(m);
1299                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1300         }
1301
1302         return m;
1303 }
1304
1305 /*
1306  * If the monitor connection resets, pick a new monitor and resubmit
1307  * any pending requests.
1308  */
1309 static void mon_fault(struct ceph_connection *con)
1310 {
1311         struct ceph_mon_client *monc = con->private;
1312
1313         mutex_lock(&monc->mutex);
1314         dout("%s mon%d\n", __func__, monc->cur_mon);
1315         if (monc->cur_mon >= 0) {
1316                 if (!monc->hunting) {
1317                         dout("%s hunting for new mon\n", __func__);
1318                         reopen_session(monc);
1319                         __schedule_delayed(monc);
1320                 } else {
1321                         dout("%s already hunting\n", __func__);
1322                 }
1323         }
1324         mutex_unlock(&monc->mutex);
1325 }
1326
1327 /*
1328  * We can ignore refcounting on the connection struct, as all references
1329  * will come from the messenger workqueue, which is drained prior to
1330  * mon_client destruction.
1331  */
1332 static struct ceph_connection *con_get(struct ceph_connection *con)
1333 {
1334         return con;
1335 }
1336
1337 static void con_put(struct ceph_connection *con)
1338 {
1339 }
1340
1341 static const struct ceph_connection_operations mon_con_ops = {
1342         .get = con_get,
1343         .put = con_put,
1344         .dispatch = dispatch,
1345         .fault = mon_fault,
1346         .alloc_msg = mon_alloc_msg,
1347 };