]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/osc/osc_request.c
staging/lustre/osc: some cleanup to reduce stack overflow chance
[karo-tx-linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <linux/libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47 #include <obd_lov.h>
48
49 #ifdef  __CYGWIN__
50 # include <ctype.h>
51 #endif
52
53 #include <lustre_ha.h>
54 #include <lprocfs_status.h>
55 #include <lustre_log.h>
56 #include <lustre_debug.h>
57 #include <lustre_param.h>
58 #include <lustre_fid.h>
59 #include "osc_internal.h"
60 #include "osc_cl_internal.h"
61
62 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63 static int brw_interpret(const struct lu_env *env,
64                          struct ptlrpc_request *req, void *data, int rc);
65 int osc_cleanup(struct obd_device *obd);
66
67 /* Pack OSC object metadata for disk storage (LE byte order). */
68 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69                       struct lov_stripe_md *lsm)
70 {
71         int lmm_size;
72         ENTRY;
73
74         lmm_size = sizeof(**lmmp);
75         if (lmmp == NULL)
76                 RETURN(lmm_size);
77
78         if (*lmmp != NULL && lsm == NULL) {
79                 OBD_FREE(*lmmp, lmm_size);
80                 *lmmp = NULL;
81                 RETURN(0);
82         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
83                 RETURN(-EBADF);
84         }
85
86         if (*lmmp == NULL) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (*lmmp == NULL)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm)
93                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
94
95         RETURN(lmm_size);
96 }
97
98 /* Unpack OSC object metadata from disk storage (LE byte order). */
99 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
100                         struct lov_mds_md *lmm, int lmm_bytes)
101 {
102         int lsm_size;
103         struct obd_import *imp = class_exp2cliimp(exp);
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof(*lmm)) {
108                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
109                                exp->exp_obd->obd_name, lmm_bytes,
110                                (int)sizeof(*lmm));
111                         RETURN(-EINVAL);
112                 }
113                 /* XXX LOV_MAGIC etc check? */
114
115                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
116                         CERROR("%s: zero lmm_object_id: rc = %d\n",
117                                exp->exp_obd->obd_name, -EINVAL);
118                         RETURN(-EINVAL);
119                 }
120         }
121
122         lsm_size = lov_stripe_md_size(1);
123         if (lsmp == NULL)
124                 RETURN(lsm_size);
125
126         if (*lsmp != NULL && lmm == NULL) {
127                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
128                 OBD_FREE(*lsmp, lsm_size);
129                 *lsmp = NULL;
130                 RETURN(0);
131         }
132
133         if (*lsmp == NULL) {
134                 OBD_ALLOC(*lsmp, lsm_size);
135                 if (unlikely(*lsmp == NULL))
136                         RETURN(-ENOMEM);
137                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
139                         OBD_FREE(*lsmp, lsm_size);
140                         RETURN(-ENOMEM);
141                 }
142                 loi_init((*lsmp)->lsm_oinfo[0]);
143         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
144                 RETURN(-EBADF);
145         }
146
147         if (lmm != NULL)
148                 /* XXX zero *lsmp? */
149                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
150
151         if (imp != NULL &&
152             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
153                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
154         else
155                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156
157         RETURN(lsm_size);
158 }
159
160 static inline void osc_pack_capa(struct ptlrpc_request *req,
161                                  struct ost_body *body, void *capa)
162 {
163         struct obd_capa *oc = (struct obd_capa *)capa;
164         struct lustre_capa *c;
165
166         if (!capa)
167                 return;
168
169         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
170         LASSERT(c);
171         capa_cpy(c, oc);
172         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173         DEBUG_CAPA(D_SEC, c, "pack");
174 }
175
176 static inline void osc_pack_req_body(struct ptlrpc_request *req,
177                                      struct obd_info *oinfo)
178 {
179         struct ost_body *body;
180
181         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
182         LASSERT(body);
183
184         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
185         osc_pack_capa(req, body, oinfo->oi_capa);
186 }
187
188 static inline void osc_set_capa_size(struct ptlrpc_request *req,
189                                      const struct req_msg_field *field,
190                                      struct obd_capa *oc)
191 {
192         if (oc == NULL)
193                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
194         else
195                 /* it is already calculated as sizeof struct obd_capa */
196                 ;
197 }
198
199 static int osc_getattr_interpret(const struct lu_env *env,
200                                  struct ptlrpc_request *req,
201                                  struct osc_async_args *aa, int rc)
202 {
203         struct ost_body *body;
204         ENTRY;
205
206         if (rc != 0)
207                 GOTO(out, rc);
208
209         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
210         if (body) {
211                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
212                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
213
214                 /* This should really be sent by the OST */
215                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
216                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
217         } else {
218                 CDEBUG(D_INFO, "can't unpack ost_body\n");
219                 rc = -EPROTO;
220                 aa->aa_oi->oi_oa->o_valid = 0;
221         }
222 out:
223         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
224         RETURN(rc);
225 }
226
227 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
228                              struct ptlrpc_request_set *set)
229 {
230         struct ptlrpc_request *req;
231         struct osc_async_args *aa;
232         int                 rc;
233         ENTRY;
234
235         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
236         if (req == NULL)
237                 RETURN(-ENOMEM);
238
239         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
240         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
241         if (rc) {
242                 ptlrpc_request_free(req);
243                 RETURN(rc);
244         }
245
246         osc_pack_req_body(req, oinfo);
247
248         ptlrpc_request_set_replen(req);
249         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
250
251         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
252         aa = ptlrpc_req_async_args(req);
253         aa->aa_oi = oinfo;
254
255         ptlrpc_set_add_req(set, req);
256         RETURN(0);
257 }
258
259 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
260                        struct obd_info *oinfo)
261 {
262         struct ptlrpc_request *req;
263         struct ost_body       *body;
264         int                 rc;
265         ENTRY;
266
267         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
268         if (req == NULL)
269                 RETURN(-ENOMEM);
270
271         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
272         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
273         if (rc) {
274                 ptlrpc_request_free(req);
275                 RETURN(rc);
276         }
277
278         osc_pack_req_body(req, oinfo);
279
280         ptlrpc_request_set_replen(req);
281
282         rc = ptlrpc_queue_wait(req);
283         if (rc)
284                 GOTO(out, rc);
285
286         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
287         if (body == NULL)
288                 GOTO(out, rc = -EPROTO);
289
290         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
291         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
292
293         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
294         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
295
296         EXIT;
297  out:
298         ptlrpc_req_finished(req);
299         return rc;
300 }
301
302 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
303                        struct obd_info *oinfo, struct obd_trans_info *oti)
304 {
305         struct ptlrpc_request *req;
306         struct ost_body       *body;
307         int                 rc;
308         ENTRY;
309
310         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
311
312         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
313         if (req == NULL)
314                 RETURN(-ENOMEM);
315
316         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
317         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
318         if (rc) {
319                 ptlrpc_request_free(req);
320                 RETURN(rc);
321         }
322
323         osc_pack_req_body(req, oinfo);
324
325         ptlrpc_request_set_replen(req);
326
327         rc = ptlrpc_queue_wait(req);
328         if (rc)
329                 GOTO(out, rc);
330
331         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
332         if (body == NULL)
333                 GOTO(out, rc = -EPROTO);
334
335         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
336
337         EXIT;
338 out:
339         ptlrpc_req_finished(req);
340         RETURN(rc);
341 }
342
343 static int osc_setattr_interpret(const struct lu_env *env,
344                                  struct ptlrpc_request *req,
345                                  struct osc_setattr_args *sa, int rc)
346 {
347         struct ost_body *body;
348         ENTRY;
349
350         if (rc != 0)
351                 GOTO(out, rc);
352
353         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
354         if (body == NULL)
355                 GOTO(out, rc = -EPROTO);
356
357         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
358 out:
359         rc = sa->sa_upcall(sa->sa_cookie, rc);
360         RETURN(rc);
361 }
362
363 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
364                            struct obd_trans_info *oti,
365                            obd_enqueue_update_f upcall, void *cookie,
366                            struct ptlrpc_request_set *rqset)
367 {
368         struct ptlrpc_request   *req;
369         struct osc_setattr_args *sa;
370         int                   rc;
371         ENTRY;
372
373         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
374         if (req == NULL)
375                 RETURN(-ENOMEM);
376
377         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
378         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
379         if (rc) {
380                 ptlrpc_request_free(req);
381                 RETURN(rc);
382         }
383
384         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
385                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
386
387         osc_pack_req_body(req, oinfo);
388
389         ptlrpc_request_set_replen(req);
390
391         /* do mds to ost setattr asynchronously */
392         if (!rqset) {
393                 /* Do not wait for response. */
394                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
395         } else {
396                 req->rq_interpret_reply =
397                         (ptlrpc_interpterer_t)osc_setattr_interpret;
398
399                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
400                 sa = ptlrpc_req_async_args(req);
401                 sa->sa_oa = oinfo->oi_oa;
402                 sa->sa_upcall = upcall;
403                 sa->sa_cookie = cookie;
404
405                 if (rqset == PTLRPCD_SET)
406                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
407                 else
408                         ptlrpc_set_add_req(rqset, req);
409         }
410
411         RETURN(0);
412 }
413
414 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
415                              struct obd_trans_info *oti,
416                              struct ptlrpc_request_set *rqset)
417 {
418         return osc_setattr_async_base(exp, oinfo, oti,
419                                       oinfo->oi_cb_up, oinfo, rqset);
420 }
421
422 int osc_real_create(struct obd_export *exp, struct obdo *oa,
423                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
424 {
425         struct ptlrpc_request *req;
426         struct ost_body       *body;
427         struct lov_stripe_md  *lsm;
428         int                 rc;
429         ENTRY;
430
431         LASSERT(oa);
432         LASSERT(ea);
433
434         lsm = *ea;
435         if (!lsm) {
436                 rc = obd_alloc_memmd(exp, &lsm);
437                 if (rc < 0)
438                         RETURN(rc);
439         }
440
441         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
442         if (req == NULL)
443                 GOTO(out, rc = -ENOMEM);
444
445         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
446         if (rc) {
447                 ptlrpc_request_free(req);
448                 GOTO(out, rc);
449         }
450
451         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
452         LASSERT(body);
453         lustre_set_wire_obdo(&body->oa, oa);
454
455         ptlrpc_request_set_replen(req);
456
457         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
458             oa->o_flags == OBD_FL_DELORPHAN) {
459                 DEBUG_REQ(D_HA, req,
460                           "delorphan from OST integration");
461                 /* Don't resend the delorphan req */
462                 req->rq_no_resend = req->rq_no_delay = 1;
463         }
464
465         rc = ptlrpc_queue_wait(req);
466         if (rc)
467                 GOTO(out_req, rc);
468
469         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
470         if (body == NULL)
471                 GOTO(out_req, rc = -EPROTO);
472
473         lustre_get_wire_obdo(oa, &body->oa);
474
475         oa->o_blksize = cli_brw_size(exp->exp_obd);
476         oa->o_valid |= OBD_MD_FLBLKSZ;
477
478         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
479          * have valid lsm_oinfo data structs, so don't go touching that.
480          * This needs to be fixed in a big way.
481          */
482         lsm->lsm_oi = oa->o_oi;
483         *ea = lsm;
484
485         if (oti != NULL) {
486                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
487
488                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
489                         if (!oti->oti_logcookies)
490                                 oti_alloc_cookies(oti, 1);
491                         *oti->oti_logcookies = oa->o_lcookie;
492                 }
493         }
494
495         CDEBUG(D_HA, "transno: "LPD64"\n",
496                lustre_msg_get_transno(req->rq_repmsg));
497 out_req:
498         ptlrpc_req_finished(req);
499 out:
500         if (rc && !*ea)
501                 obd_free_memmd(exp, &lsm);
502         RETURN(rc);
503 }
504
505 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
506                    obd_enqueue_update_f upcall, void *cookie,
507                    struct ptlrpc_request_set *rqset)
508 {
509         struct ptlrpc_request   *req;
510         struct osc_setattr_args *sa;
511         struct ost_body  *body;
512         int                   rc;
513         ENTRY;
514
515         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
516         if (req == NULL)
517                 RETURN(-ENOMEM);
518
519         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
520         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
521         if (rc) {
522                 ptlrpc_request_free(req);
523                 RETURN(rc);
524         }
525         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
526         ptlrpc_at_set_req_timeout(req);
527
528         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
529         LASSERT(body);
530         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
531         osc_pack_capa(req, body, oinfo->oi_capa);
532
533         ptlrpc_request_set_replen(req);
534
535         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
536         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
537         sa = ptlrpc_req_async_args(req);
538         sa->sa_oa     = oinfo->oi_oa;
539         sa->sa_upcall = upcall;
540         sa->sa_cookie = cookie;
541         if (rqset == PTLRPCD_SET)
542                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
543         else
544                 ptlrpc_set_add_req(rqset, req);
545
546         RETURN(0);
547 }
548
549 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
550                      struct obd_info *oinfo, struct obd_trans_info *oti,
551                      struct ptlrpc_request_set *rqset)
552 {
553         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
554         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
555         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
556         return osc_punch_base(exp, oinfo,
557                               oinfo->oi_cb_up, oinfo, rqset);
558 }
559
560 static int osc_sync_interpret(const struct lu_env *env,
561                               struct ptlrpc_request *req,
562                               void *arg, int rc)
563 {
564         struct osc_fsync_args *fa = arg;
565         struct ost_body *body;
566         ENTRY;
567
568         if (rc)
569                 GOTO(out, rc);
570
571         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
572         if (body == NULL) {
573                 CERROR ("can't unpack ost_body\n");
574                 GOTO(out, rc = -EPROTO);
575         }
576
577         *fa->fa_oi->oi_oa = body->oa;
578 out:
579         rc = fa->fa_upcall(fa->fa_cookie, rc);
580         RETURN(rc);
581 }
582
583 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
584                   obd_enqueue_update_f upcall, void *cookie,
585                   struct ptlrpc_request_set *rqset)
586 {
587         struct ptlrpc_request *req;
588         struct ost_body       *body;
589         struct osc_fsync_args *fa;
590         int                 rc;
591         ENTRY;
592
593         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
594         if (req == NULL)
595                 RETURN(-ENOMEM);
596
597         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
598         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
599         if (rc) {
600                 ptlrpc_request_free(req);
601                 RETURN(rc);
602         }
603
604         /* overload the size and blocks fields in the oa with start/end */
605         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
606         LASSERT(body);
607         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
608         osc_pack_capa(req, body, oinfo->oi_capa);
609
610         ptlrpc_request_set_replen(req);
611         req->rq_interpret_reply = osc_sync_interpret;
612
613         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
614         fa = ptlrpc_req_async_args(req);
615         fa->fa_oi = oinfo;
616         fa->fa_upcall = upcall;
617         fa->fa_cookie = cookie;
618
619         if (rqset == PTLRPCD_SET)
620                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
621         else
622                 ptlrpc_set_add_req(rqset, req);
623
624         RETURN (0);
625 }
626
627 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
628                     struct obd_info *oinfo, obd_size start, obd_size end,
629                     struct ptlrpc_request_set *set)
630 {
631         ENTRY;
632
633         if (!oinfo->oi_oa) {
634                 CDEBUG(D_INFO, "oa NULL\n");
635                 RETURN(-EINVAL);
636         }
637
638         oinfo->oi_oa->o_size = start;
639         oinfo->oi_oa->o_blocks = end;
640         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
641
642         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
643 }
644
645 /* Find and cancel locally locks matched by @mode in the resource found by
646  * @objid. Found locks are added into @cancel list. Returns the amount of
647  * locks added to @cancels list. */
648 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
649                                    struct list_head *cancels,
650                                    ldlm_mode_t mode, int lock_flags)
651 {
652         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
653         struct ldlm_res_id res_id;
654         struct ldlm_resource *res;
655         int count;
656         ENTRY;
657
658         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
659          * export) but disabled through procfs (flag in NS).
660          *
661          * This distinguishes from a case when ELC is not supported originally,
662          * when we still want to cancel locks in advance and just cancel them
663          * locally, without sending any RPC. */
664         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
665                 RETURN(0);
666
667         ostid_build_res_name(&oa->o_oi, &res_id);
668         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
669         if (res == NULL)
670                 RETURN(0);
671
672         LDLM_RESOURCE_ADDREF(res);
673         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
674                                            lock_flags, 0, NULL);
675         LDLM_RESOURCE_DELREF(res);
676         ldlm_resource_putref(res);
677         RETURN(count);
678 }
679
680 static int osc_destroy_interpret(const struct lu_env *env,
681                                  struct ptlrpc_request *req, void *data,
682                                  int rc)
683 {
684         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
685
686         atomic_dec(&cli->cl_destroy_in_flight);
687         wake_up(&cli->cl_destroy_waitq);
688         return 0;
689 }
690
691 static int osc_can_send_destroy(struct client_obd *cli)
692 {
693         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
694             cli->cl_max_rpcs_in_flight) {
695                 /* The destroy request can be sent */
696                 return 1;
697         }
698         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
699             cli->cl_max_rpcs_in_flight) {
700                 /*
701                  * The counter has been modified between the two atomic
702                  * operations.
703                  */
704                 wake_up(&cli->cl_destroy_waitq);
705         }
706         return 0;
707 }
708
709 int osc_create(const struct lu_env *env, struct obd_export *exp,
710                struct obdo *oa, struct lov_stripe_md **ea,
711                struct obd_trans_info *oti)
712 {
713         int rc = 0;
714         ENTRY;
715
716         LASSERT(oa);
717         LASSERT(ea);
718         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
719
720         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
721             oa->o_flags == OBD_FL_RECREATE_OBJS) {
722                 RETURN(osc_real_create(exp, oa, ea, oti));
723         }
724
725         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
726                 RETURN(osc_real_create(exp, oa, ea, oti));
727
728         /* we should not get here anymore */
729         LBUG();
730
731         RETURN(rc);
732 }
733
734 /* Destroy requests can be async always on the client, and we don't even really
735  * care about the return code since the client cannot do anything at all about
736  * a destroy failure.
737  * When the MDS is unlinking a filename, it saves the file objects into a
738  * recovery llog, and these object records are cancelled when the OST reports
739  * they were destroyed and sync'd to disk (i.e. transaction committed).
740  * If the client dies, or the OST is down when the object should be destroyed,
741  * the records are not cancelled, and when the OST reconnects to the MDS next,
742  * it will retrieve the llog unlink logs and then sends the log cancellation
743  * cookies to the MDS after committing destroy transactions. */
744 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
745                        struct obdo *oa, struct lov_stripe_md *ea,
746                        struct obd_trans_info *oti, struct obd_export *md_export,
747                        void *capa)
748 {
749         struct client_obd     *cli = &exp->exp_obd->u.cli;
750         struct ptlrpc_request *req;
751         struct ost_body       *body;
752         LIST_HEAD(cancels);
753         int rc, count;
754         ENTRY;
755
756         if (!oa) {
757                 CDEBUG(D_INFO, "oa NULL\n");
758                 RETURN(-EINVAL);
759         }
760
761         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
762                                         LDLM_FL_DISCARD_DATA);
763
764         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
765         if (req == NULL) {
766                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
767                 RETURN(-ENOMEM);
768         }
769
770         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
771         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
772                                0, &cancels, count);
773         if (rc) {
774                 ptlrpc_request_free(req);
775                 RETURN(rc);
776         }
777
778         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
779         ptlrpc_at_set_req_timeout(req);
780
781         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
782                 oa->o_lcookie = *oti->oti_logcookies;
783         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
784         LASSERT(body);
785         lustre_set_wire_obdo(&body->oa, oa);
786
787         osc_pack_capa(req, body, (struct obd_capa *)capa);
788         ptlrpc_request_set_replen(req);
789
790         /* If osc_destory is for destroying the unlink orphan,
791          * sent from MDT to OST, which should not be blocked here,
792          * because the process might be triggered by ptlrpcd, and
793          * it is not good to block ptlrpcd thread (b=16006)*/
794         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
795                 req->rq_interpret_reply = osc_destroy_interpret;
796                 if (!osc_can_send_destroy(cli)) {
797                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
798                                                           NULL);
799
800                         /*
801                          * Wait until the number of on-going destroy RPCs drops
802                          * under max_rpc_in_flight
803                          */
804                         l_wait_event_exclusive(cli->cl_destroy_waitq,
805                                                osc_can_send_destroy(cli), &lwi);
806                 }
807         }
808
809         /* Do not wait for response */
810         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
811         RETURN(0);
812 }
813
814 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
815                                 long writing_bytes)
816 {
817         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
818
819         LASSERT(!(oa->o_valid & bits));
820
821         oa->o_valid |= bits;
822         client_obd_list_lock(&cli->cl_loi_list_lock);
823         oa->o_dirty = cli->cl_dirty;
824         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
825                      cli->cl_dirty_max)) {
826                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
827                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
828                 oa->o_undirty = 0;
829         } else if (unlikely(atomic_read(&obd_dirty_pages) -
830                             atomic_read(&obd_dirty_transit_pages) >
831                             (long)(obd_max_dirty_pages + 1))) {
832                 /* The atomic_read() allowing the atomic_inc() are
833                  * not covered by a lock thus they may safely race and trip
834                  * this CERROR() unless we add in a small fudge factor (+1). */
835                 CERROR("dirty %d - %d > system dirty_max %d\n",
836                        atomic_read(&obd_dirty_pages),
837                        atomic_read(&obd_dirty_transit_pages),
838                        obd_max_dirty_pages);
839                 oa->o_undirty = 0;
840         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
841                 CERROR("dirty %lu - dirty_max %lu too big???\n",
842                        cli->cl_dirty, cli->cl_dirty_max);
843                 oa->o_undirty = 0;
844         } else {
845                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
846                                       PAGE_CACHE_SHIFT)*
847                                      (cli->cl_max_rpcs_in_flight + 1);
848                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
849         }
850         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
851         oa->o_dropped = cli->cl_lost_grant;
852         cli->cl_lost_grant = 0;
853         client_obd_list_unlock(&cli->cl_loi_list_lock);
854         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
855                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
856
857 }
858
859 void osc_update_next_shrink(struct client_obd *cli)
860 {
861         cli->cl_next_shrink_grant =
862                 cfs_time_shift(cli->cl_grant_shrink_interval);
863         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
864                cli->cl_next_shrink_grant);
865 }
866
867 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
868 {
869         client_obd_list_lock(&cli->cl_loi_list_lock);
870         cli->cl_avail_grant += grant;
871         client_obd_list_unlock(&cli->cl_loi_list_lock);
872 }
873
874 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
875 {
876         if (body->oa.o_valid & OBD_MD_FLGRANT) {
877                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
878                 __osc_update_grant(cli, body->oa.o_grant);
879         }
880 }
881
882 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
883                               obd_count keylen, void *key, obd_count vallen,
884                               void *val, struct ptlrpc_request_set *set);
885
886 static int osc_shrink_grant_interpret(const struct lu_env *env,
887                                       struct ptlrpc_request *req,
888                                       void *aa, int rc)
889 {
890         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
891         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
892         struct ost_body *body;
893
894         if (rc != 0) {
895                 __osc_update_grant(cli, oa->o_grant);
896                 GOTO(out, rc);
897         }
898
899         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
900         LASSERT(body);
901         osc_update_grant(cli, body);
902 out:
903         OBDO_FREE(oa);
904         return rc;
905 }
906
907 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
908 {
909         client_obd_list_lock(&cli->cl_loi_list_lock);
910         oa->o_grant = cli->cl_avail_grant / 4;
911         cli->cl_avail_grant -= oa->o_grant;
912         client_obd_list_unlock(&cli->cl_loi_list_lock);
913         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
914                 oa->o_valid |= OBD_MD_FLFLAGS;
915                 oa->o_flags = 0;
916         }
917         oa->o_flags |= OBD_FL_SHRINK_GRANT;
918         osc_update_next_shrink(cli);
919 }
920
921 /* Shrink the current grant, either from some large amount to enough for a
922  * full set of in-flight RPCs, or if we have already shrunk to that limit
923  * then to enough for a single RPC.  This avoids keeping more grant than
924  * needed, and avoids shrinking the grant piecemeal. */
925 static int osc_shrink_grant(struct client_obd *cli)
926 {
927         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
928                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
929
930         client_obd_list_lock(&cli->cl_loi_list_lock);
931         if (cli->cl_avail_grant <= target_bytes)
932                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
933         client_obd_list_unlock(&cli->cl_loi_list_lock);
934
935         return osc_shrink_grant_to_target(cli, target_bytes);
936 }
937
938 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
939 {
940         int                     rc = 0;
941         struct ost_body *body;
942         ENTRY;
943
944         client_obd_list_lock(&cli->cl_loi_list_lock);
945         /* Don't shrink if we are already above or below the desired limit
946          * We don't want to shrink below a single RPC, as that will negatively
947          * impact block allocation and long-term performance. */
948         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
949                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
950
951         if (target_bytes >= cli->cl_avail_grant) {
952                 client_obd_list_unlock(&cli->cl_loi_list_lock);
953                 RETURN(0);
954         }
955         client_obd_list_unlock(&cli->cl_loi_list_lock);
956
957         OBD_ALLOC_PTR(body);
958         if (!body)
959                 RETURN(-ENOMEM);
960
961         osc_announce_cached(cli, &body->oa, 0);
962
963         client_obd_list_lock(&cli->cl_loi_list_lock);
964         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
965         cli->cl_avail_grant = target_bytes;
966         client_obd_list_unlock(&cli->cl_loi_list_lock);
967         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
968                 body->oa.o_valid |= OBD_MD_FLFLAGS;
969                 body->oa.o_flags = 0;
970         }
971         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
972         osc_update_next_shrink(cli);
973
974         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
975                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
976                                 sizeof(*body), body, NULL);
977         if (rc != 0)
978                 __osc_update_grant(cli, body->oa.o_grant);
979         OBD_FREE_PTR(body);
980         RETURN(rc);
981 }
982
983 static int osc_should_shrink_grant(struct client_obd *client)
984 {
985         cfs_time_t time = cfs_time_current();
986         cfs_time_t next_shrink = client->cl_next_shrink_grant;
987
988         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
989              OBD_CONNECT_GRANT_SHRINK) == 0)
990                 return 0;
991
992         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
993                 /* Get the current RPC size directly, instead of going via:
994                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
995                  * Keep comment here so that it can be found by searching. */
996                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
997
998                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
999                     client->cl_avail_grant > brw_size)
1000                         return 1;
1001                 else
1002                         osc_update_next_shrink(client);
1003         }
1004         return 0;
1005 }
1006
1007 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1008 {
1009         struct client_obd *client;
1010
1011         list_for_each_entry(client, &item->ti_obd_list,
1012                                 cl_grant_shrink_list) {
1013                 if (osc_should_shrink_grant(client))
1014                         osc_shrink_grant(client);
1015         }
1016         return 0;
1017 }
1018
1019 static int osc_add_shrink_grant(struct client_obd *client)
1020 {
1021         int rc;
1022
1023         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1024                                        TIMEOUT_GRANT,
1025                                        osc_grant_shrink_grant_cb, NULL,
1026                                        &client->cl_grant_shrink_list);
1027         if (rc) {
1028                 CERROR("add grant client %s error %d\n",
1029                         client->cl_import->imp_obd->obd_name, rc);
1030                 return rc;
1031         }
1032         CDEBUG(D_CACHE, "add grant client %s \n",
1033                client->cl_import->imp_obd->obd_name);
1034         osc_update_next_shrink(client);
1035         return 0;
1036 }
1037
1038 static int osc_del_shrink_grant(struct client_obd *client)
1039 {
1040         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1041                                          TIMEOUT_GRANT);
1042 }
1043
1044 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1045 {
1046         /*
1047          * ocd_grant is the total grant amount we're expect to hold: if we've
1048          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1049          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1050          *
1051          * race is tolerable here: if we're evicted, but imp_state already
1052          * left EVICTED state, then cl_dirty must be 0 already.
1053          */
1054         client_obd_list_lock(&cli->cl_loi_list_lock);
1055         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1056                 cli->cl_avail_grant = ocd->ocd_grant;
1057         else
1058                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1059
1060         if (cli->cl_avail_grant < 0) {
1061                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1062                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1063                       ocd->ocd_grant, cli->cl_dirty);
1064                 /* workaround for servers which do not have the patch from
1065                  * LU-2679 */
1066                 cli->cl_avail_grant = ocd->ocd_grant;
1067         }
1068
1069         /* determine the appropriate chunk size used by osc_extent. */
1070         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1071         client_obd_list_unlock(&cli->cl_loi_list_lock);
1072
1073         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1074                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1075                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1076
1077         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1078             list_empty(&cli->cl_grant_shrink_list))
1079                 osc_add_shrink_grant(cli);
1080 }
1081
1082 /* We assume that the reason this OSC got a short read is because it read
1083  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1084  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1085  * this stripe never got written at or beyond this stripe offset yet. */
1086 static void handle_short_read(int nob_read, obd_count page_count,
1087                               struct brw_page **pga)
1088 {
1089         char *ptr;
1090         int i = 0;
1091
1092         /* skip bytes read OK */
1093         while (nob_read > 0) {
1094                 LASSERT (page_count > 0);
1095
1096                 if (pga[i]->count > nob_read) {
1097                         /* EOF inside this page */
1098                         ptr = kmap(pga[i]->pg) +
1099                                 (pga[i]->off & ~CFS_PAGE_MASK);
1100                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1101                         kunmap(pga[i]->pg);
1102                         page_count--;
1103                         i++;
1104                         break;
1105                 }
1106
1107                 nob_read -= pga[i]->count;
1108                 page_count--;
1109                 i++;
1110         }
1111
1112         /* zero remaining pages */
1113         while (page_count-- > 0) {
1114                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1115                 memset(ptr, 0, pga[i]->count);
1116                 kunmap(pga[i]->pg);
1117                 i++;
1118         }
1119 }
1120
1121 static int check_write_rcs(struct ptlrpc_request *req,
1122                            int requested_nob, int niocount,
1123                            obd_count page_count, struct brw_page **pga)
1124 {
1125         int     i;
1126         __u32   *remote_rcs;
1127
1128         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1129                                                   sizeof(*remote_rcs) *
1130                                                   niocount);
1131         if (remote_rcs == NULL) {
1132                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1133                 return(-EPROTO);
1134         }
1135
1136         /* return error if any niobuf was in error */
1137         for (i = 0; i < niocount; i++) {
1138                 if ((int)remote_rcs[i] < 0)
1139                         return(remote_rcs[i]);
1140
1141                 if (remote_rcs[i] != 0) {
1142                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1143                                 i, remote_rcs[i], req);
1144                         return(-EPROTO);
1145                 }
1146         }
1147
1148         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1149                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1150                        req->rq_bulk->bd_nob_transferred, requested_nob);
1151                 return(-EPROTO);
1152         }
1153
1154         return (0);
1155 }
1156
1157 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1158 {
1159         if (p1->flag != p2->flag) {
1160                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1161                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1162
1163                 /* warn if we try to combine flags that we don't know to be
1164                  * safe to combine */
1165                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1166                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1167                               "report this at http://bugs.whamcloud.com/\n",
1168                               p1->flag, p2->flag);
1169                 }
1170                 return 0;
1171         }
1172
1173         return (p1->off + p1->count == p2->off);
1174 }
1175
1176 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1177                                    struct brw_page **pga, int opc,
1178                                    cksum_type_t cksum_type)
1179 {
1180         __u32                           cksum;
1181         int                             i = 0;
1182         struct cfs_crypto_hash_desc     *hdesc;
1183         unsigned int                    bufsize;
1184         int                             err;
1185         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1186
1187         LASSERT(pg_count > 0);
1188
1189         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1190         if (IS_ERR(hdesc)) {
1191                 CERROR("Unable to initialize checksum hash %s\n",
1192                        cfs_crypto_hash_name(cfs_alg));
1193                 return PTR_ERR(hdesc);
1194         }
1195
1196         while (nob > 0 && pg_count > 0) {
1197                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1198
1199                 /* corrupt the data before we compute the checksum, to
1200                  * simulate an OST->client data error */
1201                 if (i == 0 && opc == OST_READ &&
1202                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1203                         unsigned char *ptr = kmap(pga[i]->pg);
1204                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1205                         memcpy(ptr + off, "bad1", min(4, nob));
1206                         kunmap(pga[i]->pg);
1207                 }
1208                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1209                                   pga[i]->off & ~CFS_PAGE_MASK,
1210                                   count);
1211                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1212                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1213
1214                 nob -= pga[i]->count;
1215                 pg_count--;
1216                 i++;
1217         }
1218
1219         bufsize = 4;
1220         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1221
1222         if (err)
1223                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1224
1225         /* For sending we only compute the wrong checksum instead
1226          * of corrupting the data so it is still correct on a redo */
1227         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1228                 cksum++;
1229
1230         return cksum;
1231 }
1232
1233 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1234                                 struct lov_stripe_md *lsm, obd_count page_count,
1235                                 struct brw_page **pga,
1236                                 struct ptlrpc_request **reqp,
1237                                 struct obd_capa *ocapa, int reserve,
1238                                 int resend)
1239 {
1240         struct ptlrpc_request   *req;
1241         struct ptlrpc_bulk_desc *desc;
1242         struct ost_body  *body;
1243         struct obd_ioobj        *ioobj;
1244         struct niobuf_remote    *niobuf;
1245         int niocount, i, requested_nob, opc, rc;
1246         struct osc_brw_async_args *aa;
1247         struct req_capsule      *pill;
1248         struct brw_page *pg_prev;
1249
1250         ENTRY;
1251         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1252                 RETURN(-ENOMEM); /* Recoverable */
1253         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1254                 RETURN(-EINVAL); /* Fatal */
1255
1256         if ((cmd & OBD_BRW_WRITE) != 0) {
1257                 opc = OST_WRITE;
1258                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1259                                                 cli->cl_import->imp_rq_pool,
1260                                                 &RQF_OST_BRW_WRITE);
1261         } else {
1262                 opc = OST_READ;
1263                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1264         }
1265         if (req == NULL)
1266                 RETURN(-ENOMEM);
1267
1268         for (niocount = i = 1; i < page_count; i++) {
1269                 if (!can_merge_pages(pga[i - 1], pga[i]))
1270                         niocount++;
1271         }
1272
1273         pill = &req->rq_pill;
1274         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1275                              sizeof(*ioobj));
1276         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1277                              niocount * sizeof(*niobuf));
1278         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1279
1280         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1281         if (rc) {
1282                 ptlrpc_request_free(req);
1283                 RETURN(rc);
1284         }
1285         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1286         ptlrpc_at_set_req_timeout(req);
1287         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1288          * retry logic */
1289         req->rq_no_retry_einprogress = 1;
1290
1291         desc = ptlrpc_prep_bulk_imp(req, page_count,
1292                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1293                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1294                 OST_BULK_PORTAL);
1295
1296         if (desc == NULL)
1297                 GOTO(out, rc = -ENOMEM);
1298         /* NB request now owns desc and will free it when it gets freed */
1299
1300         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1301         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1302         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1303         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1304
1305         lustre_set_wire_obdo(&body->oa, oa);
1306
1307         obdo_to_ioobj(oa, ioobj);
1308         ioobj->ioo_bufcnt = niocount;
1309         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1310          * that might be send for this request.  The actual number is decided
1311          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1312          * "max - 1" for old client compatibility sending "0", and also so the
1313          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1314         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1315         osc_pack_capa(req, body, ocapa);
1316         LASSERT(page_count > 0);
1317         pg_prev = pga[0];
1318         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1319                 struct brw_page *pg = pga[i];
1320                 int poff = pg->off & ~CFS_PAGE_MASK;
1321
1322                 LASSERT(pg->count > 0);
1323                 /* make sure there is no gap in the middle of page array */
1324                 LASSERTF(page_count == 1 ||
1325                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1326                           ergo(i > 0 && i < page_count - 1,
1327                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1328                           ergo(i == page_count - 1, poff == 0)),
1329                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1330                          i, page_count, pg, pg->off, pg->count);
1331                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1332                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1333                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1334                          i, page_count,
1335                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1336                          pg_prev->pg, page_private(pg_prev->pg),
1337                          pg_prev->pg->index, pg_prev->off);
1338                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1339                         (pg->flag & OBD_BRW_SRVLOCK));
1340
1341                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1342                 requested_nob += pg->count;
1343
1344                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1345                         niobuf--;
1346                         niobuf->len += pg->count;
1347                 } else {
1348                         niobuf->offset = pg->off;
1349                         niobuf->len    = pg->count;
1350                         niobuf->flags  = pg->flag;
1351                 }
1352                 pg_prev = pg;
1353         }
1354
1355         LASSERTF((void *)(niobuf - niocount) ==
1356                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1357                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1358                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1359
1360         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1361         if (resend) {
1362                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1363                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1364                         body->oa.o_flags = 0;
1365                 }
1366                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1367         }
1368
1369         if (osc_should_shrink_grant(cli))
1370                 osc_shrink_grant_local(cli, &body->oa);
1371
1372         /* size[REQ_REC_OFF] still sizeof (*body) */
1373         if (opc == OST_WRITE) {
1374                 if (cli->cl_checksum &&
1375                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1376                         /* store cl_cksum_type in a local variable since
1377                          * it can be changed via lprocfs */
1378                         cksum_type_t cksum_type = cli->cl_cksum_type;
1379
1380                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1381                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1382                                 body->oa.o_flags = 0;
1383                         }
1384                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1385                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1386                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1387                                                              page_count, pga,
1388                                                              OST_WRITE,
1389                                                              cksum_type);
1390                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1391                                body->oa.o_cksum);
1392                         /* save this in 'oa', too, for later checking */
1393                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1394                         oa->o_flags |= cksum_type_pack(cksum_type);
1395                 } else {
1396                         /* clear out the checksum flag, in case this is a
1397                          * resend but cl_checksum is no longer set. b=11238 */
1398                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1399                 }
1400                 oa->o_cksum = body->oa.o_cksum;
1401                 /* 1 RC per niobuf */
1402                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1403                                      sizeof(__u32) * niocount);
1404         } else {
1405                 if (cli->cl_checksum &&
1406                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1407                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1408                                 body->oa.o_flags = 0;
1409                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1410                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1411                 }
1412         }
1413         ptlrpc_request_set_replen(req);
1414
1415         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1416         aa = ptlrpc_req_async_args(req);
1417         aa->aa_oa = oa;
1418         aa->aa_requested_nob = requested_nob;
1419         aa->aa_nio_count = niocount;
1420         aa->aa_page_count = page_count;
1421         aa->aa_resends = 0;
1422         aa->aa_ppga = pga;
1423         aa->aa_cli = cli;
1424         INIT_LIST_HEAD(&aa->aa_oaps);
1425         if (ocapa && reserve)
1426                 aa->aa_ocapa = capa_get(ocapa);
1427
1428         *reqp = req;
1429         RETURN(0);
1430
1431  out:
1432         ptlrpc_req_finished(req);
1433         RETURN(rc);
1434 }
1435
1436 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1437                                 __u32 client_cksum, __u32 server_cksum, int nob,
1438                                 obd_count page_count, struct brw_page **pga,
1439                                 cksum_type_t client_cksum_type)
1440 {
1441         __u32 new_cksum;
1442         char *msg;
1443         cksum_type_t cksum_type;
1444
1445         if (server_cksum == client_cksum) {
1446                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1447                 return 0;
1448         }
1449
1450         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1451                                        oa->o_flags : 0);
1452         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1453                                       cksum_type);
1454
1455         if (cksum_type != client_cksum_type)
1456                 msg = "the server did not use the checksum type specified in "
1457                       "the original request - likely a protocol problem";
1458         else if (new_cksum == server_cksum)
1459                 msg = "changed on the client after we checksummed it - "
1460                       "likely false positive due to mmap IO (bug 11742)";
1461         else if (new_cksum == client_cksum)
1462                 msg = "changed in transit before arrival at OST";
1463         else
1464                 msg = "changed in transit AND doesn't match the original - "
1465                       "likely false positive due to mmap IO (bug 11742)";
1466
1467         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1468                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1469                            msg, libcfs_nid2str(peer->nid),
1470                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1471                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1472                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1473                            POSTID(&oa->o_oi), pga[0]->off,
1474                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1475         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1476                "client csum now %x\n", client_cksum, client_cksum_type,
1477                server_cksum, cksum_type, new_cksum);
1478         return 1;
1479 }
1480
1481 /* Note rc enters this function as number of bytes transferred */
1482 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1483 {
1484         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1485         const lnet_process_id_t *peer =
1486                         &req->rq_import->imp_connection->c_peer;
1487         struct client_obd *cli = aa->aa_cli;
1488         struct ost_body *body;
1489         __u32 client_cksum = 0;
1490         ENTRY;
1491
1492         if (rc < 0 && rc != -EDQUOT) {
1493                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1494                 RETURN(rc);
1495         }
1496
1497         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1498         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1499         if (body == NULL) {
1500                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1501                 RETURN(-EPROTO);
1502         }
1503
1504         /* set/clear over quota flag for a uid/gid */
1505         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1506             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1507                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1508
1509                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1510                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1511                        body->oa.o_flags);
1512                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1513         }
1514
1515         osc_update_grant(cli, body);
1516
1517         if (rc < 0)
1518                 RETURN(rc);
1519
1520         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1521                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1522
1523         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1524                 if (rc > 0) {
1525                         CERROR("Unexpected +ve rc %d\n", rc);
1526                         RETURN(-EPROTO);
1527                 }
1528                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1529
1530                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1531                         RETURN(-EAGAIN);
1532
1533                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1534                     check_write_checksum(&body->oa, peer, client_cksum,
1535                                          body->oa.o_cksum, aa->aa_requested_nob,
1536                                          aa->aa_page_count, aa->aa_ppga,
1537                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1538                         RETURN(-EAGAIN);
1539
1540                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1541                                      aa->aa_page_count, aa->aa_ppga);
1542                 GOTO(out, rc);
1543         }
1544
1545         /* The rest of this function executes only for OST_READs */
1546
1547         /* if unwrap_bulk failed, return -EAGAIN to retry */
1548         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1549         if (rc < 0)
1550                 GOTO(out, rc = -EAGAIN);
1551
1552         if (rc > aa->aa_requested_nob) {
1553                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1554                        aa->aa_requested_nob);
1555                 RETURN(-EPROTO);
1556         }
1557
1558         if (rc != req->rq_bulk->bd_nob_transferred) {
1559                 CERROR ("Unexpected rc %d (%d transferred)\n",
1560                         rc, req->rq_bulk->bd_nob_transferred);
1561                 return (-EPROTO);
1562         }
1563
1564         if (rc < aa->aa_requested_nob)
1565                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1566
1567         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1568                 static int cksum_counter;
1569                 __u32      server_cksum = body->oa.o_cksum;
1570                 char      *via;
1571                 char      *router;
1572                 cksum_type_t cksum_type;
1573
1574                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1575                                                body->oa.o_flags : 0);
1576                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1577                                                  aa->aa_ppga, OST_READ,
1578                                                  cksum_type);
1579
1580                 if (peer->nid == req->rq_bulk->bd_sender) {
1581                         via = router = "";
1582                 } else {
1583                         via = " via ";
1584                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1585                 }
1586
1587                 if (server_cksum == ~0 && rc > 0) {
1588                         CERROR("Protocol error: server %s set the 'checksum' "
1589                                "bit, but didn't send a checksum.  Not fatal, "
1590                                "but please notify on http://bugs.whamcloud.com/\n",
1591                                libcfs_nid2str(peer->nid));
1592                 } else if (server_cksum != client_cksum) {
1593                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1594                                            "%s%s%s inode "DFID" object "DOSTID
1595                                            " extent ["LPU64"-"LPU64"]\n",
1596                                            req->rq_import->imp_obd->obd_name,
1597                                            libcfs_nid2str(peer->nid),
1598                                            via, router,
1599                                            body->oa.o_valid & OBD_MD_FLFID ?
1600                                                 body->oa.o_parent_seq : (__u64)0,
1601                                            body->oa.o_valid & OBD_MD_FLFID ?
1602                                                 body->oa.o_parent_oid : 0,
1603                                            body->oa.o_valid & OBD_MD_FLFID ?
1604                                                 body->oa.o_parent_ver : 0,
1605                                            POSTID(&body->oa.o_oi),
1606                                            aa->aa_ppga[0]->off,
1607                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1608                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1609                                                                         1);
1610                         CERROR("client %x, server %x, cksum_type %x\n",
1611                                client_cksum, server_cksum, cksum_type);
1612                         cksum_counter = 0;
1613                         aa->aa_oa->o_cksum = client_cksum;
1614                         rc = -EAGAIN;
1615                 } else {
1616                         cksum_counter++;
1617                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1618                         rc = 0;
1619                 }
1620         } else if (unlikely(client_cksum)) {
1621                 static int cksum_missed;
1622
1623                 cksum_missed++;
1624                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1625                         CERROR("Checksum %u requested from %s but not sent\n",
1626                                cksum_missed, libcfs_nid2str(peer->nid));
1627         } else {
1628                 rc = 0;
1629         }
1630 out:
1631         if (rc >= 0)
1632                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1633
1634         RETURN(rc);
1635 }
1636
1637 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1638                             struct lov_stripe_md *lsm,
1639                             obd_count page_count, struct brw_page **pga,
1640                             struct obd_capa *ocapa)
1641 {
1642         struct ptlrpc_request *req;
1643         int                 rc;
1644         wait_queue_head_t           waitq;
1645         int                 generation, resends = 0;
1646         struct l_wait_info     lwi;
1647
1648         ENTRY;
1649
1650         init_waitqueue_head(&waitq);
1651         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1652
1653 restart_bulk:
1654         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1655                                   page_count, pga, &req, ocapa, 0, resends);
1656         if (rc != 0)
1657                 return (rc);
1658
1659         if (resends) {
1660                 req->rq_generation_set = 1;
1661                 req->rq_import_generation = generation;
1662                 req->rq_sent = cfs_time_current_sec() + resends;
1663         }
1664
1665         rc = ptlrpc_queue_wait(req);
1666
1667         if (rc == -ETIMEDOUT && req->rq_resend) {
1668                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1669                 ptlrpc_req_finished(req);
1670                 goto restart_bulk;
1671         }
1672
1673         rc = osc_brw_fini_request(req, rc);
1674
1675         ptlrpc_req_finished(req);
1676         /* When server return -EINPROGRESS, client should always retry
1677          * regardless of the number of times the bulk was resent already.*/
1678         if (osc_recoverable_error(rc)) {
1679                 resends++;
1680                 if (rc != -EINPROGRESS &&
1681                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1682                         CERROR("%s: too many resend retries for object: "
1683                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1684                                POSTID(&oa->o_oi), rc);
1685                         goto out;
1686                 }
1687                 if (generation !=
1688                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1689                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1690                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1691                                POSTID(&oa->o_oi), rc);
1692                         goto out;
1693                 }
1694
1695                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1696                                        NULL);
1697                 l_wait_event(waitq, 0, &lwi);
1698
1699                 goto restart_bulk;
1700         }
1701 out:
1702         if (rc == -EAGAIN || rc == -EINPROGRESS)
1703                 rc = -EIO;
1704         RETURN (rc);
1705 }
1706
1707 static int osc_brw_redo_request(struct ptlrpc_request *request,
1708                                 struct osc_brw_async_args *aa, int rc)
1709 {
1710         struct ptlrpc_request *new_req;
1711         struct osc_brw_async_args *new_aa;
1712         struct osc_async_page *oap;
1713         ENTRY;
1714
1715         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1716                   "redo for recoverable error %d", rc);
1717
1718         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1719                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1720                                   aa->aa_cli, aa->aa_oa,
1721                                   NULL /* lsm unused by osc currently */,
1722                                   aa->aa_page_count, aa->aa_ppga,
1723                                   &new_req, aa->aa_ocapa, 0, 1);
1724         if (rc)
1725                 RETURN(rc);
1726
1727         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1728                 if (oap->oap_request != NULL) {
1729                         LASSERTF(request == oap->oap_request,
1730                                  "request %p != oap_request %p\n",
1731                                  request, oap->oap_request);
1732                         if (oap->oap_interrupted) {
1733                                 ptlrpc_req_finished(new_req);
1734                                 RETURN(-EINTR);
1735                         }
1736                 }
1737         }
1738         /* New request takes over pga and oaps from old request.
1739          * Note that copying a list_head doesn't work, need to move it... */
1740         aa->aa_resends++;
1741         new_req->rq_interpret_reply = request->rq_interpret_reply;
1742         new_req->rq_async_args = request->rq_async_args;
1743         /* cap resend delay to the current request timeout, this is similar to
1744          * what ptlrpc does (see after_reply()) */
1745         if (aa->aa_resends > new_req->rq_timeout)
1746                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1747         else
1748                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1749         new_req->rq_generation_set = 1;
1750         new_req->rq_import_generation = request->rq_import_generation;
1751
1752         new_aa = ptlrpc_req_async_args(new_req);
1753
1754         INIT_LIST_HEAD(&new_aa->aa_oaps);
1755         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1756         INIT_LIST_HEAD(&new_aa->aa_exts);
1757         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1758         new_aa->aa_resends = aa->aa_resends;
1759
1760         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1761                 if (oap->oap_request) {
1762                         ptlrpc_req_finished(oap->oap_request);
1763                         oap->oap_request = ptlrpc_request_addref(new_req);
1764                 }
1765         }
1766
1767         new_aa->aa_ocapa = aa->aa_ocapa;
1768         aa->aa_ocapa = NULL;
1769
1770         /* XXX: This code will run into problem if we're going to support
1771          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1772          * and wait for all of them to be finished. We should inherit request
1773          * set from old request. */
1774         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1775
1776         DEBUG_REQ(D_INFO, new_req, "new request");
1777         RETURN(0);
1778 }
1779
1780 /*
1781  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1782  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1783  * fine for our small page arrays and doesn't require allocation.  its an
1784  * insertion sort that swaps elements that are strides apart, shrinking the
1785  * stride down until its '1' and the array is sorted.
1786  */
1787 static void sort_brw_pages(struct brw_page **array, int num)
1788 {
1789         int stride, i, j;
1790         struct brw_page *tmp;
1791
1792         if (num == 1)
1793                 return;
1794         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1795                 ;
1796
1797         do {
1798                 stride /= 3;
1799                 for (i = stride ; i < num ; i++) {
1800                         tmp = array[i];
1801                         j = i;
1802                         while (j >= stride && array[j - stride]->off > tmp->off) {
1803                                 array[j] = array[j - stride];
1804                                 j -= stride;
1805                         }
1806                         array[j] = tmp;
1807                 }
1808         } while (stride > 1);
1809 }
1810
1811 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1812 {
1813         int count = 1;
1814         int offset;
1815         int i = 0;
1816
1817         LASSERT (pages > 0);
1818         offset = pg[i]->off & ~CFS_PAGE_MASK;
1819
1820         for (;;) {
1821                 pages--;
1822                 if (pages == 0)  /* that's all */
1823                         return count;
1824
1825                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1826                         return count;   /* doesn't end on page boundary */
1827
1828                 i++;
1829                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1830                 if (offset != 0)        /* doesn't start on page boundary */
1831                         return count;
1832
1833                 count++;
1834         }
1835 }
1836
1837 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1838 {
1839         struct brw_page **ppga;
1840         int i;
1841
1842         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1843         if (ppga == NULL)
1844                 return NULL;
1845
1846         for (i = 0; i < count; i++)
1847                 ppga[i] = pga + i;
1848         return ppga;
1849 }
1850
1851 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1852 {
1853         LASSERT(ppga != NULL);
1854         OBD_FREE(ppga, sizeof(*ppga) * count);
1855 }
1856
1857 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1858                    obd_count page_count, struct brw_page *pga,
1859                    struct obd_trans_info *oti)
1860 {
1861         struct obdo *saved_oa = NULL;
1862         struct brw_page **ppga, **orig;
1863         struct obd_import *imp = class_exp2cliimp(exp);
1864         struct client_obd *cli;
1865         int rc, page_count_orig;
1866         ENTRY;
1867
1868         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1869         cli = &imp->imp_obd->u.cli;
1870
1871         if (cmd & OBD_BRW_CHECK) {
1872                 /* The caller just wants to know if there's a chance that this
1873                  * I/O can succeed */
1874
1875                 if (imp->imp_invalid)
1876                         RETURN(-EIO);
1877                 RETURN(0);
1878         }
1879
1880         /* test_brw with a failed create can trip this, maybe others. */
1881         LASSERT(cli->cl_max_pages_per_rpc);
1882
1883         rc = 0;
1884
1885         orig = ppga = osc_build_ppga(pga, page_count);
1886         if (ppga == NULL)
1887                 RETURN(-ENOMEM);
1888         page_count_orig = page_count;
1889
1890         sort_brw_pages(ppga, page_count);
1891         while (page_count) {
1892                 obd_count pages_per_brw;
1893
1894                 if (page_count > cli->cl_max_pages_per_rpc)
1895                         pages_per_brw = cli->cl_max_pages_per_rpc;
1896                 else
1897                         pages_per_brw = page_count;
1898
1899                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1900
1901                 if (saved_oa != NULL) {
1902                         /* restore previously saved oa */
1903                         *oinfo->oi_oa = *saved_oa;
1904                 } else if (page_count > pages_per_brw) {
1905                         /* save a copy of oa (brw will clobber it) */
1906                         OBDO_ALLOC(saved_oa);
1907                         if (saved_oa == NULL)
1908                                 GOTO(out, rc = -ENOMEM);
1909                         *saved_oa = *oinfo->oi_oa;
1910                 }
1911
1912                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1913                                       pages_per_brw, ppga, oinfo->oi_capa);
1914
1915                 if (rc != 0)
1916                         break;
1917
1918                 page_count -= pages_per_brw;
1919                 ppga += pages_per_brw;
1920         }
1921
1922 out:
1923         osc_release_ppga(orig, page_count_orig);
1924
1925         if (saved_oa != NULL)
1926                 OBDO_FREE(saved_oa);
1927
1928         RETURN(rc);
1929 }
1930
1931 static int brw_interpret(const struct lu_env *env,
1932                          struct ptlrpc_request *req, void *data, int rc)
1933 {
1934         struct osc_brw_async_args *aa = data;
1935         struct osc_extent *ext;
1936         struct osc_extent *tmp;
1937         struct cl_object  *obj = NULL;
1938         struct client_obd *cli = aa->aa_cli;
1939         ENTRY;
1940
1941         rc = osc_brw_fini_request(req, rc);
1942         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1943         /* When server return -EINPROGRESS, client should always retry
1944          * regardless of the number of times the bulk was resent already. */
1945         if (osc_recoverable_error(rc)) {
1946                 if (req->rq_import_generation !=
1947                     req->rq_import->imp_generation) {
1948                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1949                                ""DOSTID", rc = %d.\n",
1950                                req->rq_import->imp_obd->obd_name,
1951                                POSTID(&aa->aa_oa->o_oi), rc);
1952                 } else if (rc == -EINPROGRESS ||
1953                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1954                         rc = osc_brw_redo_request(req, aa, rc);
1955                 } else {
1956                         CERROR("%s: too many resent retries for object: "
1957                                ""LPU64":"LPU64", rc = %d.\n",
1958                                req->rq_import->imp_obd->obd_name,
1959                                POSTID(&aa->aa_oa->o_oi), rc);
1960                 }
1961
1962                 if (rc == 0)
1963                         RETURN(0);
1964                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1965                         rc = -EIO;
1966         }
1967
1968         if (aa->aa_ocapa) {
1969                 capa_put(aa->aa_ocapa);
1970                 aa->aa_ocapa = NULL;
1971         }
1972
1973         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1974                 if (obj == NULL && rc == 0) {
1975                         obj = osc2cl(ext->oe_obj);
1976                         cl_object_get(obj);
1977                 }
1978
1979                 list_del_init(&ext->oe_link);
1980                 osc_extent_finish(env, ext, 1, rc);
1981         }
1982         LASSERT(list_empty(&aa->aa_exts));
1983         LASSERT(list_empty(&aa->aa_oaps));
1984
1985         if (obj != NULL) {
1986                 struct obdo *oa = aa->aa_oa;
1987                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1988                 unsigned long valid = 0;
1989
1990                 LASSERT(rc == 0);
1991                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1992                         attr->cat_blocks = oa->o_blocks;
1993                         valid |= CAT_BLOCKS;
1994                 }
1995                 if (oa->o_valid & OBD_MD_FLMTIME) {
1996                         attr->cat_mtime = oa->o_mtime;
1997                         valid |= CAT_MTIME;
1998                 }
1999                 if (oa->o_valid & OBD_MD_FLATIME) {
2000                         attr->cat_atime = oa->o_atime;
2001                         valid |= CAT_ATIME;
2002                 }
2003                 if (oa->o_valid & OBD_MD_FLCTIME) {
2004                         attr->cat_ctime = oa->o_ctime;
2005                         valid |= CAT_CTIME;
2006                 }
2007                 if (valid != 0) {
2008                         cl_object_attr_lock(obj);
2009                         cl_object_attr_set(env, obj, attr, valid);
2010                         cl_object_attr_unlock(obj);
2011                 }
2012                 cl_object_put(env, obj);
2013         }
2014         OBDO_FREE(aa->aa_oa);
2015
2016         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2017                           req->rq_bulk->bd_nob_transferred);
2018         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2019         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2020
2021         client_obd_list_lock(&cli->cl_loi_list_lock);
2022         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2023          * is called so we know whether to go to sync BRWs or wait for more
2024          * RPCs to complete */
2025         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2026                 cli->cl_w_in_flight--;
2027         else
2028                 cli->cl_r_in_flight--;
2029         osc_wake_cache_waiters(cli);
2030         client_obd_list_unlock(&cli->cl_loi_list_lock);
2031
2032         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2033         RETURN(rc);
2034 }
2035
2036 /**
2037  * Build an RPC by the list of extent @ext_list. The caller must ensure
2038  * that the total pages in this list are NOT over max pages per RPC.
2039  * Extents in the list must be in OES_RPC state.
2040  */
2041 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2042                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
2043 {
2044         struct ptlrpc_request           *req = NULL;
2045         struct osc_extent               *ext;
2046         struct brw_page                 **pga = NULL;
2047         struct osc_brw_async_args       *aa = NULL;
2048         struct obdo                     *oa = NULL;
2049         struct osc_async_page           *oap;
2050         struct osc_async_page           *tmp;
2051         struct cl_req                   *clerq = NULL;
2052         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2053                                                                       CRT_READ;
2054         struct ldlm_lock                *lock = NULL;
2055         struct cl_req_attr              *crattr = NULL;
2056         obd_off                         starting_offset = OBD_OBJECT_EOF;
2057         obd_off                         ending_offset = 0;
2058         int                             mpflag = 0;
2059         int                             mem_tight = 0;
2060         int                             page_count = 0;
2061         int                             i;
2062         int                             rc;
2063         LIST_HEAD(rpc_list);
2064
2065         ENTRY;
2066         LASSERT(!list_empty(ext_list));
2067
2068         /* add pages into rpc_list to build BRW rpc */
2069         list_for_each_entry(ext, ext_list, oe_link) {
2070                 LASSERT(ext->oe_state == OES_RPC);
2071                 mem_tight |= ext->oe_memalloc;
2072                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2073                         ++page_count;
2074                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2075                         if (starting_offset > oap->oap_obj_off)
2076                                 starting_offset = oap->oap_obj_off;
2077                         else
2078                                 LASSERT(oap->oap_page_off == 0);
2079                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2080                                 ending_offset = oap->oap_obj_off +
2081                                                 oap->oap_count;
2082                         else
2083                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2084                                         PAGE_CACHE_SIZE);
2085                 }
2086         }
2087
2088         if (mem_tight)
2089                 mpflag = cfs_memory_pressure_get_and_set();
2090
2091         OBD_ALLOC(crattr, sizeof(*crattr));
2092         if (crattr == NULL)
2093                 GOTO(out, rc = -ENOMEM);
2094
2095         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2096         if (pga == NULL)
2097                 GOTO(out, rc = -ENOMEM);
2098
2099         OBDO_ALLOC(oa);
2100         if (oa == NULL)
2101                 GOTO(out, rc = -ENOMEM);
2102
2103         i = 0;
2104         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2105                 struct cl_page *page = oap2cl_page(oap);
2106                 if (clerq == NULL) {
2107                         clerq = cl_req_alloc(env, page, crt,
2108                                              1 /* only 1-object rpcs for now */);
2109                         if (IS_ERR(clerq))
2110                                 GOTO(out, rc = PTR_ERR(clerq));
2111                         lock = oap->oap_ldlm_lock;
2112                 }
2113                 if (mem_tight)
2114                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2115                 pga[i] = &oap->oap_brw_page;
2116                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2117                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2118                        pga[i]->pg, page_index(oap->oap_page), oap,
2119                        pga[i]->flag);
2120                 i++;
2121                 cl_req_page_add(env, clerq, page);
2122         }
2123
2124         /* always get the data for the obdo for the rpc */
2125         LASSERT(clerq != NULL);
2126         crattr->cra_oa = oa;
2127         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2128         if (lock) {
2129                 oa->o_handle = lock->l_remote_handle;
2130                 oa->o_valid |= OBD_MD_FLHANDLE;
2131         }
2132
2133         rc = cl_req_prep(env, clerq);
2134         if (rc != 0) {
2135                 CERROR("cl_req_prep failed: %d\n", rc);
2136                 GOTO(out, rc);
2137         }
2138
2139         sort_brw_pages(pga, page_count);
2140         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2141                         pga, &req, crattr->cra_capa, 1, 0);
2142         if (rc != 0) {
2143                 CERROR("prep_req failed: %d\n", rc);
2144                 GOTO(out, rc);
2145         }
2146
2147         req->rq_interpret_reply = brw_interpret;
2148
2149         if (mem_tight != 0)
2150                 req->rq_memalloc = 1;
2151
2152         /* Need to update the timestamps after the request is built in case
2153          * we race with setattr (locally or in queue at OST).  If OST gets
2154          * later setattr before earlier BRW (as determined by the request xid),
2155          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2156          * way to do this in a single call.  bug 10150 */
2157         cl_req_attr_set(env, clerq, crattr,
2158                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2159
2160         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2161
2162         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2163         aa = ptlrpc_req_async_args(req);
2164         INIT_LIST_HEAD(&aa->aa_oaps);
2165         list_splice_init(&rpc_list, &aa->aa_oaps);
2166         INIT_LIST_HEAD(&aa->aa_exts);
2167         list_splice_init(ext_list, &aa->aa_exts);
2168         aa->aa_clerq = clerq;
2169
2170         /* queued sync pages can be torn down while the pages
2171          * were between the pending list and the rpc */
2172         tmp = NULL;
2173         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2174                 /* only one oap gets a request reference */
2175                 if (tmp == NULL)
2176                         tmp = oap;
2177                 if (oap->oap_interrupted && !req->rq_intr) {
2178                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2179                                         oap, req);
2180                         ptlrpc_mark_interrupted(req);
2181                 }
2182         }
2183         if (tmp != NULL)
2184                 tmp->oap_request = ptlrpc_request_addref(req);
2185
2186         client_obd_list_lock(&cli->cl_loi_list_lock);
2187         starting_offset >>= PAGE_CACHE_SHIFT;
2188         if (cmd == OBD_BRW_READ) {
2189                 cli->cl_r_in_flight++;
2190                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2191                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2192                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2193                                       starting_offset + 1);
2194         } else {
2195                 cli->cl_w_in_flight++;
2196                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2197                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2198                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2199                                       starting_offset + 1);
2200         }
2201         client_obd_list_unlock(&cli->cl_loi_list_lock);
2202
2203         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2204                   page_count, aa, cli->cl_r_in_flight,
2205                   cli->cl_w_in_flight);
2206
2207         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2208          * see which CPU/NUMA node the majority of pages were allocated
2209          * on, and try to assign the async RPC to the CPU core
2210          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2211          *
2212          * But on the other hand, we expect that multiple ptlrpcd
2213          * threads and the initial write sponsor can run in parallel,
2214          * especially when data checksum is enabled, which is CPU-bound
2215          * operation and single ptlrpcd thread cannot process in time.
2216          * So more ptlrpcd threads sharing BRW load
2217          * (with PDL_POLICY_ROUND) seems better.
2218          */
2219         ptlrpcd_add_req(req, pol, -1);
2220         rc = 0;
2221         EXIT;
2222
2223 out:
2224         if (mem_tight != 0)
2225                 cfs_memory_pressure_restore(mpflag);
2226
2227         if (crattr != NULL) {
2228                 capa_put(crattr->cra_capa);
2229                 OBD_FREE(crattr, sizeof(*crattr));
2230         }
2231
2232         if (rc != 0) {
2233                 LASSERT(req == NULL);
2234
2235                 if (oa)
2236                         OBDO_FREE(oa);
2237                 if (pga)
2238                         OBD_FREE(pga, sizeof(*pga) * page_count);
2239                 /* this should happen rarely and is pretty bad, it makes the
2240                  * pending list not follow the dirty order */
2241                 while (!list_empty(ext_list)) {
2242                         ext = list_entry(ext_list->next, struct osc_extent,
2243                                              oe_link);
2244                         list_del_init(&ext->oe_link);
2245                         osc_extent_finish(env, ext, 0, rc);
2246                 }
2247                 if (clerq && !IS_ERR(clerq))
2248                         cl_req_completion(env, clerq, rc);
2249         }
2250         RETURN(rc);
2251 }
2252
2253 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2254                                         struct ldlm_enqueue_info *einfo)
2255 {
2256         void *data = einfo->ei_cbdata;
2257         int set = 0;
2258
2259         LASSERT(lock != NULL);
2260         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2261         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2262         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2263         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2264
2265         lock_res_and_lock(lock);
2266         spin_lock(&osc_ast_guard);
2267
2268         if (lock->l_ast_data == NULL)
2269                 lock->l_ast_data = data;
2270         if (lock->l_ast_data == data)
2271                 set = 1;
2272
2273         spin_unlock(&osc_ast_guard);
2274         unlock_res_and_lock(lock);
2275
2276         return set;
2277 }
2278
2279 static int osc_set_data_with_check(struct lustre_handle *lockh,
2280                                    struct ldlm_enqueue_info *einfo)
2281 {
2282         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2283         int set = 0;
2284
2285         if (lock != NULL) {
2286                 set = osc_set_lock_data_with_check(lock, einfo);
2287                 LDLM_LOCK_PUT(lock);
2288         } else
2289                 CERROR("lockh %p, data %p - client evicted?\n",
2290                        lockh, einfo->ei_cbdata);
2291         return set;
2292 }
2293
2294 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2295                              ldlm_iterator_t replace, void *data)
2296 {
2297         struct ldlm_res_id res_id;
2298         struct obd_device *obd = class_exp2obd(exp);
2299
2300         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2301         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2302         return 0;
2303 }
2304
2305 /* find any ldlm lock of the inode in osc
2306  * return 0    not find
2307  *      1    find one
2308  *      < 0    error */
2309 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2310                            ldlm_iterator_t replace, void *data)
2311 {
2312         struct ldlm_res_id res_id;
2313         struct obd_device *obd = class_exp2obd(exp);
2314         int rc = 0;
2315
2316         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2317         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2318         if (rc == LDLM_ITER_STOP)
2319                 return(1);
2320         if (rc == LDLM_ITER_CONTINUE)
2321                 return(0);
2322         return(rc);
2323 }
2324
2325 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2326                             obd_enqueue_update_f upcall, void *cookie,
2327                             __u64 *flags, int agl, int rc)
2328 {
2329         int intent = *flags & LDLM_FL_HAS_INTENT;
2330         ENTRY;
2331
2332         if (intent) {
2333                 /* The request was created before ldlm_cli_enqueue call. */
2334                 if (rc == ELDLM_LOCK_ABORTED) {
2335                         struct ldlm_reply *rep;
2336                         rep = req_capsule_server_get(&req->rq_pill,
2337                                                      &RMF_DLM_REP);
2338
2339                         LASSERT(rep != NULL);
2340                         if (rep->lock_policy_res1)
2341                                 rc = rep->lock_policy_res1;
2342                 }
2343         }
2344
2345         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2346             (rc == 0)) {
2347                 *flags |= LDLM_FL_LVB_READY;
2348                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2349                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2350         }
2351
2352         /* Call the update callback. */
2353         rc = (*upcall)(cookie, rc);
2354         RETURN(rc);
2355 }
2356
2357 static int osc_enqueue_interpret(const struct lu_env *env,
2358                                  struct ptlrpc_request *req,
2359                                  struct osc_enqueue_args *aa, int rc)
2360 {
2361         struct ldlm_lock *lock;
2362         struct lustre_handle handle;
2363         __u32 mode;
2364         struct ost_lvb *lvb;
2365         __u32 lvb_len;
2366         __u64 *flags = aa->oa_flags;
2367
2368         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2369          * might be freed anytime after lock upcall has been called. */
2370         lustre_handle_copy(&handle, aa->oa_lockh);
2371         mode = aa->oa_ei->ei_mode;
2372
2373         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2374          * be valid. */
2375         lock = ldlm_handle2lock(&handle);
2376
2377         /* Take an additional reference so that a blocking AST that
2378          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2379          * to arrive after an upcall has been executed by
2380          * osc_enqueue_fini(). */
2381         ldlm_lock_addref(&handle, mode);
2382
2383         /* Let CP AST to grant the lock first. */
2384         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2385
2386         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2387                 lvb = NULL;
2388                 lvb_len = 0;
2389         } else {
2390                 lvb = aa->oa_lvb;
2391                 lvb_len = sizeof(*aa->oa_lvb);
2392         }
2393
2394         /* Complete obtaining the lock procedure. */
2395         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2396                                    mode, flags, lvb, lvb_len, &handle, rc);
2397         /* Complete osc stuff. */
2398         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2399                               flags, aa->oa_agl, rc);
2400
2401         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2402
2403         /* Release the lock for async request. */
2404         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2405                 /*
2406                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2407                  * not already released by
2408                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2409                  */
2410                 ldlm_lock_decref(&handle, mode);
2411
2412         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2413                  aa->oa_lockh, req, aa);
2414         ldlm_lock_decref(&handle, mode);
2415         LDLM_LOCK_PUT(lock);
2416         return rc;
2417 }
2418
2419 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2420                         struct lov_oinfo *loi, int flags,
2421                         struct ost_lvb *lvb, __u32 mode, int rc)
2422 {
2423         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2424
2425         if (rc == ELDLM_OK) {
2426                 __u64 tmp;
2427
2428                 LASSERT(lock != NULL);
2429                 loi->loi_lvb = *lvb;
2430                 tmp = loi->loi_lvb.lvb_size;
2431                 /* Extend KMS up to the end of this lock and no further
2432                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2433                 if (tmp > lock->l_policy_data.l_extent.end)
2434                         tmp = lock->l_policy_data.l_extent.end + 1;
2435                 if (tmp >= loi->loi_kms) {
2436                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2437                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2438                         loi_kms_set(loi, tmp);
2439                 } else {
2440                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2441                                    LPU64"; leaving kms="LPU64", end="LPU64,
2442                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2443                                    lock->l_policy_data.l_extent.end);
2444                 }
2445                 ldlm_lock_allow_match(lock);
2446         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2447                 LASSERT(lock != NULL);
2448                 loi->loi_lvb = *lvb;
2449                 ldlm_lock_allow_match(lock);
2450                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2451                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2452                 rc = ELDLM_OK;
2453         }
2454
2455         if (lock != NULL) {
2456                 if (rc != ELDLM_OK)
2457                         ldlm_lock_fail_match(lock);
2458
2459                 LDLM_LOCK_PUT(lock);
2460         }
2461 }
2462 EXPORT_SYMBOL(osc_update_enqueue);
2463
2464 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2465
2466 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2467  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2468  * other synchronous requests, however keeping some locks and trying to obtain
2469  * others may take a considerable amount of time in a case of ost failure; and
2470  * when other sync requests do not get released lock from a client, the client
2471  * is excluded from the cluster -- such scenarious make the life difficult, so
2472  * release locks just after they are obtained. */
2473 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2474                      __u64 *flags, ldlm_policy_data_t *policy,
2475                      struct ost_lvb *lvb, int kms_valid,
2476                      obd_enqueue_update_f upcall, void *cookie,
2477                      struct ldlm_enqueue_info *einfo,
2478                      struct lustre_handle *lockh,
2479                      struct ptlrpc_request_set *rqset, int async, int agl)
2480 {
2481         struct obd_device *obd = exp->exp_obd;
2482         struct ptlrpc_request *req = NULL;
2483         int intent = *flags & LDLM_FL_HAS_INTENT;
2484         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2485         ldlm_mode_t mode;
2486         int rc;
2487         ENTRY;
2488
2489         /* Filesystem lock extents are extended to page boundaries so that
2490          * dealing with the page cache is a little smoother.  */
2491         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2492         policy->l_extent.end |= ~CFS_PAGE_MASK;
2493
2494         /*
2495          * kms is not valid when either object is completely fresh (so that no
2496          * locks are cached), or object was evicted. In the latter case cached
2497          * lock cannot be used, because it would prime inode state with
2498          * potentially stale LVB.
2499          */
2500         if (!kms_valid)
2501                 goto no_match;
2502
2503         /* Next, search for already existing extent locks that will cover us */
2504         /* If we're trying to read, we also search for an existing PW lock.  The
2505          * VFS and page cache already protect us locally, so lots of readers/
2506          * writers can share a single PW lock.
2507          *
2508          * There are problems with conversion deadlocks, so instead of
2509          * converting a read lock to a write lock, we'll just enqueue a new
2510          * one.
2511          *
2512          * At some point we should cancel the read lock instead of making them
2513          * send us a blocking callback, but there are problems with canceling
2514          * locks out from other users right now, too. */
2515         mode = einfo->ei_mode;
2516         if (einfo->ei_mode == LCK_PR)
2517                 mode |= LCK_PW;
2518         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2519                                einfo->ei_type, policy, mode, lockh, 0);
2520         if (mode) {
2521                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2522
2523                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2524                         /* For AGL, if enqueue RPC is sent but the lock is not
2525                          * granted, then skip to process this strpe.
2526                          * Return -ECANCELED to tell the caller. */
2527                         ldlm_lock_decref(lockh, mode);
2528                         LDLM_LOCK_PUT(matched);
2529                         RETURN(-ECANCELED);
2530                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2531                         *flags |= LDLM_FL_LVB_READY;
2532                         /* addref the lock only if not async requests and PW
2533                          * lock is matched whereas we asked for PR. */
2534                         if (!rqset && einfo->ei_mode != mode)
2535                                 ldlm_lock_addref(lockh, LCK_PR);
2536                         if (intent) {
2537                                 /* I would like to be able to ASSERT here that
2538                                  * rss <= kms, but I can't, for reasons which
2539                                  * are explained in lov_enqueue() */
2540                         }
2541
2542                         /* We already have a lock, and it's referenced.
2543                          *
2544                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2545                          * AGL upcall may change it to CLS_HELD directly. */
2546                         (*upcall)(cookie, ELDLM_OK);
2547
2548                         if (einfo->ei_mode != mode)
2549                                 ldlm_lock_decref(lockh, LCK_PW);
2550                         else if (rqset)
2551                                 /* For async requests, decref the lock. */
2552                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2553                         LDLM_LOCK_PUT(matched);
2554                         RETURN(ELDLM_OK);
2555                 } else {
2556                         ldlm_lock_decref(lockh, mode);
2557                         LDLM_LOCK_PUT(matched);
2558                 }
2559         }
2560
2561  no_match:
2562         if (intent) {
2563                 LIST_HEAD(cancels);
2564                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2565                                            &RQF_LDLM_ENQUEUE_LVB);
2566                 if (req == NULL)
2567                         RETURN(-ENOMEM);
2568
2569                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2570                 if (rc) {
2571                         ptlrpc_request_free(req);
2572                         RETURN(rc);
2573                 }
2574
2575                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2576                                      sizeof *lvb);
2577                 ptlrpc_request_set_replen(req);
2578         }
2579
2580         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2581         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2582
2583         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2584                               sizeof(*lvb), LVB_T_OST, lockh, async);
2585         if (rqset) {
2586                 if (!rc) {
2587                         struct osc_enqueue_args *aa;
2588                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2589                         aa = ptlrpc_req_async_args(req);
2590                         aa->oa_ei = einfo;
2591                         aa->oa_exp = exp;
2592                         aa->oa_flags  = flags;
2593                         aa->oa_upcall = upcall;
2594                         aa->oa_cookie = cookie;
2595                         aa->oa_lvb    = lvb;
2596                         aa->oa_lockh  = lockh;
2597                         aa->oa_agl    = !!agl;
2598
2599                         req->rq_interpret_reply =
2600                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2601                         if (rqset == PTLRPCD_SET)
2602                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2603                         else
2604                                 ptlrpc_set_add_req(rqset, req);
2605                 } else if (intent) {
2606                         ptlrpc_req_finished(req);
2607                 }
2608                 RETURN(rc);
2609         }
2610
2611         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2612         if (intent)
2613                 ptlrpc_req_finished(req);
2614
2615         RETURN(rc);
2616 }
2617
2618 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2619                        struct ldlm_enqueue_info *einfo,
2620                        struct ptlrpc_request_set *rqset)
2621 {
2622         struct ldlm_res_id res_id;
2623         int rc;
2624         ENTRY;
2625
2626         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2627         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2628                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2629                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2630                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2631                               rqset, rqset != NULL, 0);
2632         RETURN(rc);
2633 }
2634
2635 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2636                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2637                    int *flags, void *data, struct lustre_handle *lockh,
2638                    int unref)
2639 {
2640         struct obd_device *obd = exp->exp_obd;
2641         int lflags = *flags;
2642         ldlm_mode_t rc;
2643         ENTRY;
2644
2645         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2646                 RETURN(-EIO);
2647
2648         /* Filesystem lock extents are extended to page boundaries so that
2649          * dealing with the page cache is a little smoother */
2650         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2651         policy->l_extent.end |= ~CFS_PAGE_MASK;
2652
2653         /* Next, search for already existing extent locks that will cover us */
2654         /* If we're trying to read, we also search for an existing PW lock.  The
2655          * VFS and page cache already protect us locally, so lots of readers/
2656          * writers can share a single PW lock. */
2657         rc = mode;
2658         if (mode == LCK_PR)
2659                 rc |= LCK_PW;
2660         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2661                              res_id, type, policy, rc, lockh, unref);
2662         if (rc) {
2663                 if (data != NULL) {
2664                         if (!osc_set_data_with_check(lockh, data)) {
2665                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2666                                         ldlm_lock_decref(lockh, rc);
2667                                 RETURN(0);
2668                         }
2669                 }
2670                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2671                         ldlm_lock_addref(lockh, LCK_PR);
2672                         ldlm_lock_decref(lockh, LCK_PW);
2673                 }
2674                 RETURN(rc);
2675         }
2676         RETURN(rc);
2677 }
2678
2679 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2680 {
2681         ENTRY;
2682
2683         if (unlikely(mode == LCK_GROUP))
2684                 ldlm_lock_decref_and_cancel(lockh, mode);
2685         else
2686                 ldlm_lock_decref(lockh, mode);
2687
2688         RETURN(0);
2689 }
2690
2691 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2692                       __u32 mode, struct lustre_handle *lockh)
2693 {
2694         ENTRY;
2695         RETURN(osc_cancel_base(lockh, mode));
2696 }
2697
2698 static int osc_cancel_unused(struct obd_export *exp,
2699                              struct lov_stripe_md *lsm,
2700                              ldlm_cancel_flags_t flags,
2701                              void *opaque)
2702 {
2703         struct obd_device *obd = class_exp2obd(exp);
2704         struct ldlm_res_id res_id, *resp = NULL;
2705
2706         if (lsm != NULL) {
2707                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2708                 resp = &res_id;
2709         }
2710
2711         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2712 }
2713
2714 static int osc_statfs_interpret(const struct lu_env *env,
2715                                 struct ptlrpc_request *req,
2716                                 struct osc_async_args *aa, int rc)
2717 {
2718         struct obd_statfs *msfs;
2719         ENTRY;
2720
2721         if (rc == -EBADR)
2722                 /* The request has in fact never been sent
2723                  * due to issues at a higher level (LOV).
2724                  * Exit immediately since the caller is
2725                  * aware of the problem and takes care
2726                  * of the clean up */
2727                  RETURN(rc);
2728
2729         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2730             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2731                 GOTO(out, rc = 0);
2732
2733         if (rc != 0)
2734                 GOTO(out, rc);
2735
2736         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2737         if (msfs == NULL) {
2738                 GOTO(out, rc = -EPROTO);
2739         }
2740
2741         *aa->aa_oi->oi_osfs = *msfs;
2742 out:
2743         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2744         RETURN(rc);
2745 }
2746
2747 static int osc_statfs_async(struct obd_export *exp,
2748                             struct obd_info *oinfo, __u64 max_age,
2749                             struct ptlrpc_request_set *rqset)
2750 {
2751         struct obd_device     *obd = class_exp2obd(exp);
2752         struct ptlrpc_request *req;
2753         struct osc_async_args *aa;
2754         int                 rc;
2755         ENTRY;
2756
2757         /* We could possibly pass max_age in the request (as an absolute
2758          * timestamp or a "seconds.usec ago") so the target can avoid doing
2759          * extra calls into the filesystem if that isn't necessary (e.g.
2760          * during mount that would help a bit).  Having relative timestamps
2761          * is not so great if request processing is slow, while absolute
2762          * timestamps are not ideal because they need time synchronization. */
2763         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2764         if (req == NULL)
2765                 RETURN(-ENOMEM);
2766
2767         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2768         if (rc) {
2769                 ptlrpc_request_free(req);
2770                 RETURN(rc);
2771         }
2772         ptlrpc_request_set_replen(req);
2773         req->rq_request_portal = OST_CREATE_PORTAL;
2774         ptlrpc_at_set_req_timeout(req);
2775
2776         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2777                 /* procfs requests not want stat in wait for avoid deadlock */
2778                 req->rq_no_resend = 1;
2779                 req->rq_no_delay = 1;
2780         }
2781
2782         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2783         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2784         aa = ptlrpc_req_async_args(req);
2785         aa->aa_oi = oinfo;
2786
2787         ptlrpc_set_add_req(rqset, req);
2788         RETURN(0);
2789 }
2790
2791 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2792                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2793 {
2794         struct obd_device     *obd = class_exp2obd(exp);
2795         struct obd_statfs     *msfs;
2796         struct ptlrpc_request *req;
2797         struct obd_import     *imp = NULL;
2798         int rc;
2799         ENTRY;
2800
2801         /*Since the request might also come from lprocfs, so we need
2802          *sync this with client_disconnect_export Bug15684*/
2803         down_read(&obd->u.cli.cl_sem);
2804         if (obd->u.cli.cl_import)
2805                 imp = class_import_get(obd->u.cli.cl_import);
2806         up_read(&obd->u.cli.cl_sem);
2807         if (!imp)
2808                 RETURN(-ENODEV);
2809
2810         /* We could possibly pass max_age in the request (as an absolute
2811          * timestamp or a "seconds.usec ago") so the target can avoid doing
2812          * extra calls into the filesystem if that isn't necessary (e.g.
2813          * during mount that would help a bit).  Having relative timestamps
2814          * is not so great if request processing is slow, while absolute
2815          * timestamps are not ideal because they need time synchronization. */
2816         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2817
2818         class_import_put(imp);
2819
2820         if (req == NULL)
2821                 RETURN(-ENOMEM);
2822
2823         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2824         if (rc) {
2825                 ptlrpc_request_free(req);
2826                 RETURN(rc);
2827         }
2828         ptlrpc_request_set_replen(req);
2829         req->rq_request_portal = OST_CREATE_PORTAL;
2830         ptlrpc_at_set_req_timeout(req);
2831
2832         if (flags & OBD_STATFS_NODELAY) {
2833                 /* procfs requests not want stat in wait for avoid deadlock */
2834                 req->rq_no_resend = 1;
2835                 req->rq_no_delay = 1;
2836         }
2837
2838         rc = ptlrpc_queue_wait(req);
2839         if (rc)
2840                 GOTO(out, rc);
2841
2842         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2843         if (msfs == NULL) {
2844                 GOTO(out, rc = -EPROTO);
2845         }
2846
2847         *osfs = *msfs;
2848
2849         EXIT;
2850  out:
2851         ptlrpc_req_finished(req);
2852         return rc;
2853 }
2854
2855 /* Retrieve object striping information.
2856  *
2857  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2858  * the maximum number of OST indices which will fit in the user buffer.
2859  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2860  */
2861 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2862 {
2863         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2864         struct lov_user_md_v3 lum, *lumk;
2865         struct lov_user_ost_data_v1 *lmm_objects;
2866         int rc = 0, lum_size;
2867         ENTRY;
2868
2869         if (!lsm)
2870                 RETURN(-ENODATA);
2871
2872         /* we only need the header part from user space to get lmm_magic and
2873          * lmm_stripe_count, (the header part is common to v1 and v3) */
2874         lum_size = sizeof(struct lov_user_md_v1);
2875         if (copy_from_user(&lum, lump, lum_size))
2876                 RETURN(-EFAULT);
2877
2878         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2879             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2880                 RETURN(-EINVAL);
2881
2882         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2883         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2884         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2885         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2886
2887         /* we can use lov_mds_md_size() to compute lum_size
2888          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2889         if (lum.lmm_stripe_count > 0) {
2890                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2891                 OBD_ALLOC(lumk, lum_size);
2892                 if (!lumk)
2893                         RETURN(-ENOMEM);
2894
2895                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2896                         lmm_objects =
2897                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2898                 else
2899                         lmm_objects = &(lumk->lmm_objects[0]);
2900                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2901         } else {
2902                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2903                 lumk = &lum;
2904         }
2905
2906         lumk->lmm_oi = lsm->lsm_oi;
2907         lumk->lmm_stripe_count = 1;
2908
2909         if (copy_to_user(lump, lumk, lum_size))
2910                 rc = -EFAULT;
2911
2912         if (lumk != &lum)
2913                 OBD_FREE(lumk, lum_size);
2914
2915         RETURN(rc);
2916 }
2917
2918
2919 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2920                          void *karg, void *uarg)
2921 {
2922         struct obd_device *obd = exp->exp_obd;
2923         struct obd_ioctl_data *data = karg;
2924         int err = 0;
2925         ENTRY;
2926
2927         if (!try_module_get(THIS_MODULE)) {
2928                 CERROR("Can't get module. Is it alive?");
2929                 return -EINVAL;
2930         }
2931         switch (cmd) {
2932         case OBD_IOC_LOV_GET_CONFIG: {
2933                 char *buf;
2934                 struct lov_desc *desc;
2935                 struct obd_uuid uuid;
2936
2937                 buf = NULL;
2938                 len = 0;
2939                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2940                         GOTO(out, err = -EINVAL);
2941
2942                 data = (struct obd_ioctl_data *)buf;
2943
2944                 if (sizeof(*desc) > data->ioc_inllen1) {
2945                         obd_ioctl_freedata(buf, len);
2946                         GOTO(out, err = -EINVAL);
2947                 }
2948
2949                 if (data->ioc_inllen2 < sizeof(uuid)) {
2950                         obd_ioctl_freedata(buf, len);
2951                         GOTO(out, err = -EINVAL);
2952                 }
2953
2954                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2955                 desc->ld_tgt_count = 1;
2956                 desc->ld_active_tgt_count = 1;
2957                 desc->ld_default_stripe_count = 1;
2958                 desc->ld_default_stripe_size = 0;
2959                 desc->ld_default_stripe_offset = 0;
2960                 desc->ld_pattern = 0;
2961                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2962
2963                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2964
2965                 err = copy_to_user((void *)uarg, buf, len);
2966                 if (err)
2967                         err = -EFAULT;
2968                 obd_ioctl_freedata(buf, len);
2969                 GOTO(out, err);
2970         }
2971         case LL_IOC_LOV_SETSTRIPE:
2972                 err = obd_alloc_memmd(exp, karg);
2973                 if (err > 0)
2974                         err = 0;
2975                 GOTO(out, err);
2976         case LL_IOC_LOV_GETSTRIPE:
2977                 err = osc_getstripe(karg, uarg);
2978                 GOTO(out, err);
2979         case OBD_IOC_CLIENT_RECOVER:
2980                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2981                                             data->ioc_inlbuf1, 0);
2982                 if (err > 0)
2983                         err = 0;
2984                 GOTO(out, err);
2985         case IOC_OSC_SET_ACTIVE:
2986                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2987                                                data->ioc_offset);
2988                 GOTO(out, err);
2989         case OBD_IOC_POLL_QUOTACHECK:
2990                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2991                 GOTO(out, err);
2992         case OBD_IOC_PING_TARGET:
2993                 err = ptlrpc_obd_ping(obd);
2994                 GOTO(out, err);
2995         default:
2996                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2997                        cmd, current_comm());
2998                 GOTO(out, err = -ENOTTY);
2999         }
3000 out:
3001         module_put(THIS_MODULE);
3002         return err;
3003 }
3004
3005 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3006                         obd_count keylen, void *key, __u32 *vallen, void *val,
3007                         struct lov_stripe_md *lsm)
3008 {
3009         ENTRY;
3010         if (!vallen || !val)
3011                 RETURN(-EFAULT);
3012
3013         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3014                 __u32 *stripe = val;
3015                 *vallen = sizeof(*stripe);
3016                 *stripe = 0;
3017                 RETURN(0);
3018         } else if (KEY_IS(KEY_LAST_ID)) {
3019                 struct ptlrpc_request *req;
3020                 obd_id          *reply;
3021                 char              *tmp;
3022                 int                 rc;
3023
3024                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3025                                            &RQF_OST_GET_INFO_LAST_ID);
3026                 if (req == NULL)
3027                         RETURN(-ENOMEM);
3028
3029                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3030                                      RCL_CLIENT, keylen);
3031                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3032                 if (rc) {
3033                         ptlrpc_request_free(req);
3034                         RETURN(rc);
3035                 }
3036
3037                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3038                 memcpy(tmp, key, keylen);
3039
3040                 req->rq_no_delay = req->rq_no_resend = 1;
3041                 ptlrpc_request_set_replen(req);
3042                 rc = ptlrpc_queue_wait(req);
3043                 if (rc)
3044                         GOTO(out, rc);
3045
3046                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3047                 if (reply == NULL)
3048                         GOTO(out, rc = -EPROTO);
3049
3050                 *((obd_id *)val) = *reply;
3051         out:
3052                 ptlrpc_req_finished(req);
3053                 RETURN(rc);
3054         } else if (KEY_IS(KEY_FIEMAP)) {
3055                 struct ptlrpc_request *req;
3056                 struct ll_user_fiemap *reply;
3057                 char *tmp;
3058                 int rc;
3059
3060                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3061                                            &RQF_OST_GET_INFO_FIEMAP);
3062                 if (req == NULL)
3063                         RETURN(-ENOMEM);
3064
3065                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3066                                      RCL_CLIENT, keylen);
3067                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3068                                      RCL_CLIENT, *vallen);
3069                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3070                                      RCL_SERVER, *vallen);
3071
3072                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3073                 if (rc) {
3074                         ptlrpc_request_free(req);
3075                         RETURN(rc);
3076                 }
3077
3078                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3079                 memcpy(tmp, key, keylen);
3080                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3081                 memcpy(tmp, val, *vallen);
3082
3083                 ptlrpc_request_set_replen(req);
3084                 rc = ptlrpc_queue_wait(req);
3085                 if (rc)
3086                         GOTO(out1, rc);
3087
3088                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3089                 if (reply == NULL)
3090                         GOTO(out1, rc = -EPROTO);
3091
3092                 memcpy(val, reply, *vallen);
3093         out1:
3094                 ptlrpc_req_finished(req);
3095
3096                 RETURN(rc);
3097         }
3098
3099         RETURN(-EINVAL);
3100 }
3101
3102 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3103                               obd_count keylen, void *key, obd_count vallen,
3104                               void *val, struct ptlrpc_request_set *set)
3105 {
3106         struct ptlrpc_request *req;
3107         struct obd_device     *obd = exp->exp_obd;
3108         struct obd_import     *imp = class_exp2cliimp(exp);
3109         char              *tmp;
3110         int                 rc;
3111         ENTRY;
3112
3113         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3114
3115         if (KEY_IS(KEY_CHECKSUM)) {
3116                 if (vallen != sizeof(int))
3117                         RETURN(-EINVAL);
3118                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3119                 RETURN(0);
3120         }
3121
3122         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3123                 sptlrpc_conf_client_adapt(obd);
3124                 RETURN(0);
3125         }
3126
3127         if (KEY_IS(KEY_FLUSH_CTX)) {
3128                 sptlrpc_import_flush_my_ctx(imp);
3129                 RETURN(0);
3130         }
3131
3132         if (KEY_IS(KEY_CACHE_SET)) {
3133                 struct client_obd *cli = &obd->u.cli;
3134
3135                 LASSERT(cli->cl_cache == NULL); /* only once */
3136                 cli->cl_cache = (struct cl_client_cache *)val;
3137                 atomic_inc(&cli->cl_cache->ccc_users);
3138                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3139
3140                 /* add this osc into entity list */
3141                 LASSERT(list_empty(&cli->cl_lru_osc));
3142                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3143                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3144                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3145
3146                 RETURN(0);
3147         }
3148
3149         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3150                 struct client_obd *cli = &obd->u.cli;
3151                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3152                 int target = *(int *)val;
3153
3154                 nr = osc_lru_shrink(cli, min(nr, target));
3155                 *(int *)val -= nr;
3156                 RETURN(0);
3157         }
3158
3159         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3160                 RETURN(-EINVAL);
3161
3162         /* We pass all other commands directly to OST. Since nobody calls osc
3163            methods directly and everybody is supposed to go through LOV, we
3164            assume lov checked invalid values for us.
3165            The only recognised values so far are evict_by_nid and mds_conn.
3166            Even if something bad goes through, we'd get a -EINVAL from OST
3167            anyway. */
3168
3169         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3170                                                 &RQF_OST_SET_GRANT_INFO :
3171                                                 &RQF_OBD_SET_INFO);
3172         if (req == NULL)
3173                 RETURN(-ENOMEM);
3174
3175         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3176                              RCL_CLIENT, keylen);
3177         if (!KEY_IS(KEY_GRANT_SHRINK))
3178                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3179                                      RCL_CLIENT, vallen);
3180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3181         if (rc) {
3182                 ptlrpc_request_free(req);
3183                 RETURN(rc);
3184         }
3185
3186         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3187         memcpy(tmp, key, keylen);
3188         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3189                                                         &RMF_OST_BODY :
3190                                                         &RMF_SETINFO_VAL);
3191         memcpy(tmp, val, vallen);
3192
3193         if (KEY_IS(KEY_GRANT_SHRINK)) {
3194                 struct osc_grant_args *aa;
3195                 struct obdo *oa;
3196
3197                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3198                 aa = ptlrpc_req_async_args(req);
3199                 OBDO_ALLOC(oa);
3200                 if (!oa) {
3201                         ptlrpc_req_finished(req);
3202                         RETURN(-ENOMEM);
3203                 }
3204                 *oa = ((struct ost_body *)val)->oa;
3205                 aa->aa_oa = oa;
3206                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3207         }
3208
3209         ptlrpc_request_set_replen(req);
3210         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3211                 LASSERT(set != NULL);
3212                 ptlrpc_set_add_req(set, req);
3213                 ptlrpc_check_set(NULL, set);
3214         } else
3215                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3216
3217         RETURN(0);
3218 }
3219
3220
3221 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3222                          struct obd_device *disk_obd, int *index)
3223 {
3224         /* this code is not supposed to be used with LOD/OSP
3225          * to be removed soon */
3226         LBUG();
3227         return 0;
3228 }
3229
3230 static int osc_llog_finish(struct obd_device *obd, int count)
3231 {
3232         struct llog_ctxt *ctxt;
3233
3234         ENTRY;
3235
3236         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3237         if (ctxt) {
3238                 llog_cat_close(NULL, ctxt->loc_handle);
3239                 llog_cleanup(NULL, ctxt);
3240         }
3241
3242         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3243         if (ctxt)
3244                 llog_cleanup(NULL, ctxt);
3245         RETURN(0);
3246 }
3247
3248 static int osc_reconnect(const struct lu_env *env,
3249                          struct obd_export *exp, struct obd_device *obd,
3250                          struct obd_uuid *cluuid,
3251                          struct obd_connect_data *data,
3252                          void *localdata)
3253 {
3254         struct client_obd *cli = &obd->u.cli;
3255
3256         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3257                 long lost_grant;
3258
3259                 client_obd_list_lock(&cli->cl_loi_list_lock);
3260                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3261                                 2 * cli_brw_size(obd);
3262                 lost_grant = cli->cl_lost_grant;
3263                 cli->cl_lost_grant = 0;
3264                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3265
3266                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3267                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3268                        data->ocd_version, data->ocd_grant, lost_grant);
3269         }
3270
3271         RETURN(0);
3272 }
3273
3274 static int osc_disconnect(struct obd_export *exp)
3275 {
3276         struct obd_device *obd = class_exp2obd(exp);
3277         struct llog_ctxt  *ctxt;
3278         int rc;
3279
3280         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3281         if (ctxt) {
3282                 if (obd->u.cli.cl_conn_count == 1) {
3283                         /* Flush any remaining cancel messages out to the
3284                          * target */
3285                         llog_sync(ctxt, exp, 0);
3286                 }
3287                 llog_ctxt_put(ctxt);
3288         } else {
3289                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3290                        obd);
3291         }
3292
3293         rc = client_disconnect_export(exp);
3294         /**
3295          * Initially we put del_shrink_grant before disconnect_export, but it
3296          * causes the following problem if setup (connect) and cleanup
3297          * (disconnect) are tangled together.
3298          *      connect p1                   disconnect p2
3299          *   ptlrpc_connect_import
3300          *     ...............         class_manual_cleanup
3301          *                                   osc_disconnect
3302          *                                   del_shrink_grant
3303          *   ptlrpc_connect_interrupt
3304          *     init_grant_shrink
3305          *   add this client to shrink list
3306          *                                    cleanup_osc
3307          * Bang! pinger trigger the shrink.
3308          * So the osc should be disconnected from the shrink list, after we
3309          * are sure the import has been destroyed. BUG18662
3310          */
3311         if (obd->u.cli.cl_import == NULL)
3312                 osc_del_shrink_grant(&obd->u.cli);
3313         return rc;
3314 }
3315
3316 static int osc_import_event(struct obd_device *obd,
3317                             struct obd_import *imp,
3318                             enum obd_import_event event)
3319 {
3320         struct client_obd *cli;
3321         int rc = 0;
3322
3323         ENTRY;
3324         LASSERT(imp->imp_obd == obd);
3325
3326         switch (event) {
3327         case IMP_EVENT_DISCON: {
3328                 cli = &obd->u.cli;
3329                 client_obd_list_lock(&cli->cl_loi_list_lock);
3330                 cli->cl_avail_grant = 0;
3331                 cli->cl_lost_grant = 0;
3332                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3333                 break;
3334         }
3335         case IMP_EVENT_INACTIVE: {
3336                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3337                 break;
3338         }
3339         case IMP_EVENT_INVALIDATE: {
3340                 struct ldlm_namespace *ns = obd->obd_namespace;
3341                 struct lu_env    *env;
3342                 int                 refcheck;
3343
3344                 env = cl_env_get(&refcheck);
3345                 if (!IS_ERR(env)) {
3346                         /* Reset grants */
3347                         cli = &obd->u.cli;
3348                         /* all pages go to failing rpcs due to the invalid
3349                          * import */
3350                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3351
3352                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3353                         cl_env_put(env, &refcheck);
3354                 } else
3355                         rc = PTR_ERR(env);
3356                 break;
3357         }
3358         case IMP_EVENT_ACTIVE: {
3359                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3360                 break;
3361         }
3362         case IMP_EVENT_OCD: {
3363                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3364
3365                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3366                         osc_init_grant(&obd->u.cli, ocd);
3367
3368                 /* See bug 7198 */
3369                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3370                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3371
3372                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3373                 break;
3374         }
3375         case IMP_EVENT_DEACTIVATE: {
3376                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3377                 break;
3378         }
3379         case IMP_EVENT_ACTIVATE: {
3380                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3381                 break;
3382         }
3383         default:
3384                 CERROR("Unknown import event %d\n", event);
3385                 LBUG();
3386         }
3387         RETURN(rc);
3388 }
3389
3390 /**
3391  * Determine whether the lock can be canceled before replaying the lock
3392  * during recovery, see bug16774 for detailed information.
3393  *
3394  * \retval zero the lock can't be canceled
3395  * \retval other ok to cancel
3396  */
3397 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3398 {
3399         check_res_locked(lock->l_resource);
3400
3401         /*
3402          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3403          *
3404          * XXX as a future improvement, we can also cancel unused write lock
3405          * if it doesn't have dirty data and active mmaps.
3406          */
3407         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3408             (lock->l_granted_mode == LCK_PR ||
3409              lock->l_granted_mode == LCK_CR) &&
3410             (osc_dlm_lock_pageref(lock) == 0))
3411                 RETURN(1);
3412
3413         RETURN(0);
3414 }
3415
3416 static int brw_queue_work(const struct lu_env *env, void *data)
3417 {
3418         struct client_obd *cli = data;
3419
3420         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3421
3422         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3423         RETURN(0);
3424 }
3425
3426 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3427 {
3428         struct lprocfs_static_vars lvars = { 0 };
3429         struct client_obd         *cli = &obd->u.cli;
3430         void                   *handler;
3431         int                     rc;
3432         ENTRY;
3433
3434         rc = ptlrpcd_addref();
3435         if (rc)
3436                 RETURN(rc);
3437
3438         rc = client_obd_setup(obd, lcfg);
3439         if (rc)
3440                 GOTO(out_ptlrpcd, rc);
3441
3442         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3443         if (IS_ERR(handler))
3444                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3445         cli->cl_writeback_work = handler;
3446
3447         rc = osc_quota_setup(obd);
3448         if (rc)
3449                 GOTO(out_ptlrpcd_work, rc);
3450
3451         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3452         lprocfs_osc_init_vars(&lvars);
3453         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3454                 lproc_osc_attach_seqstat(obd);
3455                 sptlrpc_lprocfs_cliobd_attach(obd);
3456                 ptlrpc_lprocfs_register_obd(obd);
3457         }
3458
3459         /* We need to allocate a few requests more, because
3460          * brw_interpret tries to create new requests before freeing
3461          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3462          * reserved, but I'm afraid that might be too much wasted RAM
3463          * in fact, so 2 is just my guess and still should work. */
3464         cli->cl_import->imp_rq_pool =
3465                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3466                                     OST_MAXREQSIZE,
3467                                     ptlrpc_add_rqs_to_pool);
3468
3469         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3470         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3471         RETURN(rc);
3472
3473 out_ptlrpcd_work:
3474         ptlrpcd_destroy_work(handler);
3475 out_client_setup:
3476         client_obd_cleanup(obd);
3477 out_ptlrpcd:
3478         ptlrpcd_decref();
3479         RETURN(rc);
3480 }
3481
3482 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3483 {
3484         int rc = 0;
3485         ENTRY;
3486
3487         switch (stage) {
3488         case OBD_CLEANUP_EARLY: {
3489                 struct obd_import *imp;
3490                 imp = obd->u.cli.cl_import;
3491                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3492                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3493                 ptlrpc_deactivate_import(imp);
3494                 spin_lock(&imp->imp_lock);
3495                 imp->imp_pingable = 0;
3496                 spin_unlock(&imp->imp_lock);
3497                 break;
3498         }
3499         case OBD_CLEANUP_EXPORTS: {
3500                 struct client_obd *cli = &obd->u.cli;
3501                 /* LU-464
3502                  * for echo client, export may be on zombie list, wait for
3503                  * zombie thread to cull it, because cli.cl_import will be
3504                  * cleared in client_disconnect_export():
3505                  *   class_export_destroy() -> obd_cleanup() ->
3506                  *   echo_device_free() -> echo_client_cleanup() ->
3507                  *   obd_disconnect() -> osc_disconnect() ->
3508                  *   client_disconnect_export()
3509                  */
3510                 obd_zombie_barrier();
3511                 if (cli->cl_writeback_work) {
3512                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3513                         cli->cl_writeback_work = NULL;
3514                 }
3515                 obd_cleanup_client_import(obd);
3516                 ptlrpc_lprocfs_unregister_obd(obd);
3517                 lprocfs_obd_cleanup(obd);
3518                 rc = obd_llog_finish(obd, 0);
3519                 if (rc != 0)
3520                         CERROR("failed to cleanup llogging subsystems\n");
3521                 break;
3522                 }
3523         }
3524         RETURN(rc);
3525 }
3526
3527 int osc_cleanup(struct obd_device *obd)
3528 {
3529         struct client_obd *cli = &obd->u.cli;
3530         int rc;
3531
3532         ENTRY;
3533
3534         /* lru cleanup */
3535         if (cli->cl_cache != NULL) {
3536                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3537                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3538                 list_del_init(&cli->cl_lru_osc);
3539                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3540                 cli->cl_lru_left = NULL;
3541                 atomic_dec(&cli->cl_cache->ccc_users);
3542                 cli->cl_cache = NULL;
3543         }
3544
3545         /* free memory of osc quota cache */
3546         osc_quota_cleanup(obd);
3547
3548         rc = client_obd_cleanup(obd);
3549
3550         ptlrpcd_decref();
3551         RETURN(rc);
3552 }
3553
3554 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3555 {
3556         struct lprocfs_static_vars lvars = { 0 };
3557         int rc = 0;
3558
3559         lprocfs_osc_init_vars(&lvars);
3560
3561         switch (lcfg->lcfg_command) {
3562         default:
3563                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3564                                               lcfg, obd);
3565                 if (rc > 0)
3566                         rc = 0;
3567                 break;
3568         }
3569
3570         return(rc);
3571 }
3572
3573 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3574 {
3575         return osc_process_config_base(obd, buf);
3576 }
3577
3578 struct obd_ops osc_obd_ops = {
3579         .o_owner                = THIS_MODULE,
3580         .o_setup                = osc_setup,
3581         .o_precleanup      = osc_precleanup,
3582         .o_cleanup            = osc_cleanup,
3583         .o_add_conn          = client_import_add_conn,
3584         .o_del_conn          = client_import_del_conn,
3585         .o_connect            = client_connect_import,
3586         .o_reconnect        = osc_reconnect,
3587         .o_disconnect      = osc_disconnect,
3588         .o_statfs              = osc_statfs,
3589         .o_statfs_async  = osc_statfs_async,
3590         .o_packmd              = osc_packmd,
3591         .o_unpackmd          = osc_unpackmd,
3592         .o_create              = osc_create,
3593         .o_destroy            = osc_destroy,
3594         .o_getattr            = osc_getattr,
3595         .o_getattr_async        = osc_getattr_async,
3596         .o_setattr            = osc_setattr,
3597         .o_setattr_async        = osc_setattr_async,
3598         .o_brw            = osc_brw,
3599         .o_punch                = osc_punch,
3600         .o_sync          = osc_sync,
3601         .o_enqueue            = osc_enqueue,
3602         .o_change_cbdata        = osc_change_cbdata,
3603         .o_find_cbdata    = osc_find_cbdata,
3604         .o_cancel              = osc_cancel,
3605         .o_cancel_unused        = osc_cancel_unused,
3606         .o_iocontrol        = osc_iocontrol,
3607         .o_get_info          = osc_get_info,
3608         .o_set_info_async       = osc_set_info_async,
3609         .o_import_event  = osc_import_event,
3610         .o_llog_init        = osc_llog_init,
3611         .o_llog_finish    = osc_llog_finish,
3612         .o_process_config       = osc_process_config,
3613         .o_quotactl          = osc_quotactl,
3614         .o_quotacheck      = osc_quotacheck,
3615 };
3616
3617 extern struct lu_kmem_descr osc_caches[];
3618 extern spinlock_t osc_ast_guard;
3619 extern struct lock_class_key osc_ast_guard_class;
3620
3621 int __init osc_init(void)
3622 {
3623         struct lprocfs_static_vars lvars = { 0 };
3624         int rc;
3625         ENTRY;
3626
3627         /* print an address of _any_ initialized kernel symbol from this
3628          * module, to allow debugging with gdb that doesn't support data
3629          * symbols from modules.*/
3630         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3631
3632         rc = lu_kmem_init(osc_caches);
3633
3634         lprocfs_osc_init_vars(&lvars);
3635
3636         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3637                                  LUSTRE_OSC_NAME, &osc_device_type);
3638         if (rc) {
3639                 lu_kmem_fini(osc_caches);
3640                 RETURN(rc);
3641         }
3642
3643         spin_lock_init(&osc_ast_guard);
3644         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3645
3646         RETURN(rc);
3647 }
3648
3649 static void /*__exit*/ osc_exit(void)
3650 {
3651         class_unregister_type(LUSTRE_OSC_NAME);
3652         lu_kmem_fini(osc_caches);
3653 }
3654
3655 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3656 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3657 MODULE_LICENSE("GPL");
3658
3659 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);