]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/lov/lov_obd.c
staging: add Lustre file system client support
[karo-tx-linux.git] / drivers / staging / lustre / lustre / lov / lov_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/lov/lov_obd.c
37  *
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Peter Braam <braam@clusterfs.com>
40  * Author: Mike Shaver <shaver@clusterfs.com>
41  * Author: Nathan Rutman <nathan@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOV
45 #include <linux/libcfs/libcfs.h>
46
47 #include <obd_support.h>
48 #include <lustre_lib.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_idl.h>
51 #include <lustre_dlm.h>
52 #include <lustre_mds.h>
53 #include <lustre_debug.h>
54 #include <obd_class.h>
55 #include <obd_lov.h>
56 #include <obd_ost.h>
57 #include <lprocfs_status.h>
58 #include <lustre_param.h>
59 #include <cl_object.h>
60 #include <lclient.h>
61 #include <lustre/ll_fiemap.h>
62 #include <lustre_log.h>
63 #include <lustre_fid.h>
64
65 #include "lov_internal.h"
66
67 /* Keep a refcount of lov->tgt usage to prevent racing with addition/deletion.
68    Any function that expects lov_tgts to remain stationary must take a ref. */
69 static void lov_getref(struct obd_device *obd)
70 {
71         struct lov_obd *lov = &obd->u.lov;
72
73         /* nobody gets through here until lov_putref is done */
74         mutex_lock(&lov->lov_lock);
75         atomic_inc(&lov->lov_refcount);
76         mutex_unlock(&lov->lov_lock);
77         return;
78 }
79
80 static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt);
81
82 static void lov_putref(struct obd_device *obd)
83 {
84         struct lov_obd *lov = &obd->u.lov;
85
86         mutex_lock(&lov->lov_lock);
87         /* ok to dec to 0 more than once -- ltd_exp's will be null */
88         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
89                 LIST_HEAD(kill);
90                 int i;
91                 struct lov_tgt_desc *tgt, *n;
92                 CDEBUG(D_CONFIG, "destroying %d lov targets\n",
93                        lov->lov_death_row);
94                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
95                         tgt = lov->lov_tgts[i];
96
97                         if (!tgt || !tgt->ltd_reap)
98                                 continue;
99                         list_add(&tgt->ltd_kill, &kill);
100                         /* XXX - right now there is a dependency on ld_tgt_count
101                          * being the maximum tgt index for computing the
102                          * mds_max_easize. So we can't shrink it. */
103                         lov_ost_pool_remove(&lov->lov_packed, i);
104                         lov->lov_tgts[i] = NULL;
105                         lov->lov_death_row--;
106                 }
107                 mutex_unlock(&lov->lov_lock);
108
109                 list_for_each_entry_safe(tgt, n, &kill, ltd_kill) {
110                         list_del(&tgt->ltd_kill);
111                         /* Disconnect */
112                         __lov_del_obd(obd, tgt);
113                 }
114         } else {
115                 mutex_unlock(&lov->lov_lock);
116         }
117 }
118
119 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
120                               enum obd_notify_event ev);
121 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
122                       enum obd_notify_event ev, void *data);
123
124
125 #define MAX_STRING_SIZE 128
126 int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
127                     struct obd_connect_data *data)
128 {
129         struct lov_obd *lov = &obd->u.lov;
130         struct obd_uuid *tgt_uuid;
131         struct obd_device *tgt_obd;
132         static struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
133         struct obd_import *imp;
134         proc_dir_entry_t *lov_proc_dir;
135         int rc;
136         ENTRY;
137
138         if (!lov->lov_tgts[index])
139                 RETURN(-EINVAL);
140
141         tgt_uuid = &lov->lov_tgts[index]->ltd_uuid;
142         tgt_obd = lov->lov_tgts[index]->ltd_obd;
143
144         if (!tgt_obd->obd_set_up) {
145                 CERROR("Target %s not set up\n", obd_uuid2str(tgt_uuid));
146                 RETURN(-EINVAL);
147         }
148
149         /* override the sp_me from lov */
150         tgt_obd->u.cli.cl_sp_me = lov->lov_sp_me;
151
152         if (data && (data->ocd_connect_flags & OBD_CONNECT_INDEX))
153                 data->ocd_index = index;
154
155         /*
156          * Divine LOV knows that OBDs under it are OSCs.
157          */
158         imp = tgt_obd->u.cli.cl_import;
159
160         if (activate) {
161                 tgt_obd->obd_no_recov = 0;
162                 /* FIXME this is probably supposed to be
163                    ptlrpc_set_import_active.  Horrible naming. */
164                 ptlrpc_activate_import(imp);
165         }
166
167         rc = obd_register_observer(tgt_obd, obd);
168         if (rc) {
169                 CERROR("Target %s register_observer error %d\n",
170                        obd_uuid2str(tgt_uuid), rc);
171                 RETURN(rc);
172         }
173
174
175         if (imp->imp_invalid) {
176                 CDEBUG(D_CONFIG, "not connecting OSC %s; administratively "
177                        "disabled\n", obd_uuid2str(tgt_uuid));
178                 RETURN(0);
179         }
180
181         rc = obd_connect(NULL, &lov->lov_tgts[index]->ltd_exp, tgt_obd,
182                          &lov_osc_uuid, data, NULL);
183         if (rc || !lov->lov_tgts[index]->ltd_exp) {
184                 CERROR("Target %s connect error %d\n",
185                        obd_uuid2str(tgt_uuid), rc);
186                 RETURN(-ENODEV);
187         }
188
189         lov->lov_tgts[index]->ltd_reap = 0;
190
191         CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
192                obd_uuid2str(tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
193
194         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
195         if (lov_proc_dir) {
196                 struct obd_device *osc_obd = lov->lov_tgts[index]->ltd_exp->exp_obd;
197                 proc_dir_entry_t *osc_symlink;
198
199                 LASSERT(osc_obd != NULL);
200                 LASSERT(osc_obd->obd_magic == OBD_DEVICE_MAGIC);
201                 LASSERT(osc_obd->obd_type->typ_name != NULL);
202
203                 osc_symlink = lprocfs_add_symlink(osc_obd->obd_name,
204                                                   lov_proc_dir,
205                                                   "../../../%s/%s",
206                                                   osc_obd->obd_type->typ_name,
207                                                   osc_obd->obd_name);
208                 if (osc_symlink == NULL) {
209                         CERROR("could not register LOV target "
210                                 "/proc/fs/lustre/%s/%s/target_obds/%s.",
211                                 obd->obd_type->typ_name, obd->obd_name,
212                                 osc_obd->obd_name);
213                         lprocfs_remove(&lov_proc_dir);
214                 }
215         }
216
217         RETURN(0);
218 }
219
220 static int lov_connect(const struct lu_env *env,
221                        struct obd_export **exp, struct obd_device *obd,
222                        struct obd_uuid *cluuid, struct obd_connect_data *data,
223                        void *localdata)
224 {
225         struct lov_obd *lov = &obd->u.lov;
226         struct lov_tgt_desc *tgt;
227         struct lustre_handle conn;
228         int i, rc;
229         ENTRY;
230
231         CDEBUG(D_CONFIG, "connect #%d\n", lov->lov_connects);
232
233         rc = class_connect(&conn, obd, cluuid);
234         if (rc)
235                 RETURN(rc);
236
237         *exp = class_conn2export(&conn);
238
239         /* Why should there ever be more than 1 connect? */
240         lov->lov_connects++;
241         LASSERT(lov->lov_connects == 1);
242
243         memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
244         if (data)
245                 lov->lov_ocd = *data;
246
247         obd_getref(obd);
248         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
249                 tgt = lov->lov_tgts[i];
250                 if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
251                         continue;
252                 /* Flags will be lowest common denominator */
253                 rc = lov_connect_obd(obd, i, tgt->ltd_activate, &lov->lov_ocd);
254                 if (rc) {
255                         CERROR("%s: lov connect tgt %d failed: %d\n",
256                                obd->obd_name, i, rc);
257                         continue;
258                 }
259                 /* connect to administrative disabled ost */
260                 if (!lov->lov_tgts[i]->ltd_exp)
261                         continue;
262
263                 rc = lov_notify(obd, lov->lov_tgts[i]->ltd_exp->exp_obd,
264                                 OBD_NOTIFY_CONNECT, (void *)&i);
265                 if (rc) {
266                         CERROR("%s error sending notify %d\n",
267                                obd->obd_name, rc);
268                 }
269         }
270         obd_putref(obd);
271
272         RETURN(0);
273 }
274
275 static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
276 {
277         proc_dir_entry_t *lov_proc_dir;
278         struct lov_obd *lov = &obd->u.lov;
279         struct obd_device *osc_obd;
280         int rc;
281         ENTRY;
282
283         osc_obd = class_exp2obd(tgt->ltd_exp);
284         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
285                obd->obd_name, osc_obd->obd_name);
286
287         if (tgt->ltd_active) {
288                 tgt->ltd_active = 0;
289                 lov->desc.ld_active_tgt_count--;
290                 tgt->ltd_exp->exp_obd->obd_inactive = 1;
291         }
292
293         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
294         if (lov_proc_dir) {
295                 proc_dir_entry_t *osc_symlink;
296
297                 osc_symlink = lprocfs_srch(lov_proc_dir, osc_obd->obd_name);
298                 if (osc_symlink) {
299                         lprocfs_remove(&osc_symlink);
300                 } else {
301                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing.",
302                                obd->obd_type->typ_name, obd->obd_name,
303                                osc_obd->obd_name);
304                 }
305         }
306
307         if (osc_obd) {
308                 /* Pass it on to our clients.
309                  * XXX This should be an argument to disconnect,
310                  * XXX not a back-door flag on the OBD.  Ah well.
311                  */
312                 osc_obd->obd_force = obd->obd_force;
313                 osc_obd->obd_fail = obd->obd_fail;
314                 osc_obd->obd_no_recov = obd->obd_no_recov;
315         }
316
317         obd_register_observer(osc_obd, NULL);
318
319         rc = obd_disconnect(tgt->ltd_exp);
320         if (rc) {
321                 CERROR("Target %s disconnect error %d\n",
322                        tgt->ltd_uuid.uuid, rc);
323                 rc = 0;
324         }
325
326         tgt->ltd_exp = NULL;
327         RETURN(0);
328 }
329
330 static int lov_disconnect(struct obd_export *exp)
331 {
332         struct obd_device *obd = class_exp2obd(exp);
333         struct lov_obd *lov = &obd->u.lov;
334         int i, rc;
335         ENTRY;
336
337         if (!lov->lov_tgts)
338                 goto out;
339
340         /* Only disconnect the underlying layers on the final disconnect. */
341         lov->lov_connects--;
342         if (lov->lov_connects != 0) {
343                 /* why should there be more than 1 connect? */
344                 CERROR("disconnect #%d\n", lov->lov_connects);
345                 goto out;
346         }
347
348         /* Let's hold another reference so lov_del_obd doesn't spin through
349            putref every time */
350         obd_getref(obd);
351
352         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
353                 if (lov->lov_tgts[i] && lov->lov_tgts[i]->ltd_exp) {
354                         /* Disconnection is the last we know about an obd */
355                         lov_del_target(obd, i, 0, lov->lov_tgts[i]->ltd_gen);
356                 }
357         }
358         obd_putref(obd);
359
360 out:
361         rc = class_disconnect(exp); /* bz 9811 */
362         RETURN(rc);
363 }
364
365 /* Error codes:
366  *
367  *  -EINVAL  : UUID can't be found in the LOV's target list
368  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
369  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
370  *  any >= 0 : is log target index
371  */
372 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
373                               enum obd_notify_event ev)
374 {
375         struct lov_obd *lov = &obd->u.lov;
376         struct lov_tgt_desc *tgt;
377         int index, activate, active;
378         ENTRY;
379
380         CDEBUG(D_INFO, "Searching in lov %p for uuid %s event(%d)\n",
381                lov, uuid->uuid, ev);
382
383         obd_getref(obd);
384         for (index = 0; index < lov->desc.ld_tgt_count; index++) {
385                 tgt = lov->lov_tgts[index];
386                 if (!tgt)
387                         continue;
388                 /*
389                  * LU-642, initially inactive OSC could miss the obd_connect,
390                  * we make up for it here.
391                  */
392                 if (ev == OBD_NOTIFY_ACTIVATE && tgt->ltd_exp == NULL &&
393                     obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
394                         struct obd_uuid lov_osc_uuid = {"LOV_OSC_UUID"};
395
396                         obd_connect(NULL, &tgt->ltd_exp, tgt->ltd_obd,
397                                     &lov_osc_uuid, &lov->lov_ocd, NULL);
398                 }
399                 if (!tgt->ltd_exp)
400                         continue;
401
402                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
403                        index, obd_uuid2str(&tgt->ltd_uuid),
404                        tgt->ltd_exp->exp_handle.h_cookie);
405                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
406                         break;
407         }
408
409         if (index == lov->desc.ld_tgt_count)
410                 GOTO(out, index = -EINVAL);
411
412         if (ev == OBD_NOTIFY_DEACTIVATE || ev == OBD_NOTIFY_ACTIVATE) {
413                 activate = (ev == OBD_NOTIFY_ACTIVATE) ? 1 : 0;
414
415                 if (lov->lov_tgts[index]->ltd_activate == activate) {
416                         CDEBUG(D_INFO, "OSC %s already %sactivate!\n",
417                                uuid->uuid, activate ? "" : "de");
418                 } else {
419                         lov->lov_tgts[index]->ltd_activate = activate;
420                         CDEBUG(D_CONFIG, "%sactivate OSC %s\n",
421                                activate ? "" : "de", obd_uuid2str(uuid));
422                 }
423
424         } else if (ev == OBD_NOTIFY_INACTIVE || ev == OBD_NOTIFY_ACTIVE) {
425                 active = (ev == OBD_NOTIFY_ACTIVE) ? 1 : 0;
426
427                 if (lov->lov_tgts[index]->ltd_active == active) {
428                         CDEBUG(D_INFO, "OSC %s already %sactive!\n",
429                                uuid->uuid, active ? "" : "in");
430                         GOTO(out, index);
431                 } else {
432                         CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n",
433                                obd_uuid2str(uuid), active ? "" : "in");
434                 }
435
436                 lov->lov_tgts[index]->ltd_active = active;
437                 if (active) {
438                         lov->desc.ld_active_tgt_count++;
439                         lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
440                 } else {
441                         lov->desc.ld_active_tgt_count--;
442                         lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
443                 }
444         } else {
445                 CERROR("Unknown event(%d) for uuid %s", ev, uuid->uuid);
446         }
447
448  out:
449         obd_putref(obd);
450         RETURN(index);
451 }
452
453 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
454                       enum obd_notify_event ev, void *data)
455 {
456         int rc = 0;
457         struct lov_obd *lov = &obd->u.lov;
458         ENTRY;
459
460         down_read(&lov->lov_notify_lock);
461         if (!lov->lov_connects) {
462                 up_read(&lov->lov_notify_lock);
463                 RETURN(rc);
464         }
465
466         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE ||
467             ev == OBD_NOTIFY_ACTIVATE || ev == OBD_NOTIFY_DEACTIVATE) {
468                 struct obd_uuid *uuid;
469
470                 LASSERT(watched);
471
472                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
473                         up_read(&lov->lov_notify_lock);
474                         CERROR("unexpected notification of %s %s!\n",
475                                watched->obd_type->typ_name,
476                                watched->obd_name);
477                         RETURN(-EINVAL);
478                 }
479                 uuid = &watched->u.cli.cl_target_uuid;
480
481                 /* Set OSC as active before notifying the observer, so the
482                  * observer can use the OSC normally.
483                  */
484                 rc = lov_set_osc_active(obd, uuid, ev);
485                 if (rc < 0) {
486                         up_read(&lov->lov_notify_lock);
487                         CERROR("event(%d) of %s failed: %d\n", ev,
488                                obd_uuid2str(uuid), rc);
489                         RETURN(rc);
490                 }
491                 /* active event should be pass lov target index as data */
492                 data = &rc;
493         }
494
495         /* Pass the notification up the chain. */
496         if (watched) {
497                 rc = obd_notify_observer(obd, watched, ev, data);
498         } else {
499                 /* NULL watched means all osc's in the lov (only for syncs) */
500                 /* sync event should be send lov idx as data */
501                 struct lov_obd *lov = &obd->u.lov;
502                 int i, is_sync;
503
504                 data = &i;
505                 is_sync = (ev == OBD_NOTIFY_SYNC) ||
506                           (ev == OBD_NOTIFY_SYNC_NONBLOCK);
507
508                 obd_getref(obd);
509                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
510                         if (!lov->lov_tgts[i])
511                                 continue;
512
513                         /* don't send sync event if target not
514                          * connected/activated */
515                         if (is_sync &&  !lov->lov_tgts[i]->ltd_active)
516                                 continue;
517
518                         rc = obd_notify_observer(obd, lov->lov_tgts[i]->ltd_obd,
519                                                  ev, data);
520                         if (rc) {
521                                 CERROR("%s: notify %s of %s failed %d\n",
522                                        obd->obd_name,
523                                        obd->obd_observer->obd_name,
524                                        lov->lov_tgts[i]->ltd_obd->obd_name,
525                                        rc);
526                         }
527                 }
528                 obd_putref(obd);
529         }
530
531         up_read(&lov->lov_notify_lock);
532         RETURN(rc);
533 }
534
535 static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
536                           __u32 index, int gen, int active)
537 {
538         struct lov_obd *lov = &obd->u.lov;
539         struct lov_tgt_desc *tgt;
540         struct obd_device *tgt_obd;
541         int rc;
542         ENTRY;
543
544         CDEBUG(D_CONFIG, "uuid:%s idx:%d gen:%d active:%d\n",
545                uuidp->uuid, index, gen, active);
546
547         if (gen <= 0) {
548                 CERROR("request to add OBD %s with invalid generation: %d\n",
549                        uuidp->uuid, gen);
550                 RETURN(-EINVAL);
551         }
552
553         tgt_obd = class_find_client_obd(uuidp, LUSTRE_OSC_NAME,
554                                         &obd->obd_uuid);
555         if (tgt_obd == NULL)
556                 RETURN(-EINVAL);
557
558         mutex_lock(&lov->lov_lock);
559
560         if ((index < lov->lov_tgt_size) && (lov->lov_tgts[index] != NULL)) {
561                 tgt = lov->lov_tgts[index];
562                 CERROR("UUID %s already assigned at LOV target index %d\n",
563                        obd_uuid2str(&tgt->ltd_uuid), index);
564                 mutex_unlock(&lov->lov_lock);
565                 RETURN(-EEXIST);
566         }
567
568         if (index >= lov->lov_tgt_size) {
569                 /* We need to reallocate the lov target array. */
570                 struct lov_tgt_desc **newtgts, **old = NULL;
571                 __u32 newsize, oldsize = 0;
572
573                 newsize = max(lov->lov_tgt_size, (__u32)2);
574                 while (newsize < index + 1)
575                         newsize = newsize << 1;
576                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
577                 if (newtgts == NULL) {
578                         mutex_unlock(&lov->lov_lock);
579                         RETURN(-ENOMEM);
580                 }
581
582                 if (lov->lov_tgt_size) {
583                         memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
584                                lov->lov_tgt_size);
585                         old = lov->lov_tgts;
586                         oldsize = lov->lov_tgt_size;
587                 }
588
589                 lov->lov_tgts = newtgts;
590                 lov->lov_tgt_size = newsize;
591                 smp_rmb();
592                 if (old)
593                         OBD_FREE(old, sizeof(*old) * oldsize);
594
595                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
596                        lov->lov_tgts, lov->lov_tgt_size);
597         }
598
599         OBD_ALLOC_PTR(tgt);
600         if (!tgt) {
601                 mutex_unlock(&lov->lov_lock);
602                 RETURN(-ENOMEM);
603         }
604
605         rc = lov_ost_pool_add(&lov->lov_packed, index, lov->lov_tgt_size);
606         if (rc) {
607                 mutex_unlock(&lov->lov_lock);
608                 OBD_FREE_PTR(tgt);
609                 RETURN(rc);
610         }
611
612         tgt->ltd_uuid = *uuidp;
613         tgt->ltd_obd = tgt_obd;
614         /* XXX - add a sanity check on the generation number. */
615         tgt->ltd_gen = gen;
616         tgt->ltd_index = index;
617         tgt->ltd_activate = active;
618         lov->lov_tgts[index] = tgt;
619         if (index >= lov->desc.ld_tgt_count)
620                 lov->desc.ld_tgt_count = index + 1;
621
622         mutex_unlock(&lov->lov_lock);
623
624         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
625                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
626
627         rc = obd_notify(obd, tgt_obd, OBD_NOTIFY_CREATE, &index);
628
629         if (lov->lov_connects == 0) {
630                 /* lov_connect hasn't been called yet. We'll do the
631                    lov_connect_obd on this target when that fn first runs,
632                    because we don't know the connect flags yet. */
633                 RETURN(0);
634         }
635
636         obd_getref(obd);
637
638         rc = lov_connect_obd(obd, index, active, &lov->lov_ocd);
639         if (rc)
640                 GOTO(out, rc);
641
642         /* connect to administrative disabled ost */
643         if (!tgt->ltd_exp)
644                 GOTO(out, rc = 0);
645
646         if (lov->lov_cache != NULL) {
647                 rc = obd_set_info_async(NULL, tgt->ltd_exp,
648                                 sizeof(KEY_CACHE_SET), KEY_CACHE_SET,
649                                 sizeof(struct cl_client_cache), lov->lov_cache,
650                                 NULL);
651                 if (rc < 0)
652                         GOTO(out, rc);
653         }
654
655         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
656                         active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
657                         (void *)&index);
658
659 out:
660         if (rc) {
661                 CERROR("add failed (%d), deleting %s\n", rc,
662                        obd_uuid2str(&tgt->ltd_uuid));
663                 lov_del_target(obd, index, 0, 0);
664         }
665         obd_putref(obd);
666         RETURN(rc);
667 }
668
669 /* Schedule a target for deletion */
670 int lov_del_target(struct obd_device *obd, __u32 index,
671                    struct obd_uuid *uuidp, int gen)
672 {
673         struct lov_obd *lov = &obd->u.lov;
674         int count = lov->desc.ld_tgt_count;
675         int rc = 0;
676         ENTRY;
677
678         if (index >= count) {
679                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
680                        index, count);
681                 RETURN(-EINVAL);
682         }
683
684         /* to make sure there's no ongoing lov_notify() now */
685         down_write(&lov->lov_notify_lock);
686         obd_getref(obd);
687
688         if (!lov->lov_tgts[index]) {
689                 CERROR("LOV target at index %d is not setup.\n", index);
690                 GOTO(out, rc = -EINVAL);
691         }
692
693         if (uuidp && !obd_uuid_equals(uuidp, &lov->lov_tgts[index]->ltd_uuid)) {
694                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
695                        lov_uuid2str(lov, index), index,
696                        obd_uuid2str(uuidp));
697                 GOTO(out, rc = -EINVAL);
698         }
699
700         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
701                lov_uuid2str(lov, index), index,
702                lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
703                lov->lov_tgts[index]->ltd_active);
704
705         lov->lov_tgts[index]->ltd_reap = 1;
706         lov->lov_death_row++;
707         /* we really delete it from obd_putref */
708 out:
709         obd_putref(obd);
710         up_write(&lov->lov_notify_lock);
711
712         RETURN(rc);
713 }
714
715 static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
716 {
717         struct obd_device *osc_obd;
718
719         LASSERT(tgt);
720         LASSERT(tgt->ltd_reap);
721
722         osc_obd = class_exp2obd(tgt->ltd_exp);
723
724         CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
725                tgt->ltd_uuid.uuid,
726                osc_obd ? osc_obd->obd_name : "<no obd>");
727
728         if (tgt->ltd_exp)
729                 lov_disconnect_obd(obd, tgt);
730
731         OBD_FREE_PTR(tgt);
732
733         /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
734            do it ourselves. And we can't do it from lov_cleanup,
735            because we just lost our only reference to it. */
736         if (osc_obd)
737                 class_manual_cleanup(osc_obd);
738 }
739
740 void lov_fix_desc_stripe_size(__u64 *val)
741 {
742         if (*val < LOV_DEFAULT_STRIPE_SIZE) {
743                 LCONSOLE_WARN("Increasing default stripe size to min %u\n",
744                               LOV_DEFAULT_STRIPE_SIZE);
745                 *val = LOV_DEFAULT_STRIPE_SIZE;
746         } else if (*val & (LOV_MIN_STRIPE_SIZE - 1)) {
747                 *val &= ~(LOV_MIN_STRIPE_SIZE - 1);
748                 LCONSOLE_WARN("Changing default stripe size to "LPU64" (a "
749                               "multiple of %u)\n",
750                               *val, LOV_MIN_STRIPE_SIZE);
751         }
752 }
753
754 void lov_fix_desc_stripe_count(__u32 *val)
755 {
756         if (*val == 0)
757                 *val = 1;
758 }
759
760 void lov_fix_desc_pattern(__u32 *val)
761 {
762         /* from lov_setstripe */
763         if ((*val != 0) && (*val != LOV_PATTERN_RAID0)) {
764                 LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val);
765                 *val = 0;
766         }
767 }
768
769 void lov_fix_desc_qos_maxage(__u32 *val)
770 {
771         /* fix qos_maxage */
772         if (*val == 0)
773                 *val = QOS_DEFAULT_MAXAGE;
774 }
775
776 void lov_fix_desc(struct lov_desc *desc)
777 {
778         lov_fix_desc_stripe_size(&desc->ld_default_stripe_size);
779         lov_fix_desc_stripe_count(&desc->ld_default_stripe_count);
780         lov_fix_desc_pattern(&desc->ld_pattern);
781         lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
782 }
783
784 int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
785 {
786         struct lprocfs_static_vars lvars = { 0 };
787         struct lov_desc *desc;
788         struct lov_obd *lov = &obd->u.lov;
789         int rc;
790         ENTRY;
791
792         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
793                 CERROR("LOV setup requires a descriptor\n");
794                 RETURN(-EINVAL);
795         }
796
797         desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
798
799         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
800                 CERROR("descriptor size wrong: %d > %d\n",
801                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
802                 RETURN(-EINVAL);
803         }
804
805         if (desc->ld_magic != LOV_DESC_MAGIC) {
806                 if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
807                             CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
808                                    obd->obd_name, desc);
809                             lustre_swab_lov_desc(desc);
810                 } else {
811                         CERROR("%s: Bad lov desc magic: %#x\n",
812                                obd->obd_name, desc->ld_magic);
813                         RETURN(-EINVAL);
814                 }
815         }
816
817         lov_fix_desc(desc);
818
819         desc->ld_active_tgt_count = 0;
820         lov->desc = *desc;
821         lov->lov_tgt_size = 0;
822
823         mutex_init(&lov->lov_lock);
824         atomic_set(&lov->lov_refcount, 0);
825         lov->lov_sp_me = LUSTRE_SP_CLI;
826
827         init_rwsem(&lov->lov_notify_lock);
828
829         lov->lov_pools_hash_body = cfs_hash_create("POOLS", HASH_POOLS_CUR_BITS,
830                                                    HASH_POOLS_MAX_BITS,
831                                                    HASH_POOLS_BKT_BITS, 0,
832                                                    CFS_HASH_MIN_THETA,
833                                                    CFS_HASH_MAX_THETA,
834                                                    &pool_hash_operations,
835                                                    CFS_HASH_DEFAULT);
836         INIT_LIST_HEAD(&lov->lov_pool_list);
837         lov->lov_pool_count = 0;
838         rc = lov_ost_pool_init(&lov->lov_packed, 0);
839         if (rc)
840                 GOTO(out, rc);
841
842         lprocfs_lov_init_vars(&lvars);
843         lprocfs_obd_setup(obd, lvars.obd_vars);
844 #ifdef LPROCFS
845         {
846                 int rc;
847
848                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
849                                         0444, &lov_proc_target_fops, obd);
850                 if (rc)
851                         CWARN("Error adding the target_obd file\n");
852         }
853 #endif
854         lov->lov_pool_proc_entry = lprocfs_register("pools",
855                                                     obd->obd_proc_entry,
856                                                     NULL, NULL);
857
858         RETURN(0);
859
860 out:
861         return rc;
862 }
863
864 static int lov_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
865 {
866         int rc = 0;
867         struct lov_obd *lov = &obd->u.lov;
868
869         ENTRY;
870
871         switch (stage) {
872         case OBD_CLEANUP_EARLY: {
873                 int i;
874                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
875                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active)
876                                 continue;
877                         obd_precleanup(class_exp2obd(lov->lov_tgts[i]->ltd_exp),
878                                        OBD_CLEANUP_EARLY);
879                 }
880                 break;
881         }
882         case OBD_CLEANUP_EXPORTS:
883                 rc = obd_llog_finish(obd, 0);
884                 if (rc != 0)
885                         CERROR("failed to cleanup llogging subsystems\n");
886                 break;
887         }
888         RETURN(rc);
889 }
890
891 static int lov_cleanup(struct obd_device *obd)
892 {
893         struct lov_obd *lov = &obd->u.lov;
894         struct list_head *pos, *tmp;
895         struct pool_desc *pool;
896         ENTRY;
897
898         list_for_each_safe(pos, tmp, &lov->lov_pool_list) {
899                 pool = list_entry(pos, struct pool_desc, pool_list);
900                 /* free pool structs */
901                 CDEBUG(D_INFO, "delete pool %p\n", pool);
902                 /* In the function below, .hs_keycmp resolves to
903                  * pool_hashkey_keycmp() */
904                 /* coverity[overrun-buffer-val] */
905                 lov_pool_del(obd, pool->pool_name);
906         }
907         cfs_hash_putref(lov->lov_pools_hash_body);
908         lov_ost_pool_free(&lov->lov_packed);
909
910         lprocfs_obd_cleanup(obd);
911         if (lov->lov_tgts) {
912                 int i;
913                 obd_getref(obd);
914                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
915                         if (!lov->lov_tgts[i])
916                                 continue;
917
918                         /* Inactive targets may never have connected */
919                         if (lov->lov_tgts[i]->ltd_active ||
920                             atomic_read(&lov->lov_refcount))
921                             /* We should never get here - these
922                                should have been removed in the
923                              disconnect. */
924                                 CERROR("lov tgt %d not cleaned!"
925                                        " deathrow=%d, lovrc=%d\n",
926                                        i, lov->lov_death_row,
927                                        atomic_read(&lov->lov_refcount));
928                         lov_del_target(obd, i, 0, 0);
929                 }
930                 obd_putref(obd);
931                 OBD_FREE(lov->lov_tgts, sizeof(*lov->lov_tgts) *
932                          lov->lov_tgt_size);
933                 lov->lov_tgt_size = 0;
934         }
935         RETURN(0);
936 }
937
938 int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg,
939                             __u32 *indexp, int *genp)
940 {
941         struct obd_uuid obd_uuid;
942         int cmd;
943         int rc = 0;
944         ENTRY;
945
946         switch(cmd = lcfg->lcfg_command) {
947         case LCFG_LOV_ADD_OBD:
948         case LCFG_LOV_ADD_INA:
949         case LCFG_LOV_DEL_OBD: {
950                 __u32 index;
951                 int gen;
952                 /* lov_modify_tgts add  0:lov_mdsA  1:ost1_UUID  2:0  3:1 */
953                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
954                         GOTO(out, rc = -EINVAL);
955
956                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
957
958                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", indexp) != 1)
959                         GOTO(out, rc = -EINVAL);
960                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", genp) != 1)
961                         GOTO(out, rc = -EINVAL);
962                 index = *indexp;
963                 gen = *genp;
964                 if (cmd == LCFG_LOV_ADD_OBD)
965                         rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
966                 else if (cmd == LCFG_LOV_ADD_INA)
967                         rc = lov_add_target(obd, &obd_uuid, index, gen, 0);
968                 else
969                         rc = lov_del_target(obd, index, &obd_uuid, gen);
970                 GOTO(out, rc);
971         }
972         case LCFG_PARAM: {
973                 struct lprocfs_static_vars lvars = { 0 };
974                 struct lov_desc *desc = &(obd->u.lov.desc);
975
976                 if (!desc)
977                         GOTO(out, rc = -EINVAL);
978
979                 lprocfs_lov_init_vars(&lvars);
980
981                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
982                                               lcfg, obd);
983                 if (rc > 0)
984                         rc = 0;
985                 GOTO(out, rc);
986         }
987         case LCFG_POOL_NEW:
988         case LCFG_POOL_ADD:
989         case LCFG_POOL_DEL:
990         case LCFG_POOL_REM:
991                 GOTO(out, rc);
992
993         default: {
994                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
995                 GOTO(out, rc = -EINVAL);
996
997         }
998         }
999 out:
1000         RETURN(rc);
1001 }
1002
1003 static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
1004                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
1005 {
1006         struct lov_stripe_md *obj_mdp, *lsm;
1007         struct lov_obd *lov = &exp->exp_obd->u.lov;
1008         unsigned ost_idx;
1009         int rc, i;
1010         ENTRY;
1011
1012         LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
1013                 src_oa->o_flags & OBD_FL_RECREATE_OBJS);
1014
1015         OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
1016         if (obj_mdp == NULL)
1017                 RETURN(-ENOMEM);
1018
1019         ost_idx = src_oa->o_nlink;
1020         lsm = *ea;
1021         if (lsm == NULL)
1022                 GOTO(out, rc = -EINVAL);
1023         if (ost_idx >= lov->desc.ld_tgt_count ||
1024             !lov->lov_tgts[ost_idx])
1025                 GOTO(out, rc = -EINVAL);
1026
1027         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1028                 if (lsm->lsm_oinfo[i]->loi_ost_idx == ost_idx) {
1029                         if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) !=
1030                                         ostid_id(&src_oa->o_oi))
1031                                 GOTO(out, rc = -EINVAL);
1032                         break;
1033                 }
1034         }
1035         if (i == lsm->lsm_stripe_count)
1036                 GOTO(out, rc = -EINVAL);
1037
1038         rc = obd_create(NULL, lov->lov_tgts[ost_idx]->ltd_exp,
1039                         src_oa, &obj_mdp, oti);
1040 out:
1041         OBD_FREE(obj_mdp, sizeof(*obj_mdp));
1042         RETURN(rc);
1043 }
1044
1045 /* the LOV expects oa->o_id to be set to the LOV object id */
1046 static int lov_create(const struct lu_env *env, struct obd_export *exp,
1047                       struct obdo *src_oa, struct lov_stripe_md **ea,
1048                       struct obd_trans_info *oti)
1049 {
1050         struct lov_obd *lov;
1051         int rc = 0;
1052         ENTRY;
1053
1054         LASSERT(ea != NULL);
1055         if (exp == NULL)
1056                 RETURN(-EINVAL);
1057
1058         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1059             src_oa->o_flags == OBD_FL_DELORPHAN) {
1060                 /* should be used with LOV anymore */
1061                 LBUG();
1062         }
1063
1064         lov = &exp->exp_obd->u.lov;
1065         if (!lov->desc.ld_active_tgt_count)
1066                 RETURN(-EIO);
1067
1068         obd_getref(exp->exp_obd);
1069         /* Recreate a specific object id at the given OST index */
1070         if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
1071             (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
1072                  rc = lov_recreate(exp, src_oa, ea, oti);
1073         }
1074
1075         obd_putref(exp->exp_obd);
1076         RETURN(rc);
1077 }
1078
1079 #define ASSERT_LSM_MAGIC(lsmp)                                            \
1080 do {                                                                        \
1081         LASSERT((lsmp) != NULL);                                                \
1082         LASSERTF(((lsmp)->lsm_magic == LOV_MAGIC_V1 ||                    \
1083                  (lsmp)->lsm_magic == LOV_MAGIC_V3),                        \
1084                  "%p->lsm_magic=%x\n", (lsmp), (lsmp)->lsm_magic);            \
1085 } while (0)
1086
1087 static int lov_destroy(const struct lu_env *env, struct obd_export *exp,
1088                        struct obdo *oa, struct lov_stripe_md *lsm,
1089                        struct obd_trans_info *oti, struct obd_export *md_exp,
1090                        void *capa)
1091 {
1092         struct lov_request_set *set;
1093         struct obd_info oinfo;
1094         struct lov_request *req;
1095         struct list_head *pos;
1096         struct lov_obd *lov;
1097         int rc = 0, err = 0;
1098         ENTRY;
1099
1100         ASSERT_LSM_MAGIC(lsm);
1101
1102         if (!exp || !exp->exp_obd)
1103                 RETURN(-ENODEV);
1104
1105         if (oa->o_valid & OBD_MD_FLCOOKIE) {
1106                 LASSERT(oti);
1107                 LASSERT(oti->oti_logcookies);
1108         }
1109
1110         lov = &exp->exp_obd->u.lov;
1111         obd_getref(exp->exp_obd);
1112         rc = lov_prep_destroy_set(exp, &oinfo, oa, lsm, oti, &set);
1113         if (rc)
1114                 GOTO(out, rc);
1115
1116         list_for_each (pos, &set->set_list) {
1117                 req = list_entry(pos, struct lov_request, rq_link);
1118
1119                 if (oa->o_valid & OBD_MD_FLCOOKIE)
1120                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1121
1122                 err = obd_destroy(env, lov->lov_tgts[req->rq_idx]->ltd_exp,
1123                                   req->rq_oi.oi_oa, NULL, oti, NULL, capa);
1124                 err = lov_update_common_set(set, req, err);
1125                 if (err) {
1126                         CERROR("%s: destroying objid "DOSTID" subobj "
1127                                DOSTID" on OST idx %d: rc = %d\n",
1128                                exp->exp_obd->obd_name, POSTID(&oa->o_oi),
1129                                POSTID(&req->rq_oi.oi_oa->o_oi),
1130                                req->rq_idx, err);
1131                         if (!rc)
1132                                 rc = err;
1133                 }
1134         }
1135
1136         if (rc == 0) {
1137                 LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
1138                 rc = lsm_op_find(lsm->lsm_magic)->lsm_destroy(lsm, oa, md_exp);
1139         }
1140         err = lov_fini_destroy_set(set);
1141 out:
1142         obd_putref(exp->exp_obd);
1143         RETURN(rc ? rc : err);
1144 }
1145
1146 static int lov_getattr(const struct lu_env *env, struct obd_export *exp,
1147                        struct obd_info *oinfo)
1148 {
1149         struct lov_request_set *set;
1150         struct lov_request *req;
1151         struct list_head *pos;
1152         struct lov_obd *lov;
1153         int err = 0, rc = 0;
1154         ENTRY;
1155
1156         LASSERT(oinfo);
1157         ASSERT_LSM_MAGIC(oinfo->oi_md);
1158
1159         if (!exp || !exp->exp_obd)
1160                 RETURN(-ENODEV);
1161
1162         lov = &exp->exp_obd->u.lov;
1163
1164         rc = lov_prep_getattr_set(exp, oinfo, &set);
1165         if (rc)
1166                 RETURN(rc);
1167
1168         list_for_each (pos, &set->set_list) {
1169                 req = list_entry(pos, struct lov_request, rq_link);
1170
1171                 CDEBUG(D_INFO, "objid "DOSTID"[%d] has subobj "DOSTID" at idx"
1172                        " %u\n", POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
1173                        POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
1174
1175                 rc = obd_getattr(env, lov->lov_tgts[req->rq_idx]->ltd_exp,
1176                                  &req->rq_oi);
1177                 err = lov_update_common_set(set, req, rc);
1178                 if (err) {
1179                         CERROR("%s: getattr objid "DOSTID" subobj "
1180                                DOSTID" on OST idx %d: rc = %d\n",
1181                                exp->exp_obd->obd_name,
1182                                POSTID(&oinfo->oi_oa->o_oi),
1183                                POSTID(&req->rq_oi.oi_oa->o_oi),
1184                                req->rq_idx, err);
1185                         break;
1186                 }
1187         }
1188
1189         rc = lov_fini_getattr_set(set);
1190         if (err)
1191                 rc = err;
1192         RETURN(rc);
1193 }
1194
1195 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
1196                                  void *data, int rc)
1197 {
1198         struct lov_request_set *lovset = (struct lov_request_set *)data;
1199         int err;
1200         ENTRY;
1201
1202         /* don't do attribute merge if this aysnc op failed */
1203         if (rc)
1204                 atomic_set(&lovset->set_completes, 0);
1205         err = lov_fini_getattr_set(lovset);
1206         RETURN(rc ? rc : err);
1207 }
1208
1209 static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
1210                               struct ptlrpc_request_set *rqset)
1211 {
1212         struct lov_request_set *lovset;
1213         struct lov_obd *lov;
1214         struct list_head *pos;
1215         struct lov_request *req;
1216         int rc = 0, err;
1217         ENTRY;
1218
1219         LASSERT(oinfo);
1220         ASSERT_LSM_MAGIC(oinfo->oi_md);
1221
1222         if (!exp || !exp->exp_obd)
1223                 RETURN(-ENODEV);
1224
1225         lov = &exp->exp_obd->u.lov;
1226
1227         rc = lov_prep_getattr_set(exp, oinfo, &lovset);
1228         if (rc)
1229                 RETURN(rc);
1230
1231         CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes\n",
1232                POSTID(&oinfo->oi_md->lsm_oi), oinfo->oi_md->lsm_stripe_count,
1233                oinfo->oi_md->lsm_stripe_size);
1234
1235         list_for_each(pos, &lovset->set_list) {
1236                 req = list_entry(pos, struct lov_request, rq_link);
1237
1238                 CDEBUG(D_INFO, "objid "DOSTID"[%d] has subobj "DOSTID" at idx"
1239                        "%u\n", POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
1240                        POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
1241                 rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1242                                        &req->rq_oi, rqset);
1243                 if (rc) {
1244                         CERROR("%s: getattr objid "DOSTID" subobj"
1245                                DOSTID" on OST idx %d: rc = %d\n",
1246                                exp->exp_obd->obd_name,
1247                                POSTID(&oinfo->oi_oa->o_oi),
1248                                POSTID(&req->rq_oi.oi_oa->o_oi),
1249                                req->rq_idx, rc);
1250                         GOTO(out, rc);
1251                 }
1252         }
1253
1254         if (!list_empty(&rqset->set_requests)) {
1255                 LASSERT(rc == 0);
1256                 LASSERT (rqset->set_interpret == NULL);
1257                 rqset->set_interpret = lov_getattr_interpret;
1258                 rqset->set_arg = (void *)lovset;
1259                 RETURN(rc);
1260         }
1261 out:
1262         if (rc)
1263                 atomic_set(&lovset->set_completes, 0);
1264         err = lov_fini_getattr_set(lovset);
1265         RETURN(rc ? rc : err);
1266 }
1267
1268 static int lov_setattr(const struct lu_env *env, struct obd_export *exp,
1269                        struct obd_info *oinfo, struct obd_trans_info *oti)
1270 {
1271         struct lov_request_set *set;
1272         struct lov_obd *lov;
1273         struct list_head *pos;
1274         struct lov_request *req;
1275         int err = 0, rc = 0;
1276         ENTRY;
1277
1278         LASSERT(oinfo);
1279         ASSERT_LSM_MAGIC(oinfo->oi_md);
1280
1281         if (!exp || !exp->exp_obd)
1282                 RETURN(-ENODEV);
1283
1284         /* for now, we only expect the following updates here */
1285         LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |
1286                                             OBD_MD_FLMODE | OBD_MD_FLATIME |
1287                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
1288                                             OBD_MD_FLFLAGS | OBD_MD_FLSIZE |
1289                                             OBD_MD_FLGROUP | OBD_MD_FLUID |
1290                                             OBD_MD_FLGID | OBD_MD_FLFID |
1291                                             OBD_MD_FLGENER)));
1292         lov = &exp->exp_obd->u.lov;
1293         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1294         if (rc)
1295                 RETURN(rc);
1296
1297         list_for_each (pos, &set->set_list) {
1298                 req = list_entry(pos, struct lov_request, rq_link);
1299
1300                 rc = obd_setattr(env, lov->lov_tgts[req->rq_idx]->ltd_exp,
1301                                  &req->rq_oi, NULL);
1302                 err = lov_update_setattr_set(set, req, rc);
1303                 if (err) {
1304                         CERROR("%s: setattr objid "DOSTID" subobj "
1305                                DOSTID" on OST idx %d: rc = %d\n",
1306                                exp->exp_obd->obd_name,
1307                                POSTID(&set->set_oi->oi_oa->o_oi),
1308                                POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx,
1309                                err);
1310                         if (!rc)
1311                                 rc = err;
1312                 }
1313         }
1314         err = lov_fini_setattr_set(set);
1315         if (!rc)
1316                 rc = err;
1317         RETURN(rc);
1318 }
1319
1320 static int lov_setattr_interpret(struct ptlrpc_request_set *rqset,
1321                                  void *data, int rc)
1322 {
1323         struct lov_request_set *lovset = (struct lov_request_set *)data;
1324         int err;
1325         ENTRY;
1326
1327         if (rc)
1328                 atomic_set(&lovset->set_completes, 0);
1329         err = lov_fini_setattr_set(lovset);
1330         RETURN(rc ? rc : err);
1331 }
1332
1333 /* If @oti is given, the request goes from MDS and responses from OSTs are not
1334    needed. Otherwise, a client is waiting for responses. */
1335 static int lov_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
1336                              struct obd_trans_info *oti,
1337                              struct ptlrpc_request_set *rqset)
1338 {
1339         struct lov_request_set *set;
1340         struct lov_request *req;
1341         struct list_head *pos;
1342         struct lov_obd *lov;
1343         int rc = 0;
1344         ENTRY;
1345
1346         LASSERT(oinfo);
1347         ASSERT_LSM_MAGIC(oinfo->oi_md);
1348         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
1349                 LASSERT(oti);
1350                 LASSERT(oti->oti_logcookies);
1351         }
1352
1353         if (!exp || !exp->exp_obd)
1354                 RETURN(-ENODEV);
1355
1356         lov = &exp->exp_obd->u.lov;
1357         rc = lov_prep_setattr_set(exp, oinfo, oti, &set);
1358         if (rc)
1359                 RETURN(rc);
1360
1361         CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes\n",
1362                POSTID(&oinfo->oi_md->lsm_oi),
1363                oinfo->oi_md->lsm_stripe_count,
1364                oinfo->oi_md->lsm_stripe_size);
1365
1366         list_for_each(pos, &set->set_list) {
1367                 req = list_entry(pos, struct lov_request, rq_link);
1368
1369                 if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1370                         oti->oti_logcookies = set->set_cookies + req->rq_stripe;
1371
1372                 CDEBUG(D_INFO, "objid "DOSTID"[%d] has subobj "DOSTID" at idx"
1373                        "%u\n", POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
1374                        POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
1375
1376                 rc = obd_setattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1377                                        &req->rq_oi, oti, rqset);
1378                 if (rc) {
1379                         CERROR("error: setattr objid "DOSTID" subobj"
1380                                DOSTID" on OST idx %d: rc = %d\n",
1381                                POSTID(&set->set_oi->oi_oa->o_oi),
1382                                POSTID(&req->rq_oi.oi_oa->o_oi),
1383                                req->rq_idx, rc);
1384                         break;
1385                 }
1386         }
1387
1388         /* If we are not waiting for responses on async requests, return. */
1389         if (rc || !rqset || list_empty(&rqset->set_requests)) {
1390                 int err;
1391                 if (rc)
1392                         atomic_set(&set->set_completes, 0);
1393                 err = lov_fini_setattr_set(set);
1394                 RETURN(rc ? rc : err);
1395         }
1396
1397         LASSERT(rqset->set_interpret == NULL);
1398         rqset->set_interpret = lov_setattr_interpret;
1399         rqset->set_arg = (void *)set;
1400
1401         RETURN(0);
1402 }
1403
1404 static int lov_punch_interpret(struct ptlrpc_request_set *rqset,
1405                                void *data, int rc)
1406 {
1407         struct lov_request_set *lovset = (struct lov_request_set *)data;
1408         int err;
1409         ENTRY;
1410
1411         if (rc)
1412                 atomic_set(&lovset->set_completes, 0);
1413         err = lov_fini_punch_set(lovset);
1414         RETURN(rc ? rc : err);
1415 }
1416
1417 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1418  * we can send this 'punch' to just the authoritative node and the nodes
1419  * that the punch will affect. */
1420 static int lov_punch(const struct lu_env *env, struct obd_export *exp,
1421                      struct obd_info *oinfo, struct obd_trans_info *oti,
1422                      struct ptlrpc_request_set *rqset)
1423 {
1424         struct lov_request_set *set;
1425         struct lov_obd *lov;
1426         struct list_head *pos;
1427         struct lov_request *req;
1428         int rc = 0;
1429         ENTRY;
1430
1431         LASSERT(oinfo);
1432         ASSERT_LSM_MAGIC(oinfo->oi_md);
1433
1434         if (!exp || !exp->exp_obd)
1435                 RETURN(-ENODEV);
1436
1437         lov = &exp->exp_obd->u.lov;
1438         rc = lov_prep_punch_set(exp, oinfo, oti, &set);
1439         if (rc)
1440                 RETURN(rc);
1441
1442         list_for_each (pos, &set->set_list) {
1443                 req = list_entry(pos, struct lov_request, rq_link);
1444
1445                 rc = obd_punch(env, lov->lov_tgts[req->rq_idx]->ltd_exp,
1446                                &req->rq_oi, NULL, rqset);
1447                 if (rc) {
1448                         CERROR("%s: punch objid "DOSTID" subobj "DOSTID
1449                                " on OST idx %d: rc = %d\n",
1450                                exp->exp_obd->obd_name,
1451                                POSTID(&set->set_oi->oi_oa->o_oi),
1452                                POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx, rc);
1453                         break;
1454                 }
1455         }
1456
1457         if (rc || list_empty(&rqset->set_requests)) {
1458                 int err;
1459                 err = lov_fini_punch_set(set);
1460                 RETURN(rc ? rc : err);
1461         }
1462
1463         LASSERT(rqset->set_interpret == NULL);
1464         rqset->set_interpret = lov_punch_interpret;
1465         rqset->set_arg = (void *)set;
1466
1467         RETURN(0);
1468 }
1469
1470 static int lov_sync_interpret(struct ptlrpc_request_set *rqset,
1471                               void *data, int rc)
1472 {
1473         struct lov_request_set *lovset = data;
1474         int err;
1475         ENTRY;
1476
1477         if (rc)
1478                 atomic_set(&lovset->set_completes, 0);
1479         err = lov_fini_sync_set(lovset);
1480         RETURN(rc ?: err);
1481 }
1482
1483 static int lov_sync(const struct lu_env *env, struct obd_export *exp,
1484                     struct obd_info *oinfo, obd_off start, obd_off end,
1485                     struct ptlrpc_request_set *rqset)
1486 {
1487         struct lov_request_set *set = NULL;
1488         struct lov_obd *lov;
1489         struct list_head *pos;
1490         struct lov_request *req;
1491         int rc = 0;
1492         ENTRY;
1493
1494         ASSERT_LSM_MAGIC(oinfo->oi_md);
1495         LASSERT(rqset != NULL);
1496
1497         if (!exp->exp_obd)
1498                 RETURN(-ENODEV);
1499
1500         lov = &exp->exp_obd->u.lov;
1501         rc = lov_prep_sync_set(exp, oinfo, start, end, &set);
1502         if (rc)
1503                 RETURN(rc);
1504
1505         CDEBUG(D_INFO, "fsync objid "DOSTID" ["LPX64", "LPX64"]\n",
1506                POSTID(&set->set_oi->oi_oa->o_oi), start, end);
1507
1508         list_for_each (pos, &set->set_list) {
1509                 req = list_entry(pos, struct lov_request, rq_link);
1510
1511                 rc = obd_sync(env, lov->lov_tgts[req->rq_idx]->ltd_exp,
1512                               &req->rq_oi, req->rq_oi.oi_policy.l_extent.start,
1513                               req->rq_oi.oi_policy.l_extent.end, rqset);
1514                 if (rc) {
1515                         CERROR("%s: fsync objid "DOSTID" subobj "DOSTID
1516                                " on OST idx %d: rc = %d\n",
1517                                exp->exp_obd->obd_name,
1518                                POSTID(&set->set_oi->oi_oa->o_oi),
1519                                POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx,
1520                                rc);
1521                         break;
1522                 }
1523         }
1524
1525         /* If we are not waiting for responses on async requests, return. */
1526         if (rc || list_empty(&rqset->set_requests)) {
1527                 int err = lov_fini_sync_set(set);
1528
1529                 RETURN(rc ?: err);
1530         }
1531
1532         LASSERT(rqset->set_interpret == NULL);
1533         rqset->set_interpret = lov_sync_interpret;
1534         rqset->set_arg = (void *)set;
1535
1536         RETURN(0);
1537 }
1538
1539 static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
1540                          obd_count oa_bufs, struct brw_page *pga)
1541 {
1542         struct obd_info oinfo = { { { 0 } } };
1543         int i, rc = 0;
1544
1545         oinfo.oi_oa = lov_oinfo->oi_oa;
1546
1547         /* The caller just wants to know if there's a chance that this
1548          * I/O can succeed */
1549         for (i = 0; i < oa_bufs; i++) {
1550                 int stripe = lov_stripe_number(lov_oinfo->oi_md, pga[i].off);
1551                 int ost = lov_oinfo->oi_md->lsm_oinfo[stripe]->loi_ost_idx;
1552                 obd_off start, end;
1553
1554                 if (!lov_stripe_intersects(lov_oinfo->oi_md, i, pga[i].off,
1555                                            pga[i].off + pga[i].count - 1,
1556                                            &start, &end))
1557                         continue;
1558
1559                 if (!lov->lov_tgts[ost] || !lov->lov_tgts[ost]->ltd_active) {
1560                         CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1561                         return -EIO;
1562                 }
1563
1564                 rc = obd_brw(OBD_BRW_CHECK, lov->lov_tgts[ost]->ltd_exp, &oinfo,
1565                              1, &pga[i], NULL);
1566                 if (rc)
1567                         break;
1568         }
1569         return rc;
1570 }
1571
1572 static int lov_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1573                    obd_count oa_bufs, struct brw_page *pga,
1574                    struct obd_trans_info *oti)
1575 {
1576         struct lov_request_set *set;
1577         struct lov_request *req;
1578         struct list_head *pos;
1579         struct lov_obd *lov = &exp->exp_obd->u.lov;
1580         int err, rc = 0;
1581         ENTRY;
1582
1583         ASSERT_LSM_MAGIC(oinfo->oi_md);
1584
1585         if (cmd == OBD_BRW_CHECK) {
1586                 rc = lov_brw_check(lov, oinfo, oa_bufs, pga);
1587                 RETURN(rc);
1588         }
1589
1590         rc = lov_prep_brw_set(exp, oinfo, oa_bufs, pga, oti, &set);
1591         if (rc)
1592                 RETURN(rc);
1593
1594         list_for_each (pos, &set->set_list) {
1595                 struct obd_export *sub_exp;
1596                 struct brw_page *sub_pga;
1597                 req = list_entry(pos, struct lov_request, rq_link);
1598
1599                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
1600                 sub_pga = set->set_pga + req->rq_pgaidx;
1601                 rc = obd_brw(cmd, sub_exp, &req->rq_oi, req->rq_oabufs,
1602                              sub_pga, oti);
1603                 if (rc)
1604                         break;
1605                 lov_update_common_set(set, req, rc);
1606         }
1607
1608         err = lov_fini_brw_set(set);
1609         if (!rc)
1610                 rc = err;
1611         RETURN(rc);
1612 }
1613
1614 static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset,
1615                                  void *data, int rc)
1616 {
1617         struct lov_request_set *lovset = (struct lov_request_set *)data;
1618         ENTRY;
1619         rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset);
1620         RETURN(rc);
1621 }
1622
1623 static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
1624                        struct ldlm_enqueue_info *einfo,
1625                        struct ptlrpc_request_set *rqset)
1626 {
1627         ldlm_mode_t mode = einfo->ei_mode;
1628         struct lov_request_set *set;
1629         struct lov_request *req;
1630         struct list_head *pos;
1631         struct lov_obd *lov;
1632         ldlm_error_t rc;
1633         ENTRY;
1634
1635         LASSERT(oinfo);
1636         ASSERT_LSM_MAGIC(oinfo->oi_md);
1637         LASSERT(mode == (mode & -mode));
1638
1639         /* we should never be asked to replay a lock this way. */
1640         LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
1641
1642         if (!exp || !exp->exp_obd)
1643                 RETURN(-ENODEV);
1644
1645         lov = &exp->exp_obd->u.lov;
1646         rc = lov_prep_enqueue_set(exp, oinfo, einfo, &set);
1647         if (rc)
1648                 RETURN(rc);
1649
1650         list_for_each (pos, &set->set_list) {
1651                 req = list_entry(pos, struct lov_request, rq_link);
1652
1653                 rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp,
1654                                  &req->rq_oi, einfo, rqset);
1655                 if (rc != ELDLM_OK)
1656                         GOTO(out, rc);
1657         }
1658
1659         if (rqset && !list_empty(&rqset->set_requests)) {
1660                 LASSERT(rc == 0);
1661                 LASSERT(rqset->set_interpret == NULL);
1662                 rqset->set_interpret = lov_enqueue_interpret;
1663                 rqset->set_arg = (void *)set;
1664                 RETURN(rc);
1665         }
1666 out:
1667         rc = lov_fini_enqueue_set(set, mode, rc, rqset);
1668         RETURN(rc);
1669 }
1670
1671 static int lov_change_cbdata(struct obd_export *exp,
1672                              struct lov_stripe_md *lsm, ldlm_iterator_t it,
1673                              void *data)
1674 {
1675         struct lov_obd *lov;
1676         int rc = 0, i;
1677         ENTRY;
1678
1679         ASSERT_LSM_MAGIC(lsm);
1680
1681         if (!exp || !exp->exp_obd)
1682                 RETURN(-ENODEV);
1683
1684         lov = &exp->exp_obd->u.lov;
1685         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1686                 struct lov_stripe_md submd;
1687                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1688
1689                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
1690                         CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
1691                         continue;
1692                 }
1693
1694                 submd.lsm_oi = loi->loi_oi;
1695                 submd.lsm_stripe_count = 0;
1696                 rc = obd_change_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1697                                        &submd, it, data);
1698         }
1699         RETURN(rc);
1700 }
1701
1702 /* find any ldlm lock of the inode in lov
1703  * return 0    not find
1704  *      1    find one
1705  *      < 0    error */
1706 static int lov_find_cbdata(struct obd_export *exp,
1707                            struct lov_stripe_md *lsm, ldlm_iterator_t it,
1708                            void *data)
1709 {
1710         struct lov_obd *lov;
1711         int rc = 0, i;
1712         ENTRY;
1713
1714         ASSERT_LSM_MAGIC(lsm);
1715
1716         if (!exp || !exp->exp_obd)
1717                 RETURN(-ENODEV);
1718
1719         lov = &exp->exp_obd->u.lov;
1720         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1721                 struct lov_stripe_md submd;
1722                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1723
1724                 if (!lov->lov_tgts[loi->loi_ost_idx]) {
1725                         CDEBUG(D_HA, "lov idx %d NULL \n", loi->loi_ost_idx);
1726                         continue;
1727                 }
1728                 submd.lsm_oi = loi->loi_oi;
1729                 submd.lsm_stripe_count = 0;
1730                 rc = obd_find_cbdata(lov->lov_tgts[loi->loi_ost_idx]->ltd_exp,
1731                                      &submd, it, data);
1732                 if (rc != 0)
1733                         RETURN(rc);
1734         }
1735         RETURN(rc);
1736 }
1737
1738 static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
1739                       __u32 mode, struct lustre_handle *lockh)
1740 {
1741         struct lov_request_set *set;
1742         struct obd_info oinfo;
1743         struct lov_request *req;
1744         struct list_head *pos;
1745         struct lov_obd *lov;
1746         struct lustre_handle *lov_lockhp;
1747         int err = 0, rc = 0;
1748         ENTRY;
1749
1750         ASSERT_LSM_MAGIC(lsm);
1751
1752         if (!exp || !exp->exp_obd)
1753                 RETURN(-ENODEV);
1754
1755         LASSERT(lockh);
1756         lov = &exp->exp_obd->u.lov;
1757         rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
1758         if (rc)
1759                 RETURN(rc);
1760
1761         list_for_each(pos, &set->set_list) {
1762                 req = list_entry(pos, struct lov_request, rq_link);
1763                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
1764
1765                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
1766                                 req->rq_oi.oi_md, mode, lov_lockhp);
1767                 rc = lov_update_common_set(set, req, rc);
1768                 if (rc) {
1769                         CERROR("%s: cancel objid "DOSTID" subobj "
1770                                DOSTID" on OST idx %d: rc = %d\n",
1771                                exp->exp_obd->obd_name, POSTID(&lsm->lsm_oi),
1772                                POSTID(&req->rq_oi.oi_md->lsm_oi),
1773                                req->rq_idx, rc);
1774                         err = rc;
1775                 }
1776
1777         }
1778         lov_fini_cancel_set(set);
1779         RETURN(err);
1780 }
1781
1782 static int lov_cancel_unused(struct obd_export *exp,
1783                              struct lov_stripe_md *lsm,
1784                              ldlm_cancel_flags_t flags, void *opaque)
1785 {
1786         struct lov_obd *lov;
1787         int rc = 0, i;
1788         ENTRY;
1789
1790         if (!exp || !exp->exp_obd)
1791                 RETURN(-ENODEV);
1792
1793         lov = &exp->exp_obd->u.lov;
1794         if (lsm == NULL) {
1795                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1796                         int err;
1797                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
1798                                 continue;
1799
1800                         err = obd_cancel_unused(lov->lov_tgts[i]->ltd_exp, NULL,
1801                                                 flags, opaque);
1802                         if (!rc)
1803                                 rc = err;
1804                 }
1805                 RETURN(rc);
1806         }
1807
1808         ASSERT_LSM_MAGIC(lsm);
1809
1810         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1811                 struct lov_stripe_md submd;
1812                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1813                 int idx = loi->loi_ost_idx;
1814                 int err;
1815
1816                 if (!lov->lov_tgts[idx]) {
1817                         CDEBUG(D_HA, "lov idx %d NULL\n", idx);
1818                         continue;
1819                 }
1820
1821                 if (!lov->lov_tgts[idx]->ltd_active)
1822                         CDEBUG(D_HA, "lov idx %d inactive\n", idx);
1823
1824                 submd.lsm_oi = loi->loi_oi;
1825                 submd.lsm_stripe_count = 0;
1826                 err = obd_cancel_unused(lov->lov_tgts[idx]->ltd_exp,
1827                                         &submd, flags, opaque);
1828                 if (err && lov->lov_tgts[idx]->ltd_active) {
1829                         CERROR("%s: cancel unused objid "DOSTID
1830                                " subobj "DOSTID" on OST idx %d: rc = %d\n",
1831                                exp->exp_obd->obd_name, POSTID(&lsm->lsm_oi),
1832                                POSTID(&loi->loi_oi), idx, err);
1833                         if (!rc)
1834                                 rc = err;
1835                 }
1836         }
1837         RETURN(rc);
1838 }
1839
1840 int lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
1841 {
1842         struct lov_request_set *lovset = (struct lov_request_set *)data;
1843         int err;
1844         ENTRY;
1845
1846         if (rc)
1847                 atomic_set(&lovset->set_completes, 0);
1848
1849         err = lov_fini_statfs_set(lovset);
1850         RETURN(rc ? rc : err);
1851 }
1852
1853 static int lov_statfs_async(struct obd_export *exp, struct obd_info *oinfo,
1854                             __u64 max_age, struct ptlrpc_request_set *rqset)
1855 {
1856         struct obd_device      *obd = class_exp2obd(exp);
1857         struct lov_request_set *set;
1858         struct lov_request *req;
1859         struct list_head *pos;
1860         struct lov_obd *lov;
1861         int rc = 0;
1862         ENTRY;
1863
1864         LASSERT(oinfo != NULL);
1865         LASSERT(oinfo->oi_osfs != NULL);
1866
1867         lov = &obd->u.lov;
1868         rc = lov_prep_statfs_set(obd, oinfo, &set);
1869         if (rc)
1870                 RETURN(rc);
1871
1872         list_for_each (pos, &set->set_list) {
1873                 req = list_entry(pos, struct lov_request, rq_link);
1874                 rc = obd_statfs_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
1875                                       &req->rq_oi, max_age, rqset);
1876                 if (rc)
1877                         break;
1878         }
1879
1880         if (rc || list_empty(&rqset->set_requests)) {
1881                 int err;
1882                 if (rc)
1883                         atomic_set(&set->set_completes, 0);
1884                 err = lov_fini_statfs_set(set);
1885                 RETURN(rc ? rc : err);
1886         }
1887
1888         LASSERT(rqset->set_interpret == NULL);
1889         rqset->set_interpret = lov_statfs_interpret;
1890         rqset->set_arg = (void *)set;
1891         RETURN(0);
1892 }
1893
1894 static int lov_statfs(const struct lu_env *env, struct obd_export *exp,
1895                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1896 {
1897         struct ptlrpc_request_set *set = NULL;
1898         struct obd_info oinfo = { { { 0 } } };
1899         int rc = 0;
1900         ENTRY;
1901
1902
1903         /* for obdclass we forbid using obd_statfs_rqset, but prefer using async
1904          * statfs requests */
1905         set = ptlrpc_prep_set();
1906         if (set == NULL)
1907                 RETURN(-ENOMEM);
1908
1909         oinfo.oi_osfs = osfs;
1910         oinfo.oi_flags = flags;
1911         rc = lov_statfs_async(exp, &oinfo, max_age, set);
1912         if (rc == 0)
1913                 rc = ptlrpc_set_wait(set);
1914         ptlrpc_set_destroy(set);
1915
1916         RETURN(rc);
1917 }
1918
1919 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1920                          void *karg, void *uarg)
1921 {
1922         struct obd_device *obddev = class_exp2obd(exp);
1923         struct lov_obd *lov = &obddev->u.lov;
1924         int i = 0, rc = 0, count = lov->desc.ld_tgt_count;
1925         struct obd_uuid *uuidp;
1926         ENTRY;
1927
1928         switch (cmd) {
1929         case IOC_OBD_STATFS: {
1930                 struct obd_ioctl_data *data = karg;
1931                 struct obd_device *osc_obd;
1932                 struct obd_statfs stat_buf = {0};
1933                 __u32 index;
1934                 __u32 flags;
1935
1936                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
1937                 if ((index >= count))
1938                         RETURN(-ENODEV);
1939
1940                 if (!lov->lov_tgts[index])
1941                         /* Try again with the next index */
1942                         RETURN(-EAGAIN);
1943                 if (!lov->lov_tgts[index]->ltd_active)
1944                         RETURN(-ENODATA);
1945
1946                 osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
1947                 if (!osc_obd)
1948                         RETURN(-EINVAL);
1949
1950                 /* copy UUID */
1951                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(osc_obd),
1952                                      min((int) data->ioc_plen2,
1953                                          (int) sizeof(struct obd_uuid))))
1954                         RETURN(-EFAULT);
1955
1956                 flags = uarg ? *(__u32*)uarg : 0;
1957                 /* got statfs data */
1958                 rc = obd_statfs(NULL, lov->lov_tgts[index]->ltd_exp, &stat_buf,
1959                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1960                                 flags);
1961                 if (rc)
1962                         RETURN(rc);
1963                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1964                                      min((int) data->ioc_plen1,
1965                                          (int) sizeof(stat_buf))))
1966                         RETURN(-EFAULT);
1967                 break;
1968         }
1969         case OBD_IOC_LOV_GET_CONFIG: {
1970                 struct obd_ioctl_data *data;
1971                 struct lov_desc *desc;
1972                 char *buf = NULL;
1973                 __u32 *genp;
1974
1975                 len = 0;
1976                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1977                         RETURN(-EINVAL);
1978
1979                 data = (struct obd_ioctl_data *)buf;
1980
1981                 if (sizeof(*desc) > data->ioc_inllen1) {
1982                         obd_ioctl_freedata(buf, len);
1983                         RETURN(-EINVAL);
1984                 }
1985
1986                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1987                         obd_ioctl_freedata(buf, len);
1988                         RETURN(-EINVAL);
1989                 }
1990
1991                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1992                         obd_ioctl_freedata(buf, len);
1993                         RETURN(-EINVAL);
1994                 }
1995
1996                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1997                 memcpy(desc, &(lov->desc), sizeof(*desc));
1998
1999                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2000                 genp = (__u32 *)data->ioc_inlbuf3;
2001                 /* the uuid will be empty for deleted OSTs */
2002                 for (i = 0; i < count; i++, uuidp++, genp++) {
2003                         if (!lov->lov_tgts[i])
2004                                 continue;
2005                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
2006                         *genp = lov->lov_tgts[i]->ltd_gen;
2007                 }
2008
2009                 if (copy_to_user((void *)uarg, buf, len))
2010                         rc = -EFAULT;
2011                 obd_ioctl_freedata(buf, len);
2012                 break;
2013         }
2014         case LL_IOC_LOV_SETSTRIPE:
2015                 rc = lov_setstripe(exp, len, karg, uarg);
2016                 break;
2017         case LL_IOC_LOV_GETSTRIPE:
2018                 rc = lov_getstripe(exp, karg, uarg);
2019                 break;
2020         case LL_IOC_LOV_SETEA:
2021                 rc = lov_setea(exp, karg, uarg);
2022                 break;
2023         case OBD_IOC_QUOTACTL: {
2024                 struct if_quotactl *qctl = karg;
2025                 struct lov_tgt_desc *tgt = NULL;
2026                 struct obd_quotactl *oqctl;
2027
2028                 if (qctl->qc_valid == QC_OSTIDX) {
2029                         if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
2030                                 RETURN(-EINVAL);
2031
2032                         tgt = lov->lov_tgts[qctl->qc_idx];
2033                         if (!tgt || !tgt->ltd_exp)
2034                                 RETURN(-EINVAL);
2035                 } else if (qctl->qc_valid == QC_UUID) {
2036                         for (i = 0; i < count; i++) {
2037                                 tgt = lov->lov_tgts[i];
2038                                 if (!tgt ||
2039                                     !obd_uuid_equals(&tgt->ltd_uuid,
2040                                                      &qctl->obd_uuid))
2041                                         continue;
2042
2043                                 if (tgt->ltd_exp == NULL)
2044                                         RETURN(-EINVAL);
2045
2046                                 break;
2047                         }
2048                 } else {
2049                         RETURN(-EINVAL);
2050                 }
2051
2052                 if (i >= count)
2053                         RETURN(-EAGAIN);
2054
2055                 LASSERT(tgt && tgt->ltd_exp);
2056                 OBD_ALLOC_PTR(oqctl);
2057                 if (!oqctl)
2058                         RETURN(-ENOMEM);
2059
2060                 QCTL_COPY(oqctl, qctl);
2061                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
2062                 if (rc == 0) {
2063                         QCTL_COPY(qctl, oqctl);
2064                         qctl->qc_valid = QC_OSTIDX;
2065                         qctl->obd_uuid = tgt->ltd_uuid;
2066                 }
2067                 OBD_FREE_PTR(oqctl);
2068                 break;
2069         }
2070         default: {
2071                 int set = 0;
2072
2073                 if (count == 0)
2074                         RETURN(-ENOTTY);
2075
2076                 for (i = 0; i < count; i++) {
2077                         int err;
2078                         struct obd_device *osc_obd;
2079
2080                         /* OST was disconnected */
2081                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
2082                                 continue;
2083
2084                         /* ll_umount_begin() sets force flag but for lov, not
2085                          * osc. Let's pass it through */
2086                         osc_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
2087                         osc_obd->obd_force = obddev->obd_force;
2088                         err = obd_iocontrol(cmd, lov->lov_tgts[i]->ltd_exp,
2089                                             len, karg, uarg);
2090                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
2091                                 RETURN(err);
2092                         } else if (err) {
2093                                 if (lov->lov_tgts[i]->ltd_active) {
2094                                         CDEBUG(err == -ENOTTY ?
2095                                                D_IOCTL : D_WARNING,
2096                                                "iocontrol OSC %s on OST "
2097                                                "idx %d cmd %x: err = %d\n",
2098                                                lov_uuid2str(lov, i),
2099                                                i, cmd, err);
2100                                         if (!rc)
2101                                                 rc = err;
2102                                 }
2103                         } else {
2104                                 set = 1;
2105                         }
2106                 }
2107                 if (!set && !rc)
2108                         rc = -EIO;
2109         }
2110         }
2111
2112         RETURN(rc);
2113 }
2114
2115 #define FIEMAP_BUFFER_SIZE 4096
2116
2117 /**
2118  * Non-zero fe_logical indicates that this is a continuation FIEMAP
2119  * call. The local end offset and the device are sent in the first
2120  * fm_extent. This function calculates the stripe number from the index.
2121  * This function returns a stripe_no on which mapping is to be restarted.
2122  *
2123  * This function returns fm_end_offset which is the in-OST offset at which
2124  * mapping should be restarted. If fm_end_offset=0 is returned then caller
2125  * will re-calculate proper offset in next stripe.
2126  * Note that the first extent is passed to lov_get_info via the value field.
2127  *
2128  * \param fiemap fiemap request header
2129  * \param lsm striping information for the file
2130  * \param fm_start logical start of mapping
2131  * \param fm_end logical end of mapping
2132  * \param start_stripe starting stripe will be returned in this
2133  */
2134 obd_size fiemap_calc_fm_end_offset(struct ll_user_fiemap *fiemap,
2135                                    struct lov_stripe_md *lsm, obd_size fm_start,
2136                                    obd_size fm_end, int *start_stripe)
2137 {
2138         obd_size local_end = fiemap->fm_extents[0].fe_logical;
2139         obd_off lun_start, lun_end;
2140         obd_size fm_end_offset;
2141         int stripe_no = -1, i;
2142
2143         if (fiemap->fm_extent_count == 0 ||
2144             fiemap->fm_extents[0].fe_logical == 0)
2145                 return 0;
2146
2147         /* Find out stripe_no from ost_index saved in the fe_device */
2148         for (i = 0; i < lsm->lsm_stripe_count; i++) {
2149                 if (lsm->lsm_oinfo[i]->loi_ost_idx ==
2150                                         fiemap->fm_extents[0].fe_device) {
2151                         stripe_no = i;
2152                         break;
2153                 }
2154         }
2155         if (stripe_no == -1)
2156                 return -EINVAL;
2157
2158         /* If we have finished mapping on previous device, shift logical
2159          * offset to start of next device */
2160         if ((lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
2161                                    &lun_start, &lun_end)) != 0 &&
2162                                    local_end < lun_end) {
2163                 fm_end_offset = local_end;
2164                 *start_stripe = stripe_no;
2165         } else {
2166                 /* This is a special value to indicate that caller should
2167                  * calculate offset in next stripe. */
2168                 fm_end_offset = 0;
2169                 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
2170         }
2171
2172         return fm_end_offset;
2173 }
2174
2175 /**
2176  * We calculate on which OST the mapping will end. If the length of mapping
2177  * is greater than (stripe_size * stripe_count) then the last_stripe will
2178  * will be one just before start_stripe. Else we check if the mapping
2179  * intersects each OST and find last_stripe.
2180  * This function returns the last_stripe and also sets the stripe_count
2181  * over which the mapping is spread
2182  *
2183  * \param lsm striping information for the file
2184  * \param fm_start logical start of mapping
2185  * \param fm_end logical end of mapping
2186  * \param start_stripe starting stripe of the mapping
2187  * \param stripe_count the number of stripes across which to map is returned
2188  *
2189  * \retval last_stripe return the last stripe of the mapping
2190  */
2191 int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, obd_size fm_start,
2192                             obd_size fm_end, int start_stripe,
2193                             int *stripe_count)
2194 {
2195         int last_stripe;
2196         obd_off obd_start, obd_end;
2197         int i, j;
2198
2199         if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
2200                 last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
2201                                                               start_stripe - 1);
2202                 *stripe_count = lsm->lsm_stripe_count;
2203         } else {
2204                 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
2205                      i = (i + 1) % lsm->lsm_stripe_count, j++) {
2206                         if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
2207                                                    &obd_start, &obd_end)) == 0)
2208                                 break;
2209                 }
2210                 *stripe_count = j;
2211                 last_stripe = (start_stripe + j - 1) %lsm->lsm_stripe_count;
2212         }
2213
2214         return last_stripe;
2215 }
2216
2217 /**
2218  * Set fe_device and copy extents from local buffer into main return buffer.
2219  *
2220  * \param fiemap fiemap request header
2221  * \param lcl_fm_ext array of local fiemap extents to be copied
2222  * \param ost_index OST index to be written into the fm_device field for each
2223                     extent
2224  * \param ext_count number of extents to be copied
2225  * \param current_extent where to start copying in main extent array
2226  */
2227 void fiemap_prepare_and_copy_exts(struct ll_user_fiemap *fiemap,
2228                                   struct ll_fiemap_extent *lcl_fm_ext,
2229                                   int ost_index, unsigned int ext_count,
2230                                   int current_extent)
2231 {
2232         char *to;
2233         int ext;
2234
2235         for (ext = 0; ext < ext_count; ext++) {
2236                 lcl_fm_ext[ext].fe_device = ost_index;
2237                 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
2238         }
2239
2240         /* Copy fm_extent's from fm_local to return buffer */
2241         to = (char *)fiemap + fiemap_count_to_size(current_extent);
2242         memcpy(to, lcl_fm_ext, ext_count * sizeof(struct ll_fiemap_extent));
2243 }
2244
2245 /**
2246  * Break down the FIEMAP request and send appropriate calls to individual OSTs.
2247  * This also handles the restarting of FIEMAP calls in case mapping overflows
2248  * the available number of extents in single call.
2249  */
2250 static int lov_fiemap(struct lov_obd *lov, __u32 keylen, void *key,
2251                       __u32 *vallen, void *val, struct lov_stripe_md *lsm)
2252 {
2253         struct ll_fiemap_info_key *fm_key = key;
2254         struct ll_user_fiemap *fiemap = val;
2255         struct ll_user_fiemap *fm_local = NULL;
2256         struct ll_fiemap_extent *lcl_fm_ext;
2257         int count_local;
2258         unsigned int get_num_extents = 0;
2259         int ost_index = 0, actual_start_stripe, start_stripe;
2260         obd_size fm_start, fm_end, fm_length, fm_end_offset;
2261         obd_size curr_loc;
2262         int current_extent = 0, rc = 0, i;
2263         int ost_eof = 0; /* EOF for object */
2264         int ost_done = 0; /* done with required mapping for this OST? */
2265         int last_stripe;
2266         int cur_stripe = 0, cur_stripe_wrap = 0, stripe_count;
2267         unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
2268
2269         if (lsm == NULL)
2270                 GOTO(out, rc = 0);
2271
2272         if (fiemap_count_to_size(fm_key->fiemap.fm_extent_count) < buffer_size)
2273                 buffer_size = fiemap_count_to_size(fm_key->fiemap.fm_extent_count);
2274
2275         OBD_ALLOC_LARGE(fm_local, buffer_size);
2276         if (fm_local == NULL)
2277                 GOTO(out, rc = -ENOMEM);
2278         lcl_fm_ext = &fm_local->fm_extents[0];
2279
2280         count_local = fiemap_size_to_count(buffer_size);
2281
2282         memcpy(fiemap, &fm_key->fiemap, sizeof(*fiemap));
2283         fm_start = fiemap->fm_start;
2284         fm_length = fiemap->fm_length;
2285         /* Calculate start stripe, last stripe and length of mapping */
2286         actual_start_stripe = start_stripe = lov_stripe_number(lsm, fm_start);
2287         fm_end = (fm_length == ~0ULL ? fm_key->oa.o_size :
2288                                                 fm_start + fm_length - 1);
2289         /* If fm_length != ~0ULL but fm_start+fm_length-1 exceeds file size */
2290         if (fm_end > fm_key->oa.o_size)
2291                 fm_end = fm_key->oa.o_size;
2292
2293         last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
2294                                             actual_start_stripe, &stripe_count);
2295
2296         fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start,
2297                                                   fm_end, &start_stripe);
2298         if (fm_end_offset == -EINVAL)
2299                 GOTO(out, rc = -EINVAL);
2300
2301         if (fiemap->fm_extent_count == 0) {
2302                 get_num_extents = 1;
2303                 count_local = 0;
2304         }
2305
2306         /* Check each stripe */
2307         for (cur_stripe = start_stripe, i = 0; i < stripe_count;
2308              i++, cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
2309                 obd_size req_fm_len; /* Stores length of required mapping */
2310                 obd_size len_mapped_single_call;
2311                 obd_off lun_start, lun_end, obd_object_end;
2312                 unsigned int ext_count;
2313
2314                 cur_stripe_wrap = cur_stripe;
2315
2316                 /* Find out range of mapping on this stripe */
2317                 if ((lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
2318                                            &lun_start, &obd_object_end)) == 0)
2319                         continue;
2320
2321                 /* If this is a continuation FIEMAP call and we are on
2322                  * starting stripe then lun_start needs to be set to
2323                  * fm_end_offset */
2324                 if (fm_end_offset != 0 && cur_stripe == start_stripe)
2325                         lun_start = fm_end_offset;
2326
2327                 if (fm_length != ~0ULL) {
2328                         /* Handle fm_start + fm_length overflow */
2329                         if (fm_start + fm_length < fm_start)
2330                                 fm_length = ~0ULL - fm_start;
2331                         lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
2332                                                      cur_stripe);
2333                 } else {
2334                         lun_end = ~0ULL;
2335                 }
2336
2337                 if (lun_start == lun_end)
2338                         continue;
2339
2340                 req_fm_len = obd_object_end - lun_start;
2341                 fm_local->fm_length = 0;
2342                 len_mapped_single_call = 0;
2343
2344                 /* If the output buffer is very large and the objects have many
2345                  * extents we may need to loop on a single OST repeatedly */
2346                 ost_eof = 0;
2347                 ost_done = 0;
2348                 do {
2349                         if (get_num_extents == 0) {
2350                                 /* Don't get too many extents. */
2351                                 if (current_extent + count_local >
2352                                     fiemap->fm_extent_count)
2353                                         count_local = fiemap->fm_extent_count -
2354                                                                  current_extent;
2355                         }
2356
2357                         lun_start += len_mapped_single_call;
2358                         fm_local->fm_length = req_fm_len - len_mapped_single_call;
2359                         req_fm_len = fm_local->fm_length;
2360                         fm_local->fm_extent_count = count_local;
2361                         fm_local->fm_mapped_extents = 0;
2362                         fm_local->fm_flags = fiemap->fm_flags;
2363
2364                         fm_key->oa.o_oi = lsm->lsm_oinfo[cur_stripe]->loi_oi;
2365                         ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
2366
2367                         if (ost_index < 0 || ost_index >=lov->desc.ld_tgt_count)
2368                                 GOTO(out, rc = -EINVAL);
2369
2370                         /* If OST is inactive, return extent with UNKNOWN flag */
2371                         if (!lov->lov_tgts[ost_index]->ltd_active) {
2372                                 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
2373                                 fm_local->fm_mapped_extents = 1;
2374
2375                                 lcl_fm_ext[0].fe_logical = lun_start;
2376                                 lcl_fm_ext[0].fe_length = obd_object_end -
2377                                                                       lun_start;
2378                                 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
2379
2380                                 goto inactive_tgt;
2381                         }
2382
2383                         fm_local->fm_start = lun_start;
2384                         fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
2385                         memcpy(&fm_key->fiemap, fm_local, sizeof(*fm_local));
2386                         *vallen=fiemap_count_to_size(fm_local->fm_extent_count);
2387                         rc = obd_get_info(NULL,
2388                                           lov->lov_tgts[ost_index]->ltd_exp,
2389                                           keylen, key, vallen, fm_local, lsm);
2390                         if (rc != 0)
2391                                 GOTO(out, rc);
2392
2393 inactive_tgt:
2394                         ext_count = fm_local->fm_mapped_extents;
2395                         if (ext_count == 0) {
2396                                 ost_done = 1;
2397                                 /* If last stripe has hole at the end,
2398                                  * then we need to return */
2399                                 if (cur_stripe_wrap == last_stripe) {
2400                                         fiemap->fm_mapped_extents = 0;
2401                                         goto finish;
2402                                 }
2403                                 break;
2404                         }
2405
2406                         /* If we just need num of extents then go to next device */
2407                         if (get_num_extents) {
2408                                 current_extent += ext_count;
2409                                 break;
2410                         }
2411
2412                         len_mapped_single_call = lcl_fm_ext[ext_count-1].fe_logical -
2413                                   lun_start + lcl_fm_ext[ext_count - 1].fe_length;
2414
2415                         /* Have we finished mapping on this device? */
2416                         if (req_fm_len <= len_mapped_single_call)
2417                                 ost_done = 1;
2418
2419                         /* Clear the EXTENT_LAST flag which can be present on
2420                          * last extent */
2421                         if (lcl_fm_ext[ext_count-1].fe_flags & FIEMAP_EXTENT_LAST)
2422                                 lcl_fm_ext[ext_count - 1].fe_flags &=
2423                                                             ~FIEMAP_EXTENT_LAST;
2424
2425                         curr_loc = lov_stripe_size(lsm,
2426                                            lcl_fm_ext[ext_count - 1].fe_logical+
2427                                            lcl_fm_ext[ext_count - 1].fe_length,
2428                                            cur_stripe);
2429                         if (curr_loc >= fm_key->oa.o_size)
2430                                 ost_eof = 1;
2431
2432                         fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
2433                                                      ost_index, ext_count,
2434                                                      current_extent);
2435
2436                         current_extent += ext_count;
2437
2438                         /* Ran out of available extents? */
2439                         if (current_extent >= fiemap->fm_extent_count)
2440                                 goto finish;
2441                 } while (ost_done == 0 && ost_eof == 0);
2442
2443                 if (cur_stripe_wrap == last_stripe)
2444                         goto finish;
2445         }
2446
2447 finish:
2448         /* Indicate that we are returning device offsets unless file just has
2449          * single stripe */
2450         if (lsm->lsm_stripe_count > 1)
2451                 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
2452
2453         if (get_num_extents)
2454                 goto skip_last_device_calc;
2455
2456         /* Check if we have reached the last stripe and whether mapping for that
2457          * stripe is done. */
2458         if (cur_stripe_wrap == last_stripe) {
2459                 if (ost_done || ost_eof)
2460                         fiemap->fm_extents[current_extent - 1].fe_flags |=
2461                                                              FIEMAP_EXTENT_LAST;
2462         }
2463
2464 skip_last_device_calc:
2465         fiemap->fm_mapped_extents = current_extent;
2466
2467 out:
2468         OBD_FREE_LARGE(fm_local, buffer_size);
2469         return rc;
2470 }
2471
2472 static int lov_get_info(const struct lu_env *env, struct obd_export *exp,
2473                         __u32 keylen, void *key, __u32 *vallen, void *val,
2474                         struct lov_stripe_md *lsm)
2475 {
2476         struct obd_device *obddev = class_exp2obd(exp);
2477         struct lov_obd *lov = &obddev->u.lov;
2478         int i, rc;
2479         ENTRY;
2480
2481         if (!vallen || !val)
2482                 RETURN(-EFAULT);
2483
2484         obd_getref(obddev);
2485
2486         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2487                 struct {
2488                         char name[16];
2489                         struct ldlm_lock *lock;
2490                 } *data = key;
2491                 struct ldlm_res_id *res_id = &data->lock->l_resource->lr_name;
2492                 struct lov_oinfo *loi;
2493                 __u32 *stripe = val;
2494
2495                 if (*vallen < sizeof(*stripe))
2496                         GOTO(out, rc = -EFAULT);
2497                 *vallen = sizeof(*stripe);
2498
2499                 /* XXX This is another one of those bits that will need to
2500                  * change if we ever actually support nested LOVs.  It uses
2501                  * the lock's export to find out which stripe it is. */
2502                 /* XXX - it's assumed all the locks for deleted OSTs have
2503                  * been cancelled. Also, the export for deleted OSTs will
2504                  * be NULL and won't match the lock's export. */
2505                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
2506                         loi = lsm->lsm_oinfo[i];
2507                         if (!lov->lov_tgts[loi->loi_ost_idx])
2508                                 continue;
2509                         if (lov->lov_tgts[loi->loi_ost_idx]->ltd_exp ==
2510                             data->lock->l_conn_export &&
2511                             ostid_res_name_eq(&loi->loi_oi, res_id)) {
2512                                 *stripe = i;
2513                                 GOTO(out, rc = 0);
2514                         }
2515                 }
2516                 LDLM_ERROR(data->lock, "lock on inode without such object");
2517                 dump_lsm(D_ERROR, lsm);
2518                 GOTO(out, rc = -ENXIO);
2519         } else if (KEY_IS(KEY_LAST_ID)) {
2520                 struct obd_id_info *info = val;
2521                 __u32 size = sizeof(obd_id);
2522                 struct lov_tgt_desc *tgt;
2523
2524                 LASSERT(*vallen == sizeof(struct obd_id_info));
2525                 tgt = lov->lov_tgts[info->idx];
2526
2527                 if (!tgt || !tgt->ltd_active)
2528                         GOTO(out, rc = -ESRCH);
2529
2530                 rc = obd_get_info(env, tgt->ltd_exp, keylen, key,
2531                                   &size, info->data, NULL);
2532                 GOTO(out, rc = 0);
2533         } else if (KEY_IS(KEY_LOVDESC)) {
2534                 struct lov_desc *desc_ret = val;
2535                 *desc_ret = lov->desc;
2536
2537                 GOTO(out, rc = 0);
2538         } else if (KEY_IS(KEY_FIEMAP)) {
2539                 rc = lov_fiemap(lov, keylen, key, vallen, val, lsm);
2540                 GOTO(out, rc);
2541         } else if (KEY_IS(KEY_CONNECT_FLAG)) {
2542                 struct lov_tgt_desc *tgt;
2543                 __u64 ost_idx = *((__u64*)val);
2544
2545                 LASSERT(*vallen == sizeof(__u64));
2546                 LASSERT(ost_idx < lov->desc.ld_tgt_count);
2547                 tgt = lov->lov_tgts[ost_idx];
2548
2549                 if (!tgt || !tgt->ltd_exp)
2550                         GOTO(out, rc = -ESRCH);
2551
2552                 *((__u64 *)val) = exp_connect_flags(tgt->ltd_exp);
2553                 GOTO(out, rc = 0);
2554         } else if (KEY_IS(KEY_TGT_COUNT)) {
2555                 *((int *)val) = lov->desc.ld_tgt_count;
2556                 GOTO(out, rc = 0);
2557         }
2558
2559         rc = -EINVAL;
2560
2561 out:
2562         obd_putref(obddev);
2563         RETURN(rc);
2564 }
2565
2566 static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
2567                               obd_count keylen, void *key, obd_count vallen,
2568                               void *val, struct ptlrpc_request_set *set)
2569 {
2570         struct obd_device *obddev = class_exp2obd(exp);
2571         struct lov_obd *lov = &obddev->u.lov;
2572         obd_count count;
2573         int i, rc = 0, err;
2574         struct lov_tgt_desc *tgt;
2575         unsigned incr, check_uuid,
2576                  do_inactive, no_set;
2577         unsigned next_id = 0,  mds_con = 0, capa = 0;
2578         ENTRY;
2579
2580         incr = check_uuid = do_inactive = no_set = 0;
2581         if (set == NULL) {
2582                 no_set = 1;
2583                 set = ptlrpc_prep_set();
2584                 if (!set)
2585                         RETURN(-ENOMEM);
2586         }
2587
2588         obd_getref(obddev);
2589         count = lov->desc.ld_tgt_count;
2590
2591         if (KEY_IS(KEY_NEXT_ID)) {
2592                 count = vallen / sizeof(struct obd_id_info);
2593                 vallen = sizeof(obd_id);
2594                 incr = sizeof(struct obd_id_info);
2595                 do_inactive = 1;
2596                 next_id = 1;
2597         } else if (KEY_IS(KEY_CHECKSUM)) {
2598                 do_inactive = 1;
2599         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2600                 /* use defaults:  do_inactive = incr = 0; */
2601         } else if (KEY_IS(KEY_MDS_CONN)) {
2602                 mds_con = 1;
2603         } else if (KEY_IS(KEY_CAPA_KEY)) {
2604                 capa = 1;
2605         } else if (KEY_IS(KEY_CACHE_SET)) {
2606                 LASSERT(lov->lov_cache == NULL);
2607                 lov->lov_cache = val;
2608                 do_inactive = 1;
2609         }
2610
2611         for (i = 0; i < count; i++, val = (char *)val + incr) {
2612                 if (next_id) {
2613                         tgt = lov->lov_tgts[((struct obd_id_info*)val)->idx];
2614                 } else {
2615                         tgt = lov->lov_tgts[i];
2616                 }
2617                 /* OST was disconnected */
2618                 if (!tgt || !tgt->ltd_exp)
2619                         continue;
2620
2621                 /* OST is inactive and we don't want inactive OSCs */
2622                 if (!tgt->ltd_active && !do_inactive)
2623                         continue;
2624
2625                 if (mds_con) {
2626                         struct mds_group_info *mgi;
2627
2628                         LASSERT(vallen == sizeof(*mgi));
2629                         mgi = (struct mds_group_info *)val;
2630
2631                         /* Only want a specific OSC */
2632                         if (mgi->uuid && !obd_uuid_equals(mgi->uuid,
2633                                                 &tgt->ltd_uuid))
2634                                 continue;
2635
2636                         err = obd_set_info_async(env, tgt->ltd_exp,
2637                                          keylen, key, sizeof(int),
2638                                          &mgi->group, set);
2639                 } else if (next_id) {
2640                         err = obd_set_info_async(env, tgt->ltd_exp,
2641                                          keylen, key, vallen,
2642                                          ((struct obd_id_info*)val)->data, set);
2643                 } else if (capa) {
2644                         struct mds_capa_info *info = (struct mds_capa_info*)val;
2645
2646                         LASSERT(vallen == sizeof(*info));
2647
2648                          /* Only want a specific OSC */
2649                         if (info->uuid &&
2650                             !obd_uuid_equals(info->uuid, &tgt->ltd_uuid))
2651                                 continue;
2652
2653                         err = obd_set_info_async(env, tgt->ltd_exp, keylen,
2654                                                  key, sizeof(*info->capa),
2655                                                  info->capa, set);
2656                 } else {
2657                         /* Only want a specific OSC */
2658                         if (check_uuid &&
2659                             !obd_uuid_equals(val, &tgt->ltd_uuid))
2660                                 continue;
2661
2662                         err = obd_set_info_async(env, tgt->ltd_exp,
2663                                          keylen, key, vallen, val, set);
2664                 }
2665
2666                 if (!rc)
2667                         rc = err;
2668         }
2669
2670         obd_putref(obddev);
2671         if (no_set) {
2672                 err = ptlrpc_set_wait(set);
2673                 if (!rc)
2674                         rc = err;
2675                 ptlrpc_set_destroy(set);
2676         }
2677         RETURN(rc);
2678 }
2679
2680 static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm,
2681                            int cmd, __u64 *offset)
2682 {
2683         __u32 ssize = lsm->lsm_stripe_size;
2684         __u64 start;
2685
2686         start = *offset;
2687         lov_do_div64(start, ssize);
2688         start = start * ssize;
2689
2690         CDEBUG(D_DLMTRACE, "offset "LPU64", stripe %u, start "LPU64
2691                            ", end "LPU64"\n", *offset, ssize, start,
2692                            start + ssize - 1);
2693         if (cmd == OBD_CALC_STRIPE_END) {
2694                 *offset = start + ssize - 1;
2695         } else if (cmd == OBD_CALC_STRIPE_START) {
2696                 *offset = start;
2697         } else {
2698                 LBUG();
2699         }
2700
2701         RETURN(0);
2702 }
2703
2704 void lov_stripe_lock(struct lov_stripe_md *md)
2705 {
2706         LASSERT(md->lsm_lock_owner != current_pid());
2707         spin_lock(&md->lsm_lock);
2708         LASSERT(md->lsm_lock_owner == 0);
2709         md->lsm_lock_owner = current_pid();
2710 }
2711 EXPORT_SYMBOL(lov_stripe_lock);
2712
2713 void lov_stripe_unlock(struct lov_stripe_md *md)
2714 {
2715         LASSERT(md->lsm_lock_owner == current_pid());
2716         md->lsm_lock_owner = 0;
2717         spin_unlock(&md->lsm_lock);
2718 }
2719 EXPORT_SYMBOL(lov_stripe_unlock);
2720
2721 static int lov_quotactl(struct obd_device *obd, struct obd_export *exp,
2722                         struct obd_quotactl *oqctl)
2723 {
2724         struct lov_obd      *lov = &obd->u.lov;
2725         struct lov_tgt_desc *tgt;
2726         __u64           curspace = 0;
2727         __u64           bhardlimit = 0;
2728         int               i, rc = 0;
2729         ENTRY;
2730
2731         if (oqctl->qc_cmd != LUSTRE_Q_QUOTAON &&
2732             oqctl->qc_cmd != LUSTRE_Q_QUOTAOFF &&
2733             oqctl->qc_cmd != Q_GETOQUOTA &&
2734             oqctl->qc_cmd != Q_INITQUOTA &&
2735             oqctl->qc_cmd != LUSTRE_Q_SETQUOTA &&
2736             oqctl->qc_cmd != Q_FINVALIDATE) {
2737                 CERROR("bad quota opc %x for lov obd", oqctl->qc_cmd);
2738                 RETURN(-EFAULT);
2739         }
2740
2741         /* for lov tgt */
2742         obd_getref(obd);
2743         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2744                 int err;
2745
2746                 tgt = lov->lov_tgts[i];
2747
2748                 if (!tgt)
2749                         continue;
2750
2751                 if (!tgt->ltd_active || tgt->ltd_reap) {
2752                         if (oqctl->qc_cmd == Q_GETOQUOTA &&
2753                             lov->lov_tgts[i]->ltd_activate) {
2754                                 rc = -EREMOTEIO;
2755                                 CERROR("ost %d is inactive\n", i);
2756                         } else {
2757                                 CDEBUG(D_HA, "ost %d is inactive\n", i);
2758                         }
2759                         continue;
2760                 }
2761
2762                 err = obd_quotactl(tgt->ltd_exp, oqctl);
2763                 if (err) {
2764                         if (tgt->ltd_active && !rc)
2765                                 rc = err;
2766                         continue;
2767                 }
2768
2769                 if (oqctl->qc_cmd == Q_GETOQUOTA) {
2770                         curspace += oqctl->qc_dqblk.dqb_curspace;
2771                         bhardlimit += oqctl->qc_dqblk.dqb_bhardlimit;
2772                 }
2773         }
2774         obd_putref(obd);
2775
2776         if (oqctl->qc_cmd == Q_GETOQUOTA) {
2777                 oqctl->qc_dqblk.dqb_curspace = curspace;
2778                 oqctl->qc_dqblk.dqb_bhardlimit = bhardlimit;
2779         }
2780         RETURN(rc);
2781 }
2782
2783 static int lov_quotacheck(struct obd_device *obd, struct obd_export *exp,
2784                           struct obd_quotactl *oqctl)
2785 {
2786         struct lov_obd *lov = &obd->u.lov;
2787         int          i, rc = 0;
2788         ENTRY;
2789
2790         obd_getref(obd);
2791
2792         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2793                 if (!lov->lov_tgts[i])
2794                         continue;
2795
2796                 /* Skip quota check on the administratively disabled OSTs. */
2797                 if (!lov->lov_tgts[i]->ltd_activate) {
2798                         CWARN("lov idx %d was administratively disabled, "
2799                               "skip quotacheck on it.\n", i);
2800                         continue;
2801                 }
2802
2803                 if (!lov->lov_tgts[i]->ltd_active) {
2804                         CERROR("lov idx %d inactive\n", i);
2805                         rc = -EIO;
2806                         goto out;
2807                 }
2808         }
2809
2810         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2811                 int err;
2812
2813                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_activate)
2814                         continue;
2815
2816                 err = obd_quotacheck(lov->lov_tgts[i]->ltd_exp, oqctl);
2817                 if (err && !rc)
2818                         rc = err;
2819         }
2820
2821 out:
2822         obd_putref(obd);
2823
2824         RETURN(rc);
2825 }
2826
2827 struct obd_ops lov_obd_ops = {
2828         .o_owner               = THIS_MODULE,
2829         .o_setup               = lov_setup,
2830         .o_precleanup     = lov_precleanup,
2831         .o_cleanup           = lov_cleanup,
2832         //.o_process_config      = lov_process_config,
2833         .o_connect           = lov_connect,
2834         .o_disconnect     = lov_disconnect,
2835         .o_statfs             = lov_statfs,
2836         .o_statfs_async = lov_statfs_async,
2837         .o_packmd             = lov_packmd,
2838         .o_unpackmd         = lov_unpackmd,
2839         .o_create             = lov_create,
2840         .o_destroy           = lov_destroy,
2841         .o_getattr           = lov_getattr,
2842         .o_getattr_async       = lov_getattr_async,
2843         .o_setattr           = lov_setattr,
2844         .o_setattr_async       = lov_setattr_async,
2845         .o_brw           = lov_brw,
2846         .o_merge_lvb       = lov_merge_lvb,
2847         .o_adjust_kms     = lov_adjust_kms,
2848         .o_punch               = lov_punch,
2849         .o_sync         = lov_sync,
2850         .o_enqueue           = lov_enqueue,
2851         .o_change_cbdata       = lov_change_cbdata,
2852         .o_find_cbdata   = lov_find_cbdata,
2853         .o_cancel             = lov_cancel,
2854         .o_cancel_unused       = lov_cancel_unused,
2855         .o_iocontrol       = lov_iocontrol,
2856         .o_get_info         = lov_get_info,
2857         .o_set_info_async      = lov_set_info_async,
2858         .o_extent_calc   = lov_extent_calc,
2859         .o_llog_init       = lov_llog_init,
2860         .o_llog_finish   = lov_llog_finish,
2861         .o_notify             = lov_notify,
2862         .o_pool_new         = lov_pool_new,
2863         .o_pool_rem         = lov_pool_remove,
2864         .o_pool_add         = lov_pool_add,
2865         .o_pool_del         = lov_pool_del,
2866         .o_getref             = lov_getref,
2867         .o_putref             = lov_putref,
2868         .o_quotactl         = lov_quotactl,
2869         .o_quotacheck     = lov_quotacheck,
2870 };
2871
2872 struct kmem_cache *lov_oinfo_slab;
2873
2874 extern struct lu_kmem_descr lov_caches[];
2875
2876 int __init lov_init(void)
2877 {
2878         struct lprocfs_static_vars lvars = { 0 };
2879         int rc;
2880         ENTRY;
2881
2882         /* print an address of _any_ initialized kernel symbol from this
2883          * module, to allow debugging with gdb that doesn't support data
2884          * symbols from modules.*/
2885         CDEBUG(D_INFO, "Lustre LOV module (%p).\n", &lov_caches);
2886
2887         rc = lu_kmem_init(lov_caches);
2888         if (rc)
2889                 return rc;
2890
2891         lov_oinfo_slab = kmem_cache_create("lov_oinfo",
2892                                               sizeof(struct lov_oinfo),
2893                                               0, SLAB_HWCACHE_ALIGN, NULL);
2894         if (lov_oinfo_slab == NULL) {
2895                 lu_kmem_fini(lov_caches);
2896                 return -ENOMEM;
2897         }
2898         lprocfs_lov_init_vars(&lvars);
2899
2900         rc = class_register_type(&lov_obd_ops, NULL, lvars.module_vars,
2901                                  LUSTRE_LOV_NAME, &lov_device_type);
2902
2903         if (rc) {
2904                 kmem_cache_destroy(lov_oinfo_slab);
2905                 lu_kmem_fini(lov_caches);
2906         }
2907
2908         RETURN(rc);
2909 }
2910
2911 static void /*__exit*/ lov_exit(void)
2912 {
2913         class_unregister_type(LUSTRE_LOV_NAME);
2914         kmem_cache_destroy(lov_oinfo_slab);
2915
2916         lu_kmem_fini(lov_caches);
2917 }
2918
2919 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2920 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2921 MODULE_LICENSE("GPL");
2922
2923 cfs_module(lov, LUSTRE_VERSION_STRING, lov_init, lov_exit);