]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/osc/osc_request.c
staging: lustre: remove some unused debug macros
[karo-tx-linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <linux/libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47 #include <obd_lov.h>
48
49 #include <lustre_ha.h>
50 #include <lprocfs_status.h>
51 #include <lustre_log.h>
52 #include <lustre_debug.h>
53 #include <lustre_param.h>
54 #include <lustre_fid.h>
55 #include "osc_internal.h"
56 #include "osc_cl_internal.h"
57
58 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
59 static int brw_interpret(const struct lu_env *env,
60                          struct ptlrpc_request *req, void *data, int rc);
61 int osc_cleanup(struct obd_device *obd);
62
63 /* Pack OSC object metadata for disk storage (LE byte order). */
64 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
65                       struct lov_stripe_md *lsm)
66 {
67         int lmm_size;
68
69         lmm_size = sizeof(**lmmp);
70         if (lmmp == NULL)
71                 return lmm_size;
72
73         if (*lmmp != NULL && lsm == NULL) {
74                 OBD_FREE(*lmmp, lmm_size);
75                 *lmmp = NULL;
76                 return 0;
77         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
78                 return -EBADF;
79         }
80
81         if (*lmmp == NULL) {
82                 OBD_ALLOC(*lmmp, lmm_size);
83                 if (*lmmp == NULL)
84                         return -ENOMEM;
85         }
86
87         if (lsm)
88                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
89
90         return lmm_size;
91 }
92
93 /* Unpack OSC object metadata from disk storage (LE byte order). */
94 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
95                         struct lov_mds_md *lmm, int lmm_bytes)
96 {
97         int lsm_size;
98         struct obd_import *imp = class_exp2cliimp(exp);
99
100         if (lmm != NULL) {
101                 if (lmm_bytes < sizeof(*lmm)) {
102                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
103                                exp->exp_obd->obd_name, lmm_bytes,
104                                (int)sizeof(*lmm));
105                         return -EINVAL;
106                 }
107                 /* XXX LOV_MAGIC etc check? */
108
109                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
110                         CERROR("%s: zero lmm_object_id: rc = %d\n",
111                                exp->exp_obd->obd_name, -EINVAL);
112                         return -EINVAL;
113                 }
114         }
115
116         lsm_size = lov_stripe_md_size(1);
117         if (lsmp == NULL)
118                 return lsm_size;
119
120         if (*lsmp != NULL && lmm == NULL) {
121                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
122                 OBD_FREE(*lsmp, lsm_size);
123                 *lsmp = NULL;
124                 return 0;
125         }
126
127         if (*lsmp == NULL) {
128                 OBD_ALLOC(*lsmp, lsm_size);
129                 if (unlikely(*lsmp == NULL))
130                         return -ENOMEM;
131                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
132                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
133                         OBD_FREE(*lsmp, lsm_size);
134                         return -ENOMEM;
135                 }
136                 loi_init((*lsmp)->lsm_oinfo[0]);
137         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
138                 return -EBADF;
139         }
140
141         if (lmm != NULL)
142                 /* XXX zero *lsmp? */
143                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
144
145         if (imp != NULL &&
146             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
147                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
148         else
149                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
150
151         return lsm_size;
152 }
153
154 static inline void osc_pack_capa(struct ptlrpc_request *req,
155                                  struct ost_body *body, void *capa)
156 {
157         struct obd_capa *oc = (struct obd_capa *)capa;
158         struct lustre_capa *c;
159
160         if (!capa)
161                 return;
162
163         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
164         LASSERT(c);
165         capa_cpy(c, oc);
166         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
167         DEBUG_CAPA(D_SEC, c, "pack");
168 }
169
170 static inline void osc_pack_req_body(struct ptlrpc_request *req,
171                                      struct obd_info *oinfo)
172 {
173         struct ost_body *body;
174
175         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
176         LASSERT(body);
177
178         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
179                              oinfo->oi_oa);
180         osc_pack_capa(req, body, oinfo->oi_capa);
181 }
182
183 static inline void osc_set_capa_size(struct ptlrpc_request *req,
184                                      const struct req_msg_field *field,
185                                      struct obd_capa *oc)
186 {
187         if (oc == NULL)
188                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
189         else
190                 /* it is already calculated as sizeof struct obd_capa */
191                 ;
192 }
193
194 static int osc_getattr_interpret(const struct lu_env *env,
195                                  struct ptlrpc_request *req,
196                                  struct osc_async_args *aa, int rc)
197 {
198         struct ost_body *body;
199
200         if (rc != 0)
201                 GOTO(out, rc);
202
203         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
204         if (body) {
205                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
206                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
207                                      aa->aa_oi->oi_oa, &body->oa);
208
209                 /* This should really be sent by the OST */
210                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
211                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
212         } else {
213                 CDEBUG(D_INFO, "can't unpack ost_body\n");
214                 rc = -EPROTO;
215                 aa->aa_oi->oi_oa->o_valid = 0;
216         }
217 out:
218         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
219         return rc;
220 }
221
222 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
223                              struct ptlrpc_request_set *set)
224 {
225         struct ptlrpc_request *req;
226         struct osc_async_args *aa;
227         int                 rc;
228
229         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
230         if (req == NULL)
231                 return -ENOMEM;
232
233         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
234         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
235         if (rc) {
236                 ptlrpc_request_free(req);
237                 return rc;
238         }
239
240         osc_pack_req_body(req, oinfo);
241
242         ptlrpc_request_set_replen(req);
243         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
244
245         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
246         aa = ptlrpc_req_async_args(req);
247         aa->aa_oi = oinfo;
248
249         ptlrpc_set_add_req(set, req);
250         return 0;
251 }
252
253 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
254                        struct obd_info *oinfo)
255 {
256         struct ptlrpc_request *req;
257         struct ost_body       *body;
258         int                 rc;
259
260         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
261         if (req == NULL)
262                 return -ENOMEM;
263
264         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
265         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
266         if (rc) {
267                 ptlrpc_request_free(req);
268                 return rc;
269         }
270
271         osc_pack_req_body(req, oinfo);
272
273         ptlrpc_request_set_replen(req);
274
275         rc = ptlrpc_queue_wait(req);
276         if (rc)
277                 GOTO(out, rc);
278
279         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
280         if (body == NULL)
281                 GOTO(out, rc = -EPROTO);
282
283         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
284         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
285                              &body->oa);
286
287         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
288         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
289
290  out:
291         ptlrpc_req_finished(req);
292         return rc;
293 }
294
295 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
296                        struct obd_info *oinfo, struct obd_trans_info *oti)
297 {
298         struct ptlrpc_request *req;
299         struct ost_body       *body;
300         int                 rc;
301
302         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
303
304         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
305         if (req == NULL)
306                 return -ENOMEM;
307
308         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
309         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
310         if (rc) {
311                 ptlrpc_request_free(req);
312                 return rc;
313         }
314
315         osc_pack_req_body(req, oinfo);
316
317         ptlrpc_request_set_replen(req);
318
319         rc = ptlrpc_queue_wait(req);
320         if (rc)
321                 GOTO(out, rc);
322
323         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
324         if (body == NULL)
325                 GOTO(out, rc = -EPROTO);
326
327         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
328                              &body->oa);
329
330 out:
331         ptlrpc_req_finished(req);
332         return rc;
333 }
334
335 static int osc_setattr_interpret(const struct lu_env *env,
336                                  struct ptlrpc_request *req,
337                                  struct osc_setattr_args *sa, int rc)
338 {
339         struct ost_body *body;
340
341         if (rc != 0)
342                 GOTO(out, rc);
343
344         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
345         if (body == NULL)
346                 GOTO(out, rc = -EPROTO);
347
348         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
349                              &body->oa);
350 out:
351         rc = sa->sa_upcall(sa->sa_cookie, rc);
352         return rc;
353 }
354
355 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
356                            struct obd_trans_info *oti,
357                            obd_enqueue_update_f upcall, void *cookie,
358                            struct ptlrpc_request_set *rqset)
359 {
360         struct ptlrpc_request   *req;
361         struct osc_setattr_args *sa;
362         int                   rc;
363
364         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
365         if (req == NULL)
366                 return -ENOMEM;
367
368         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
369         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
370         if (rc) {
371                 ptlrpc_request_free(req);
372                 return rc;
373         }
374
375         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
376                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
377
378         osc_pack_req_body(req, oinfo);
379
380         ptlrpc_request_set_replen(req);
381
382         /* do mds to ost setattr asynchronously */
383         if (!rqset) {
384                 /* Do not wait for response. */
385                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
386         } else {
387                 req->rq_interpret_reply =
388                         (ptlrpc_interpterer_t)osc_setattr_interpret;
389
390                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
391                 sa = ptlrpc_req_async_args(req);
392                 sa->sa_oa = oinfo->oi_oa;
393                 sa->sa_upcall = upcall;
394                 sa->sa_cookie = cookie;
395
396                 if (rqset == PTLRPCD_SET)
397                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
398                 else
399                         ptlrpc_set_add_req(rqset, req);
400         }
401
402         return 0;
403 }
404
405 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
406                              struct obd_trans_info *oti,
407                              struct ptlrpc_request_set *rqset)
408 {
409         return osc_setattr_async_base(exp, oinfo, oti,
410                                       oinfo->oi_cb_up, oinfo, rqset);
411 }
412
413 int osc_real_create(struct obd_export *exp, struct obdo *oa,
414                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 {
416         struct ptlrpc_request *req;
417         struct ost_body       *body;
418         struct lov_stripe_md  *lsm;
419         int                 rc;
420
421         LASSERT(oa);
422         LASSERT(ea);
423
424         lsm = *ea;
425         if (!lsm) {
426                 rc = obd_alloc_memmd(exp, &lsm);
427                 if (rc < 0)
428                         return rc;
429         }
430
431         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
432         if (req == NULL)
433                 GOTO(out, rc = -ENOMEM);
434
435         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
436         if (rc) {
437                 ptlrpc_request_free(req);
438                 GOTO(out, rc);
439         }
440
441         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
442         LASSERT(body);
443
444         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
445
446         ptlrpc_request_set_replen(req);
447
448         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
449             oa->o_flags == OBD_FL_DELORPHAN) {
450                 DEBUG_REQ(D_HA, req,
451                           "delorphan from OST integration");
452                 /* Don't resend the delorphan req */
453                 req->rq_no_resend = req->rq_no_delay = 1;
454         }
455
456         rc = ptlrpc_queue_wait(req);
457         if (rc)
458                 GOTO(out_req, rc);
459
460         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
461         if (body == NULL)
462                 GOTO(out_req, rc = -EPROTO);
463
464         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
465         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
466
467         oa->o_blksize = cli_brw_size(exp->exp_obd);
468         oa->o_valid |= OBD_MD_FLBLKSZ;
469
470         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
471          * have valid lsm_oinfo data structs, so don't go touching that.
472          * This needs to be fixed in a big way.
473          */
474         lsm->lsm_oi = oa->o_oi;
475         *ea = lsm;
476
477         if (oti != NULL) {
478                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
479
480                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
481                         if (!oti->oti_logcookies)
482                                 oti_alloc_cookies(oti, 1);
483                         *oti->oti_logcookies = oa->o_lcookie;
484                 }
485         }
486
487         CDEBUG(D_HA, "transno: "LPD64"\n",
488                lustre_msg_get_transno(req->rq_repmsg));
489 out_req:
490         ptlrpc_req_finished(req);
491 out:
492         if (rc && !*ea)
493                 obd_free_memmd(exp, &lsm);
494         return rc;
495 }
496
497 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
498                    obd_enqueue_update_f upcall, void *cookie,
499                    struct ptlrpc_request_set *rqset)
500 {
501         struct ptlrpc_request   *req;
502         struct osc_setattr_args *sa;
503         struct ost_body  *body;
504         int                   rc;
505
506         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
507         if (req == NULL)
508                 return -ENOMEM;
509
510         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
511         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
512         if (rc) {
513                 ptlrpc_request_free(req);
514                 return rc;
515         }
516         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
517         ptlrpc_at_set_req_timeout(req);
518
519         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
520         LASSERT(body);
521         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
522                              oinfo->oi_oa);
523         osc_pack_capa(req, body, oinfo->oi_capa);
524
525         ptlrpc_request_set_replen(req);
526
527         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
528         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
529         sa = ptlrpc_req_async_args(req);
530         sa->sa_oa     = oinfo->oi_oa;
531         sa->sa_upcall = upcall;
532         sa->sa_cookie = cookie;
533         if (rqset == PTLRPCD_SET)
534                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
535         else
536                 ptlrpc_set_add_req(rqset, req);
537
538         return 0;
539 }
540
541 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
542                      struct obd_info *oinfo, struct obd_trans_info *oti,
543                      struct ptlrpc_request_set *rqset)
544 {
545         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
546         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
547         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
548         return osc_punch_base(exp, oinfo,
549                               oinfo->oi_cb_up, oinfo, rqset);
550 }
551
552 static int osc_sync_interpret(const struct lu_env *env,
553                               struct ptlrpc_request *req,
554                               void *arg, int rc)
555 {
556         struct osc_fsync_args *fa = arg;
557         struct ost_body *body;
558
559         if (rc)
560                 GOTO(out, rc);
561
562         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
563         if (body == NULL) {
564                 CERROR ("can't unpack ost_body\n");
565                 GOTO(out, rc = -EPROTO);
566         }
567
568         *fa->fa_oi->oi_oa = body->oa;
569 out:
570         rc = fa->fa_upcall(fa->fa_cookie, rc);
571         return rc;
572 }
573
574 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
575                   obd_enqueue_update_f upcall, void *cookie,
576                   struct ptlrpc_request_set *rqset)
577 {
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         struct osc_fsync_args *fa;
581         int                 rc;
582
583         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
584         if (req == NULL)
585                 return -ENOMEM;
586
587         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
588         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
589         if (rc) {
590                 ptlrpc_request_free(req);
591                 return rc;
592         }
593
594         /* overload the size and blocks fields in the oa with start/end */
595         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
596         LASSERT(body);
597         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
598                              oinfo->oi_oa);
599         osc_pack_capa(req, body, oinfo->oi_capa);
600
601         ptlrpc_request_set_replen(req);
602         req->rq_interpret_reply = osc_sync_interpret;
603
604         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
605         fa = ptlrpc_req_async_args(req);
606         fa->fa_oi = oinfo;
607         fa->fa_upcall = upcall;
608         fa->fa_cookie = cookie;
609
610         if (rqset == PTLRPCD_SET)
611                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
612         else
613                 ptlrpc_set_add_req(rqset, req);
614
615         return 0;
616 }
617
618 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
619                     struct obd_info *oinfo, obd_size start, obd_size end,
620                     struct ptlrpc_request_set *set)
621 {
622         if (!oinfo->oi_oa) {
623                 CDEBUG(D_INFO, "oa NULL\n");
624                 return -EINVAL;
625         }
626
627         oinfo->oi_oa->o_size = start;
628         oinfo->oi_oa->o_blocks = end;
629         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
630
631         return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
632 }
633
634 /* Find and cancel locally locks matched by @mode in the resource found by
635  * @objid. Found locks are added into @cancel list. Returns the amount of
636  * locks added to @cancels list. */
637 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
638                                    struct list_head *cancels,
639                                    ldlm_mode_t mode, int lock_flags)
640 {
641         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
642         struct ldlm_res_id res_id;
643         struct ldlm_resource *res;
644         int count;
645
646         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
647          * export) but disabled through procfs (flag in NS).
648          *
649          * This distinguishes from a case when ELC is not supported originally,
650          * when we still want to cancel locks in advance and just cancel them
651          * locally, without sending any RPC. */
652         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
653                 return 0;
654
655         ostid_build_res_name(&oa->o_oi, &res_id);
656         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
657         if (res == NULL)
658                 return 0;
659
660         LDLM_RESOURCE_ADDREF(res);
661         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
662                                            lock_flags, 0, NULL);
663         LDLM_RESOURCE_DELREF(res);
664         ldlm_resource_putref(res);
665         return count;
666 }
667
668 static int osc_destroy_interpret(const struct lu_env *env,
669                                  struct ptlrpc_request *req, void *data,
670                                  int rc)
671 {
672         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
673
674         atomic_dec(&cli->cl_destroy_in_flight);
675         wake_up(&cli->cl_destroy_waitq);
676         return 0;
677 }
678
679 static int osc_can_send_destroy(struct client_obd *cli)
680 {
681         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
682             cli->cl_max_rpcs_in_flight) {
683                 /* The destroy request can be sent */
684                 return 1;
685         }
686         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
687             cli->cl_max_rpcs_in_flight) {
688                 /*
689                  * The counter has been modified between the two atomic
690                  * operations.
691                  */
692                 wake_up(&cli->cl_destroy_waitq);
693         }
694         return 0;
695 }
696
697 int osc_create(const struct lu_env *env, struct obd_export *exp,
698                struct obdo *oa, struct lov_stripe_md **ea,
699                struct obd_trans_info *oti)
700 {
701         int rc = 0;
702
703         LASSERT(oa);
704         LASSERT(ea);
705         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
706
707         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
708             oa->o_flags == OBD_FL_RECREATE_OBJS) {
709                 return osc_real_create(exp, oa, ea, oti);
710         }
711
712         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
713                 return osc_real_create(exp, oa, ea, oti);
714
715         /* we should not get here anymore */
716         LBUG();
717
718         return rc;
719 }
720
721 /* Destroy requests can be async always on the client, and we don't even really
722  * care about the return code since the client cannot do anything at all about
723  * a destroy failure.
724  * When the MDS is unlinking a filename, it saves the file objects into a
725  * recovery llog, and these object records are cancelled when the OST reports
726  * they were destroyed and sync'd to disk (i.e. transaction committed).
727  * If the client dies, or the OST is down when the object should be destroyed,
728  * the records are not cancelled, and when the OST reconnects to the MDS next,
729  * it will retrieve the llog unlink logs and then sends the log cancellation
730  * cookies to the MDS after committing destroy transactions. */
731 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
732                        struct obdo *oa, struct lov_stripe_md *ea,
733                        struct obd_trans_info *oti, struct obd_export *md_export,
734                        void *capa)
735 {
736         struct client_obd     *cli = &exp->exp_obd->u.cli;
737         struct ptlrpc_request *req;
738         struct ost_body       *body;
739         LIST_HEAD(cancels);
740         int rc, count;
741
742         if (!oa) {
743                 CDEBUG(D_INFO, "oa NULL\n");
744                 return -EINVAL;
745         }
746
747         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
748                                         LDLM_FL_DISCARD_DATA);
749
750         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
751         if (req == NULL) {
752                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
753                 return -ENOMEM;
754         }
755
756         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
757         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
758                                0, &cancels, count);
759         if (rc) {
760                 ptlrpc_request_free(req);
761                 return rc;
762         }
763
764         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
765         ptlrpc_at_set_req_timeout(req);
766
767         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
768                 oa->o_lcookie = *oti->oti_logcookies;
769         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
770         LASSERT(body);
771         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
772
773         osc_pack_capa(req, body, (struct obd_capa *)capa);
774         ptlrpc_request_set_replen(req);
775
776         /* If osc_destroy is for destroying the unlink orphan,
777          * sent from MDT to OST, which should not be blocked here,
778          * because the process might be triggered by ptlrpcd, and
779          * it is not good to block ptlrpcd thread (b=16006)*/
780         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
781                 req->rq_interpret_reply = osc_destroy_interpret;
782                 if (!osc_can_send_destroy(cli)) {
783                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
784                                                           NULL);
785
786                         /*
787                          * Wait until the number of on-going destroy RPCs drops
788                          * under max_rpc_in_flight
789                          */
790                         l_wait_event_exclusive(cli->cl_destroy_waitq,
791                                                osc_can_send_destroy(cli), &lwi);
792                 }
793         }
794
795         /* Do not wait for response */
796         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
797         return 0;
798 }
799
800 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
801                                 long writing_bytes)
802 {
803         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
804
805         LASSERT(!(oa->o_valid & bits));
806
807         oa->o_valid |= bits;
808         client_obd_list_lock(&cli->cl_loi_list_lock);
809         oa->o_dirty = cli->cl_dirty;
810         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
811                      cli->cl_dirty_max)) {
812                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
813                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
814                 oa->o_undirty = 0;
815         } else if (unlikely(atomic_read(&obd_dirty_pages) -
816                             atomic_read(&obd_dirty_transit_pages) >
817                             (long)(obd_max_dirty_pages + 1))) {
818                 /* The atomic_read() allowing the atomic_inc() are
819                  * not covered by a lock thus they may safely race and trip
820                  * this CERROR() unless we add in a small fudge factor (+1). */
821                 CERROR("dirty %d - %d > system dirty_max %d\n",
822                        atomic_read(&obd_dirty_pages),
823                        atomic_read(&obd_dirty_transit_pages),
824                        obd_max_dirty_pages);
825                 oa->o_undirty = 0;
826         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
827                 CERROR("dirty %lu - dirty_max %lu too big???\n",
828                        cli->cl_dirty, cli->cl_dirty_max);
829                 oa->o_undirty = 0;
830         } else {
831                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
832                                       PAGE_CACHE_SHIFT)*
833                                      (cli->cl_max_rpcs_in_flight + 1);
834                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
835         }
836         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
837         oa->o_dropped = cli->cl_lost_grant;
838         cli->cl_lost_grant = 0;
839         client_obd_list_unlock(&cli->cl_loi_list_lock);
840         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
841                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
842
843 }
844
845 void osc_update_next_shrink(struct client_obd *cli)
846 {
847         cli->cl_next_shrink_grant =
848                 cfs_time_shift(cli->cl_grant_shrink_interval);
849         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
850                cli->cl_next_shrink_grant);
851 }
852
853 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
854 {
855         client_obd_list_lock(&cli->cl_loi_list_lock);
856         cli->cl_avail_grant += grant;
857         client_obd_list_unlock(&cli->cl_loi_list_lock);
858 }
859
860 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
861 {
862         if (body->oa.o_valid & OBD_MD_FLGRANT) {
863                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
864                 __osc_update_grant(cli, body->oa.o_grant);
865         }
866 }
867
868 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
869                               obd_count keylen, void *key, obd_count vallen,
870                               void *val, struct ptlrpc_request_set *set);
871
872 static int osc_shrink_grant_interpret(const struct lu_env *env,
873                                       struct ptlrpc_request *req,
874                                       void *aa, int rc)
875 {
876         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
877         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
878         struct ost_body *body;
879
880         if (rc != 0) {
881                 __osc_update_grant(cli, oa->o_grant);
882                 GOTO(out, rc);
883         }
884
885         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
886         LASSERT(body);
887         osc_update_grant(cli, body);
888 out:
889         OBDO_FREE(oa);
890         return rc;
891 }
892
893 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
894 {
895         client_obd_list_lock(&cli->cl_loi_list_lock);
896         oa->o_grant = cli->cl_avail_grant / 4;
897         cli->cl_avail_grant -= oa->o_grant;
898         client_obd_list_unlock(&cli->cl_loi_list_lock);
899         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
900                 oa->o_valid |= OBD_MD_FLFLAGS;
901                 oa->o_flags = 0;
902         }
903         oa->o_flags |= OBD_FL_SHRINK_GRANT;
904         osc_update_next_shrink(cli);
905 }
906
907 /* Shrink the current grant, either from some large amount to enough for a
908  * full set of in-flight RPCs, or if we have already shrunk to that limit
909  * then to enough for a single RPC.  This avoids keeping more grant than
910  * needed, and avoids shrinking the grant piecemeal. */
911 static int osc_shrink_grant(struct client_obd *cli)
912 {
913         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
914                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
915
916         client_obd_list_lock(&cli->cl_loi_list_lock);
917         if (cli->cl_avail_grant <= target_bytes)
918                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
919         client_obd_list_unlock(&cli->cl_loi_list_lock);
920
921         return osc_shrink_grant_to_target(cli, target_bytes);
922 }
923
924 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
925 {
926         int                     rc = 0;
927         struct ost_body *body;
928
929         client_obd_list_lock(&cli->cl_loi_list_lock);
930         /* Don't shrink if we are already above or below the desired limit
931          * We don't want to shrink below a single RPC, as that will negatively
932          * impact block allocation and long-term performance. */
933         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
934                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
935
936         if (target_bytes >= cli->cl_avail_grant) {
937                 client_obd_list_unlock(&cli->cl_loi_list_lock);
938                 return 0;
939         }
940         client_obd_list_unlock(&cli->cl_loi_list_lock);
941
942         OBD_ALLOC_PTR(body);
943         if (!body)
944                 return -ENOMEM;
945
946         osc_announce_cached(cli, &body->oa, 0);
947
948         client_obd_list_lock(&cli->cl_loi_list_lock);
949         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
950         cli->cl_avail_grant = target_bytes;
951         client_obd_list_unlock(&cli->cl_loi_list_lock);
952         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
953                 body->oa.o_valid |= OBD_MD_FLFLAGS;
954                 body->oa.o_flags = 0;
955         }
956         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
957         osc_update_next_shrink(cli);
958
959         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
960                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
961                                 sizeof(*body), body, NULL);
962         if (rc != 0)
963                 __osc_update_grant(cli, body->oa.o_grant);
964         OBD_FREE_PTR(body);
965         return rc;
966 }
967
968 static int osc_should_shrink_grant(struct client_obd *client)
969 {
970         cfs_time_t time = cfs_time_current();
971         cfs_time_t next_shrink = client->cl_next_shrink_grant;
972
973         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
974              OBD_CONNECT_GRANT_SHRINK) == 0)
975                 return 0;
976
977         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
978                 /* Get the current RPC size directly, instead of going via:
979                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
980                  * Keep comment here so that it can be found by searching. */
981                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
982
983                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
984                     client->cl_avail_grant > brw_size)
985                         return 1;
986                 else
987                         osc_update_next_shrink(client);
988         }
989         return 0;
990 }
991
992 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
993 {
994         struct client_obd *client;
995
996         list_for_each_entry(client, &item->ti_obd_list,
997                                 cl_grant_shrink_list) {
998                 if (osc_should_shrink_grant(client))
999                         osc_shrink_grant(client);
1000         }
1001         return 0;
1002 }
1003
1004 static int osc_add_shrink_grant(struct client_obd *client)
1005 {
1006         int rc;
1007
1008         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1009                                        TIMEOUT_GRANT,
1010                                        osc_grant_shrink_grant_cb, NULL,
1011                                        &client->cl_grant_shrink_list);
1012         if (rc) {
1013                 CERROR("add grant client %s error %d\n",
1014                         client->cl_import->imp_obd->obd_name, rc);
1015                 return rc;
1016         }
1017         CDEBUG(D_CACHE, "add grant client %s \n",
1018                client->cl_import->imp_obd->obd_name);
1019         osc_update_next_shrink(client);
1020         return 0;
1021 }
1022
1023 static int osc_del_shrink_grant(struct client_obd *client)
1024 {
1025         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1026                                          TIMEOUT_GRANT);
1027 }
1028
1029 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1030 {
1031         /*
1032          * ocd_grant is the total grant amount we're expect to hold: if we've
1033          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1034          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1035          *
1036          * race is tolerable here: if we're evicted, but imp_state already
1037          * left EVICTED state, then cl_dirty must be 0 already.
1038          */
1039         client_obd_list_lock(&cli->cl_loi_list_lock);
1040         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1041                 cli->cl_avail_grant = ocd->ocd_grant;
1042         else
1043                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1044
1045         if (cli->cl_avail_grant < 0) {
1046                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1047                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1048                       ocd->ocd_grant, cli->cl_dirty);
1049                 /* workaround for servers which do not have the patch from
1050                  * LU-2679 */
1051                 cli->cl_avail_grant = ocd->ocd_grant;
1052         }
1053
1054         /* determine the appropriate chunk size used by osc_extent. */
1055         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1056         client_obd_list_unlock(&cli->cl_loi_list_lock);
1057
1058         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1059                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1060                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1061
1062         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1063             list_empty(&cli->cl_grant_shrink_list))
1064                 osc_add_shrink_grant(cli);
1065 }
1066
1067 /* We assume that the reason this OSC got a short read is because it read
1068  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1069  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1070  * this stripe never got written at or beyond this stripe offset yet. */
1071 static void handle_short_read(int nob_read, obd_count page_count,
1072                               struct brw_page **pga)
1073 {
1074         char *ptr;
1075         int i = 0;
1076
1077         /* skip bytes read OK */
1078         while (nob_read > 0) {
1079                 LASSERT (page_count > 0);
1080
1081                 if (pga[i]->count > nob_read) {
1082                         /* EOF inside this page */
1083                         ptr = kmap(pga[i]->pg) +
1084                                 (pga[i]->off & ~CFS_PAGE_MASK);
1085                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1086                         kunmap(pga[i]->pg);
1087                         page_count--;
1088                         i++;
1089                         break;
1090                 }
1091
1092                 nob_read -= pga[i]->count;
1093                 page_count--;
1094                 i++;
1095         }
1096
1097         /* zero remaining pages */
1098         while (page_count-- > 0) {
1099                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1100                 memset(ptr, 0, pga[i]->count);
1101                 kunmap(pga[i]->pg);
1102                 i++;
1103         }
1104 }
1105
1106 static int check_write_rcs(struct ptlrpc_request *req,
1107                            int requested_nob, int niocount,
1108                            obd_count page_count, struct brw_page **pga)
1109 {
1110         int     i;
1111         __u32   *remote_rcs;
1112
1113         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1114                                                   sizeof(*remote_rcs) *
1115                                                   niocount);
1116         if (remote_rcs == NULL) {
1117                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1118                 return(-EPROTO);
1119         }
1120
1121         /* return error if any niobuf was in error */
1122         for (i = 0; i < niocount; i++) {
1123                 if ((int)remote_rcs[i] < 0)
1124                         return(remote_rcs[i]);
1125
1126                 if (remote_rcs[i] != 0) {
1127                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1128                                 i, remote_rcs[i], req);
1129                         return(-EPROTO);
1130                 }
1131         }
1132
1133         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1134                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1135                        req->rq_bulk->bd_nob_transferred, requested_nob);
1136                 return(-EPROTO);
1137         }
1138
1139         return (0);
1140 }
1141
1142 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1143 {
1144         if (p1->flag != p2->flag) {
1145                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1146                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1147
1148                 /* warn if we try to combine flags that we don't know to be
1149                  * safe to combine */
1150                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1151                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1152                               "report this at http://bugs.whamcloud.com/\n",
1153                               p1->flag, p2->flag);
1154                 }
1155                 return 0;
1156         }
1157
1158         return (p1->off + p1->count == p2->off);
1159 }
1160
1161 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1162                                    struct brw_page **pga, int opc,
1163                                    cksum_type_t cksum_type)
1164 {
1165         __u32                           cksum;
1166         int                             i = 0;
1167         struct cfs_crypto_hash_desc     *hdesc;
1168         unsigned int                    bufsize;
1169         int                             err;
1170         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1171
1172         LASSERT(pg_count > 0);
1173
1174         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1175         if (IS_ERR(hdesc)) {
1176                 CERROR("Unable to initialize checksum hash %s\n",
1177                        cfs_crypto_hash_name(cfs_alg));
1178                 return PTR_ERR(hdesc);
1179         }
1180
1181         while (nob > 0 && pg_count > 0) {
1182                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1183
1184                 /* corrupt the data before we compute the checksum, to
1185                  * simulate an OST->client data error */
1186                 if (i == 0 && opc == OST_READ &&
1187                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1188                         unsigned char *ptr = kmap(pga[i]->pg);
1189                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1190                         memcpy(ptr + off, "bad1", min(4, nob));
1191                         kunmap(pga[i]->pg);
1192                 }
1193                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1194                                   pga[i]->off & ~CFS_PAGE_MASK,
1195                                   count);
1196                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1197                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1198
1199                 nob -= pga[i]->count;
1200                 pg_count--;
1201                 i++;
1202         }
1203
1204         bufsize = 4;
1205         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1206
1207         if (err)
1208                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1209
1210         /* For sending we only compute the wrong checksum instead
1211          * of corrupting the data so it is still correct on a redo */
1212         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1213                 cksum++;
1214
1215         return cksum;
1216 }
1217
1218 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1219                                 struct lov_stripe_md *lsm, obd_count page_count,
1220                                 struct brw_page **pga,
1221                                 struct ptlrpc_request **reqp,
1222                                 struct obd_capa *ocapa, int reserve,
1223                                 int resend)
1224 {
1225         struct ptlrpc_request   *req;
1226         struct ptlrpc_bulk_desc *desc;
1227         struct ost_body  *body;
1228         struct obd_ioobj        *ioobj;
1229         struct niobuf_remote    *niobuf;
1230         int niocount, i, requested_nob, opc, rc;
1231         struct osc_brw_async_args *aa;
1232         struct req_capsule      *pill;
1233         struct brw_page *pg_prev;
1234
1235         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1236                 return -ENOMEM; /* Recoverable */
1237         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1238                 return -EINVAL; /* Fatal */
1239
1240         if ((cmd & OBD_BRW_WRITE) != 0) {
1241                 opc = OST_WRITE;
1242                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1243                                                 cli->cl_import->imp_rq_pool,
1244                                                 &RQF_OST_BRW_WRITE);
1245         } else {
1246                 opc = OST_READ;
1247                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1248         }
1249         if (req == NULL)
1250                 return -ENOMEM;
1251
1252         for (niocount = i = 1; i < page_count; i++) {
1253                 if (!can_merge_pages(pga[i - 1], pga[i]))
1254                         niocount++;
1255         }
1256
1257         pill = &req->rq_pill;
1258         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1259                              sizeof(*ioobj));
1260         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1261                              niocount * sizeof(*niobuf));
1262         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1263
1264         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1265         if (rc) {
1266                 ptlrpc_request_free(req);
1267                 return rc;
1268         }
1269         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1270         ptlrpc_at_set_req_timeout(req);
1271         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1272          * retry logic */
1273         req->rq_no_retry_einprogress = 1;
1274
1275         desc = ptlrpc_prep_bulk_imp(req, page_count,
1276                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1277                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1278                 OST_BULK_PORTAL);
1279
1280         if (desc == NULL)
1281                 GOTO(out, rc = -ENOMEM);
1282         /* NB request now owns desc and will free it when it gets freed */
1283
1284         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1285         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1286         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1287         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1288
1289         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1290
1291         obdo_to_ioobj(oa, ioobj);
1292         ioobj->ioo_bufcnt = niocount;
1293         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1294          * that might be send for this request.  The actual number is decided
1295          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1296          * "max - 1" for old client compatibility sending "0", and also so the
1297          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1298         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1299         osc_pack_capa(req, body, ocapa);
1300         LASSERT(page_count > 0);
1301         pg_prev = pga[0];
1302         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1303                 struct brw_page *pg = pga[i];
1304                 int poff = pg->off & ~CFS_PAGE_MASK;
1305
1306                 LASSERT(pg->count > 0);
1307                 /* make sure there is no gap in the middle of page array */
1308                 LASSERTF(page_count == 1 ||
1309                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1310                           ergo(i > 0 && i < page_count - 1,
1311                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1312                           ergo(i == page_count - 1, poff == 0)),
1313                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1314                          i, page_count, pg, pg->off, pg->count);
1315                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1316                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1317                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1318                          i, page_count,
1319                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1320                          pg_prev->pg, page_private(pg_prev->pg),
1321                          pg_prev->pg->index, pg_prev->off);
1322                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1323                         (pg->flag & OBD_BRW_SRVLOCK));
1324
1325                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1326                 requested_nob += pg->count;
1327
1328                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1329                         niobuf--;
1330                         niobuf->len += pg->count;
1331                 } else {
1332                         niobuf->offset = pg->off;
1333                         niobuf->len    = pg->count;
1334                         niobuf->flags  = pg->flag;
1335                 }
1336                 pg_prev = pg;
1337         }
1338
1339         LASSERTF((void *)(niobuf - niocount) ==
1340                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1341                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1342                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1343
1344         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1345         if (resend) {
1346                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1347                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1348                         body->oa.o_flags = 0;
1349                 }
1350                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1351         }
1352
1353         if (osc_should_shrink_grant(cli))
1354                 osc_shrink_grant_local(cli, &body->oa);
1355
1356         /* size[REQ_REC_OFF] still sizeof (*body) */
1357         if (opc == OST_WRITE) {
1358                 if (cli->cl_checksum &&
1359                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1360                         /* store cl_cksum_type in a local variable since
1361                          * it can be changed via lprocfs */
1362                         cksum_type_t cksum_type = cli->cl_cksum_type;
1363
1364                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1365                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1366                                 body->oa.o_flags = 0;
1367                         }
1368                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1369                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1370                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1371                                                              page_count, pga,
1372                                                              OST_WRITE,
1373                                                              cksum_type);
1374                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1375                                body->oa.o_cksum);
1376                         /* save this in 'oa', too, for later checking */
1377                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1378                         oa->o_flags |= cksum_type_pack(cksum_type);
1379                 } else {
1380                         /* clear out the checksum flag, in case this is a
1381                          * resend but cl_checksum is no longer set. b=11238 */
1382                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1383                 }
1384                 oa->o_cksum = body->oa.o_cksum;
1385                 /* 1 RC per niobuf */
1386                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1387                                      sizeof(__u32) * niocount);
1388         } else {
1389                 if (cli->cl_checksum &&
1390                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1391                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1392                                 body->oa.o_flags = 0;
1393                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1394                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395                 }
1396         }
1397         ptlrpc_request_set_replen(req);
1398
1399         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1400         aa = ptlrpc_req_async_args(req);
1401         aa->aa_oa = oa;
1402         aa->aa_requested_nob = requested_nob;
1403         aa->aa_nio_count = niocount;
1404         aa->aa_page_count = page_count;
1405         aa->aa_resends = 0;
1406         aa->aa_ppga = pga;
1407         aa->aa_cli = cli;
1408         INIT_LIST_HEAD(&aa->aa_oaps);
1409         if (ocapa && reserve)
1410                 aa->aa_ocapa = capa_get(ocapa);
1411
1412         *reqp = req;
1413         return 0;
1414
1415  out:
1416         ptlrpc_req_finished(req);
1417         return rc;
1418 }
1419
1420 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1421                                 __u32 client_cksum, __u32 server_cksum, int nob,
1422                                 obd_count page_count, struct brw_page **pga,
1423                                 cksum_type_t client_cksum_type)
1424 {
1425         __u32 new_cksum;
1426         char *msg;
1427         cksum_type_t cksum_type;
1428
1429         if (server_cksum == client_cksum) {
1430                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1431                 return 0;
1432         }
1433
1434         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1435                                        oa->o_flags : 0);
1436         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1437                                       cksum_type);
1438
1439         if (cksum_type != client_cksum_type)
1440                 msg = "the server did not use the checksum type specified in "
1441                       "the original request - likely a protocol problem";
1442         else if (new_cksum == server_cksum)
1443                 msg = "changed on the client after we checksummed it - "
1444                       "likely false positive due to mmap IO (bug 11742)";
1445         else if (new_cksum == client_cksum)
1446                 msg = "changed in transit before arrival at OST";
1447         else
1448                 msg = "changed in transit AND doesn't match the original - "
1449                       "likely false positive due to mmap IO (bug 11742)";
1450
1451         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1452                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1453                            msg, libcfs_nid2str(peer->nid),
1454                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1455                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1456                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1457                            POSTID(&oa->o_oi), pga[0]->off,
1458                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1459         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1460                "client csum now %x\n", client_cksum, client_cksum_type,
1461                server_cksum, cksum_type, new_cksum);
1462         return 1;
1463 }
1464
1465 /* Note rc enters this function as number of bytes transferred */
1466 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1467 {
1468         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1469         const lnet_process_id_t *peer =
1470                         &req->rq_import->imp_connection->c_peer;
1471         struct client_obd *cli = aa->aa_cli;
1472         struct ost_body *body;
1473         __u32 client_cksum = 0;
1474
1475         if (rc < 0 && rc != -EDQUOT) {
1476                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1477                 return rc;
1478         }
1479
1480         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1481         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1482         if (body == NULL) {
1483                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1484                 return -EPROTO;
1485         }
1486
1487         /* set/clear over quota flag for a uid/gid */
1488         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1489             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1490                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1491
1492                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1493                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1494                        body->oa.o_flags);
1495                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1496         }
1497
1498         osc_update_grant(cli, body);
1499
1500         if (rc < 0)
1501                 return rc;
1502
1503         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1504                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1505
1506         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1507                 if (rc > 0) {
1508                         CERROR("Unexpected +ve rc %d\n", rc);
1509                         return -EPROTO;
1510                 }
1511                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1512
1513                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1514                         return -EAGAIN;
1515
1516                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1517                     check_write_checksum(&body->oa, peer, client_cksum,
1518                                          body->oa.o_cksum, aa->aa_requested_nob,
1519                                          aa->aa_page_count, aa->aa_ppga,
1520                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1521                         return -EAGAIN;
1522
1523                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1524                                      aa->aa_page_count, aa->aa_ppga);
1525                 GOTO(out, rc);
1526         }
1527
1528         /* The rest of this function executes only for OST_READs */
1529
1530         /* if unwrap_bulk failed, return -EAGAIN to retry */
1531         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1532         if (rc < 0)
1533                 GOTO(out, rc = -EAGAIN);
1534
1535         if (rc > aa->aa_requested_nob) {
1536                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1537                        aa->aa_requested_nob);
1538                 return -EPROTO;
1539         }
1540
1541         if (rc != req->rq_bulk->bd_nob_transferred) {
1542                 CERROR ("Unexpected rc %d (%d transferred)\n",
1543                         rc, req->rq_bulk->bd_nob_transferred);
1544                 return (-EPROTO);
1545         }
1546
1547         if (rc < aa->aa_requested_nob)
1548                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1549
1550         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1551                 static int cksum_counter;
1552                 __u32      server_cksum = body->oa.o_cksum;
1553                 char      *via;
1554                 char      *router;
1555                 cksum_type_t cksum_type;
1556
1557                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1558                                                body->oa.o_flags : 0);
1559                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1560                                                  aa->aa_ppga, OST_READ,
1561                                                  cksum_type);
1562
1563                 if (peer->nid == req->rq_bulk->bd_sender) {
1564                         via = router = "";
1565                 } else {
1566                         via = " via ";
1567                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1568                 }
1569
1570                 if (server_cksum == ~0 && rc > 0) {
1571                         CERROR("Protocol error: server %s set the 'checksum' "
1572                                "bit, but didn't send a checksum.  Not fatal, "
1573                                "but please notify on http://bugs.whamcloud.com/\n",
1574                                libcfs_nid2str(peer->nid));
1575                 } else if (server_cksum != client_cksum) {
1576                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1577                                            "%s%s%s inode "DFID" object "DOSTID
1578                                            " extent ["LPU64"-"LPU64"]\n",
1579                                            req->rq_import->imp_obd->obd_name,
1580                                            libcfs_nid2str(peer->nid),
1581                                            via, router,
1582                                            body->oa.o_valid & OBD_MD_FLFID ?
1583                                                 body->oa.o_parent_seq : (__u64)0,
1584                                            body->oa.o_valid & OBD_MD_FLFID ?
1585                                                 body->oa.o_parent_oid : 0,
1586                                            body->oa.o_valid & OBD_MD_FLFID ?
1587                                                 body->oa.o_parent_ver : 0,
1588                                            POSTID(&body->oa.o_oi),
1589                                            aa->aa_ppga[0]->off,
1590                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1591                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1592                                                                         1);
1593                         CERROR("client %x, server %x, cksum_type %x\n",
1594                                client_cksum, server_cksum, cksum_type);
1595                         cksum_counter = 0;
1596                         aa->aa_oa->o_cksum = client_cksum;
1597                         rc = -EAGAIN;
1598                 } else {
1599                         cksum_counter++;
1600                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1601                         rc = 0;
1602                 }
1603         } else if (unlikely(client_cksum)) {
1604                 static int cksum_missed;
1605
1606                 cksum_missed++;
1607                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1608                         CERROR("Checksum %u requested from %s but not sent\n",
1609                                cksum_missed, libcfs_nid2str(peer->nid));
1610         } else {
1611                 rc = 0;
1612         }
1613 out:
1614         if (rc >= 0)
1615                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1616                                      aa->aa_oa, &body->oa);
1617
1618         return rc;
1619 }
1620
1621 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1622                             struct lov_stripe_md *lsm,
1623                             obd_count page_count, struct brw_page **pga,
1624                             struct obd_capa *ocapa)
1625 {
1626         struct ptlrpc_request *req;
1627         int                 rc;
1628         wait_queue_head_t           waitq;
1629         int                 generation, resends = 0;
1630         struct l_wait_info     lwi;
1631
1632         init_waitqueue_head(&waitq);
1633         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1634
1635 restart_bulk:
1636         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1637                                   page_count, pga, &req, ocapa, 0, resends);
1638         if (rc != 0)
1639                 return (rc);
1640
1641         if (resends) {
1642                 req->rq_generation_set = 1;
1643                 req->rq_import_generation = generation;
1644                 req->rq_sent = cfs_time_current_sec() + resends;
1645         }
1646
1647         rc = ptlrpc_queue_wait(req);
1648
1649         if (rc == -ETIMEDOUT && req->rq_resend) {
1650                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1651                 ptlrpc_req_finished(req);
1652                 goto restart_bulk;
1653         }
1654
1655         rc = osc_brw_fini_request(req, rc);
1656
1657         ptlrpc_req_finished(req);
1658         /* When server return -EINPROGRESS, client should always retry
1659          * regardless of the number of times the bulk was resent already.*/
1660         if (osc_recoverable_error(rc)) {
1661                 resends++;
1662                 if (rc != -EINPROGRESS &&
1663                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1664                         CERROR("%s: too many resend retries for object: "
1665                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1666                                POSTID(&oa->o_oi), rc);
1667                         goto out;
1668                 }
1669                 if (generation !=
1670                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1671                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1672                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1673                                POSTID(&oa->o_oi), rc);
1674                         goto out;
1675                 }
1676
1677                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1678                                        NULL);
1679                 l_wait_event(waitq, 0, &lwi);
1680
1681                 goto restart_bulk;
1682         }
1683 out:
1684         if (rc == -EAGAIN || rc == -EINPROGRESS)
1685                 rc = -EIO;
1686         return rc;
1687 }
1688
1689 static int osc_brw_redo_request(struct ptlrpc_request *request,
1690                                 struct osc_brw_async_args *aa, int rc)
1691 {
1692         struct ptlrpc_request *new_req;
1693         struct osc_brw_async_args *new_aa;
1694         struct osc_async_page *oap;
1695
1696         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1697                   "redo for recoverable error %d", rc);
1698
1699         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1700                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1701                                   aa->aa_cli, aa->aa_oa,
1702                                   NULL /* lsm unused by osc currently */,
1703                                   aa->aa_page_count, aa->aa_ppga,
1704                                   &new_req, aa->aa_ocapa, 0, 1);
1705         if (rc)
1706                 return rc;
1707
1708         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1709                 if (oap->oap_request != NULL) {
1710                         LASSERTF(request == oap->oap_request,
1711                                  "request %p != oap_request %p\n",
1712                                  request, oap->oap_request);
1713                         if (oap->oap_interrupted) {
1714                                 ptlrpc_req_finished(new_req);
1715                                 return -EINTR;
1716                         }
1717                 }
1718         }
1719         /* New request takes over pga and oaps from old request.
1720          * Note that copying a list_head doesn't work, need to move it... */
1721         aa->aa_resends++;
1722         new_req->rq_interpret_reply = request->rq_interpret_reply;
1723         new_req->rq_async_args = request->rq_async_args;
1724         /* cap resend delay to the current request timeout, this is similar to
1725          * what ptlrpc does (see after_reply()) */
1726         if (aa->aa_resends > new_req->rq_timeout)
1727                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1728         else
1729                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1730         new_req->rq_generation_set = 1;
1731         new_req->rq_import_generation = request->rq_import_generation;
1732
1733         new_aa = ptlrpc_req_async_args(new_req);
1734
1735         INIT_LIST_HEAD(&new_aa->aa_oaps);
1736         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1737         INIT_LIST_HEAD(&new_aa->aa_exts);
1738         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1739         new_aa->aa_resends = aa->aa_resends;
1740
1741         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1742                 if (oap->oap_request) {
1743                         ptlrpc_req_finished(oap->oap_request);
1744                         oap->oap_request = ptlrpc_request_addref(new_req);
1745                 }
1746         }
1747
1748         new_aa->aa_ocapa = aa->aa_ocapa;
1749         aa->aa_ocapa = NULL;
1750
1751         /* XXX: This code will run into problem if we're going to support
1752          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1753          * and wait for all of them to be finished. We should inherit request
1754          * set from old request. */
1755         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1756
1757         DEBUG_REQ(D_INFO, new_req, "new request");
1758         return 0;
1759 }
1760
1761 /*
1762  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1763  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1764  * fine for our small page arrays and doesn't require allocation.  its an
1765  * insertion sort that swaps elements that are strides apart, shrinking the
1766  * stride down until its '1' and the array is sorted.
1767  */
1768 static void sort_brw_pages(struct brw_page **array, int num)
1769 {
1770         int stride, i, j;
1771         struct brw_page *tmp;
1772
1773         if (num == 1)
1774                 return;
1775         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1776                 ;
1777
1778         do {
1779                 stride /= 3;
1780                 for (i = stride ; i < num ; i++) {
1781                         tmp = array[i];
1782                         j = i;
1783                         while (j >= stride && array[j - stride]->off > tmp->off) {
1784                                 array[j] = array[j - stride];
1785                                 j -= stride;
1786                         }
1787                         array[j] = tmp;
1788                 }
1789         } while (stride > 1);
1790 }
1791
1792 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1793 {
1794         int count = 1;
1795         int offset;
1796         int i = 0;
1797
1798         LASSERT (pages > 0);
1799         offset = pg[i]->off & ~CFS_PAGE_MASK;
1800
1801         for (;;) {
1802                 pages--;
1803                 if (pages == 0)  /* that's all */
1804                         return count;
1805
1806                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1807                         return count;   /* doesn't end on page boundary */
1808
1809                 i++;
1810                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1811                 if (offset != 0)        /* doesn't start on page boundary */
1812                         return count;
1813
1814                 count++;
1815         }
1816 }
1817
1818 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1819 {
1820         struct brw_page **ppga;
1821         int i;
1822
1823         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1824         if (ppga == NULL)
1825                 return NULL;
1826
1827         for (i = 0; i < count; i++)
1828                 ppga[i] = pga + i;
1829         return ppga;
1830 }
1831
1832 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1833 {
1834         LASSERT(ppga != NULL);
1835         OBD_FREE(ppga, sizeof(*ppga) * count);
1836 }
1837
1838 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1839                    obd_count page_count, struct brw_page *pga,
1840                    struct obd_trans_info *oti)
1841 {
1842         struct obdo *saved_oa = NULL;
1843         struct brw_page **ppga, **orig;
1844         struct obd_import *imp = class_exp2cliimp(exp);
1845         struct client_obd *cli;
1846         int rc, page_count_orig;
1847
1848         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1849         cli = &imp->imp_obd->u.cli;
1850
1851         if (cmd & OBD_BRW_CHECK) {
1852                 /* The caller just wants to know if there's a chance that this
1853                  * I/O can succeed */
1854
1855                 if (imp->imp_invalid)
1856                         return -EIO;
1857                 return 0;
1858         }
1859
1860         /* test_brw with a failed create can trip this, maybe others. */
1861         LASSERT(cli->cl_max_pages_per_rpc);
1862
1863         rc = 0;
1864
1865         orig = ppga = osc_build_ppga(pga, page_count);
1866         if (ppga == NULL)
1867                 return -ENOMEM;
1868         page_count_orig = page_count;
1869
1870         sort_brw_pages(ppga, page_count);
1871         while (page_count) {
1872                 obd_count pages_per_brw;
1873
1874                 if (page_count > cli->cl_max_pages_per_rpc)
1875                         pages_per_brw = cli->cl_max_pages_per_rpc;
1876                 else
1877                         pages_per_brw = page_count;
1878
1879                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1880
1881                 if (saved_oa != NULL) {
1882                         /* restore previously saved oa */
1883                         *oinfo->oi_oa = *saved_oa;
1884                 } else if (page_count > pages_per_brw) {
1885                         /* save a copy of oa (brw will clobber it) */
1886                         OBDO_ALLOC(saved_oa);
1887                         if (saved_oa == NULL)
1888                                 GOTO(out, rc = -ENOMEM);
1889                         *saved_oa = *oinfo->oi_oa;
1890                 }
1891
1892                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1893                                       pages_per_brw, ppga, oinfo->oi_capa);
1894
1895                 if (rc != 0)
1896                         break;
1897
1898                 page_count -= pages_per_brw;
1899                 ppga += pages_per_brw;
1900         }
1901
1902 out:
1903         osc_release_ppga(orig, page_count_orig);
1904
1905         if (saved_oa != NULL)
1906                 OBDO_FREE(saved_oa);
1907
1908         return rc;
1909 }
1910
1911 static int brw_interpret(const struct lu_env *env,
1912                          struct ptlrpc_request *req, void *data, int rc)
1913 {
1914         struct osc_brw_async_args *aa = data;
1915         struct osc_extent *ext;
1916         struct osc_extent *tmp;
1917         struct cl_object  *obj = NULL;
1918         struct client_obd *cli = aa->aa_cli;
1919
1920         rc = osc_brw_fini_request(req, rc);
1921         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1922         /* When server return -EINPROGRESS, client should always retry
1923          * regardless of the number of times the bulk was resent already. */
1924         if (osc_recoverable_error(rc)) {
1925                 if (req->rq_import_generation !=
1926                     req->rq_import->imp_generation) {
1927                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1928                                ""DOSTID", rc = %d.\n",
1929                                req->rq_import->imp_obd->obd_name,
1930                                POSTID(&aa->aa_oa->o_oi), rc);
1931                 } else if (rc == -EINPROGRESS ||
1932                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1933                         rc = osc_brw_redo_request(req, aa, rc);
1934                 } else {
1935                         CERROR("%s: too many resent retries for object: "
1936                                ""LPU64":"LPU64", rc = %d.\n",
1937                                req->rq_import->imp_obd->obd_name,
1938                                POSTID(&aa->aa_oa->o_oi), rc);
1939                 }
1940
1941                 if (rc == 0)
1942                         return 0;
1943                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1944                         rc = -EIO;
1945         }
1946
1947         if (aa->aa_ocapa) {
1948                 capa_put(aa->aa_ocapa);
1949                 aa->aa_ocapa = NULL;
1950         }
1951
1952         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1953                 if (obj == NULL && rc == 0) {
1954                         obj = osc2cl(ext->oe_obj);
1955                         cl_object_get(obj);
1956                 }
1957
1958                 list_del_init(&ext->oe_link);
1959                 osc_extent_finish(env, ext, 1, rc);
1960         }
1961         LASSERT(list_empty(&aa->aa_exts));
1962         LASSERT(list_empty(&aa->aa_oaps));
1963
1964         if (obj != NULL) {
1965                 struct obdo *oa = aa->aa_oa;
1966                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1967                 unsigned long valid = 0;
1968
1969                 LASSERT(rc == 0);
1970                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1971                         attr->cat_blocks = oa->o_blocks;
1972                         valid |= CAT_BLOCKS;
1973                 }
1974                 if (oa->o_valid & OBD_MD_FLMTIME) {
1975                         attr->cat_mtime = oa->o_mtime;
1976                         valid |= CAT_MTIME;
1977                 }
1978                 if (oa->o_valid & OBD_MD_FLATIME) {
1979                         attr->cat_atime = oa->o_atime;
1980                         valid |= CAT_ATIME;
1981                 }
1982                 if (oa->o_valid & OBD_MD_FLCTIME) {
1983                         attr->cat_ctime = oa->o_ctime;
1984                         valid |= CAT_CTIME;
1985                 }
1986                 if (valid != 0) {
1987                         cl_object_attr_lock(obj);
1988                         cl_object_attr_set(env, obj, attr, valid);
1989                         cl_object_attr_unlock(obj);
1990                 }
1991                 cl_object_put(env, obj);
1992         }
1993         OBDO_FREE(aa->aa_oa);
1994
1995         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1996                           req->rq_bulk->bd_nob_transferred);
1997         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1998         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1999
2000         client_obd_list_lock(&cli->cl_loi_list_lock);
2001         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2002          * is called so we know whether to go to sync BRWs or wait for more
2003          * RPCs to complete */
2004         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2005                 cli->cl_w_in_flight--;
2006         else
2007                 cli->cl_r_in_flight--;
2008         osc_wake_cache_waiters(cli);
2009         client_obd_list_unlock(&cli->cl_loi_list_lock);
2010
2011         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2012         return rc;
2013 }
2014
2015 /**
2016  * Build an RPC by the list of extent @ext_list. The caller must ensure
2017  * that the total pages in this list are NOT over max pages per RPC.
2018  * Extents in the list must be in OES_RPC state.
2019  */
2020 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2021                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
2022 {
2023         struct ptlrpc_request           *req = NULL;
2024         struct osc_extent               *ext;
2025         struct brw_page                 **pga = NULL;
2026         struct osc_brw_async_args       *aa = NULL;
2027         struct obdo                     *oa = NULL;
2028         struct osc_async_page           *oap;
2029         struct osc_async_page           *tmp;
2030         struct cl_req                   *clerq = NULL;
2031         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2032                                                                       CRT_READ;
2033         struct ldlm_lock                *lock = NULL;
2034         struct cl_req_attr              *crattr = NULL;
2035         obd_off                         starting_offset = OBD_OBJECT_EOF;
2036         obd_off                         ending_offset = 0;
2037         int                             mpflag = 0;
2038         int                             mem_tight = 0;
2039         int                             page_count = 0;
2040         int                             i;
2041         int                             rc;
2042         LIST_HEAD(rpc_list);
2043
2044         LASSERT(!list_empty(ext_list));
2045
2046         /* add pages into rpc_list to build BRW rpc */
2047         list_for_each_entry(ext, ext_list, oe_link) {
2048                 LASSERT(ext->oe_state == OES_RPC);
2049                 mem_tight |= ext->oe_memalloc;
2050                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2051                         ++page_count;
2052                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2053                         if (starting_offset > oap->oap_obj_off)
2054                                 starting_offset = oap->oap_obj_off;
2055                         else
2056                                 LASSERT(oap->oap_page_off == 0);
2057                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2058                                 ending_offset = oap->oap_obj_off +
2059                                                 oap->oap_count;
2060                         else
2061                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2062                                         PAGE_CACHE_SIZE);
2063                 }
2064         }
2065
2066         if (mem_tight)
2067                 mpflag = cfs_memory_pressure_get_and_set();
2068
2069         OBD_ALLOC(crattr, sizeof(*crattr));
2070         if (crattr == NULL)
2071                 GOTO(out, rc = -ENOMEM);
2072
2073         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2074         if (pga == NULL)
2075                 GOTO(out, rc = -ENOMEM);
2076
2077         OBDO_ALLOC(oa);
2078         if (oa == NULL)
2079                 GOTO(out, rc = -ENOMEM);
2080
2081         i = 0;
2082         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2083                 struct cl_page *page = oap2cl_page(oap);
2084                 if (clerq == NULL) {
2085                         clerq = cl_req_alloc(env, page, crt,
2086                                              1 /* only 1-object rpcs for now */);
2087                         if (IS_ERR(clerq))
2088                                 GOTO(out, rc = PTR_ERR(clerq));
2089                         lock = oap->oap_ldlm_lock;
2090                 }
2091                 if (mem_tight)
2092                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2093                 pga[i] = &oap->oap_brw_page;
2094                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2095                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2096                        pga[i]->pg, page_index(oap->oap_page), oap,
2097                        pga[i]->flag);
2098                 i++;
2099                 cl_req_page_add(env, clerq, page);
2100         }
2101
2102         /* always get the data for the obdo for the rpc */
2103         LASSERT(clerq != NULL);
2104         crattr->cra_oa = oa;
2105         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2106         if (lock) {
2107                 oa->o_handle = lock->l_remote_handle;
2108                 oa->o_valid |= OBD_MD_FLHANDLE;
2109         }
2110
2111         rc = cl_req_prep(env, clerq);
2112         if (rc != 0) {
2113                 CERROR("cl_req_prep failed: %d\n", rc);
2114                 GOTO(out, rc);
2115         }
2116
2117         sort_brw_pages(pga, page_count);
2118         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2119                         pga, &req, crattr->cra_capa, 1, 0);
2120         if (rc != 0) {
2121                 CERROR("prep_req failed: %d\n", rc);
2122                 GOTO(out, rc);
2123         }
2124
2125         req->rq_interpret_reply = brw_interpret;
2126
2127         if (mem_tight != 0)
2128                 req->rq_memalloc = 1;
2129
2130         /* Need to update the timestamps after the request is built in case
2131          * we race with setattr (locally or in queue at OST).  If OST gets
2132          * later setattr before earlier BRW (as determined by the request xid),
2133          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2134          * way to do this in a single call.  bug 10150 */
2135         cl_req_attr_set(env, clerq, crattr,
2136                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2137
2138         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2139
2140         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2141         aa = ptlrpc_req_async_args(req);
2142         INIT_LIST_HEAD(&aa->aa_oaps);
2143         list_splice_init(&rpc_list, &aa->aa_oaps);
2144         INIT_LIST_HEAD(&aa->aa_exts);
2145         list_splice_init(ext_list, &aa->aa_exts);
2146         aa->aa_clerq = clerq;
2147
2148         /* queued sync pages can be torn down while the pages
2149          * were between the pending list and the rpc */
2150         tmp = NULL;
2151         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2152                 /* only one oap gets a request reference */
2153                 if (tmp == NULL)
2154                         tmp = oap;
2155                 if (oap->oap_interrupted && !req->rq_intr) {
2156                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2157                                         oap, req);
2158                         ptlrpc_mark_interrupted(req);
2159                 }
2160         }
2161         if (tmp != NULL)
2162                 tmp->oap_request = ptlrpc_request_addref(req);
2163
2164         client_obd_list_lock(&cli->cl_loi_list_lock);
2165         starting_offset >>= PAGE_CACHE_SHIFT;
2166         if (cmd == OBD_BRW_READ) {
2167                 cli->cl_r_in_flight++;
2168                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2169                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2170                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2171                                       starting_offset + 1);
2172         } else {
2173                 cli->cl_w_in_flight++;
2174                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2175                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2176                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2177                                       starting_offset + 1);
2178         }
2179         client_obd_list_unlock(&cli->cl_loi_list_lock);
2180
2181         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2182                   page_count, aa, cli->cl_r_in_flight,
2183                   cli->cl_w_in_flight);
2184
2185         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2186          * see which CPU/NUMA node the majority of pages were allocated
2187          * on, and try to assign the async RPC to the CPU core
2188          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2189          *
2190          * But on the other hand, we expect that multiple ptlrpcd
2191          * threads and the initial write sponsor can run in parallel,
2192          * especially when data checksum is enabled, which is CPU-bound
2193          * operation and single ptlrpcd thread cannot process in time.
2194          * So more ptlrpcd threads sharing BRW load
2195          * (with PDL_POLICY_ROUND) seems better.
2196          */
2197         ptlrpcd_add_req(req, pol, -1);
2198         rc = 0;
2199
2200 out:
2201         if (mem_tight != 0)
2202                 cfs_memory_pressure_restore(mpflag);
2203
2204         if (crattr != NULL) {
2205                 capa_put(crattr->cra_capa);
2206                 OBD_FREE(crattr, sizeof(*crattr));
2207         }
2208
2209         if (rc != 0) {
2210                 LASSERT(req == NULL);
2211
2212                 if (oa)
2213                         OBDO_FREE(oa);
2214                 if (pga)
2215                         OBD_FREE(pga, sizeof(*pga) * page_count);
2216                 /* this should happen rarely and is pretty bad, it makes the
2217                  * pending list not follow the dirty order */
2218                 while (!list_empty(ext_list)) {
2219                         ext = list_entry(ext_list->next, struct osc_extent,
2220                                              oe_link);
2221                         list_del_init(&ext->oe_link);
2222                         osc_extent_finish(env, ext, 0, rc);
2223                 }
2224                 if (clerq && !IS_ERR(clerq))
2225                         cl_req_completion(env, clerq, rc);
2226         }
2227         return rc;
2228 }
2229
2230 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2231                                         struct ldlm_enqueue_info *einfo)
2232 {
2233         void *data = einfo->ei_cbdata;
2234         int set = 0;
2235
2236         LASSERT(lock != NULL);
2237         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2238         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2239         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2240         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2241
2242         lock_res_and_lock(lock);
2243         spin_lock(&osc_ast_guard);
2244
2245         if (lock->l_ast_data == NULL)
2246                 lock->l_ast_data = data;
2247         if (lock->l_ast_data == data)
2248                 set = 1;
2249
2250         spin_unlock(&osc_ast_guard);
2251         unlock_res_and_lock(lock);
2252
2253         return set;
2254 }
2255
2256 static int osc_set_data_with_check(struct lustre_handle *lockh,
2257                                    struct ldlm_enqueue_info *einfo)
2258 {
2259         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2260         int set = 0;
2261
2262         if (lock != NULL) {
2263                 set = osc_set_lock_data_with_check(lock, einfo);
2264                 LDLM_LOCK_PUT(lock);
2265         } else
2266                 CERROR("lockh %p, data %p - client evicted?\n",
2267                        lockh, einfo->ei_cbdata);
2268         return set;
2269 }
2270
2271 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2272                              ldlm_iterator_t replace, void *data)
2273 {
2274         struct ldlm_res_id res_id;
2275         struct obd_device *obd = class_exp2obd(exp);
2276
2277         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2278         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2279         return 0;
2280 }
2281
2282 /* find any ldlm lock of the inode in osc
2283  * return 0    not find
2284  *      1    find one
2285  *      < 0    error */
2286 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2287                            ldlm_iterator_t replace, void *data)
2288 {
2289         struct ldlm_res_id res_id;
2290         struct obd_device *obd = class_exp2obd(exp);
2291         int rc = 0;
2292
2293         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2294         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2295         if (rc == LDLM_ITER_STOP)
2296                 return(1);
2297         if (rc == LDLM_ITER_CONTINUE)
2298                 return(0);
2299         return(rc);
2300 }
2301
2302 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2303                             obd_enqueue_update_f upcall, void *cookie,
2304                             __u64 *flags, int agl, int rc)
2305 {
2306         int intent = *flags & LDLM_FL_HAS_INTENT;
2307
2308         if (intent) {
2309                 /* The request was created before ldlm_cli_enqueue call. */
2310                 if (rc == ELDLM_LOCK_ABORTED) {
2311                         struct ldlm_reply *rep;
2312                         rep = req_capsule_server_get(&req->rq_pill,
2313                                                      &RMF_DLM_REP);
2314
2315                         LASSERT(rep != NULL);
2316                         rep->lock_policy_res1 =
2317                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2318                         if (rep->lock_policy_res1)
2319                                 rc = rep->lock_policy_res1;
2320                 }
2321         }
2322
2323         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2324             (rc == 0)) {
2325                 *flags |= LDLM_FL_LVB_READY;
2326                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2327                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2328         }
2329
2330         /* Call the update callback. */
2331         rc = (*upcall)(cookie, rc);
2332         return rc;
2333 }
2334
2335 static int osc_enqueue_interpret(const struct lu_env *env,
2336                                  struct ptlrpc_request *req,
2337                                  struct osc_enqueue_args *aa, int rc)
2338 {
2339         struct ldlm_lock *lock;
2340         struct lustre_handle handle;
2341         __u32 mode;
2342         struct ost_lvb *lvb;
2343         __u32 lvb_len;
2344         __u64 *flags = aa->oa_flags;
2345
2346         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2347          * might be freed anytime after lock upcall has been called. */
2348         lustre_handle_copy(&handle, aa->oa_lockh);
2349         mode = aa->oa_ei->ei_mode;
2350
2351         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2352          * be valid. */
2353         lock = ldlm_handle2lock(&handle);
2354
2355         /* Take an additional reference so that a blocking AST that
2356          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2357          * to arrive after an upcall has been executed by
2358          * osc_enqueue_fini(). */
2359         ldlm_lock_addref(&handle, mode);
2360
2361         /* Let CP AST to grant the lock first. */
2362         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2363
2364         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2365                 lvb = NULL;
2366                 lvb_len = 0;
2367         } else {
2368                 lvb = aa->oa_lvb;
2369                 lvb_len = sizeof(*aa->oa_lvb);
2370         }
2371
2372         /* Complete obtaining the lock procedure. */
2373         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2374                                    mode, flags, lvb, lvb_len, &handle, rc);
2375         /* Complete osc stuff. */
2376         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2377                               flags, aa->oa_agl, rc);
2378
2379         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2380
2381         /* Release the lock for async request. */
2382         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2383                 /*
2384                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2385                  * not already released by
2386                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2387                  */
2388                 ldlm_lock_decref(&handle, mode);
2389
2390         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2391                  aa->oa_lockh, req, aa);
2392         ldlm_lock_decref(&handle, mode);
2393         LDLM_LOCK_PUT(lock);
2394         return rc;
2395 }
2396
2397 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2398                         struct lov_oinfo *loi, int flags,
2399                         struct ost_lvb *lvb, __u32 mode, int rc)
2400 {
2401         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2402
2403         if (rc == ELDLM_OK) {
2404                 __u64 tmp;
2405
2406                 LASSERT(lock != NULL);
2407                 loi->loi_lvb = *lvb;
2408                 tmp = loi->loi_lvb.lvb_size;
2409                 /* Extend KMS up to the end of this lock and no further
2410                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2411                 if (tmp > lock->l_policy_data.l_extent.end)
2412                         tmp = lock->l_policy_data.l_extent.end + 1;
2413                 if (tmp >= loi->loi_kms) {
2414                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2415                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2416                         loi_kms_set(loi, tmp);
2417                 } else {
2418                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2419                                    LPU64"; leaving kms="LPU64", end="LPU64,
2420                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2421                                    lock->l_policy_data.l_extent.end);
2422                 }
2423                 ldlm_lock_allow_match(lock);
2424         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2425                 LASSERT(lock != NULL);
2426                 loi->loi_lvb = *lvb;
2427                 ldlm_lock_allow_match(lock);
2428                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2429                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2430                 rc = ELDLM_OK;
2431         }
2432
2433         if (lock != NULL) {
2434                 if (rc != ELDLM_OK)
2435                         ldlm_lock_fail_match(lock);
2436
2437                 LDLM_LOCK_PUT(lock);
2438         }
2439 }
2440 EXPORT_SYMBOL(osc_update_enqueue);
2441
2442 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2443
2444 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2445  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2446  * other synchronous requests, however keeping some locks and trying to obtain
2447  * others may take a considerable amount of time in a case of ost failure; and
2448  * when other sync requests do not get released lock from a client, the client
2449  * is excluded from the cluster -- such scenarious make the life difficult, so
2450  * release locks just after they are obtained. */
2451 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2452                      __u64 *flags, ldlm_policy_data_t *policy,
2453                      struct ost_lvb *lvb, int kms_valid,
2454                      obd_enqueue_update_f upcall, void *cookie,
2455                      struct ldlm_enqueue_info *einfo,
2456                      struct lustre_handle *lockh,
2457                      struct ptlrpc_request_set *rqset, int async, int agl)
2458 {
2459         struct obd_device *obd = exp->exp_obd;
2460         struct ptlrpc_request *req = NULL;
2461         int intent = *flags & LDLM_FL_HAS_INTENT;
2462         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2463         ldlm_mode_t mode;
2464         int rc;
2465
2466         /* Filesystem lock extents are extended to page boundaries so that
2467          * dealing with the page cache is a little smoother.  */
2468         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2469         policy->l_extent.end |= ~CFS_PAGE_MASK;
2470
2471         /*
2472          * kms is not valid when either object is completely fresh (so that no
2473          * locks are cached), or object was evicted. In the latter case cached
2474          * lock cannot be used, because it would prime inode state with
2475          * potentially stale LVB.
2476          */
2477         if (!kms_valid)
2478                 goto no_match;
2479
2480         /* Next, search for already existing extent locks that will cover us */
2481         /* If we're trying to read, we also search for an existing PW lock.  The
2482          * VFS and page cache already protect us locally, so lots of readers/
2483          * writers can share a single PW lock.
2484          *
2485          * There are problems with conversion deadlocks, so instead of
2486          * converting a read lock to a write lock, we'll just enqueue a new
2487          * one.
2488          *
2489          * At some point we should cancel the read lock instead of making them
2490          * send us a blocking callback, but there are problems with canceling
2491          * locks out from other users right now, too. */
2492         mode = einfo->ei_mode;
2493         if (einfo->ei_mode == LCK_PR)
2494                 mode |= LCK_PW;
2495         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2496                                einfo->ei_type, policy, mode, lockh, 0);
2497         if (mode) {
2498                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2499
2500                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2501                         /* For AGL, if enqueue RPC is sent but the lock is not
2502                          * granted, then skip to process this strpe.
2503                          * Return -ECANCELED to tell the caller. */
2504                         ldlm_lock_decref(lockh, mode);
2505                         LDLM_LOCK_PUT(matched);
2506                         return -ECANCELED;
2507                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2508                         *flags |= LDLM_FL_LVB_READY;
2509                         /* addref the lock only if not async requests and PW
2510                          * lock is matched whereas we asked for PR. */
2511                         if (!rqset && einfo->ei_mode != mode)
2512                                 ldlm_lock_addref(lockh, LCK_PR);
2513                         if (intent) {
2514                                 /* I would like to be able to ASSERT here that
2515                                  * rss <= kms, but I can't, for reasons which
2516                                  * are explained in lov_enqueue() */
2517                         }
2518
2519                         /* We already have a lock, and it's referenced.
2520                          *
2521                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2522                          * AGL upcall may change it to CLS_HELD directly. */
2523                         (*upcall)(cookie, ELDLM_OK);
2524
2525                         if (einfo->ei_mode != mode)
2526                                 ldlm_lock_decref(lockh, LCK_PW);
2527                         else if (rqset)
2528                                 /* For async requests, decref the lock. */
2529                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2530                         LDLM_LOCK_PUT(matched);
2531                         return ELDLM_OK;
2532                 } else {
2533                         ldlm_lock_decref(lockh, mode);
2534                         LDLM_LOCK_PUT(matched);
2535                 }
2536         }
2537
2538  no_match:
2539         if (intent) {
2540                 LIST_HEAD(cancels);
2541                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2542                                            &RQF_LDLM_ENQUEUE_LVB);
2543                 if (req == NULL)
2544                         return -ENOMEM;
2545
2546                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2547                 if (rc) {
2548                         ptlrpc_request_free(req);
2549                         return rc;
2550                 }
2551
2552                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2553                                      sizeof(*lvb));
2554                 ptlrpc_request_set_replen(req);
2555         }
2556
2557         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2558         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2559
2560         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2561                               sizeof(*lvb), LVB_T_OST, lockh, async);
2562         if (rqset) {
2563                 if (!rc) {
2564                         struct osc_enqueue_args *aa;
2565                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2566                         aa = ptlrpc_req_async_args(req);
2567                         aa->oa_ei = einfo;
2568                         aa->oa_exp = exp;
2569                         aa->oa_flags  = flags;
2570                         aa->oa_upcall = upcall;
2571                         aa->oa_cookie = cookie;
2572                         aa->oa_lvb    = lvb;
2573                         aa->oa_lockh  = lockh;
2574                         aa->oa_agl    = !!agl;
2575
2576                         req->rq_interpret_reply =
2577                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2578                         if (rqset == PTLRPCD_SET)
2579                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2580                         else
2581                                 ptlrpc_set_add_req(rqset, req);
2582                 } else if (intent) {
2583                         ptlrpc_req_finished(req);
2584                 }
2585                 return rc;
2586         }
2587
2588         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2589         if (intent)
2590                 ptlrpc_req_finished(req);
2591
2592         return rc;
2593 }
2594
2595 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2596                        struct ldlm_enqueue_info *einfo,
2597                        struct ptlrpc_request_set *rqset)
2598 {
2599         struct ldlm_res_id res_id;
2600         int rc;
2601
2602         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2603         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2604                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2605                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2606                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2607                               rqset, rqset != NULL, 0);
2608         return rc;
2609 }
2610
2611 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2612                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2613                    int *flags, void *data, struct lustre_handle *lockh,
2614                    int unref)
2615 {
2616         struct obd_device *obd = exp->exp_obd;
2617         int lflags = *flags;
2618         ldlm_mode_t rc;
2619
2620         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2621                 return -EIO;
2622
2623         /* Filesystem lock extents are extended to page boundaries so that
2624          * dealing with the page cache is a little smoother */
2625         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2626         policy->l_extent.end |= ~CFS_PAGE_MASK;
2627
2628         /* Next, search for already existing extent locks that will cover us */
2629         /* If we're trying to read, we also search for an existing PW lock.  The
2630          * VFS and page cache already protect us locally, so lots of readers/
2631          * writers can share a single PW lock. */
2632         rc = mode;
2633         if (mode == LCK_PR)
2634                 rc |= LCK_PW;
2635         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2636                              res_id, type, policy, rc, lockh, unref);
2637         if (rc) {
2638                 if (data != NULL) {
2639                         if (!osc_set_data_with_check(lockh, data)) {
2640                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2641                                         ldlm_lock_decref(lockh, rc);
2642                                 return 0;
2643                         }
2644                 }
2645                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2646                         ldlm_lock_addref(lockh, LCK_PR);
2647                         ldlm_lock_decref(lockh, LCK_PW);
2648                 }
2649                 return rc;
2650         }
2651         return rc;
2652 }
2653
2654 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2655 {
2656         if (unlikely(mode == LCK_GROUP))
2657                 ldlm_lock_decref_and_cancel(lockh, mode);
2658         else
2659                 ldlm_lock_decref(lockh, mode);
2660
2661         return 0;
2662 }
2663
2664 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2665                       __u32 mode, struct lustre_handle *lockh)
2666 {
2667         return osc_cancel_base(lockh, mode);
2668 }
2669
2670 static int osc_cancel_unused(struct obd_export *exp,
2671                              struct lov_stripe_md *lsm,
2672                              ldlm_cancel_flags_t flags,
2673                              void *opaque)
2674 {
2675         struct obd_device *obd = class_exp2obd(exp);
2676         struct ldlm_res_id res_id, *resp = NULL;
2677
2678         if (lsm != NULL) {
2679                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2680                 resp = &res_id;
2681         }
2682
2683         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2684 }
2685
2686 static int osc_statfs_interpret(const struct lu_env *env,
2687                                 struct ptlrpc_request *req,
2688                                 struct osc_async_args *aa, int rc)
2689 {
2690         struct obd_statfs *msfs;
2691
2692         if (rc == -EBADR)
2693                 /* The request has in fact never been sent
2694                  * due to issues at a higher level (LOV).
2695                  * Exit immediately since the caller is
2696                  * aware of the problem and takes care
2697                  * of the clean up */
2698                  return rc;
2699
2700         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2701             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2702                 GOTO(out, rc = 0);
2703
2704         if (rc != 0)
2705                 GOTO(out, rc);
2706
2707         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2708         if (msfs == NULL) {
2709                 GOTO(out, rc = -EPROTO);
2710         }
2711
2712         *aa->aa_oi->oi_osfs = *msfs;
2713 out:
2714         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2715         return rc;
2716 }
2717
2718 static int osc_statfs_async(struct obd_export *exp,
2719                             struct obd_info *oinfo, __u64 max_age,
2720                             struct ptlrpc_request_set *rqset)
2721 {
2722         struct obd_device     *obd = class_exp2obd(exp);
2723         struct ptlrpc_request *req;
2724         struct osc_async_args *aa;
2725         int                 rc;
2726
2727         /* We could possibly pass max_age in the request (as an absolute
2728          * timestamp or a "seconds.usec ago") so the target can avoid doing
2729          * extra calls into the filesystem if that isn't necessary (e.g.
2730          * during mount that would help a bit).  Having relative timestamps
2731          * is not so great if request processing is slow, while absolute
2732          * timestamps are not ideal because they need time synchronization. */
2733         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2734         if (req == NULL)
2735                 return -ENOMEM;
2736
2737         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2738         if (rc) {
2739                 ptlrpc_request_free(req);
2740                 return rc;
2741         }
2742         ptlrpc_request_set_replen(req);
2743         req->rq_request_portal = OST_CREATE_PORTAL;
2744         ptlrpc_at_set_req_timeout(req);
2745
2746         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2747                 /* procfs requests not want stat in wait for avoid deadlock */
2748                 req->rq_no_resend = 1;
2749                 req->rq_no_delay = 1;
2750         }
2751
2752         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2753         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2754         aa = ptlrpc_req_async_args(req);
2755         aa->aa_oi = oinfo;
2756
2757         ptlrpc_set_add_req(rqset, req);
2758         return 0;
2759 }
2760
2761 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2762                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2763 {
2764         struct obd_device     *obd = class_exp2obd(exp);
2765         struct obd_statfs     *msfs;
2766         struct ptlrpc_request *req;
2767         struct obd_import     *imp = NULL;
2768         int rc;
2769
2770         /*Since the request might also come from lprocfs, so we need
2771          *sync this with client_disconnect_export Bug15684*/
2772         down_read(&obd->u.cli.cl_sem);
2773         if (obd->u.cli.cl_import)
2774                 imp = class_import_get(obd->u.cli.cl_import);
2775         up_read(&obd->u.cli.cl_sem);
2776         if (!imp)
2777                 return -ENODEV;
2778
2779         /* We could possibly pass max_age in the request (as an absolute
2780          * timestamp or a "seconds.usec ago") so the target can avoid doing
2781          * extra calls into the filesystem if that isn't necessary (e.g.
2782          * during mount that would help a bit).  Having relative timestamps
2783          * is not so great if request processing is slow, while absolute
2784          * timestamps are not ideal because they need time synchronization. */
2785         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2786
2787         class_import_put(imp);
2788
2789         if (req == NULL)
2790                 return -ENOMEM;
2791
2792         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2793         if (rc) {
2794                 ptlrpc_request_free(req);
2795                 return rc;
2796         }
2797         ptlrpc_request_set_replen(req);
2798         req->rq_request_portal = OST_CREATE_PORTAL;
2799         ptlrpc_at_set_req_timeout(req);
2800
2801         if (flags & OBD_STATFS_NODELAY) {
2802                 /* procfs requests not want stat in wait for avoid deadlock */
2803                 req->rq_no_resend = 1;
2804                 req->rq_no_delay = 1;
2805         }
2806
2807         rc = ptlrpc_queue_wait(req);
2808         if (rc)
2809                 GOTO(out, rc);
2810
2811         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2812         if (msfs == NULL) {
2813                 GOTO(out, rc = -EPROTO);
2814         }
2815
2816         *osfs = *msfs;
2817
2818  out:
2819         ptlrpc_req_finished(req);
2820         return rc;
2821 }
2822
2823 /* Retrieve object striping information.
2824  *
2825  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2826  * the maximum number of OST indices which will fit in the user buffer.
2827  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2828  */
2829 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2830 {
2831         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2832         struct lov_user_md_v3 lum, *lumk;
2833         struct lov_user_ost_data_v1 *lmm_objects;
2834         int rc = 0, lum_size;
2835
2836         if (!lsm)
2837                 return -ENODATA;
2838
2839         /* we only need the header part from user space to get lmm_magic and
2840          * lmm_stripe_count, (the header part is common to v1 and v3) */
2841         lum_size = sizeof(struct lov_user_md_v1);
2842         if (copy_from_user(&lum, lump, lum_size))
2843                 return -EFAULT;
2844
2845         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2846             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2847                 return -EINVAL;
2848
2849         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2850         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2851         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2852         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2853
2854         /* we can use lov_mds_md_size() to compute lum_size
2855          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2856         if (lum.lmm_stripe_count > 0) {
2857                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2858                 OBD_ALLOC(lumk, lum_size);
2859                 if (!lumk)
2860                         return -ENOMEM;
2861
2862                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2863                         lmm_objects =
2864                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2865                 else
2866                         lmm_objects = &(lumk->lmm_objects[0]);
2867                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2868         } else {
2869                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2870                 lumk = &lum;
2871         }
2872
2873         lumk->lmm_oi = lsm->lsm_oi;
2874         lumk->lmm_stripe_count = 1;
2875
2876         if (copy_to_user(lump, lumk, lum_size))
2877                 rc = -EFAULT;
2878
2879         if (lumk != &lum)
2880                 OBD_FREE(lumk, lum_size);
2881
2882         return rc;
2883 }
2884
2885
2886 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2887                          void *karg, void *uarg)
2888 {
2889         struct obd_device *obd = exp->exp_obd;
2890         struct obd_ioctl_data *data = karg;
2891         int err = 0;
2892
2893         if (!try_module_get(THIS_MODULE)) {
2894                 CERROR("Can't get module. Is it alive?");
2895                 return -EINVAL;
2896         }
2897         switch (cmd) {
2898         case OBD_IOC_LOV_GET_CONFIG: {
2899                 char *buf;
2900                 struct lov_desc *desc;
2901                 struct obd_uuid uuid;
2902
2903                 buf = NULL;
2904                 len = 0;
2905                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2906                         GOTO(out, err = -EINVAL);
2907
2908                 data = (struct obd_ioctl_data *)buf;
2909
2910                 if (sizeof(*desc) > data->ioc_inllen1) {
2911                         obd_ioctl_freedata(buf, len);
2912                         GOTO(out, err = -EINVAL);
2913                 }
2914
2915                 if (data->ioc_inllen2 < sizeof(uuid)) {
2916                         obd_ioctl_freedata(buf, len);
2917                         GOTO(out, err = -EINVAL);
2918                 }
2919
2920                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2921                 desc->ld_tgt_count = 1;
2922                 desc->ld_active_tgt_count = 1;
2923                 desc->ld_default_stripe_count = 1;
2924                 desc->ld_default_stripe_size = 0;
2925                 desc->ld_default_stripe_offset = 0;
2926                 desc->ld_pattern = 0;
2927                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2928
2929                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2930
2931                 err = copy_to_user((void *)uarg, buf, len);
2932                 if (err)
2933                         err = -EFAULT;
2934                 obd_ioctl_freedata(buf, len);
2935                 GOTO(out, err);
2936         }
2937         case LL_IOC_LOV_SETSTRIPE:
2938                 err = obd_alloc_memmd(exp, karg);
2939                 if (err > 0)
2940                         err = 0;
2941                 GOTO(out, err);
2942         case LL_IOC_LOV_GETSTRIPE:
2943                 err = osc_getstripe(karg, uarg);
2944                 GOTO(out, err);
2945         case OBD_IOC_CLIENT_RECOVER:
2946                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2947                                             data->ioc_inlbuf1, 0);
2948                 if (err > 0)
2949                         err = 0;
2950                 GOTO(out, err);
2951         case IOC_OSC_SET_ACTIVE:
2952                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2953                                                data->ioc_offset);
2954                 GOTO(out, err);
2955         case OBD_IOC_POLL_QUOTACHECK:
2956                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2957                 GOTO(out, err);
2958         case OBD_IOC_PING_TARGET:
2959                 err = ptlrpc_obd_ping(obd);
2960                 GOTO(out, err);
2961         default:
2962                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2963                        cmd, current_comm());
2964                 GOTO(out, err = -ENOTTY);
2965         }
2966 out:
2967         module_put(THIS_MODULE);
2968         return err;
2969 }
2970
2971 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2972                         obd_count keylen, void *key, __u32 *vallen, void *val,
2973                         struct lov_stripe_md *lsm)
2974 {
2975         if (!vallen || !val)
2976                 return -EFAULT;
2977
2978         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2979                 __u32 *stripe = val;
2980                 *vallen = sizeof(*stripe);
2981                 *stripe = 0;
2982                 return 0;
2983         } else if (KEY_IS(KEY_LAST_ID)) {
2984                 struct ptlrpc_request *req;
2985                 obd_id          *reply;
2986                 char              *tmp;
2987                 int                 rc;
2988
2989                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2990                                            &RQF_OST_GET_INFO_LAST_ID);
2991                 if (req == NULL)
2992                         return -ENOMEM;
2993
2994                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2995                                      RCL_CLIENT, keylen);
2996                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2997                 if (rc) {
2998                         ptlrpc_request_free(req);
2999                         return rc;
3000                 }
3001
3002                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3003                 memcpy(tmp, key, keylen);
3004
3005                 req->rq_no_delay = req->rq_no_resend = 1;
3006                 ptlrpc_request_set_replen(req);
3007                 rc = ptlrpc_queue_wait(req);
3008                 if (rc)
3009                         GOTO(out, rc);
3010
3011                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3012                 if (reply == NULL)
3013                         GOTO(out, rc = -EPROTO);
3014
3015                 *((obd_id *)val) = *reply;
3016         out:
3017                 ptlrpc_req_finished(req);
3018                 return rc;
3019         } else if (KEY_IS(KEY_FIEMAP)) {
3020                 struct ll_fiemap_info_key *fm_key =
3021                                 (struct ll_fiemap_info_key *)key;
3022                 struct ldlm_res_id       res_id;
3023                 ldlm_policy_data_t       policy;
3024                 struct lustre_handle     lockh;
3025                 ldlm_mode_t              mode = 0;
3026                 struct ptlrpc_request   *req;
3027                 struct ll_user_fiemap   *reply;
3028                 char                    *tmp;
3029                 int                      rc;
3030
3031                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3032                         goto skip_locking;
3033
3034                 policy.l_extent.start = fm_key->fiemap.fm_start &
3035                                                 CFS_PAGE_MASK;
3036
3037                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3038                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3039                         policy.l_extent.end = OBD_OBJECT_EOF;
3040                 else
3041                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3042                                 fm_key->fiemap.fm_length +
3043                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3044
3045                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3046                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3047                                        LDLM_FL_BLOCK_GRANTED |
3048                                        LDLM_FL_LVB_READY,
3049                                        &res_id, LDLM_EXTENT, &policy,
3050                                        LCK_PR | LCK_PW, &lockh, 0);
3051                 if (mode) { /* lock is cached on client */
3052                         if (mode != LCK_PR) {
3053                                 ldlm_lock_addref(&lockh, LCK_PR);
3054                                 ldlm_lock_decref(&lockh, LCK_PW);
3055                         }
3056                 } else { /* no cached lock, needs acquire lock on server side */
3057                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3058                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3059                 }
3060
3061 skip_locking:
3062                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3063                                            &RQF_OST_GET_INFO_FIEMAP);
3064                 if (req == NULL)
3065                         GOTO(drop_lock, rc = -ENOMEM);
3066
3067                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3068                                      RCL_CLIENT, keylen);
3069                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3070                                      RCL_CLIENT, *vallen);
3071                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3072                                      RCL_SERVER, *vallen);
3073
3074                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3075                 if (rc) {
3076                         ptlrpc_request_free(req);
3077                         GOTO(drop_lock, rc);
3078                 }
3079
3080                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3081                 memcpy(tmp, key, keylen);
3082                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3083                 memcpy(tmp, val, *vallen);
3084
3085                 ptlrpc_request_set_replen(req);
3086                 rc = ptlrpc_queue_wait(req);
3087                 if (rc)
3088                         GOTO(fini_req, rc);
3089
3090                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3091                 if (reply == NULL)
3092                         GOTO(fini_req, rc = -EPROTO);
3093
3094                 memcpy(val, reply, *vallen);
3095 fini_req:
3096                 ptlrpc_req_finished(req);
3097 drop_lock:
3098                 if (mode)
3099                         ldlm_lock_decref(&lockh, LCK_PR);
3100                 return rc;
3101         }
3102
3103         return -EINVAL;
3104 }
3105
3106 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3107                               obd_count keylen, void *key, obd_count vallen,
3108                               void *val, struct ptlrpc_request_set *set)
3109 {
3110         struct ptlrpc_request *req;
3111         struct obd_device     *obd = exp->exp_obd;
3112         struct obd_import     *imp = class_exp2cliimp(exp);
3113         char              *tmp;
3114         int                 rc;
3115
3116         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3117
3118         if (KEY_IS(KEY_CHECKSUM)) {
3119                 if (vallen != sizeof(int))
3120                         return -EINVAL;
3121                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3122                 return 0;
3123         }
3124
3125         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3126                 sptlrpc_conf_client_adapt(obd);
3127                 return 0;
3128         }
3129
3130         if (KEY_IS(KEY_FLUSH_CTX)) {
3131                 sptlrpc_import_flush_my_ctx(imp);
3132                 return 0;
3133         }
3134
3135         if (KEY_IS(KEY_CACHE_SET)) {
3136                 struct client_obd *cli = &obd->u.cli;
3137
3138                 LASSERT(cli->cl_cache == NULL); /* only once */
3139                 cli->cl_cache = (struct cl_client_cache *)val;
3140                 atomic_inc(&cli->cl_cache->ccc_users);
3141                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3142
3143                 /* add this osc into entity list */
3144                 LASSERT(list_empty(&cli->cl_lru_osc));
3145                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3146                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3147                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3148
3149                 return 0;
3150         }
3151
3152         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3153                 struct client_obd *cli = &obd->u.cli;
3154                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3155                 int target = *(int *)val;
3156
3157                 nr = osc_lru_shrink(cli, min(nr, target));
3158                 *(int *)val -= nr;
3159                 return 0;
3160         }
3161
3162         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3163                 return -EINVAL;
3164
3165         /* We pass all other commands directly to OST. Since nobody calls osc
3166            methods directly and everybody is supposed to go through LOV, we
3167            assume lov checked invalid values for us.
3168            The only recognised values so far are evict_by_nid and mds_conn.
3169            Even if something bad goes through, we'd get a -EINVAL from OST
3170            anyway. */
3171
3172         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3173                                                 &RQF_OST_SET_GRANT_INFO :
3174                                                 &RQF_OBD_SET_INFO);
3175         if (req == NULL)
3176                 return -ENOMEM;
3177
3178         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3179                              RCL_CLIENT, keylen);
3180         if (!KEY_IS(KEY_GRANT_SHRINK))
3181                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3182                                      RCL_CLIENT, vallen);
3183         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3184         if (rc) {
3185                 ptlrpc_request_free(req);
3186                 return rc;
3187         }
3188
3189         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3190         memcpy(tmp, key, keylen);
3191         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3192                                                         &RMF_OST_BODY :
3193                                                         &RMF_SETINFO_VAL);
3194         memcpy(tmp, val, vallen);
3195
3196         if (KEY_IS(KEY_GRANT_SHRINK)) {
3197                 struct osc_grant_args *aa;
3198                 struct obdo *oa;
3199
3200                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3201                 aa = ptlrpc_req_async_args(req);
3202                 OBDO_ALLOC(oa);
3203                 if (!oa) {
3204                         ptlrpc_req_finished(req);
3205                         return -ENOMEM;
3206                 }
3207                 *oa = ((struct ost_body *)val)->oa;
3208                 aa->aa_oa = oa;
3209                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3210         }
3211
3212         ptlrpc_request_set_replen(req);
3213         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3214                 LASSERT(set != NULL);
3215                 ptlrpc_set_add_req(set, req);
3216                 ptlrpc_check_set(NULL, set);
3217         } else
3218                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3219
3220         return 0;
3221 }
3222
3223
3224 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3225                          struct obd_device *disk_obd, int *index)
3226 {
3227         /* this code is not supposed to be used with LOD/OSP
3228          * to be removed soon */
3229         LBUG();
3230         return 0;
3231 }
3232
3233 static int osc_llog_finish(struct obd_device *obd, int count)
3234 {
3235         struct llog_ctxt *ctxt;
3236
3237         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3238         if (ctxt) {
3239                 llog_cat_close(NULL, ctxt->loc_handle);
3240                 llog_cleanup(NULL, ctxt);
3241         }
3242
3243         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3244         if (ctxt)
3245                 llog_cleanup(NULL, ctxt);
3246         return 0;
3247 }
3248
3249 static int osc_reconnect(const struct lu_env *env,
3250                          struct obd_export *exp, struct obd_device *obd,
3251                          struct obd_uuid *cluuid,
3252                          struct obd_connect_data *data,
3253                          void *localdata)
3254 {
3255         struct client_obd *cli = &obd->u.cli;
3256
3257         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3258                 long lost_grant;
3259
3260                 client_obd_list_lock(&cli->cl_loi_list_lock);
3261                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3262                                 2 * cli_brw_size(obd);
3263                 lost_grant = cli->cl_lost_grant;
3264                 cli->cl_lost_grant = 0;
3265                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3266
3267                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3268                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3269                        data->ocd_version, data->ocd_grant, lost_grant);
3270         }
3271
3272         return 0;
3273 }
3274
3275 static int osc_disconnect(struct obd_export *exp)
3276 {
3277         struct obd_device *obd = class_exp2obd(exp);
3278         struct llog_ctxt  *ctxt;
3279         int rc;
3280
3281         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3282         if (ctxt) {
3283                 if (obd->u.cli.cl_conn_count == 1) {
3284                         /* Flush any remaining cancel messages out to the
3285                          * target */
3286                         llog_sync(ctxt, exp, 0);
3287                 }
3288                 llog_ctxt_put(ctxt);
3289         } else {
3290                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3291                        obd);
3292         }
3293
3294         rc = client_disconnect_export(exp);
3295         /**
3296          * Initially we put del_shrink_grant before disconnect_export, but it
3297          * causes the following problem if setup (connect) and cleanup
3298          * (disconnect) are tangled together.
3299          *      connect p1                   disconnect p2
3300          *   ptlrpc_connect_import
3301          *     ...............         class_manual_cleanup
3302          *                                   osc_disconnect
3303          *                                   del_shrink_grant
3304          *   ptlrpc_connect_interrupt
3305          *     init_grant_shrink
3306          *   add this client to shrink list
3307          *                                    cleanup_osc
3308          * Bang! pinger trigger the shrink.
3309          * So the osc should be disconnected from the shrink list, after we
3310          * are sure the import has been destroyed. BUG18662
3311          */
3312         if (obd->u.cli.cl_import == NULL)
3313                 osc_del_shrink_grant(&obd->u.cli);
3314         return rc;
3315 }
3316
3317 static int osc_import_event(struct obd_device *obd,
3318                             struct obd_import *imp,
3319                             enum obd_import_event event)
3320 {
3321         struct client_obd *cli;
3322         int rc = 0;
3323
3324         LASSERT(imp->imp_obd == obd);
3325
3326         switch (event) {
3327         case IMP_EVENT_DISCON: {
3328                 cli = &obd->u.cli;
3329                 client_obd_list_lock(&cli->cl_loi_list_lock);
3330                 cli->cl_avail_grant = 0;
3331                 cli->cl_lost_grant = 0;
3332                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3333                 break;
3334         }
3335         case IMP_EVENT_INACTIVE: {
3336                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3337                 break;
3338         }
3339         case IMP_EVENT_INVALIDATE: {
3340                 struct ldlm_namespace *ns = obd->obd_namespace;
3341                 struct lu_env    *env;
3342                 int                 refcheck;
3343
3344                 env = cl_env_get(&refcheck);
3345                 if (!IS_ERR(env)) {
3346                         /* Reset grants */
3347                         cli = &obd->u.cli;
3348                         /* all pages go to failing rpcs due to the invalid
3349                          * import */
3350                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3351
3352                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3353                         cl_env_put(env, &refcheck);
3354                 } else
3355                         rc = PTR_ERR(env);
3356                 break;
3357         }
3358         case IMP_EVENT_ACTIVE: {
3359                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3360                 break;
3361         }
3362         case IMP_EVENT_OCD: {
3363                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3364
3365                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3366                         osc_init_grant(&obd->u.cli, ocd);
3367
3368                 /* See bug 7198 */
3369                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3370                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3371
3372                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3373                 break;
3374         }
3375         case IMP_EVENT_DEACTIVATE: {
3376                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3377                 break;
3378         }
3379         case IMP_EVENT_ACTIVATE: {
3380                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3381                 break;
3382         }
3383         default:
3384                 CERROR("Unknown import event %d\n", event);
3385                 LBUG();
3386         }
3387         return rc;
3388 }
3389
3390 /**
3391  * Determine whether the lock can be canceled before replaying the lock
3392  * during recovery, see bug16774 for detailed information.
3393  *
3394  * \retval zero the lock can't be canceled
3395  * \retval other ok to cancel
3396  */
3397 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3398 {
3399         check_res_locked(lock->l_resource);
3400
3401         /*
3402          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3403          *
3404          * XXX as a future improvement, we can also cancel unused write lock
3405          * if it doesn't have dirty data and active mmaps.
3406          */
3407         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3408             (lock->l_granted_mode == LCK_PR ||
3409              lock->l_granted_mode == LCK_CR) &&
3410             (osc_dlm_lock_pageref(lock) == 0))
3411                 return 1;
3412
3413         return 0;
3414 }
3415
3416 static int brw_queue_work(const struct lu_env *env, void *data)
3417 {
3418         struct client_obd *cli = data;
3419
3420         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3421
3422         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3423         return 0;
3424 }
3425
3426 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3427 {
3428         struct lprocfs_static_vars lvars = { 0 };
3429         struct client_obd         *cli = &obd->u.cli;
3430         void                   *handler;
3431         int                     rc;
3432
3433         rc = ptlrpcd_addref();
3434         if (rc)
3435                 return rc;
3436
3437         rc = client_obd_setup(obd, lcfg);
3438         if (rc)
3439                 GOTO(out_ptlrpcd, rc);
3440
3441         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3442         if (IS_ERR(handler))
3443                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3444         cli->cl_writeback_work = handler;
3445
3446         rc = osc_quota_setup(obd);
3447         if (rc)
3448                 GOTO(out_ptlrpcd_work, rc);
3449
3450         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3451         lprocfs_osc_init_vars(&lvars);
3452         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3453                 lproc_osc_attach_seqstat(obd);
3454                 sptlrpc_lprocfs_cliobd_attach(obd);
3455                 ptlrpc_lprocfs_register_obd(obd);
3456         }
3457
3458         /* We need to allocate a few requests more, because
3459          * brw_interpret tries to create new requests before freeing
3460          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3461          * reserved, but I'm afraid that might be too much wasted RAM
3462          * in fact, so 2 is just my guess and still should work. */
3463         cli->cl_import->imp_rq_pool =
3464                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3465                                     OST_MAXREQSIZE,
3466                                     ptlrpc_add_rqs_to_pool);
3467
3468         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3469         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3470         return rc;
3471
3472 out_ptlrpcd_work:
3473         ptlrpcd_destroy_work(handler);
3474 out_client_setup:
3475         client_obd_cleanup(obd);
3476 out_ptlrpcd:
3477         ptlrpcd_decref();
3478         return rc;
3479 }
3480
3481 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3482 {
3483         int rc = 0;
3484
3485         switch (stage) {
3486         case OBD_CLEANUP_EARLY: {
3487                 struct obd_import *imp;
3488                 imp = obd->u.cli.cl_import;
3489                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3490                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3491                 ptlrpc_deactivate_import(imp);
3492                 spin_lock(&imp->imp_lock);
3493                 imp->imp_pingable = 0;
3494                 spin_unlock(&imp->imp_lock);
3495                 break;
3496         }
3497         case OBD_CLEANUP_EXPORTS: {
3498                 struct client_obd *cli = &obd->u.cli;
3499                 /* LU-464
3500                  * for echo client, export may be on zombie list, wait for
3501                  * zombie thread to cull it, because cli.cl_import will be
3502                  * cleared in client_disconnect_export():
3503                  *   class_export_destroy() -> obd_cleanup() ->
3504                  *   echo_device_free() -> echo_client_cleanup() ->
3505                  *   obd_disconnect() -> osc_disconnect() ->
3506                  *   client_disconnect_export()
3507                  */
3508                 obd_zombie_barrier();
3509                 if (cli->cl_writeback_work) {
3510                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3511                         cli->cl_writeback_work = NULL;
3512                 }
3513                 obd_cleanup_client_import(obd);
3514                 ptlrpc_lprocfs_unregister_obd(obd);
3515                 lprocfs_obd_cleanup(obd);
3516                 rc = obd_llog_finish(obd, 0);
3517                 if (rc != 0)
3518                         CERROR("failed to cleanup llogging subsystems\n");
3519                 break;
3520                 }
3521         }
3522         return rc;
3523 }
3524
3525 int osc_cleanup(struct obd_device *obd)
3526 {
3527         struct client_obd *cli = &obd->u.cli;
3528         int rc;
3529
3530         /* lru cleanup */
3531         if (cli->cl_cache != NULL) {
3532                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3533                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3534                 list_del_init(&cli->cl_lru_osc);
3535                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3536                 cli->cl_lru_left = NULL;
3537                 atomic_dec(&cli->cl_cache->ccc_users);
3538                 cli->cl_cache = NULL;
3539         }
3540
3541         /* free memory of osc quota cache */
3542         osc_quota_cleanup(obd);
3543
3544         rc = client_obd_cleanup(obd);
3545
3546         ptlrpcd_decref();
3547         return rc;
3548 }
3549
3550 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3551 {
3552         struct lprocfs_static_vars lvars = { 0 };
3553         int rc = 0;
3554
3555         lprocfs_osc_init_vars(&lvars);
3556
3557         switch (lcfg->lcfg_command) {
3558         default:
3559                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3560                                               lcfg, obd);
3561                 if (rc > 0)
3562                         rc = 0;
3563                 break;
3564         }
3565
3566         return(rc);
3567 }
3568
3569 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3570 {
3571         return osc_process_config_base(obd, buf);
3572 }
3573
3574 struct obd_ops osc_obd_ops = {
3575         .o_owner                = THIS_MODULE,
3576         .o_setup                = osc_setup,
3577         .o_precleanup      = osc_precleanup,
3578         .o_cleanup            = osc_cleanup,
3579         .o_add_conn          = client_import_add_conn,
3580         .o_del_conn          = client_import_del_conn,
3581         .o_connect            = client_connect_import,
3582         .o_reconnect        = osc_reconnect,
3583         .o_disconnect      = osc_disconnect,
3584         .o_statfs              = osc_statfs,
3585         .o_statfs_async  = osc_statfs_async,
3586         .o_packmd              = osc_packmd,
3587         .o_unpackmd          = osc_unpackmd,
3588         .o_create              = osc_create,
3589         .o_destroy            = osc_destroy,
3590         .o_getattr            = osc_getattr,
3591         .o_getattr_async        = osc_getattr_async,
3592         .o_setattr            = osc_setattr,
3593         .o_setattr_async        = osc_setattr_async,
3594         .o_brw            = osc_brw,
3595         .o_punch                = osc_punch,
3596         .o_sync          = osc_sync,
3597         .o_enqueue            = osc_enqueue,
3598         .o_change_cbdata        = osc_change_cbdata,
3599         .o_find_cbdata    = osc_find_cbdata,
3600         .o_cancel              = osc_cancel,
3601         .o_cancel_unused        = osc_cancel_unused,
3602         .o_iocontrol        = osc_iocontrol,
3603         .o_get_info          = osc_get_info,
3604         .o_set_info_async       = osc_set_info_async,
3605         .o_import_event  = osc_import_event,
3606         .o_llog_init        = osc_llog_init,
3607         .o_llog_finish    = osc_llog_finish,
3608         .o_process_config       = osc_process_config,
3609         .o_quotactl          = osc_quotactl,
3610         .o_quotacheck      = osc_quotacheck,
3611 };
3612
3613 extern struct lu_kmem_descr osc_caches[];
3614 extern spinlock_t osc_ast_guard;
3615 extern struct lock_class_key osc_ast_guard_class;
3616
3617 int __init osc_init(void)
3618 {
3619         struct lprocfs_static_vars lvars = { 0 };
3620         int rc;
3621
3622         /* print an address of _any_ initialized kernel symbol from this
3623          * module, to allow debugging with gdb that doesn't support data
3624          * symbols from modules.*/
3625         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3626
3627         rc = lu_kmem_init(osc_caches);
3628         if (rc)
3629                 return rc;
3630
3631         lprocfs_osc_init_vars(&lvars);
3632
3633         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3634                                  LUSTRE_OSC_NAME, &osc_device_type);
3635         if (rc) {
3636                 lu_kmem_fini(osc_caches);
3637                 return rc;
3638         }
3639
3640         spin_lock_init(&osc_ast_guard);
3641         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3642
3643         return rc;
3644 }
3645
3646 static void /*__exit*/ osc_exit(void)
3647 {
3648         class_unregister_type(LUSTRE_OSC_NAME);
3649         lu_kmem_fini(osc_caches);
3650 }
3651
3652 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3653 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3654 MODULE_LICENSE("GPL");
3655 MODULE_VERSION(LUSTRE_VERSION_STRING);
3656
3657 module_init(osc_init);
3658 module_exit(osc_exit);