]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - fs/ceph/mds_client.c
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[karo-tx-linux.git] / fs / ceph / mds_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/sched.h>
7 #include <linux/debugfs.h>
8 #include <linux/seq_file.h>
9
10 #include "super.h"
11 #include "mds_client.h"
12
13 #include <linux/ceph/ceph_features.h>
14 #include <linux/ceph/messenger.h>
15 #include <linux/ceph/decode.h>
16 #include <linux/ceph/pagelist.h>
17 #include <linux/ceph/auth.h>
18 #include <linux/ceph/debugfs.h>
19
20 /*
21  * A cluster of MDS (metadata server) daemons is responsible for
22  * managing the file system namespace (the directory hierarchy and
23  * inodes) and for coordinating shared access to storage.  Metadata is
24  * partitioning hierarchically across a number of servers, and that
25  * partition varies over time as the cluster adjusts the distribution
26  * in order to balance load.
27  *
28  * The MDS client is primarily responsible to managing synchronous
29  * metadata requests for operations like open, unlink, and so forth.
30  * If there is a MDS failure, we find out about it when we (possibly
31  * request and) receive a new MDS map, and can resubmit affected
32  * requests.
33  *
34  * For the most part, though, we take advantage of a lossless
35  * communications channel to the MDS, and do not need to worry about
36  * timing out or resubmitting requests.
37  *
38  * We maintain a stateful "session" with each MDS we interact with.
39  * Within each session, we sent periodic heartbeat messages to ensure
40  * any capabilities or leases we have been issues remain valid.  If
41  * the session times out and goes stale, our leases and capabilities
42  * are no longer valid.
43  */
44
45 struct ceph_reconnect_state {
46         struct ceph_pagelist *pagelist;
47         bool flock;
48 };
49
50 static void __wake_requests(struct ceph_mds_client *mdsc,
51                             struct list_head *head);
52
53 static const struct ceph_connection_operations mds_con_ops;
54
55
56 /*
57  * mds reply parsing
58  */
59
60 /*
61  * parse individual inode info
62  */
63 static int parse_reply_info_in(void **p, void *end,
64                                struct ceph_mds_reply_info_in *info,
65                                int features)
66 {
67         int err = -EIO;
68
69         info->in = *p;
70         *p += sizeof(struct ceph_mds_reply_inode) +
71                 sizeof(*info->in->fragtree.splits) *
72                 le32_to_cpu(info->in->fragtree.nsplits);
73
74         ceph_decode_32_safe(p, end, info->symlink_len, bad);
75         ceph_decode_need(p, end, info->symlink_len, bad);
76         info->symlink = *p;
77         *p += info->symlink_len;
78
79         if (features & CEPH_FEATURE_DIRLAYOUTHASH)
80                 ceph_decode_copy_safe(p, end, &info->dir_layout,
81                                       sizeof(info->dir_layout), bad);
82         else
83                 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
84
85         ceph_decode_32_safe(p, end, info->xattr_len, bad);
86         ceph_decode_need(p, end, info->xattr_len, bad);
87         info->xattr_data = *p;
88         *p += info->xattr_len;
89         return 0;
90 bad:
91         return err;
92 }
93
94 /*
95  * parse a normal reply, which may contain a (dir+)dentry and/or a
96  * target inode.
97  */
98 static int parse_reply_info_trace(void **p, void *end,
99                                   struct ceph_mds_reply_info_parsed *info,
100                                   int features)
101 {
102         int err;
103
104         if (info->head->is_dentry) {
105                 err = parse_reply_info_in(p, end, &info->diri, features);
106                 if (err < 0)
107                         goto out_bad;
108
109                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
110                         goto bad;
111                 info->dirfrag = *p;
112                 *p += sizeof(*info->dirfrag) +
113                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
114                 if (unlikely(*p > end))
115                         goto bad;
116
117                 ceph_decode_32_safe(p, end, info->dname_len, bad);
118                 ceph_decode_need(p, end, info->dname_len, bad);
119                 info->dname = *p;
120                 *p += info->dname_len;
121                 info->dlease = *p;
122                 *p += sizeof(*info->dlease);
123         }
124
125         if (info->head->is_target) {
126                 err = parse_reply_info_in(p, end, &info->targeti, features);
127                 if (err < 0)
128                         goto out_bad;
129         }
130
131         if (unlikely(*p != end))
132                 goto bad;
133         return 0;
134
135 bad:
136         err = -EIO;
137 out_bad:
138         pr_err("problem parsing mds trace %d\n", err);
139         return err;
140 }
141
142 /*
143  * parse readdir results
144  */
145 static int parse_reply_info_dir(void **p, void *end,
146                                 struct ceph_mds_reply_info_parsed *info,
147                                 int features)
148 {
149         u32 num, i = 0;
150         int err;
151
152         info->dir_dir = *p;
153         if (*p + sizeof(*info->dir_dir) > end)
154                 goto bad;
155         *p += sizeof(*info->dir_dir) +
156                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
157         if (*p > end)
158                 goto bad;
159
160         ceph_decode_need(p, end, sizeof(num) + 2, bad);
161         num = ceph_decode_32(p);
162         info->dir_end = ceph_decode_8(p);
163         info->dir_complete = ceph_decode_8(p);
164         if (num == 0)
165                 goto done;
166
167         /* alloc large array */
168         info->dir_nr = num;
169         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
170                                sizeof(*info->dir_dname) +
171                                sizeof(*info->dir_dname_len) +
172                                sizeof(*info->dir_dlease),
173                                GFP_NOFS);
174         if (info->dir_in == NULL) {
175                 err = -ENOMEM;
176                 goto out_bad;
177         }
178         info->dir_dname = (void *)(info->dir_in + num);
179         info->dir_dname_len = (void *)(info->dir_dname + num);
180         info->dir_dlease = (void *)(info->dir_dname_len + num);
181
182         while (num) {
183                 /* dentry */
184                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
185                 info->dir_dname_len[i] = ceph_decode_32(p);
186                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
187                 info->dir_dname[i] = *p;
188                 *p += info->dir_dname_len[i];
189                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
190                      info->dir_dname[i]);
191                 info->dir_dlease[i] = *p;
192                 *p += sizeof(struct ceph_mds_reply_lease);
193
194                 /* inode */
195                 err = parse_reply_info_in(p, end, &info->dir_in[i], features);
196                 if (err < 0)
197                         goto out_bad;
198                 i++;
199                 num--;
200         }
201
202 done:
203         if (*p != end)
204                 goto bad;
205         return 0;
206
207 bad:
208         err = -EIO;
209 out_bad:
210         pr_err("problem parsing dir contents %d\n", err);
211         return err;
212 }
213
214 /*
215  * parse fcntl F_GETLK results
216  */
217 static int parse_reply_info_filelock(void **p, void *end,
218                                      struct ceph_mds_reply_info_parsed *info,
219                                      int features)
220 {
221         if (*p + sizeof(*info->filelock_reply) > end)
222                 goto bad;
223
224         info->filelock_reply = *p;
225         *p += sizeof(*info->filelock_reply);
226
227         if (unlikely(*p != end))
228                 goto bad;
229         return 0;
230
231 bad:
232         return -EIO;
233 }
234
235 /*
236  * parse create results
237  */
238 static int parse_reply_info_create(void **p, void *end,
239                                   struct ceph_mds_reply_info_parsed *info,
240                                   int features)
241 {
242         if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
243                 if (*p == end) {
244                         info->has_create_ino = false;
245                 } else {
246                         info->has_create_ino = true;
247                         info->ino = ceph_decode_64(p);
248                 }
249         }
250
251         if (unlikely(*p != end))
252                 goto bad;
253         return 0;
254
255 bad:
256         return -EIO;
257 }
258
259 /*
260  * parse extra results
261  */
262 static int parse_reply_info_extra(void **p, void *end,
263                                   struct ceph_mds_reply_info_parsed *info,
264                                   int features)
265 {
266         if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
267                 return parse_reply_info_filelock(p, end, info, features);
268         else if (info->head->op == CEPH_MDS_OP_READDIR)
269                 return parse_reply_info_dir(p, end, info, features);
270         else if (info->head->op == CEPH_MDS_OP_CREATE)
271                 return parse_reply_info_create(p, end, info, features);
272         else
273                 return -EIO;
274 }
275
276 /*
277  * parse entire mds reply
278  */
279 static int parse_reply_info(struct ceph_msg *msg,
280                             struct ceph_mds_reply_info_parsed *info,
281                             int features)
282 {
283         void *p, *end;
284         u32 len;
285         int err;
286
287         info->head = msg->front.iov_base;
288         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
289         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
290
291         /* trace */
292         ceph_decode_32_safe(&p, end, len, bad);
293         if (len > 0) {
294                 ceph_decode_need(&p, end, len, bad);
295                 err = parse_reply_info_trace(&p, p+len, info, features);
296                 if (err < 0)
297                         goto out_bad;
298         }
299
300         /* extra */
301         ceph_decode_32_safe(&p, end, len, bad);
302         if (len > 0) {
303                 ceph_decode_need(&p, end, len, bad);
304                 err = parse_reply_info_extra(&p, p+len, info, features);
305                 if (err < 0)
306                         goto out_bad;
307         }
308
309         /* snap blob */
310         ceph_decode_32_safe(&p, end, len, bad);
311         info->snapblob_len = len;
312         info->snapblob = p;
313         p += len;
314
315         if (p != end)
316                 goto bad;
317         return 0;
318
319 bad:
320         err = -EIO;
321 out_bad:
322         pr_err("mds parse_reply err %d\n", err);
323         return err;
324 }
325
326 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
327 {
328         kfree(info->dir_in);
329 }
330
331
332 /*
333  * sessions
334  */
335 static const char *session_state_name(int s)
336 {
337         switch (s) {
338         case CEPH_MDS_SESSION_NEW: return "new";
339         case CEPH_MDS_SESSION_OPENING: return "opening";
340         case CEPH_MDS_SESSION_OPEN: return "open";
341         case CEPH_MDS_SESSION_HUNG: return "hung";
342         case CEPH_MDS_SESSION_CLOSING: return "closing";
343         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
344         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
345         default: return "???";
346         }
347 }
348
349 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
350 {
351         if (atomic_inc_not_zero(&s->s_ref)) {
352                 dout("mdsc get_session %p %d -> %d\n", s,
353                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
354                 return s;
355         } else {
356                 dout("mdsc get_session %p 0 -- FAIL", s);
357                 return NULL;
358         }
359 }
360
361 void ceph_put_mds_session(struct ceph_mds_session *s)
362 {
363         dout("mdsc put_session %p %d -> %d\n", s,
364              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
365         if (atomic_dec_and_test(&s->s_ref)) {
366                 if (s->s_auth.authorizer)
367                      s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
368                              s->s_mdsc->fsc->client->monc.auth,
369                              s->s_auth.authorizer);
370                 kfree(s);
371         }
372 }
373
374 /*
375  * called under mdsc->mutex
376  */
377 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
378                                                    int mds)
379 {
380         struct ceph_mds_session *session;
381
382         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
383                 return NULL;
384         session = mdsc->sessions[mds];
385         dout("lookup_mds_session %p %d\n", session,
386              atomic_read(&session->s_ref));
387         get_session(session);
388         return session;
389 }
390
391 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
392 {
393         if (mds >= mdsc->max_sessions)
394                 return false;
395         return mdsc->sessions[mds];
396 }
397
398 static int __verify_registered_session(struct ceph_mds_client *mdsc,
399                                        struct ceph_mds_session *s)
400 {
401         if (s->s_mds >= mdsc->max_sessions ||
402             mdsc->sessions[s->s_mds] != s)
403                 return -ENOENT;
404         return 0;
405 }
406
407 /*
408  * create+register a new session for given mds.
409  * called under mdsc->mutex.
410  */
411 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
412                                                  int mds)
413 {
414         struct ceph_mds_session *s;
415
416         s = kzalloc(sizeof(*s), GFP_NOFS);
417         if (!s)
418                 return ERR_PTR(-ENOMEM);
419         s->s_mdsc = mdsc;
420         s->s_mds = mds;
421         s->s_state = CEPH_MDS_SESSION_NEW;
422         s->s_ttl = 0;
423         s->s_seq = 0;
424         mutex_init(&s->s_mutex);
425
426         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
427
428         spin_lock_init(&s->s_gen_ttl_lock);
429         s->s_cap_gen = 0;
430         s->s_cap_ttl = jiffies - 1;
431
432         spin_lock_init(&s->s_cap_lock);
433         s->s_renew_requested = 0;
434         s->s_renew_seq = 0;
435         INIT_LIST_HEAD(&s->s_caps);
436         s->s_nr_caps = 0;
437         s->s_trim_caps = 0;
438         atomic_set(&s->s_ref, 1);
439         INIT_LIST_HEAD(&s->s_waiting);
440         INIT_LIST_HEAD(&s->s_unsafe);
441         s->s_num_cap_releases = 0;
442         s->s_cap_iterator = NULL;
443         INIT_LIST_HEAD(&s->s_cap_releases);
444         INIT_LIST_HEAD(&s->s_cap_releases_done);
445         INIT_LIST_HEAD(&s->s_cap_flushing);
446         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
447
448         dout("register_session mds%d\n", mds);
449         if (mds >= mdsc->max_sessions) {
450                 int newmax = 1 << get_count_order(mds+1);
451                 struct ceph_mds_session **sa;
452
453                 dout("register_session realloc to %d\n", newmax);
454                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
455                 if (sa == NULL)
456                         goto fail_realloc;
457                 if (mdsc->sessions) {
458                         memcpy(sa, mdsc->sessions,
459                                mdsc->max_sessions * sizeof(void *));
460                         kfree(mdsc->sessions);
461                 }
462                 mdsc->sessions = sa;
463                 mdsc->max_sessions = newmax;
464         }
465         mdsc->sessions[mds] = s;
466         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
467
468         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
469                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
470
471         return s;
472
473 fail_realloc:
474         kfree(s);
475         return ERR_PTR(-ENOMEM);
476 }
477
478 /*
479  * called under mdsc->mutex
480  */
481 static void __unregister_session(struct ceph_mds_client *mdsc,
482                                struct ceph_mds_session *s)
483 {
484         dout("__unregister_session mds%d %p\n", s->s_mds, s);
485         BUG_ON(mdsc->sessions[s->s_mds] != s);
486         mdsc->sessions[s->s_mds] = NULL;
487         ceph_con_close(&s->s_con);
488         ceph_put_mds_session(s);
489 }
490
491 /*
492  * drop session refs in request.
493  *
494  * should be last request ref, or hold mdsc->mutex
495  */
496 static void put_request_session(struct ceph_mds_request *req)
497 {
498         if (req->r_session) {
499                 ceph_put_mds_session(req->r_session);
500                 req->r_session = NULL;
501         }
502 }
503
504 void ceph_mdsc_release_request(struct kref *kref)
505 {
506         struct ceph_mds_request *req = container_of(kref,
507                                                     struct ceph_mds_request,
508                                                     r_kref);
509         if (req->r_request)
510                 ceph_msg_put(req->r_request);
511         if (req->r_reply) {
512                 ceph_msg_put(req->r_reply);
513                 destroy_reply_info(&req->r_reply_info);
514         }
515         if (req->r_inode) {
516                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
517                 iput(req->r_inode);
518         }
519         if (req->r_locked_dir)
520                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
521         if (req->r_target_inode)
522                 iput(req->r_target_inode);
523         if (req->r_dentry)
524                 dput(req->r_dentry);
525         if (req->r_old_dentry) {
526                 /*
527                  * track (and drop pins for) r_old_dentry_dir
528                  * separately, since r_old_dentry's d_parent may have
529                  * changed between the dir mutex being dropped and
530                  * this request being freed.
531                  */
532                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
533                                   CEPH_CAP_PIN);
534                 dput(req->r_old_dentry);
535                 iput(req->r_old_dentry_dir);
536         }
537         kfree(req->r_path1);
538         kfree(req->r_path2);
539         put_request_session(req);
540         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
541         kfree(req);
542 }
543
544 /*
545  * lookup session, bump ref if found.
546  *
547  * called under mdsc->mutex.
548  */
549 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
550                                              u64 tid)
551 {
552         struct ceph_mds_request *req;
553         struct rb_node *n = mdsc->request_tree.rb_node;
554
555         while (n) {
556                 req = rb_entry(n, struct ceph_mds_request, r_node);
557                 if (tid < req->r_tid)
558                         n = n->rb_left;
559                 else if (tid > req->r_tid)
560                         n = n->rb_right;
561                 else {
562                         ceph_mdsc_get_request(req);
563                         return req;
564                 }
565         }
566         return NULL;
567 }
568
569 static void __insert_request(struct ceph_mds_client *mdsc,
570                              struct ceph_mds_request *new)
571 {
572         struct rb_node **p = &mdsc->request_tree.rb_node;
573         struct rb_node *parent = NULL;
574         struct ceph_mds_request *req = NULL;
575
576         while (*p) {
577                 parent = *p;
578                 req = rb_entry(parent, struct ceph_mds_request, r_node);
579                 if (new->r_tid < req->r_tid)
580                         p = &(*p)->rb_left;
581                 else if (new->r_tid > req->r_tid)
582                         p = &(*p)->rb_right;
583                 else
584                         BUG();
585         }
586
587         rb_link_node(&new->r_node, parent, p);
588         rb_insert_color(&new->r_node, &mdsc->request_tree);
589 }
590
591 /*
592  * Register an in-flight request, and assign a tid.  Link to directory
593  * are modifying (if any).
594  *
595  * Called under mdsc->mutex.
596  */
597 static void __register_request(struct ceph_mds_client *mdsc,
598                                struct ceph_mds_request *req,
599                                struct inode *dir)
600 {
601         req->r_tid = ++mdsc->last_tid;
602         if (req->r_num_caps)
603                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
604                                   req->r_num_caps);
605         dout("__register_request %p tid %lld\n", req, req->r_tid);
606         ceph_mdsc_get_request(req);
607         __insert_request(mdsc, req);
608
609         req->r_uid = current_fsuid();
610         req->r_gid = current_fsgid();
611
612         if (dir) {
613                 struct ceph_inode_info *ci = ceph_inode(dir);
614
615                 ihold(dir);
616                 spin_lock(&ci->i_unsafe_lock);
617                 req->r_unsafe_dir = dir;
618                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
619                 spin_unlock(&ci->i_unsafe_lock);
620         }
621 }
622
623 static void __unregister_request(struct ceph_mds_client *mdsc,
624                                  struct ceph_mds_request *req)
625 {
626         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
627         rb_erase(&req->r_node, &mdsc->request_tree);
628         RB_CLEAR_NODE(&req->r_node);
629
630         if (req->r_unsafe_dir) {
631                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
632
633                 spin_lock(&ci->i_unsafe_lock);
634                 list_del_init(&req->r_unsafe_dir_item);
635                 spin_unlock(&ci->i_unsafe_lock);
636
637                 iput(req->r_unsafe_dir);
638                 req->r_unsafe_dir = NULL;
639         }
640
641         ceph_mdsc_put_request(req);
642 }
643
644 /*
645  * Choose mds to send request to next.  If there is a hint set in the
646  * request (e.g., due to a prior forward hint from the mds), use that.
647  * Otherwise, consult frag tree and/or caps to identify the
648  * appropriate mds.  If all else fails, choose randomly.
649  *
650  * Called under mdsc->mutex.
651  */
652 static struct dentry *get_nonsnap_parent(struct dentry *dentry)
653 {
654         /*
655          * we don't need to worry about protecting the d_parent access
656          * here because we never renaming inside the snapped namespace
657          * except to resplice to another snapdir, and either the old or new
658          * result is a valid result.
659          */
660         while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
661                 dentry = dentry->d_parent;
662         return dentry;
663 }
664
665 static int __choose_mds(struct ceph_mds_client *mdsc,
666                         struct ceph_mds_request *req)
667 {
668         struct inode *inode;
669         struct ceph_inode_info *ci;
670         struct ceph_cap *cap;
671         int mode = req->r_direct_mode;
672         int mds = -1;
673         u32 hash = req->r_direct_hash;
674         bool is_hash = req->r_direct_is_hash;
675
676         /*
677          * is there a specific mds we should try?  ignore hint if we have
678          * no session and the mds is not up (active or recovering).
679          */
680         if (req->r_resend_mds >= 0 &&
681             (__have_session(mdsc, req->r_resend_mds) ||
682              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
683                 dout("choose_mds using resend_mds mds%d\n",
684                      req->r_resend_mds);
685                 return req->r_resend_mds;
686         }
687
688         if (mode == USE_RANDOM_MDS)
689                 goto random;
690
691         inode = NULL;
692         if (req->r_inode) {
693                 inode = req->r_inode;
694         } else if (req->r_dentry) {
695                 /* ignore race with rename; old or new d_parent is okay */
696                 struct dentry *parent = req->r_dentry->d_parent;
697                 struct inode *dir = parent->d_inode;
698
699                 if (dir->i_sb != mdsc->fsc->sb) {
700                         /* not this fs! */
701                         inode = req->r_dentry->d_inode;
702                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
703                         /* direct snapped/virtual snapdir requests
704                          * based on parent dir inode */
705                         struct dentry *dn = get_nonsnap_parent(parent);
706                         inode = dn->d_inode;
707                         dout("__choose_mds using nonsnap parent %p\n", inode);
708                 } else if (req->r_dentry->d_inode) {
709                         /* dentry target */
710                         inode = req->r_dentry->d_inode;
711                 } else {
712                         /* dir + name */
713                         inode = dir;
714                         hash = ceph_dentry_hash(dir, req->r_dentry);
715                         is_hash = true;
716                 }
717         }
718
719         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
720              (int)hash, mode);
721         if (!inode)
722                 goto random;
723         ci = ceph_inode(inode);
724
725         if (is_hash && S_ISDIR(inode->i_mode)) {
726                 struct ceph_inode_frag frag;
727                 int found;
728
729                 ceph_choose_frag(ci, hash, &frag, &found);
730                 if (found) {
731                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
732                                 u8 r;
733
734                                 /* choose a random replica */
735                                 get_random_bytes(&r, 1);
736                                 r %= frag.ndist;
737                                 mds = frag.dist[r];
738                                 dout("choose_mds %p %llx.%llx "
739                                      "frag %u mds%d (%d/%d)\n",
740                                      inode, ceph_vinop(inode),
741                                      frag.frag, mds,
742                                      (int)r, frag.ndist);
743                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
744                                     CEPH_MDS_STATE_ACTIVE)
745                                         return mds;
746                         }
747
748                         /* since this file/dir wasn't known to be
749                          * replicated, then we want to look for the
750                          * authoritative mds. */
751                         mode = USE_AUTH_MDS;
752                         if (frag.mds >= 0) {
753                                 /* choose auth mds */
754                                 mds = frag.mds;
755                                 dout("choose_mds %p %llx.%llx "
756                                      "frag %u mds%d (auth)\n",
757                                      inode, ceph_vinop(inode), frag.frag, mds);
758                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
759                                     CEPH_MDS_STATE_ACTIVE)
760                                         return mds;
761                         }
762                 }
763         }
764
765         spin_lock(&ci->i_ceph_lock);
766         cap = NULL;
767         if (mode == USE_AUTH_MDS)
768                 cap = ci->i_auth_cap;
769         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
770                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
771         if (!cap) {
772                 spin_unlock(&ci->i_ceph_lock);
773                 goto random;
774         }
775         mds = cap->session->s_mds;
776         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
777              inode, ceph_vinop(inode), mds,
778              cap == ci->i_auth_cap ? "auth " : "", cap);
779         spin_unlock(&ci->i_ceph_lock);
780         return mds;
781
782 random:
783         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
784         dout("choose_mds chose random mds%d\n", mds);
785         return mds;
786 }
787
788
789 /*
790  * session messages
791  */
792 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
793 {
794         struct ceph_msg *msg;
795         struct ceph_mds_session_head *h;
796
797         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
798                            false);
799         if (!msg) {
800                 pr_err("create_session_msg ENOMEM creating msg\n");
801                 return NULL;
802         }
803         h = msg->front.iov_base;
804         h->op = cpu_to_le32(op);
805         h->seq = cpu_to_le64(seq);
806         return msg;
807 }
808
809 /*
810  * send session open request.
811  *
812  * called under mdsc->mutex
813  */
814 static int __open_session(struct ceph_mds_client *mdsc,
815                           struct ceph_mds_session *session)
816 {
817         struct ceph_msg *msg;
818         int mstate;
819         int mds = session->s_mds;
820
821         /* wait for mds to go active? */
822         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
823         dout("open_session to mds%d (%s)\n", mds,
824              ceph_mds_state_name(mstate));
825         session->s_state = CEPH_MDS_SESSION_OPENING;
826         session->s_renew_requested = jiffies;
827
828         /* send connect message */
829         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
830         if (!msg)
831                 return -ENOMEM;
832         ceph_con_send(&session->s_con, msg);
833         return 0;
834 }
835
836 /*
837  * open sessions for any export targets for the given mds
838  *
839  * called under mdsc->mutex
840  */
841 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
842                                           struct ceph_mds_session *session)
843 {
844         struct ceph_mds_info *mi;
845         struct ceph_mds_session *ts;
846         int i, mds = session->s_mds;
847         int target;
848
849         if (mds >= mdsc->mdsmap->m_max_mds)
850                 return;
851         mi = &mdsc->mdsmap->m_info[mds];
852         dout("open_export_target_sessions for mds%d (%d targets)\n",
853              session->s_mds, mi->num_export_targets);
854
855         for (i = 0; i < mi->num_export_targets; i++) {
856                 target = mi->export_targets[i];
857                 ts = __ceph_lookup_mds_session(mdsc, target);
858                 if (!ts) {
859                         ts = register_session(mdsc, target);
860                         if (IS_ERR(ts))
861                                 return;
862                 }
863                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
864                     session->s_state == CEPH_MDS_SESSION_CLOSING)
865                         __open_session(mdsc, session);
866                 else
867                         dout(" mds%d target mds%d %p is %s\n", session->s_mds,
868                              i, ts, session_state_name(ts->s_state));
869                 ceph_put_mds_session(ts);
870         }
871 }
872
873 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
874                                            struct ceph_mds_session *session)
875 {
876         mutex_lock(&mdsc->mutex);
877         __open_export_target_sessions(mdsc, session);
878         mutex_unlock(&mdsc->mutex);
879 }
880
881 /*
882  * session caps
883  */
884
885 /*
886  * Free preallocated cap messages assigned to this session
887  */
888 static void cleanup_cap_releases(struct ceph_mds_session *session)
889 {
890         struct ceph_msg *msg;
891
892         spin_lock(&session->s_cap_lock);
893         while (!list_empty(&session->s_cap_releases)) {
894                 msg = list_first_entry(&session->s_cap_releases,
895                                        struct ceph_msg, list_head);
896                 list_del_init(&msg->list_head);
897                 ceph_msg_put(msg);
898         }
899         while (!list_empty(&session->s_cap_releases_done)) {
900                 msg = list_first_entry(&session->s_cap_releases_done,
901                                        struct ceph_msg, list_head);
902                 list_del_init(&msg->list_head);
903                 ceph_msg_put(msg);
904         }
905         spin_unlock(&session->s_cap_lock);
906 }
907
908 /*
909  * Helper to safely iterate over all caps associated with a session, with
910  * special care taken to handle a racing __ceph_remove_cap().
911  *
912  * Caller must hold session s_mutex.
913  */
914 static int iterate_session_caps(struct ceph_mds_session *session,
915                                  int (*cb)(struct inode *, struct ceph_cap *,
916                                             void *), void *arg)
917 {
918         struct list_head *p;
919         struct ceph_cap *cap;
920         struct inode *inode, *last_inode = NULL;
921         struct ceph_cap *old_cap = NULL;
922         int ret;
923
924         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
925         spin_lock(&session->s_cap_lock);
926         p = session->s_caps.next;
927         while (p != &session->s_caps) {
928                 cap = list_entry(p, struct ceph_cap, session_caps);
929                 inode = igrab(&cap->ci->vfs_inode);
930                 if (!inode) {
931                         p = p->next;
932                         continue;
933                 }
934                 session->s_cap_iterator = cap;
935                 spin_unlock(&session->s_cap_lock);
936
937                 if (last_inode) {
938                         iput(last_inode);
939                         last_inode = NULL;
940                 }
941                 if (old_cap) {
942                         ceph_put_cap(session->s_mdsc, old_cap);
943                         old_cap = NULL;
944                 }
945
946                 ret = cb(inode, cap, arg);
947                 last_inode = inode;
948
949                 spin_lock(&session->s_cap_lock);
950                 p = p->next;
951                 if (cap->ci == NULL) {
952                         dout("iterate_session_caps  finishing cap %p removal\n",
953                              cap);
954                         BUG_ON(cap->session != session);
955                         list_del_init(&cap->session_caps);
956                         session->s_nr_caps--;
957                         cap->session = NULL;
958                         old_cap = cap;  /* put_cap it w/o locks held */
959                 }
960                 if (ret < 0)
961                         goto out;
962         }
963         ret = 0;
964 out:
965         session->s_cap_iterator = NULL;
966         spin_unlock(&session->s_cap_lock);
967
968         if (last_inode)
969                 iput(last_inode);
970         if (old_cap)
971                 ceph_put_cap(session->s_mdsc, old_cap);
972
973         return ret;
974 }
975
976 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
977                                   void *arg)
978 {
979         struct ceph_inode_info *ci = ceph_inode(inode);
980         int drop = 0;
981
982         dout("removing cap %p, ci is %p, inode is %p\n",
983              cap, ci, &ci->vfs_inode);
984         spin_lock(&ci->i_ceph_lock);
985         __ceph_remove_cap(cap);
986         if (!__ceph_is_any_real_caps(ci)) {
987                 struct ceph_mds_client *mdsc =
988                         ceph_sb_to_client(inode->i_sb)->mdsc;
989
990                 spin_lock(&mdsc->cap_dirty_lock);
991                 if (!list_empty(&ci->i_dirty_item)) {
992                         pr_info(" dropping dirty %s state for %p %lld\n",
993                                 ceph_cap_string(ci->i_dirty_caps),
994                                 inode, ceph_ino(inode));
995                         ci->i_dirty_caps = 0;
996                         list_del_init(&ci->i_dirty_item);
997                         drop = 1;
998                 }
999                 if (!list_empty(&ci->i_flushing_item)) {
1000                         pr_info(" dropping dirty+flushing %s state for %p %lld\n",
1001                                 ceph_cap_string(ci->i_flushing_caps),
1002                                 inode, ceph_ino(inode));
1003                         ci->i_flushing_caps = 0;
1004                         list_del_init(&ci->i_flushing_item);
1005                         mdsc->num_cap_flushing--;
1006                         drop = 1;
1007                 }
1008                 if (drop && ci->i_wrbuffer_ref) {
1009                         pr_info(" dropping dirty data for %p %lld\n",
1010                                 inode, ceph_ino(inode));
1011                         ci->i_wrbuffer_ref = 0;
1012                         ci->i_wrbuffer_ref_head = 0;
1013                         drop++;
1014                 }
1015                 spin_unlock(&mdsc->cap_dirty_lock);
1016         }
1017         spin_unlock(&ci->i_ceph_lock);
1018         while (drop--)
1019                 iput(inode);
1020         return 0;
1021 }
1022
1023 /*
1024  * caller must hold session s_mutex
1025  */
1026 static void remove_session_caps(struct ceph_mds_session *session)
1027 {
1028         dout("remove_session_caps on %p\n", session);
1029         iterate_session_caps(session, remove_session_caps_cb, NULL);
1030         BUG_ON(session->s_nr_caps > 0);
1031         BUG_ON(!list_empty(&session->s_cap_flushing));
1032         cleanup_cap_releases(session);
1033 }
1034
1035 /*
1036  * wake up any threads waiting on this session's caps.  if the cap is
1037  * old (didn't get renewed on the client reconnect), remove it now.
1038  *
1039  * caller must hold s_mutex.
1040  */
1041 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1042                               void *arg)
1043 {
1044         struct ceph_inode_info *ci = ceph_inode(inode);
1045
1046         wake_up_all(&ci->i_cap_wq);
1047         if (arg) {
1048                 spin_lock(&ci->i_ceph_lock);
1049                 ci->i_wanted_max_size = 0;
1050                 ci->i_requested_max_size = 0;
1051                 spin_unlock(&ci->i_ceph_lock);
1052         }
1053         return 0;
1054 }
1055
1056 static void wake_up_session_caps(struct ceph_mds_session *session,
1057                                  int reconnect)
1058 {
1059         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1060         iterate_session_caps(session, wake_up_session_cb,
1061                              (void *)(unsigned long)reconnect);
1062 }
1063
1064 /*
1065  * Send periodic message to MDS renewing all currently held caps.  The
1066  * ack will reset the expiration for all caps from this session.
1067  *
1068  * caller holds s_mutex
1069  */
1070 static int send_renew_caps(struct ceph_mds_client *mdsc,
1071                            struct ceph_mds_session *session)
1072 {
1073         struct ceph_msg *msg;
1074         int state;
1075
1076         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1077             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1078                 pr_info("mds%d caps stale\n", session->s_mds);
1079         session->s_renew_requested = jiffies;
1080
1081         /* do not try to renew caps until a recovering mds has reconnected
1082          * with its clients. */
1083         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1084         if (state < CEPH_MDS_STATE_RECONNECT) {
1085                 dout("send_renew_caps ignoring mds%d (%s)\n",
1086                      session->s_mds, ceph_mds_state_name(state));
1087                 return 0;
1088         }
1089
1090         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1091                 ceph_mds_state_name(state));
1092         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1093                                  ++session->s_renew_seq);
1094         if (!msg)
1095                 return -ENOMEM;
1096         ceph_con_send(&session->s_con, msg);
1097         return 0;
1098 }
1099
1100 /*
1101  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1102  *
1103  * Called under session->s_mutex
1104  */
1105 static void renewed_caps(struct ceph_mds_client *mdsc,
1106                          struct ceph_mds_session *session, int is_renew)
1107 {
1108         int was_stale;
1109         int wake = 0;
1110
1111         spin_lock(&session->s_cap_lock);
1112         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1113
1114         session->s_cap_ttl = session->s_renew_requested +
1115                 mdsc->mdsmap->m_session_timeout*HZ;
1116
1117         if (was_stale) {
1118                 if (time_before(jiffies, session->s_cap_ttl)) {
1119                         pr_info("mds%d caps renewed\n", session->s_mds);
1120                         wake = 1;
1121                 } else {
1122                         pr_info("mds%d caps still stale\n", session->s_mds);
1123                 }
1124         }
1125         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1126              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1127              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1128         spin_unlock(&session->s_cap_lock);
1129
1130         if (wake)
1131                 wake_up_session_caps(session, 0);
1132 }
1133
1134 /*
1135  * send a session close request
1136  */
1137 static int request_close_session(struct ceph_mds_client *mdsc,
1138                                  struct ceph_mds_session *session)
1139 {
1140         struct ceph_msg *msg;
1141
1142         dout("request_close_session mds%d state %s seq %lld\n",
1143              session->s_mds, session_state_name(session->s_state),
1144              session->s_seq);
1145         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1146         if (!msg)
1147                 return -ENOMEM;
1148         ceph_con_send(&session->s_con, msg);
1149         return 0;
1150 }
1151
1152 /*
1153  * Called with s_mutex held.
1154  */
1155 static int __close_session(struct ceph_mds_client *mdsc,
1156                          struct ceph_mds_session *session)
1157 {
1158         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1159                 return 0;
1160         session->s_state = CEPH_MDS_SESSION_CLOSING;
1161         return request_close_session(mdsc, session);
1162 }
1163
1164 /*
1165  * Trim old(er) caps.
1166  *
1167  * Because we can't cache an inode without one or more caps, we do
1168  * this indirectly: if a cap is unused, we prune its aliases, at which
1169  * point the inode will hopefully get dropped to.
1170  *
1171  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1172  * memory pressure from the MDS, though, so it needn't be perfect.
1173  */
1174 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1175 {
1176         struct ceph_mds_session *session = arg;
1177         struct ceph_inode_info *ci = ceph_inode(inode);
1178         int used, oissued, mine;
1179
1180         if (session->s_trim_caps <= 0)
1181                 return -1;
1182
1183         spin_lock(&ci->i_ceph_lock);
1184         mine = cap->issued | cap->implemented;
1185         used = __ceph_caps_used(ci);
1186         oissued = __ceph_caps_issued_other(ci, cap);
1187
1188         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1189              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1190              ceph_cap_string(used));
1191         if (ci->i_dirty_caps)
1192                 goto out;   /* dirty caps */
1193         if ((used & ~oissued) & mine)
1194                 goto out;   /* we need these caps */
1195
1196         session->s_trim_caps--;
1197         if (oissued) {
1198                 /* we aren't the only cap.. just remove us */
1199                 __ceph_remove_cap(cap);
1200         } else {
1201                 /* try to drop referring dentries */
1202                 spin_unlock(&ci->i_ceph_lock);
1203                 d_prune_aliases(inode);
1204                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1205                      inode, cap, atomic_read(&inode->i_count));
1206                 return 0;
1207         }
1208
1209 out:
1210         spin_unlock(&ci->i_ceph_lock);
1211         return 0;
1212 }
1213
1214 /*
1215  * Trim session cap count down to some max number.
1216  */
1217 static int trim_caps(struct ceph_mds_client *mdsc,
1218                      struct ceph_mds_session *session,
1219                      int max_caps)
1220 {
1221         int trim_caps = session->s_nr_caps - max_caps;
1222
1223         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1224              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1225         if (trim_caps > 0) {
1226                 session->s_trim_caps = trim_caps;
1227                 iterate_session_caps(session, trim_caps_cb, session);
1228                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1229                      session->s_mds, session->s_nr_caps, max_caps,
1230                         trim_caps - session->s_trim_caps);
1231                 session->s_trim_caps = 0;
1232         }
1233         return 0;
1234 }
1235
1236 /*
1237  * Allocate cap_release messages.  If there is a partially full message
1238  * in the queue, try to allocate enough to cover it's remainder, so that
1239  * we can send it immediately.
1240  *
1241  * Called under s_mutex.
1242  */
1243 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1244                           struct ceph_mds_session *session)
1245 {
1246         struct ceph_msg *msg, *partial = NULL;
1247         struct ceph_mds_cap_release *head;
1248         int err = -ENOMEM;
1249         int extra = mdsc->fsc->mount_options->cap_release_safety;
1250         int num;
1251
1252         dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1253              extra);
1254
1255         spin_lock(&session->s_cap_lock);
1256
1257         if (!list_empty(&session->s_cap_releases)) {
1258                 msg = list_first_entry(&session->s_cap_releases,
1259                                        struct ceph_msg,
1260                                  list_head);
1261                 head = msg->front.iov_base;
1262                 num = le32_to_cpu(head->num);
1263                 if (num) {
1264                         dout(" partial %p with (%d/%d)\n", msg, num,
1265                              (int)CEPH_CAPS_PER_RELEASE);
1266                         extra += CEPH_CAPS_PER_RELEASE - num;
1267                         partial = msg;
1268                 }
1269         }
1270         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1271                 spin_unlock(&session->s_cap_lock);
1272                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1273                                    GFP_NOFS, false);
1274                 if (!msg)
1275                         goto out_unlocked;
1276                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
1277                      (int)msg->front.iov_len);
1278                 head = msg->front.iov_base;
1279                 head->num = cpu_to_le32(0);
1280                 msg->front.iov_len = sizeof(*head);
1281                 spin_lock(&session->s_cap_lock);
1282                 list_add(&msg->list_head, &session->s_cap_releases);
1283                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1284         }
1285
1286         if (partial) {
1287                 head = partial->front.iov_base;
1288                 num = le32_to_cpu(head->num);
1289                 dout(" queueing partial %p with %d/%d\n", partial, num,
1290                      (int)CEPH_CAPS_PER_RELEASE);
1291                 list_move_tail(&partial->list_head,
1292                                &session->s_cap_releases_done);
1293                 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1294         }
1295         err = 0;
1296         spin_unlock(&session->s_cap_lock);
1297 out_unlocked:
1298         return err;
1299 }
1300
1301 /*
1302  * flush all dirty inode data to disk.
1303  *
1304  * returns true if we've flushed through want_flush_seq
1305  */
1306 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1307 {
1308         int mds, ret = 1;
1309
1310         dout("check_cap_flush want %lld\n", want_flush_seq);
1311         mutex_lock(&mdsc->mutex);
1312         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1313                 struct ceph_mds_session *session = mdsc->sessions[mds];
1314
1315                 if (!session)
1316                         continue;
1317                 get_session(session);
1318                 mutex_unlock(&mdsc->mutex);
1319
1320                 mutex_lock(&session->s_mutex);
1321                 if (!list_empty(&session->s_cap_flushing)) {
1322                         struct ceph_inode_info *ci =
1323                                 list_entry(session->s_cap_flushing.next,
1324                                            struct ceph_inode_info,
1325                                            i_flushing_item);
1326                         struct inode *inode = &ci->vfs_inode;
1327
1328                         spin_lock(&ci->i_ceph_lock);
1329                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1330                                 dout("check_cap_flush still flushing %p "
1331                                      "seq %lld <= %lld to mds%d\n", inode,
1332                                      ci->i_cap_flush_seq, want_flush_seq,
1333                                      session->s_mds);
1334                                 ret = 0;
1335                         }
1336                         spin_unlock(&ci->i_ceph_lock);
1337                 }
1338                 mutex_unlock(&session->s_mutex);
1339                 ceph_put_mds_session(session);
1340
1341                 if (!ret)
1342                         return ret;
1343                 mutex_lock(&mdsc->mutex);
1344         }
1345
1346         mutex_unlock(&mdsc->mutex);
1347         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1348         return ret;
1349 }
1350
1351 /*
1352  * called under s_mutex
1353  */
1354 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1355                             struct ceph_mds_session *session)
1356 {
1357         struct ceph_msg *msg;
1358
1359         dout("send_cap_releases mds%d\n", session->s_mds);
1360         spin_lock(&session->s_cap_lock);
1361         while (!list_empty(&session->s_cap_releases_done)) {
1362                 msg = list_first_entry(&session->s_cap_releases_done,
1363                                  struct ceph_msg, list_head);
1364                 list_del_init(&msg->list_head);
1365                 spin_unlock(&session->s_cap_lock);
1366                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1367                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1368                 ceph_con_send(&session->s_con, msg);
1369                 spin_lock(&session->s_cap_lock);
1370         }
1371         spin_unlock(&session->s_cap_lock);
1372 }
1373
1374 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1375                                  struct ceph_mds_session *session)
1376 {
1377         struct ceph_msg *msg;
1378         struct ceph_mds_cap_release *head;
1379         unsigned num;
1380
1381         dout("discard_cap_releases mds%d\n", session->s_mds);
1382         spin_lock(&session->s_cap_lock);
1383
1384         /* zero out the in-progress message */
1385         msg = list_first_entry(&session->s_cap_releases,
1386                                struct ceph_msg, list_head);
1387         head = msg->front.iov_base;
1388         num = le32_to_cpu(head->num);
1389         dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1390         head->num = cpu_to_le32(0);
1391         session->s_num_cap_releases += num;
1392
1393         /* requeue completed messages */
1394         while (!list_empty(&session->s_cap_releases_done)) {
1395                 msg = list_first_entry(&session->s_cap_releases_done,
1396                                  struct ceph_msg, list_head);
1397                 list_del_init(&msg->list_head);
1398
1399                 head = msg->front.iov_base;
1400                 num = le32_to_cpu(head->num);
1401                 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1402                      num);
1403                 session->s_num_cap_releases += num;
1404                 head->num = cpu_to_le32(0);
1405                 msg->front.iov_len = sizeof(*head);
1406                 list_add(&msg->list_head, &session->s_cap_releases);
1407         }
1408
1409         spin_unlock(&session->s_cap_lock);
1410 }
1411
1412 /*
1413  * requests
1414  */
1415
1416 /*
1417  * Create an mds request.
1418  */
1419 struct ceph_mds_request *
1420 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1421 {
1422         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1423
1424         if (!req)
1425                 return ERR_PTR(-ENOMEM);
1426
1427         mutex_init(&req->r_fill_mutex);
1428         req->r_mdsc = mdsc;
1429         req->r_started = jiffies;
1430         req->r_resend_mds = -1;
1431         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1432         req->r_fmode = -1;
1433         kref_init(&req->r_kref);
1434         INIT_LIST_HEAD(&req->r_wait);
1435         init_completion(&req->r_completion);
1436         init_completion(&req->r_safe_completion);
1437         INIT_LIST_HEAD(&req->r_unsafe_item);
1438
1439         req->r_op = op;
1440         req->r_direct_mode = mode;
1441         return req;
1442 }
1443
1444 /*
1445  * return oldest (lowest) request, tid in request tree, 0 if none.
1446  *
1447  * called under mdsc->mutex.
1448  */
1449 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1450 {
1451         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1452                 return NULL;
1453         return rb_entry(rb_first(&mdsc->request_tree),
1454                         struct ceph_mds_request, r_node);
1455 }
1456
1457 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1458 {
1459         struct ceph_mds_request *req = __get_oldest_req(mdsc);
1460
1461         if (req)
1462                 return req->r_tid;
1463         return 0;
1464 }
1465
1466 /*
1467  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1468  * on build_path_from_dentry in fs/cifs/dir.c.
1469  *
1470  * If @stop_on_nosnap, generate path relative to the first non-snapped
1471  * inode.
1472  *
1473  * Encode hidden .snap dirs as a double /, i.e.
1474  *   foo/.snap/bar -> foo//bar
1475  */
1476 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1477                            int stop_on_nosnap)
1478 {
1479         struct dentry *temp;
1480         char *path;
1481         int len, pos;
1482         unsigned seq;
1483
1484         if (dentry == NULL)
1485                 return ERR_PTR(-EINVAL);
1486
1487 retry:
1488         len = 0;
1489         seq = read_seqbegin(&rename_lock);
1490         rcu_read_lock();
1491         for (temp = dentry; !IS_ROOT(temp);) {
1492                 struct inode *inode = temp->d_inode;
1493                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1494                         len++;  /* slash only */
1495                 else if (stop_on_nosnap && inode &&
1496                          ceph_snap(inode) == CEPH_NOSNAP)
1497                         break;
1498                 else
1499                         len += 1 + temp->d_name.len;
1500                 temp = temp->d_parent;
1501         }
1502         rcu_read_unlock();
1503         if (len)
1504                 len--;  /* no leading '/' */
1505
1506         path = kmalloc(len+1, GFP_NOFS);
1507         if (path == NULL)
1508                 return ERR_PTR(-ENOMEM);
1509         pos = len;
1510         path[pos] = 0;  /* trailing null */
1511         rcu_read_lock();
1512         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1513                 struct inode *inode;
1514
1515                 spin_lock(&temp->d_lock);
1516                 inode = temp->d_inode;
1517                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1518                         dout("build_path path+%d: %p SNAPDIR\n",
1519                              pos, temp);
1520                 } else if (stop_on_nosnap && inode &&
1521                            ceph_snap(inode) == CEPH_NOSNAP) {
1522                         spin_unlock(&temp->d_lock);
1523                         break;
1524                 } else {
1525                         pos -= temp->d_name.len;
1526                         if (pos < 0) {
1527                                 spin_unlock(&temp->d_lock);
1528                                 break;
1529                         }
1530                         strncpy(path + pos, temp->d_name.name,
1531                                 temp->d_name.len);
1532                 }
1533                 spin_unlock(&temp->d_lock);
1534                 if (pos)
1535                         path[--pos] = '/';
1536                 temp = temp->d_parent;
1537         }
1538         rcu_read_unlock();
1539         if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1540                 pr_err("build_path did not end path lookup where "
1541                        "expected, namelen is %d, pos is %d\n", len, pos);
1542                 /* presumably this is only possible if racing with a
1543                    rename of one of the parent directories (we can not
1544                    lock the dentries above us to prevent this, but
1545                    retrying should be harmless) */
1546                 kfree(path);
1547                 goto retry;
1548         }
1549
1550         *base = ceph_ino(temp->d_inode);
1551         *plen = len;
1552         dout("build_path on %p %d built %llx '%.*s'\n",
1553              dentry, dentry->d_count, *base, len, path);
1554         return path;
1555 }
1556
1557 static int build_dentry_path(struct dentry *dentry,
1558                              const char **ppath, int *ppathlen, u64 *pino,
1559                              int *pfreepath)
1560 {
1561         char *path;
1562
1563         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1564                 *pino = ceph_ino(dentry->d_parent->d_inode);
1565                 *ppath = dentry->d_name.name;
1566                 *ppathlen = dentry->d_name.len;
1567                 return 0;
1568         }
1569         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1570         if (IS_ERR(path))
1571                 return PTR_ERR(path);
1572         *ppath = path;
1573         *pfreepath = 1;
1574         return 0;
1575 }
1576
1577 static int build_inode_path(struct inode *inode,
1578                             const char **ppath, int *ppathlen, u64 *pino,
1579                             int *pfreepath)
1580 {
1581         struct dentry *dentry;
1582         char *path;
1583
1584         if (ceph_snap(inode) == CEPH_NOSNAP) {
1585                 *pino = ceph_ino(inode);
1586                 *ppathlen = 0;
1587                 return 0;
1588         }
1589         dentry = d_find_alias(inode);
1590         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1591         dput(dentry);
1592         if (IS_ERR(path))
1593                 return PTR_ERR(path);
1594         *ppath = path;
1595         *pfreepath = 1;
1596         return 0;
1597 }
1598
1599 /*
1600  * request arguments may be specified via an inode *, a dentry *, or
1601  * an explicit ino+path.
1602  */
1603 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1604                                   const char *rpath, u64 rino,
1605                                   const char **ppath, int *pathlen,
1606                                   u64 *ino, int *freepath)
1607 {
1608         int r = 0;
1609
1610         if (rinode) {
1611                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1612                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1613                      ceph_snap(rinode));
1614         } else if (rdentry) {
1615                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1616                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1617                      *ppath);
1618         } else if (rpath || rino) {
1619                 *ino = rino;
1620                 *ppath = rpath;
1621                 *pathlen = rpath ? strlen(rpath) : 0;
1622                 dout(" path %.*s\n", *pathlen, rpath);
1623         }
1624
1625         return r;
1626 }
1627
1628 /*
1629  * called under mdsc->mutex
1630  */
1631 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1632                                                struct ceph_mds_request *req,
1633                                                int mds)
1634 {
1635         struct ceph_msg *msg;
1636         struct ceph_mds_request_head *head;
1637         const char *path1 = NULL;
1638         const char *path2 = NULL;
1639         u64 ino1 = 0, ino2 = 0;
1640         int pathlen1 = 0, pathlen2 = 0;
1641         int freepath1 = 0, freepath2 = 0;
1642         int len;
1643         u16 releases;
1644         void *p, *end;
1645         int ret;
1646
1647         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1648                               req->r_path1, req->r_ino1.ino,
1649                               &path1, &pathlen1, &ino1, &freepath1);
1650         if (ret < 0) {
1651                 msg = ERR_PTR(ret);
1652                 goto out;
1653         }
1654
1655         ret = set_request_path_attr(NULL, req->r_old_dentry,
1656                               req->r_path2, req->r_ino2.ino,
1657                               &path2, &pathlen2, &ino2, &freepath2);
1658         if (ret < 0) {
1659                 msg = ERR_PTR(ret);
1660                 goto out_free1;
1661         }
1662
1663         len = sizeof(*head) +
1664                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1665
1666         /* calculate (max) length for cap releases */
1667         len += sizeof(struct ceph_mds_request_release) *
1668                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1669                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1670         if (req->r_dentry_drop)
1671                 len += req->r_dentry->d_name.len;
1672         if (req->r_old_dentry_drop)
1673                 len += req->r_old_dentry->d_name.len;
1674
1675         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1676         if (!msg) {
1677                 msg = ERR_PTR(-ENOMEM);
1678                 goto out_free2;
1679         }
1680
1681         msg->hdr.tid = cpu_to_le64(req->r_tid);
1682
1683         head = msg->front.iov_base;
1684         p = msg->front.iov_base + sizeof(*head);
1685         end = msg->front.iov_base + msg->front.iov_len;
1686
1687         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1688         head->op = cpu_to_le32(req->r_op);
1689         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1690         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1691         head->args = req->r_args;
1692
1693         ceph_encode_filepath(&p, end, ino1, path1);
1694         ceph_encode_filepath(&p, end, ino2, path2);
1695
1696         /* make note of release offset, in case we need to replay */
1697         req->r_request_release_offset = p - msg->front.iov_base;
1698
1699         /* cap releases */
1700         releases = 0;
1701         if (req->r_inode_drop)
1702                 releases += ceph_encode_inode_release(&p,
1703                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1704                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1705         if (req->r_dentry_drop)
1706                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1707                        mds, req->r_dentry_drop, req->r_dentry_unless);
1708         if (req->r_old_dentry_drop)
1709                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1710                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1711         if (req->r_old_inode_drop)
1712                 releases += ceph_encode_inode_release(&p,
1713                       req->r_old_dentry->d_inode,
1714                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1715         head->num_releases = cpu_to_le16(releases);
1716
1717         BUG_ON(p > end);
1718         msg->front.iov_len = p - msg->front.iov_base;
1719         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1720
1721         msg->pages = req->r_pages;
1722         msg->nr_pages = req->r_num_pages;
1723         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1724         msg->hdr.data_off = cpu_to_le16(0);
1725
1726 out_free2:
1727         if (freepath2)
1728                 kfree((char *)path2);
1729 out_free1:
1730         if (freepath1)
1731                 kfree((char *)path1);
1732 out:
1733         return msg;
1734 }
1735
1736 /*
1737  * called under mdsc->mutex if error, under no mutex if
1738  * success.
1739  */
1740 static void complete_request(struct ceph_mds_client *mdsc,
1741                              struct ceph_mds_request *req)
1742 {
1743         if (req->r_callback)
1744                 req->r_callback(mdsc, req);
1745         else
1746                 complete_all(&req->r_completion);
1747 }
1748
1749 /*
1750  * called under mdsc->mutex
1751  */
1752 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1753                                   struct ceph_mds_request *req,
1754                                   int mds)
1755 {
1756         struct ceph_mds_request_head *rhead;
1757         struct ceph_msg *msg;
1758         int flags = 0;
1759
1760         req->r_attempts++;
1761         if (req->r_inode) {
1762                 struct ceph_cap *cap =
1763                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1764
1765                 if (cap)
1766                         req->r_sent_on_mseq = cap->mseq;
1767                 else
1768                         req->r_sent_on_mseq = -1;
1769         }
1770         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1771              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1772
1773         if (req->r_got_unsafe) {
1774                 /*
1775                  * Replay.  Do not regenerate message (and rebuild
1776                  * paths, etc.); just use the original message.
1777                  * Rebuilding paths will break for renames because
1778                  * d_move mangles the src name.
1779                  */
1780                 msg = req->r_request;
1781                 rhead = msg->front.iov_base;
1782
1783                 flags = le32_to_cpu(rhead->flags);
1784                 flags |= CEPH_MDS_FLAG_REPLAY;
1785                 rhead->flags = cpu_to_le32(flags);
1786
1787                 if (req->r_target_inode)
1788                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1789
1790                 rhead->num_retry = req->r_attempts - 1;
1791
1792                 /* remove cap/dentry releases from message */
1793                 rhead->num_releases = 0;
1794                 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1795                 msg->front.iov_len = req->r_request_release_offset;
1796                 return 0;
1797         }
1798
1799         if (req->r_request) {
1800                 ceph_msg_put(req->r_request);
1801                 req->r_request = NULL;
1802         }
1803         msg = create_request_message(mdsc, req, mds);
1804         if (IS_ERR(msg)) {
1805                 req->r_err = PTR_ERR(msg);
1806                 complete_request(mdsc, req);
1807                 return PTR_ERR(msg);
1808         }
1809         req->r_request = msg;
1810
1811         rhead = msg->front.iov_base;
1812         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1813         if (req->r_got_unsafe)
1814                 flags |= CEPH_MDS_FLAG_REPLAY;
1815         if (req->r_locked_dir)
1816                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1817         rhead->flags = cpu_to_le32(flags);
1818         rhead->num_fwd = req->r_num_fwd;
1819         rhead->num_retry = req->r_attempts - 1;
1820         rhead->ino = 0;
1821
1822         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1823         return 0;
1824 }
1825
1826 /*
1827  * send request, or put it on the appropriate wait list.
1828  */
1829 static int __do_request(struct ceph_mds_client *mdsc,
1830                         struct ceph_mds_request *req)
1831 {
1832         struct ceph_mds_session *session = NULL;
1833         int mds = -1;
1834         int err = -EAGAIN;
1835
1836         if (req->r_err || req->r_got_result)
1837                 goto out;
1838
1839         if (req->r_timeout &&
1840             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1841                 dout("do_request timed out\n");
1842                 err = -EIO;
1843                 goto finish;
1844         }
1845
1846         put_request_session(req);
1847
1848         mds = __choose_mds(mdsc, req);
1849         if (mds < 0 ||
1850             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1851                 dout("do_request no mds or not active, waiting for map\n");
1852                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1853                 goto out;
1854         }
1855
1856         /* get, open session */
1857         session = __ceph_lookup_mds_session(mdsc, mds);
1858         if (!session) {
1859                 session = register_session(mdsc, mds);
1860                 if (IS_ERR(session)) {
1861                         err = PTR_ERR(session);
1862                         goto finish;
1863                 }
1864         }
1865         req->r_session = get_session(session);
1866
1867         dout("do_request mds%d session %p state %s\n", mds, session,
1868              session_state_name(session->s_state));
1869         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1870             session->s_state != CEPH_MDS_SESSION_HUNG) {
1871                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1872                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1873                         __open_session(mdsc, session);
1874                 list_add(&req->r_wait, &session->s_waiting);
1875                 goto out_session;
1876         }
1877
1878         /* send request */
1879         req->r_resend_mds = -1;   /* forget any previous mds hint */
1880
1881         if (req->r_request_started == 0)   /* note request start time */
1882                 req->r_request_started = jiffies;
1883
1884         err = __prepare_send_request(mdsc, req, mds);
1885         if (!err) {
1886                 ceph_msg_get(req->r_request);
1887                 ceph_con_send(&session->s_con, req->r_request);
1888         }
1889
1890 out_session:
1891         ceph_put_mds_session(session);
1892 out:
1893         return err;
1894
1895 finish:
1896         req->r_err = err;
1897         complete_request(mdsc, req);
1898         goto out;
1899 }
1900
1901 /*
1902  * called under mdsc->mutex
1903  */
1904 static void __wake_requests(struct ceph_mds_client *mdsc,
1905                             struct list_head *head)
1906 {
1907         struct ceph_mds_request *req;
1908         LIST_HEAD(tmp_list);
1909
1910         list_splice_init(head, &tmp_list);
1911
1912         while (!list_empty(&tmp_list)) {
1913                 req = list_entry(tmp_list.next,
1914                                  struct ceph_mds_request, r_wait);
1915                 list_del_init(&req->r_wait);
1916                 __do_request(mdsc, req);
1917         }
1918 }
1919
1920 /*
1921  * Wake up threads with requests pending for @mds, so that they can
1922  * resubmit their requests to a possibly different mds.
1923  */
1924 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1925 {
1926         struct ceph_mds_request *req;
1927         struct rb_node *p;
1928
1929         dout("kick_requests mds%d\n", mds);
1930         for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1931                 req = rb_entry(p, struct ceph_mds_request, r_node);
1932                 if (req->r_got_unsafe)
1933                         continue;
1934                 if (req->r_session &&
1935                     req->r_session->s_mds == mds) {
1936                         dout(" kicking tid %llu\n", req->r_tid);
1937                         __do_request(mdsc, req);
1938                 }
1939         }
1940 }
1941
1942 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1943                               struct ceph_mds_request *req)
1944 {
1945         dout("submit_request on %p\n", req);
1946         mutex_lock(&mdsc->mutex);
1947         __register_request(mdsc, req, NULL);
1948         __do_request(mdsc, req);
1949         mutex_unlock(&mdsc->mutex);
1950 }
1951
1952 /*
1953  * Synchrously perform an mds request.  Take care of all of the
1954  * session setup, forwarding, retry details.
1955  */
1956 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1957                          struct inode *dir,
1958                          struct ceph_mds_request *req)
1959 {
1960         int err;
1961
1962         dout("do_request on %p\n", req);
1963
1964         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1965         if (req->r_inode)
1966                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1967         if (req->r_locked_dir)
1968                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1969         if (req->r_old_dentry)
1970                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
1971                                   CEPH_CAP_PIN);
1972
1973         /* issue */
1974         mutex_lock(&mdsc->mutex);
1975         __register_request(mdsc, req, dir);
1976         __do_request(mdsc, req);
1977
1978         if (req->r_err) {
1979                 err = req->r_err;
1980                 __unregister_request(mdsc, req);
1981                 dout("do_request early error %d\n", err);
1982                 goto out;
1983         }
1984
1985         /* wait */
1986         mutex_unlock(&mdsc->mutex);
1987         dout("do_request waiting\n");
1988         if (req->r_timeout) {
1989                 err = (long)wait_for_completion_killable_timeout(
1990                         &req->r_completion, req->r_timeout);
1991                 if (err == 0)
1992                         err = -EIO;
1993         } else {
1994                 err = wait_for_completion_killable(&req->r_completion);
1995         }
1996         dout("do_request waited, got %d\n", err);
1997         mutex_lock(&mdsc->mutex);
1998
1999         /* only abort if we didn't race with a real reply */
2000         if (req->r_got_result) {
2001                 err = le32_to_cpu(req->r_reply_info.head->result);
2002         } else if (err < 0) {
2003                 dout("aborted request %lld with %d\n", req->r_tid, err);
2004
2005                 /*
2006                  * ensure we aren't running concurrently with
2007                  * ceph_fill_trace or ceph_readdir_prepopulate, which
2008                  * rely on locks (dir mutex) held by our caller.
2009                  */
2010                 mutex_lock(&req->r_fill_mutex);
2011                 req->r_err = err;
2012                 req->r_aborted = true;
2013                 mutex_unlock(&req->r_fill_mutex);
2014
2015                 if (req->r_locked_dir &&
2016                     (req->r_op & CEPH_MDS_OP_WRITE))
2017                         ceph_invalidate_dir_request(req);
2018         } else {
2019                 err = req->r_err;
2020         }
2021
2022 out:
2023         mutex_unlock(&mdsc->mutex);
2024         dout("do_request %p done, result %d\n", req, err);
2025         return err;
2026 }
2027
2028 /*
2029  * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS
2030  * namespace request.
2031  */
2032 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2033 {
2034         struct inode *inode = req->r_locked_dir;
2035         struct ceph_inode_info *ci = ceph_inode(inode);
2036
2037         dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode);
2038         spin_lock(&ci->i_ceph_lock);
2039         ceph_dir_clear_complete(inode);
2040         ci->i_release_count++;
2041         spin_unlock(&ci->i_ceph_lock);
2042
2043         if (req->r_dentry)
2044                 ceph_invalidate_dentry_lease(req->r_dentry);
2045         if (req->r_old_dentry)
2046                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2047 }
2048
2049 /*
2050  * Handle mds reply.
2051  *
2052  * We take the session mutex and parse and process the reply immediately.
2053  * This preserves the logical ordering of replies, capabilities, etc., sent
2054  * by the MDS as they are applied to our local cache.
2055  */
2056 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2057 {
2058         struct ceph_mds_client *mdsc = session->s_mdsc;
2059         struct ceph_mds_request *req;
2060         struct ceph_mds_reply_head *head = msg->front.iov_base;
2061         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2062         u64 tid;
2063         int err, result;
2064         int mds = session->s_mds;
2065
2066         if (msg->front.iov_len < sizeof(*head)) {
2067                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2068                 ceph_msg_dump(msg);
2069                 return;
2070         }
2071
2072         /* get request, session */
2073         tid = le64_to_cpu(msg->hdr.tid);
2074         mutex_lock(&mdsc->mutex);
2075         req = __lookup_request(mdsc, tid);
2076         if (!req) {
2077                 dout("handle_reply on unknown tid %llu\n", tid);
2078                 mutex_unlock(&mdsc->mutex);
2079                 return;
2080         }
2081         dout("handle_reply %p\n", req);
2082
2083         /* correct session? */
2084         if (req->r_session != session) {
2085                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2086                        " not mds%d\n", tid, session->s_mds,
2087                        req->r_session ? req->r_session->s_mds : -1);
2088                 mutex_unlock(&mdsc->mutex);
2089                 goto out;
2090         }
2091
2092         /* dup? */
2093         if ((req->r_got_unsafe && !head->safe) ||
2094             (req->r_got_safe && head->safe)) {
2095                 pr_warning("got a dup %s reply on %llu from mds%d\n",
2096                            head->safe ? "safe" : "unsafe", tid, mds);
2097                 mutex_unlock(&mdsc->mutex);
2098                 goto out;
2099         }
2100         if (req->r_got_safe && !head->safe) {
2101                 pr_warning("got unsafe after safe on %llu from mds%d\n",
2102                            tid, mds);
2103                 mutex_unlock(&mdsc->mutex);
2104                 goto out;
2105         }
2106
2107         result = le32_to_cpu(head->result);
2108
2109         /*
2110          * Handle an ESTALE
2111          * if we're not talking to the authority, send to them
2112          * if the authority has changed while we weren't looking,
2113          * send to new authority
2114          * Otherwise we just have to return an ESTALE
2115          */
2116         if (result == -ESTALE) {
2117                 dout("got ESTALE on request %llu", req->r_tid);
2118                 if (!req->r_inode) {
2119                         /* do nothing; not an authority problem */
2120                 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2121                         dout("not using auth, setting for that now");
2122                         req->r_direct_mode = USE_AUTH_MDS;
2123                         __do_request(mdsc, req);
2124                         mutex_unlock(&mdsc->mutex);
2125                         goto out;
2126                 } else  {
2127                         struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2128                         struct ceph_cap *cap = NULL;
2129
2130                         if (req->r_session)
2131                                 cap = ceph_get_cap_for_mds(ci,
2132                                                    req->r_session->s_mds);
2133
2134                         dout("already using auth");
2135                         if ((!cap || cap != ci->i_auth_cap) ||
2136                             (cap->mseq != req->r_sent_on_mseq)) {
2137                                 dout("but cap changed, so resending");
2138                                 __do_request(mdsc, req);
2139                                 mutex_unlock(&mdsc->mutex);
2140                                 goto out;
2141                         }
2142                 }
2143                 dout("have to return ESTALE on request %llu", req->r_tid);
2144         }
2145
2146
2147         if (head->safe) {
2148                 req->r_got_safe = true;
2149                 __unregister_request(mdsc, req);
2150                 complete_all(&req->r_safe_completion);
2151
2152                 if (req->r_got_unsafe) {
2153                         /*
2154                          * We already handled the unsafe response, now do the
2155                          * cleanup.  No need to examine the response; the MDS
2156                          * doesn't include any result info in the safe
2157                          * response.  And even if it did, there is nothing
2158                          * useful we could do with a revised return value.
2159                          */
2160                         dout("got safe reply %llu, mds%d\n", tid, mds);
2161                         list_del_init(&req->r_unsafe_item);
2162
2163                         /* last unsafe request during umount? */
2164                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2165                                 complete_all(&mdsc->safe_umount_waiters);
2166                         mutex_unlock(&mdsc->mutex);
2167                         goto out;
2168                 }
2169         } else {
2170                 req->r_got_unsafe = true;
2171                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2172         }
2173
2174         dout("handle_reply tid %lld result %d\n", tid, result);
2175         rinfo = &req->r_reply_info;
2176         err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2177         mutex_unlock(&mdsc->mutex);
2178
2179         mutex_lock(&session->s_mutex);
2180         if (err < 0) {
2181                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2182                 ceph_msg_dump(msg);
2183                 goto out_err;
2184         }
2185
2186         /* snap trace */
2187         if (rinfo->snapblob_len) {
2188                 down_write(&mdsc->snap_rwsem);
2189                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2190                                rinfo->snapblob + rinfo->snapblob_len,
2191                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2192                 downgrade_write(&mdsc->snap_rwsem);
2193         } else {
2194                 down_read(&mdsc->snap_rwsem);
2195         }
2196
2197         /* insert trace into our cache */
2198         mutex_lock(&req->r_fill_mutex);
2199         err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2200         if (err == 0) {
2201                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2202                                     req->r_op == CEPH_MDS_OP_LSSNAP) &&
2203                     rinfo->dir_nr)
2204                         ceph_readdir_prepopulate(req, req->r_session);
2205                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2206         }
2207         mutex_unlock(&req->r_fill_mutex);
2208
2209         up_read(&mdsc->snap_rwsem);
2210 out_err:
2211         mutex_lock(&mdsc->mutex);
2212         if (!req->r_aborted) {
2213                 if (err) {
2214                         req->r_err = err;
2215                 } else {
2216                         req->r_reply = msg;
2217                         ceph_msg_get(msg);
2218                         req->r_got_result = true;
2219                 }
2220         } else {
2221                 dout("reply arrived after request %lld was aborted\n", tid);
2222         }
2223         mutex_unlock(&mdsc->mutex);
2224
2225         ceph_add_cap_releases(mdsc, req->r_session);
2226         mutex_unlock(&session->s_mutex);
2227
2228         /* kick calling process */
2229         complete_request(mdsc, req);
2230 out:
2231         ceph_mdsc_put_request(req);
2232         return;
2233 }
2234
2235
2236
2237 /*
2238  * handle mds notification that our request has been forwarded.
2239  */
2240 static void handle_forward(struct ceph_mds_client *mdsc,
2241                            struct ceph_mds_session *session,
2242                            struct ceph_msg *msg)
2243 {
2244         struct ceph_mds_request *req;
2245         u64 tid = le64_to_cpu(msg->hdr.tid);
2246         u32 next_mds;
2247         u32 fwd_seq;
2248         int err = -EINVAL;
2249         void *p = msg->front.iov_base;
2250         void *end = p + msg->front.iov_len;
2251
2252         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2253         next_mds = ceph_decode_32(&p);
2254         fwd_seq = ceph_decode_32(&p);
2255
2256         mutex_lock(&mdsc->mutex);
2257         req = __lookup_request(mdsc, tid);
2258         if (!req) {
2259                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2260                 goto out;  /* dup reply? */
2261         }
2262
2263         if (req->r_aborted) {
2264                 dout("forward tid %llu aborted, unregistering\n", tid);
2265                 __unregister_request(mdsc, req);
2266         } else if (fwd_seq <= req->r_num_fwd) {
2267                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2268                      tid, next_mds, req->r_num_fwd, fwd_seq);
2269         } else {
2270                 /* resend. forward race not possible; mds would drop */
2271                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2272                 BUG_ON(req->r_err);
2273                 BUG_ON(req->r_got_result);
2274                 req->r_num_fwd = fwd_seq;
2275                 req->r_resend_mds = next_mds;
2276                 put_request_session(req);
2277                 __do_request(mdsc, req);
2278         }
2279         ceph_mdsc_put_request(req);
2280 out:
2281         mutex_unlock(&mdsc->mutex);
2282         return;
2283
2284 bad:
2285         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2286 }
2287
2288 /*
2289  * handle a mds session control message
2290  */
2291 static void handle_session(struct ceph_mds_session *session,
2292                            struct ceph_msg *msg)
2293 {
2294         struct ceph_mds_client *mdsc = session->s_mdsc;
2295         u32 op;
2296         u64 seq;
2297         int mds = session->s_mds;
2298         struct ceph_mds_session_head *h = msg->front.iov_base;
2299         int wake = 0;
2300
2301         /* decode */
2302         if (msg->front.iov_len != sizeof(*h))
2303                 goto bad;
2304         op = le32_to_cpu(h->op);
2305         seq = le64_to_cpu(h->seq);
2306
2307         mutex_lock(&mdsc->mutex);
2308         if (op == CEPH_SESSION_CLOSE)
2309                 __unregister_session(mdsc, session);
2310         /* FIXME: this ttl calculation is generous */
2311         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2312         mutex_unlock(&mdsc->mutex);
2313
2314         mutex_lock(&session->s_mutex);
2315
2316         dout("handle_session mds%d %s %p state %s seq %llu\n",
2317              mds, ceph_session_op_name(op), session,
2318              session_state_name(session->s_state), seq);
2319
2320         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2321                 session->s_state = CEPH_MDS_SESSION_OPEN;
2322                 pr_info("mds%d came back\n", session->s_mds);
2323         }
2324
2325         switch (op) {
2326         case CEPH_SESSION_OPEN:
2327                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2328                         pr_info("mds%d reconnect success\n", session->s_mds);
2329                 session->s_state = CEPH_MDS_SESSION_OPEN;
2330                 renewed_caps(mdsc, session, 0);
2331                 wake = 1;
2332                 if (mdsc->stopping)
2333                         __close_session(mdsc, session);
2334                 break;
2335
2336         case CEPH_SESSION_RENEWCAPS:
2337                 if (session->s_renew_seq == seq)
2338                         renewed_caps(mdsc, session, 1);
2339                 break;
2340
2341         case CEPH_SESSION_CLOSE:
2342                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2343                         pr_info("mds%d reconnect denied\n", session->s_mds);
2344                 remove_session_caps(session);
2345                 wake = 1; /* for good measure */
2346                 wake_up_all(&mdsc->session_close_wq);
2347                 kick_requests(mdsc, mds);
2348                 break;
2349
2350         case CEPH_SESSION_STALE:
2351                 pr_info("mds%d caps went stale, renewing\n",
2352                         session->s_mds);
2353                 spin_lock(&session->s_gen_ttl_lock);
2354                 session->s_cap_gen++;
2355                 session->s_cap_ttl = jiffies - 1;
2356                 spin_unlock(&session->s_gen_ttl_lock);
2357                 send_renew_caps(mdsc, session);
2358                 break;
2359
2360         case CEPH_SESSION_RECALL_STATE:
2361                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2362                 break;
2363
2364         default:
2365                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2366                 WARN_ON(1);
2367         }
2368
2369         mutex_unlock(&session->s_mutex);
2370         if (wake) {
2371                 mutex_lock(&mdsc->mutex);
2372                 __wake_requests(mdsc, &session->s_waiting);
2373                 mutex_unlock(&mdsc->mutex);
2374         }
2375         return;
2376
2377 bad:
2378         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2379                (int)msg->front.iov_len);
2380         ceph_msg_dump(msg);
2381         return;
2382 }
2383
2384
2385 /*
2386  * called under session->mutex.
2387  */
2388 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2389                                    struct ceph_mds_session *session)
2390 {
2391         struct ceph_mds_request *req, *nreq;
2392         int err;
2393
2394         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2395
2396         mutex_lock(&mdsc->mutex);
2397         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2398                 err = __prepare_send_request(mdsc, req, session->s_mds);
2399                 if (!err) {
2400                         ceph_msg_get(req->r_request);
2401                         ceph_con_send(&session->s_con, req->r_request);
2402                 }
2403         }
2404         mutex_unlock(&mdsc->mutex);
2405 }
2406
2407 /*
2408  * Encode information about a cap for a reconnect with the MDS.
2409  */
2410 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2411                           void *arg)
2412 {
2413         union {
2414                 struct ceph_mds_cap_reconnect v2;
2415                 struct ceph_mds_cap_reconnect_v1 v1;
2416         } rec;
2417         size_t reclen;
2418         struct ceph_inode_info *ci;
2419         struct ceph_reconnect_state *recon_state = arg;
2420         struct ceph_pagelist *pagelist = recon_state->pagelist;
2421         char *path;
2422         int pathlen, err;
2423         u64 pathbase;
2424         struct dentry *dentry;
2425
2426         ci = cap->ci;
2427
2428         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2429              inode, ceph_vinop(inode), cap, cap->cap_id,
2430              ceph_cap_string(cap->issued));
2431         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2432         if (err)
2433                 return err;
2434
2435         dentry = d_find_alias(inode);
2436         if (dentry) {
2437                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2438                 if (IS_ERR(path)) {
2439                         err = PTR_ERR(path);
2440                         goto out_dput;
2441                 }
2442         } else {
2443                 path = NULL;
2444                 pathlen = 0;
2445         }
2446         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2447         if (err)
2448                 goto out_free;
2449
2450         spin_lock(&ci->i_ceph_lock);
2451         cap->seq = 0;        /* reset cap seq */
2452         cap->issue_seq = 0;  /* and issue_seq */
2453
2454         if (recon_state->flock) {
2455                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2456                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2457                 rec.v2.issued = cpu_to_le32(cap->issued);
2458                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2459                 rec.v2.pathbase = cpu_to_le64(pathbase);
2460                 rec.v2.flock_len = 0;
2461                 reclen = sizeof(rec.v2);
2462         } else {
2463                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2464                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2465                 rec.v1.issued = cpu_to_le32(cap->issued);
2466                 rec.v1.size = cpu_to_le64(inode->i_size);
2467                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2468                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2469                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2470                 rec.v1.pathbase = cpu_to_le64(pathbase);
2471                 reclen = sizeof(rec.v1);
2472         }
2473         spin_unlock(&ci->i_ceph_lock);
2474
2475         if (recon_state->flock) {
2476                 int num_fcntl_locks, num_flock_locks;
2477                 struct ceph_pagelist_cursor trunc_point;
2478
2479                 ceph_pagelist_set_cursor(pagelist, &trunc_point);
2480                 do {
2481                         lock_flocks();
2482                         ceph_count_locks(inode, &num_fcntl_locks,
2483                                          &num_flock_locks);
2484                         rec.v2.flock_len = (2*sizeof(u32) +
2485                                             (num_fcntl_locks+num_flock_locks) *
2486                                             sizeof(struct ceph_filelock));
2487                         unlock_flocks();
2488
2489                         /* pre-alloc pagelist */
2490                         ceph_pagelist_truncate(pagelist, &trunc_point);
2491                         err = ceph_pagelist_append(pagelist, &rec, reclen);
2492                         if (!err)
2493                                 err = ceph_pagelist_reserve(pagelist,
2494                                                             rec.v2.flock_len);
2495
2496                         /* encode locks */
2497                         if (!err) {
2498                                 lock_flocks();
2499                                 err = ceph_encode_locks(inode,
2500                                                         pagelist,
2501                                                         num_fcntl_locks,
2502                                                         num_flock_locks);
2503                                 unlock_flocks();
2504                         }
2505                 } while (err == -ENOSPC);
2506         } else {
2507                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2508         }
2509
2510 out_free:
2511         kfree(path);
2512 out_dput:
2513         dput(dentry);
2514         return err;
2515 }
2516
2517
2518 /*
2519  * If an MDS fails and recovers, clients need to reconnect in order to
2520  * reestablish shared state.  This includes all caps issued through
2521  * this session _and_ the snap_realm hierarchy.  Because it's not
2522  * clear which snap realms the mds cares about, we send everything we
2523  * know about.. that ensures we'll then get any new info the
2524  * recovering MDS might have.
2525  *
2526  * This is a relatively heavyweight operation, but it's rare.
2527  *
2528  * called with mdsc->mutex held.
2529  */
2530 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2531                                struct ceph_mds_session *session)
2532 {
2533         struct ceph_msg *reply;
2534         struct rb_node *p;
2535         int mds = session->s_mds;
2536         int err = -ENOMEM;
2537         struct ceph_pagelist *pagelist;
2538         struct ceph_reconnect_state recon_state;
2539
2540         pr_info("mds%d reconnect start\n", mds);
2541
2542         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2543         if (!pagelist)
2544                 goto fail_nopagelist;
2545         ceph_pagelist_init(pagelist);
2546
2547         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2548         if (!reply)
2549                 goto fail_nomsg;
2550
2551         mutex_lock(&session->s_mutex);
2552         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2553         session->s_seq = 0;
2554
2555         ceph_con_close(&session->s_con);
2556         ceph_con_open(&session->s_con,
2557                       CEPH_ENTITY_TYPE_MDS, mds,
2558                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2559
2560         /* replay unsafe requests */
2561         replay_unsafe_requests(mdsc, session);
2562
2563         down_read(&mdsc->snap_rwsem);
2564
2565         dout("session %p state %s\n", session,
2566              session_state_name(session->s_state));
2567
2568         /* drop old cap expires; we're about to reestablish that state */
2569         discard_cap_releases(mdsc, session);
2570
2571         /* traverse this session's caps */
2572         err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2573         if (err)
2574                 goto fail;
2575
2576         recon_state.pagelist = pagelist;
2577         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2578         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2579         if (err < 0)
2580                 goto fail;
2581
2582         /*
2583          * snaprealms.  we provide mds with the ino, seq (version), and
2584          * parent for all of our realms.  If the mds has any newer info,
2585          * it will tell us.
2586          */
2587         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2588                 struct ceph_snap_realm *realm =
2589                         rb_entry(p, struct ceph_snap_realm, node);
2590                 struct ceph_mds_snaprealm_reconnect sr_rec;
2591
2592                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2593                      realm->ino, realm->seq, realm->parent_ino);
2594                 sr_rec.ino = cpu_to_le64(realm->ino);
2595                 sr_rec.seq = cpu_to_le64(realm->seq);
2596                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2597                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2598                 if (err)
2599                         goto fail;
2600         }
2601
2602         reply->pagelist = pagelist;
2603         if (recon_state.flock)
2604                 reply->hdr.version = cpu_to_le16(2);
2605         reply->hdr.data_len = cpu_to_le32(pagelist->length);
2606         reply->nr_pages = calc_pages_for(0, pagelist->length);
2607         ceph_con_send(&session->s_con, reply);
2608
2609         mutex_unlock(&session->s_mutex);
2610
2611         mutex_lock(&mdsc->mutex);
2612         __wake_requests(mdsc, &session->s_waiting);
2613         mutex_unlock(&mdsc->mutex);
2614
2615         up_read(&mdsc->snap_rwsem);
2616         return;
2617
2618 fail:
2619         ceph_msg_put(reply);
2620         up_read(&mdsc->snap_rwsem);
2621         mutex_unlock(&session->s_mutex);
2622 fail_nomsg:
2623         ceph_pagelist_release(pagelist);
2624         kfree(pagelist);
2625 fail_nopagelist:
2626         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2627         return;
2628 }
2629
2630
2631 /*
2632  * compare old and new mdsmaps, kicking requests
2633  * and closing out old connections as necessary
2634  *
2635  * called under mdsc->mutex.
2636  */
2637 static void check_new_map(struct ceph_mds_client *mdsc,
2638                           struct ceph_mdsmap *newmap,
2639                           struct ceph_mdsmap *oldmap)
2640 {
2641         int i;
2642         int oldstate, newstate;
2643         struct ceph_mds_session *s;
2644
2645         dout("check_new_map new %u old %u\n",
2646              newmap->m_epoch, oldmap->m_epoch);
2647
2648         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2649                 if (mdsc->sessions[i] == NULL)
2650                         continue;
2651                 s = mdsc->sessions[i];
2652                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2653                 newstate = ceph_mdsmap_get_state(newmap, i);
2654
2655                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2656                      i, ceph_mds_state_name(oldstate),
2657                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2658                      ceph_mds_state_name(newstate),
2659                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2660                      session_state_name(s->s_state));
2661
2662                 if (i >= newmap->m_max_mds ||
2663                     memcmp(ceph_mdsmap_get_addr(oldmap, i),
2664                            ceph_mdsmap_get_addr(newmap, i),
2665                            sizeof(struct ceph_entity_addr))) {
2666                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2667                                 /* the session never opened, just close it
2668                                  * out now */
2669                                 __wake_requests(mdsc, &s->s_waiting);
2670                                 __unregister_session(mdsc, s);
2671                         } else {
2672                                 /* just close it */
2673                                 mutex_unlock(&mdsc->mutex);
2674                                 mutex_lock(&s->s_mutex);
2675                                 mutex_lock(&mdsc->mutex);
2676                                 ceph_con_close(&s->s_con);
2677                                 mutex_unlock(&s->s_mutex);
2678                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2679                         }
2680
2681                         /* kick any requests waiting on the recovering mds */
2682                         kick_requests(mdsc, i);
2683                 } else if (oldstate == newstate) {
2684                         continue;  /* nothing new with this mds */
2685                 }
2686
2687                 /*
2688                  * send reconnect?
2689                  */
2690                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2691                     newstate >= CEPH_MDS_STATE_RECONNECT) {
2692                         mutex_unlock(&mdsc->mutex);
2693                         send_mds_reconnect(mdsc, s);
2694                         mutex_lock(&mdsc->mutex);
2695                 }
2696
2697                 /*
2698                  * kick request on any mds that has gone active.
2699                  */
2700                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2701                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2702                         if (oldstate != CEPH_MDS_STATE_CREATING &&
2703                             oldstate != CEPH_MDS_STATE_STARTING)
2704                                 pr_info("mds%d recovery completed\n", s->s_mds);
2705                         kick_requests(mdsc, i);
2706                         ceph_kick_flushing_caps(mdsc, s);
2707                         wake_up_session_caps(s, 1);
2708                 }
2709         }
2710
2711         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2712                 s = mdsc->sessions[i];
2713                 if (!s)
2714                         continue;
2715                 if (!ceph_mdsmap_is_laggy(newmap, i))
2716                         continue;
2717                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2718                     s->s_state == CEPH_MDS_SESSION_HUNG ||
2719                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
2720                         dout(" connecting to export targets of laggy mds%d\n",
2721                              i);
2722                         __open_export_target_sessions(mdsc, s);
2723                 }
2724         }
2725 }
2726
2727
2728
2729 /*
2730  * leases
2731  */
2732
2733 /*
2734  * caller must hold session s_mutex, dentry->d_lock
2735  */
2736 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2737 {
2738         struct ceph_dentry_info *di = ceph_dentry(dentry);
2739
2740         ceph_put_mds_session(di->lease_session);
2741         di->lease_session = NULL;
2742 }
2743
2744 static void handle_lease(struct ceph_mds_client *mdsc,
2745                          struct ceph_mds_session *session,
2746                          struct ceph_msg *msg)
2747 {
2748         struct super_block *sb = mdsc->fsc->sb;
2749         struct inode *inode;
2750         struct dentry *parent, *dentry;
2751         struct ceph_dentry_info *di;
2752         int mds = session->s_mds;
2753         struct ceph_mds_lease *h = msg->front.iov_base;
2754         u32 seq;
2755         struct ceph_vino vino;
2756         struct qstr dname;
2757         int release = 0;
2758
2759         dout("handle_lease from mds%d\n", mds);
2760
2761         /* decode */
2762         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2763                 goto bad;
2764         vino.ino = le64_to_cpu(h->ino);
2765         vino.snap = CEPH_NOSNAP;
2766         seq = le32_to_cpu(h->seq);
2767         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2768         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2769         if (dname.len != get_unaligned_le32(h+1))
2770                 goto bad;
2771
2772         mutex_lock(&session->s_mutex);
2773         session->s_seq++;
2774
2775         /* lookup inode */
2776         inode = ceph_find_inode(sb, vino);
2777         dout("handle_lease %s, ino %llx %p %.*s\n",
2778              ceph_lease_op_name(h->action), vino.ino, inode,
2779              dname.len, dname.name);
2780         if (inode == NULL) {
2781                 dout("handle_lease no inode %llx\n", vino.ino);
2782                 goto release;
2783         }
2784
2785         /* dentry */
2786         parent = d_find_alias(inode);
2787         if (!parent) {
2788                 dout("no parent dentry on inode %p\n", inode);
2789                 WARN_ON(1);
2790                 goto release;  /* hrm... */
2791         }
2792         dname.hash = full_name_hash(dname.name, dname.len);
2793         dentry = d_lookup(parent, &dname);
2794         dput(parent);
2795         if (!dentry)
2796                 goto release;
2797
2798         spin_lock(&dentry->d_lock);
2799         di = ceph_dentry(dentry);
2800         switch (h->action) {
2801         case CEPH_MDS_LEASE_REVOKE:
2802                 if (di->lease_session == session) {
2803                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2804                                 h->seq = cpu_to_le32(di->lease_seq);
2805                         __ceph_mdsc_drop_dentry_lease(dentry);
2806                 }
2807                 release = 1;
2808                 break;
2809
2810         case CEPH_MDS_LEASE_RENEW:
2811                 if (di->lease_session == session &&
2812                     di->lease_gen == session->s_cap_gen &&
2813                     di->lease_renew_from &&
2814                     di->lease_renew_after == 0) {
2815                         unsigned long duration =
2816                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2817
2818                         di->lease_seq = seq;
2819                         dentry->d_time = di->lease_renew_from + duration;
2820                         di->lease_renew_after = di->lease_renew_from +
2821                                 (duration >> 1);
2822                         di->lease_renew_from = 0;
2823                 }
2824                 break;
2825         }
2826         spin_unlock(&dentry->d_lock);
2827         dput(dentry);
2828
2829         if (!release)
2830                 goto out;
2831
2832 release:
2833         /* let's just reuse the same message */
2834         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2835         ceph_msg_get(msg);
2836         ceph_con_send(&session->s_con, msg);
2837
2838 out:
2839         iput(inode);
2840         mutex_unlock(&session->s_mutex);
2841         return;
2842
2843 bad:
2844         pr_err("corrupt lease message\n");
2845         ceph_msg_dump(msg);
2846 }
2847
2848 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2849                               struct inode *inode,
2850                               struct dentry *dentry, char action,
2851                               u32 seq)
2852 {
2853         struct ceph_msg *msg;
2854         struct ceph_mds_lease *lease;
2855         int len = sizeof(*lease) + sizeof(u32);
2856         int dnamelen = 0;
2857
2858         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2859              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2860         dnamelen = dentry->d_name.len;
2861         len += dnamelen;
2862
2863         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
2864         if (!msg)
2865                 return;
2866         lease = msg->front.iov_base;
2867         lease->action = action;
2868         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2869         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2870         lease->seq = cpu_to_le32(seq);
2871         put_unaligned_le32(dnamelen, lease + 1);
2872         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2873
2874         /*
2875          * if this is a preemptive lease RELEASE, no need to
2876          * flush request stream, since the actual request will
2877          * soon follow.
2878          */
2879         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2880
2881         ceph_con_send(&session->s_con, msg);
2882 }
2883
2884 /*
2885  * Preemptively release a lease we expect to invalidate anyway.
2886  * Pass @inode always, @dentry is optional.
2887  */
2888 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2889                              struct dentry *dentry)
2890 {
2891         struct ceph_dentry_info *di;
2892         struct ceph_mds_session *session;
2893         u32 seq;
2894
2895         BUG_ON(inode == NULL);
2896         BUG_ON(dentry == NULL);
2897
2898         /* is dentry lease valid? */
2899         spin_lock(&dentry->d_lock);
2900         di = ceph_dentry(dentry);
2901         if (!di || !di->lease_session ||
2902             di->lease_session->s_mds < 0 ||
2903             di->lease_gen != di->lease_session->s_cap_gen ||
2904             !time_before(jiffies, dentry->d_time)) {
2905                 dout("lease_release inode %p dentry %p -- "
2906                      "no lease\n",
2907                      inode, dentry);
2908                 spin_unlock(&dentry->d_lock);
2909                 return;
2910         }
2911
2912         /* we do have a lease on this dentry; note mds and seq */
2913         session = ceph_get_mds_session(di->lease_session);
2914         seq = di->lease_seq;
2915         __ceph_mdsc_drop_dentry_lease(dentry);
2916         spin_unlock(&dentry->d_lock);
2917
2918         dout("lease_release inode %p dentry %p to mds%d\n",
2919              inode, dentry, session->s_mds);
2920         ceph_mdsc_lease_send_msg(session, inode, dentry,
2921                                  CEPH_MDS_LEASE_RELEASE, seq);
2922         ceph_put_mds_session(session);
2923 }
2924
2925 /*
2926  * drop all leases (and dentry refs) in preparation for umount
2927  */
2928 static void drop_leases(struct ceph_mds_client *mdsc)
2929 {
2930         int i;
2931
2932         dout("drop_leases\n");
2933         mutex_lock(&mdsc->mutex);
2934         for (i = 0; i < mdsc->max_sessions; i++) {
2935                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2936                 if (!s)
2937                         continue;
2938                 mutex_unlock(&mdsc->mutex);
2939                 mutex_lock(&s->s_mutex);
2940                 mutex_unlock(&s->s_mutex);
2941                 ceph_put_mds_session(s);
2942                 mutex_lock(&mdsc->mutex);
2943         }
2944         mutex_unlock(&mdsc->mutex);
2945 }
2946
2947
2948
2949 /*
2950  * delayed work -- periodically trim expired leases, renew caps with mds
2951  */
2952 static void schedule_delayed(struct ceph_mds_client *mdsc)
2953 {
2954         int delay = 5;
2955         unsigned hz = round_jiffies_relative(HZ * delay);
2956         schedule_delayed_work(&mdsc->delayed_work, hz);
2957 }
2958
2959 static void delayed_work(struct work_struct *work)
2960 {
2961         int i;
2962         struct ceph_mds_client *mdsc =
2963                 container_of(work, struct ceph_mds_client, delayed_work.work);
2964         int renew_interval;
2965         int renew_caps;
2966
2967         dout("mdsc delayed_work\n");
2968         ceph_check_delayed_caps(mdsc);
2969
2970         mutex_lock(&mdsc->mutex);
2971         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2972         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2973                                    mdsc->last_renew_caps);
2974         if (renew_caps)
2975                 mdsc->last_renew_caps = jiffies;
2976
2977         for (i = 0; i < mdsc->max_sessions; i++) {
2978                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2979                 if (s == NULL)
2980                         continue;
2981                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2982                         dout("resending session close request for mds%d\n",
2983                              s->s_mds);
2984                         request_close_session(mdsc, s);
2985                         ceph_put_mds_session(s);
2986                         continue;
2987                 }
2988                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2989                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2990                                 s->s_state = CEPH_MDS_SESSION_HUNG;
2991                                 pr_info("mds%d hung\n", s->s_mds);
2992                         }
2993                 }
2994                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2995                         /* this mds is failed or recovering, just wait */
2996                         ceph_put_mds_session(s);
2997                         continue;
2998                 }
2999                 mutex_unlock(&mdsc->mutex);
3000
3001                 mutex_lock(&s->s_mutex);
3002                 if (renew_caps)
3003                         send_renew_caps(mdsc, s);
3004                 else
3005                         ceph_con_keepalive(&s->s_con);
3006                 ceph_add_cap_releases(mdsc, s);
3007                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3008                     s->s_state == CEPH_MDS_SESSION_HUNG)
3009                         ceph_send_cap_releases(mdsc, s);
3010                 mutex_unlock(&s->s_mutex);
3011                 ceph_put_mds_session(s);
3012
3013                 mutex_lock(&mdsc->mutex);
3014         }
3015         mutex_unlock(&mdsc->mutex);
3016
3017         schedule_delayed(mdsc);
3018 }
3019
3020 int ceph_mdsc_init(struct ceph_fs_client *fsc)
3021
3022 {
3023         struct ceph_mds_client *mdsc;
3024
3025         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3026         if (!mdsc)
3027                 return -ENOMEM;
3028         mdsc->fsc = fsc;
3029         fsc->mdsc = mdsc;
3030         mutex_init(&mdsc->mutex);
3031         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3032         if (mdsc->mdsmap == NULL)
3033                 return -ENOMEM;
3034
3035         init_completion(&mdsc->safe_umount_waiters);
3036         init_waitqueue_head(&mdsc->session_close_wq);
3037         INIT_LIST_HEAD(&mdsc->waiting_for_map);
3038         mdsc->sessions = NULL;
3039         mdsc->max_sessions = 0;
3040         mdsc->stopping = 0;
3041         init_rwsem(&mdsc->snap_rwsem);
3042         mdsc->snap_realms = RB_ROOT;
3043         INIT_LIST_HEAD(&mdsc->snap_empty);
3044         spin_lock_init(&mdsc->snap_empty_lock);
3045         mdsc->last_tid = 0;
3046         mdsc->request_tree = RB_ROOT;
3047         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3048         mdsc->last_renew_caps = jiffies;
3049         INIT_LIST_HEAD(&mdsc->cap_delay_list);
3050         spin_lock_init(&mdsc->cap_delay_lock);
3051         INIT_LIST_HEAD(&mdsc->snap_flush_list);
3052         spin_lock_init(&mdsc->snap_flush_lock);
3053         mdsc->cap_flush_seq = 0;
3054         INIT_LIST_HEAD(&mdsc->cap_dirty);
3055         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3056         mdsc->num_cap_flushing = 0;
3057         spin_lock_init(&mdsc->cap_dirty_lock);
3058         init_waitqueue_head(&mdsc->cap_flushing_wq);
3059         spin_lock_init(&mdsc->dentry_lru_lock);
3060         INIT_LIST_HEAD(&mdsc->dentry_lru);
3061
3062         ceph_caps_init(mdsc);
3063         ceph_adjust_min_caps(mdsc, fsc->min_caps);
3064
3065         return 0;
3066 }
3067
3068 /*
3069  * Wait for safe replies on open mds requests.  If we time out, drop
3070  * all requests from the tree to avoid dangling dentry refs.
3071  */
3072 static void wait_requests(struct ceph_mds_client *mdsc)
3073 {
3074         struct ceph_mds_request *req;
3075         struct ceph_fs_client *fsc = mdsc->fsc;
3076
3077         mutex_lock(&mdsc->mutex);
3078         if (__get_oldest_req(mdsc)) {
3079                 mutex_unlock(&mdsc->mutex);
3080
3081                 dout("wait_requests waiting for requests\n");
3082                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3083                                     fsc->client->options->mount_timeout * HZ);
3084
3085                 /* tear down remaining requests */
3086                 mutex_lock(&mdsc->mutex);
3087                 while ((req = __get_oldest_req(mdsc))) {
3088                         dout("wait_requests timed out on tid %llu\n",
3089                              req->r_tid);
3090                         __unregister_request(mdsc, req);
3091                 }
3092         }
3093         mutex_unlock(&mdsc->mutex);
3094         dout("wait_requests done\n");
3095 }
3096
3097 /*
3098  * called before mount is ro, and before dentries are torn down.
3099  * (hmm, does this still race with new lookups?)
3100  */
3101 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3102 {
3103         dout("pre_umount\n");
3104         mdsc->stopping = 1;
3105
3106         drop_leases(mdsc);
3107         ceph_flush_dirty_caps(mdsc);
3108         wait_requests(mdsc);
3109
3110         /*
3111          * wait for reply handlers to drop their request refs and
3112          * their inode/dcache refs
3113          */
3114         ceph_msgr_flush();
3115 }
3116
3117 /*
3118  * wait for all write mds requests to flush.
3119  */
3120 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3121 {
3122         struct ceph_mds_request *req = NULL, *nextreq;
3123         struct rb_node *n;
3124
3125         mutex_lock(&mdsc->mutex);
3126         dout("wait_unsafe_requests want %lld\n", want_tid);
3127 restart:
3128         req = __get_oldest_req(mdsc);
3129         while (req && req->r_tid <= want_tid) {
3130                 /* find next request */
3131                 n = rb_next(&req->r_node);
3132                 if (n)
3133                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3134                 else
3135                         nextreq = NULL;
3136                 if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3137                         /* write op */
3138                         ceph_mdsc_get_request(req);
3139                         if (nextreq)
3140                                 ceph_mdsc_get_request(nextreq);
3141                         mutex_unlock(&mdsc->mutex);
3142                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3143                              req->r_tid, want_tid);
3144                         wait_for_completion(&req->r_safe_completion);
3145                         mutex_lock(&mdsc->mutex);
3146                         ceph_mdsc_put_request(req);
3147                         if (!nextreq)
3148                                 break;  /* next dne before, so we're done! */
3149                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
3150                                 /* next request was removed from tree */
3151                                 ceph_mdsc_put_request(nextreq);
3152                                 goto restart;
3153                         }
3154                         ceph_mdsc_put_request(nextreq);  /* won't go away */
3155                 }
3156                 req = nextreq;
3157         }
3158         mutex_unlock(&mdsc->mutex);
3159         dout("wait_unsafe_requests done\n");
3160 }
3161
3162 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3163 {
3164         u64 want_tid, want_flush;
3165
3166         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3167                 return;
3168
3169         dout("sync\n");
3170         mutex_lock(&mdsc->mutex);
3171         want_tid = mdsc->last_tid;
3172         want_flush = mdsc->cap_flush_seq;
3173         mutex_unlock(&mdsc->mutex);
3174         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3175
3176         ceph_flush_dirty_caps(mdsc);
3177
3178         wait_unsafe_requests(mdsc, want_tid);
3179         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3180 }
3181
3182 /*
3183  * true if all sessions are closed, or we force unmount
3184  */
3185 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3186 {
3187         int i, n = 0;
3188
3189         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3190                 return true;
3191
3192         mutex_lock(&mdsc->mutex);
3193         for (i = 0; i < mdsc->max_sessions; i++)
3194                 if (mdsc->sessions[i])
3195                         n++;
3196         mutex_unlock(&mdsc->mutex);
3197         return n == 0;
3198 }
3199
3200 /*
3201  * called after sb is ro.
3202  */
3203 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3204 {
3205         struct ceph_mds_session *session;
3206         int i;
3207         struct ceph_fs_client *fsc = mdsc->fsc;
3208         unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3209
3210         dout("close_sessions\n");
3211
3212         /* close sessions */
3213         mutex_lock(&mdsc->mutex);
3214         for (i = 0; i < mdsc->max_sessions; i++) {
3215                 session = __ceph_lookup_mds_session(mdsc, i);
3216                 if (!session)
3217                         continue;
3218                 mutex_unlock(&mdsc->mutex);
3219                 mutex_lock(&session->s_mutex);
3220                 __close_session(mdsc, session);
3221                 mutex_unlock(&session->s_mutex);
3222                 ceph_put_mds_session(session);
3223                 mutex_lock(&mdsc->mutex);
3224         }
3225         mutex_unlock(&mdsc->mutex);
3226
3227         dout("waiting for sessions to close\n");
3228         wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3229                            timeout);
3230
3231         /* tear down remaining sessions */
3232         mutex_lock(&mdsc->mutex);
3233         for (i = 0; i < mdsc->max_sessions; i++) {
3234                 if (mdsc->sessions[i]) {
3235                         session = get_session(mdsc->sessions[i]);
3236                         __unregister_session(mdsc, session);
3237                         mutex_unlock(&mdsc->mutex);
3238                         mutex_lock(&session->s_mutex);
3239                         remove_session_caps(session);
3240                         mutex_unlock(&session->s_mutex);
3241                         ceph_put_mds_session(session);
3242                         mutex_lock(&mdsc->mutex);
3243                 }
3244         }
3245         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3246         mutex_unlock(&mdsc->mutex);
3247
3248         ceph_cleanup_empty_realms(mdsc);
3249
3250         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3251
3252         dout("stopped\n");
3253 }
3254
3255 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3256 {
3257         dout("stop\n");
3258         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3259         if (mdsc->mdsmap)
3260                 ceph_mdsmap_destroy(mdsc->mdsmap);
3261         kfree(mdsc->sessions);
3262         ceph_caps_finalize(mdsc);
3263 }
3264
3265 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3266 {
3267         struct ceph_mds_client *mdsc = fsc->mdsc;
3268
3269         dout("mdsc_destroy %p\n", mdsc);
3270         ceph_mdsc_stop(mdsc);
3271
3272         /* flush out any connection work with references to us */
3273         ceph_msgr_flush();
3274
3275         fsc->mdsc = NULL;
3276         kfree(mdsc);
3277         dout("mdsc_destroy %p done\n", mdsc);
3278 }
3279
3280
3281 /*
3282  * handle mds map update.
3283  */
3284 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3285 {
3286         u32 epoch;
3287         u32 maplen;
3288         void *p = msg->front.iov_base;
3289         void *end = p + msg->front.iov_len;
3290         struct ceph_mdsmap *newmap, *oldmap;
3291         struct ceph_fsid fsid;
3292         int err = -EINVAL;
3293
3294         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3295         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3296         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3297                 return;
3298         epoch = ceph_decode_32(&p);
3299         maplen = ceph_decode_32(&p);
3300         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3301
3302         /* do we need it? */
3303         ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3304         mutex_lock(&mdsc->mutex);
3305         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3306                 dout("handle_map epoch %u <= our %u\n",
3307                      epoch, mdsc->mdsmap->m_epoch);
3308                 mutex_unlock(&mdsc->mutex);
3309                 return;
3310         }
3311
3312         newmap = ceph_mdsmap_decode(&p, end);
3313         if (IS_ERR(newmap)) {
3314                 err = PTR_ERR(newmap);
3315                 goto bad_unlock;
3316         }
3317
3318         /* swap into place */
3319         if (mdsc->mdsmap) {
3320                 oldmap = mdsc->mdsmap;
3321                 mdsc->mdsmap = newmap;
3322                 check_new_map(mdsc, newmap, oldmap);
3323                 ceph_mdsmap_destroy(oldmap);
3324         } else {
3325                 mdsc->mdsmap = newmap;  /* first mds map */
3326         }
3327         mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3328
3329         __wake_requests(mdsc, &mdsc->waiting_for_map);
3330
3331         mutex_unlock(&mdsc->mutex);
3332         schedule_delayed(mdsc);
3333         return;
3334
3335 bad_unlock:
3336         mutex_unlock(&mdsc->mutex);
3337 bad:
3338         pr_err("error decoding mdsmap %d\n", err);
3339         return;
3340 }
3341
3342 static struct ceph_connection *con_get(struct ceph_connection *con)
3343 {
3344         struct ceph_mds_session *s = con->private;
3345
3346         if (get_session(s)) {
3347                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3348                 return con;
3349         }
3350         dout("mdsc con_get %p FAIL\n", s);
3351         return NULL;
3352 }
3353
3354 static void con_put(struct ceph_connection *con)
3355 {
3356         struct ceph_mds_session *s = con->private;
3357
3358         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3359         ceph_put_mds_session(s);
3360 }
3361
3362 /*
3363  * if the client is unresponsive for long enough, the mds will kill
3364  * the session entirely.
3365  */
3366 static void peer_reset(struct ceph_connection *con)
3367 {
3368         struct ceph_mds_session *s = con->private;
3369         struct ceph_mds_client *mdsc = s->s_mdsc;
3370
3371         pr_warning("mds%d closed our session\n", s->s_mds);
3372         send_mds_reconnect(mdsc, s);
3373 }
3374
3375 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3376 {
3377         struct ceph_mds_session *s = con->private;
3378         struct ceph_mds_client *mdsc = s->s_mdsc;
3379         int type = le16_to_cpu(msg->hdr.type);
3380
3381         mutex_lock(&mdsc->mutex);
3382         if (__verify_registered_session(mdsc, s) < 0) {
3383                 mutex_unlock(&mdsc->mutex);
3384                 goto out;
3385         }
3386         mutex_unlock(&mdsc->mutex);
3387
3388         switch (type) {
3389         case CEPH_MSG_MDS_MAP:
3390                 ceph_mdsc_handle_map(mdsc, msg);
3391                 break;
3392         case CEPH_MSG_CLIENT_SESSION:
3393                 handle_session(s, msg);
3394                 break;
3395         case CEPH_MSG_CLIENT_REPLY:
3396                 handle_reply(s, msg);
3397                 break;
3398         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3399                 handle_forward(mdsc, s, msg);
3400                 break;
3401         case CEPH_MSG_CLIENT_CAPS:
3402                 ceph_handle_caps(s, msg);
3403                 break;
3404         case CEPH_MSG_CLIENT_SNAP:
3405                 ceph_handle_snap(mdsc, s, msg);
3406                 break;
3407         case CEPH_MSG_CLIENT_LEASE:
3408                 handle_lease(mdsc, s, msg);
3409                 break;
3410
3411         default:
3412                 pr_err("received unknown message type %d %s\n", type,
3413                        ceph_msg_type_name(type));
3414         }
3415 out:
3416         ceph_msg_put(msg);
3417 }
3418
3419 /*
3420  * authentication
3421  */
3422
3423 /*
3424  * Note: returned pointer is the address of a structure that's
3425  * managed separately.  Caller must *not* attempt to free it.
3426  */
3427 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3428                                         int *proto, int force_new)
3429 {
3430         struct ceph_mds_session *s = con->private;
3431         struct ceph_mds_client *mdsc = s->s_mdsc;
3432         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3433         struct ceph_auth_handshake *auth = &s->s_auth;
3434
3435         if (force_new && auth->authorizer) {
3436                 if (ac->ops && ac->ops->destroy_authorizer)
3437                         ac->ops->destroy_authorizer(ac, auth->authorizer);
3438                 auth->authorizer = NULL;
3439         }
3440         if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
3441                 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3442                                                         auth);
3443                 if (ret)
3444                         return ERR_PTR(ret);
3445         }
3446         *proto = ac->protocol;
3447
3448         return auth;
3449 }
3450
3451
3452 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3453 {
3454         struct ceph_mds_session *s = con->private;
3455         struct ceph_mds_client *mdsc = s->s_mdsc;
3456         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3457
3458         return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3459 }
3460
3461 static int invalidate_authorizer(struct ceph_connection *con)
3462 {
3463         struct ceph_mds_session *s = con->private;
3464         struct ceph_mds_client *mdsc = s->s_mdsc;
3465         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3466
3467         if (ac->ops->invalidate_authorizer)
3468                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3469
3470         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3471 }
3472
3473 static const struct ceph_connection_operations mds_con_ops = {
3474         .get = con_get,
3475         .put = con_put,
3476         .dispatch = dispatch,
3477         .get_authorizer = get_authorizer,
3478         .verify_authorizer_reply = verify_authorizer_reply,
3479         .invalidate_authorizer = invalidate_authorizer,
3480         .peer_reset = peer_reset,
3481 };
3482
3483 /* eof */