]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/obdclass/llog.c
staging: add Lustre file system client support
[karo-tx-linux.git] / drivers / staging / lustre / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45  * Author: Mikhail Pershin <tappro@whamcloud.com>
46  */
47
48 #define DEBUG_SUBSYSTEM S_LOG
49
50
51 #include <obd_class.h>
52 #include <lustre_log.h>
53 #include "llog_internal.h"
54
55 /*
56  * Allocate a new log or catalog handle
57  * Used inside llog_open().
58  */
59 struct llog_handle *llog_alloc_handle(void)
60 {
61         struct llog_handle *loghandle;
62
63         OBD_ALLOC_PTR(loghandle);
64         if (loghandle == NULL)
65                 return ERR_PTR(-ENOMEM);
66
67         init_rwsem(&loghandle->lgh_lock);
68         spin_lock_init(&loghandle->lgh_hdr_lock);
69         INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
70         atomic_set(&loghandle->lgh_refcount, 1);
71
72         return loghandle;
73 }
74
75 /*
76  * Free llog handle and header data if exists. Used in llog_close() only
77  */
78 void llog_free_handle(struct llog_handle *loghandle)
79 {
80         LASSERT(loghandle != NULL);
81
82         /* failed llog_init_handle */
83         if (!loghandle->lgh_hdr)
84                 goto out;
85
86         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
87                 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
88         else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
89                 LASSERT(list_empty(&loghandle->u.chd.chd_head));
90         LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
91         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
92 out:
93         OBD_FREE_PTR(loghandle);
94 }
95
96 void llog_handle_get(struct llog_handle *loghandle)
97 {
98         atomic_inc(&loghandle->lgh_refcount);
99 }
100
101 void llog_handle_put(struct llog_handle *loghandle)
102 {
103         LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
104         if (atomic_dec_and_test(&loghandle->lgh_refcount))
105                 llog_free_handle(loghandle);
106 }
107
108 /* returns negative on error; 0 if success; 1 if success & log destroyed */
109 int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
110                     int index)
111 {
112         struct llog_log_hdr *llh = loghandle->lgh_hdr;
113         int rc = 0;
114         ENTRY;
115
116         CDEBUG(D_RPCTRACE, "Canceling %d in log "DOSTID"\n",
117                index, POSTID(&loghandle->lgh_id.lgl_oi));
118
119         if (index == 0) {
120                 CERROR("Can't cancel index 0 which is header\n");
121                 RETURN(-EINVAL);
122         }
123
124         spin_lock(&loghandle->lgh_hdr_lock);
125         if (!ext2_clear_bit(index, llh->llh_bitmap)) {
126                 spin_unlock(&loghandle->lgh_hdr_lock);
127                 CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
128                 RETURN(-ENOENT);
129         }
130
131         llh->llh_count--;
132
133         if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
134             (llh->llh_count == 1) &&
135             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
136                 spin_unlock(&loghandle->lgh_hdr_lock);
137                 rc = llog_destroy(env, loghandle);
138                 if (rc < 0) {
139                         CERROR("%s: can't destroy empty llog #"DOSTID
140                                "#%08x: rc = %d\n",
141                                loghandle->lgh_ctxt->loc_obd->obd_name,
142                                POSTID(&loghandle->lgh_id.lgl_oi),
143                                loghandle->lgh_id.lgl_ogen, rc);
144                         GOTO(out_err, rc);
145                 }
146                 RETURN(1);
147         }
148         spin_unlock(&loghandle->lgh_hdr_lock);
149
150         rc = llog_write(env, loghandle, &llh->llh_hdr, NULL, 0, NULL, 0);
151         if (rc < 0) {
152                 CERROR("%s: fail to write header for llog #"DOSTID
153                        "#%08x: rc = %d\n",
154                        loghandle->lgh_ctxt->loc_obd->obd_name,
155                        POSTID(&loghandle->lgh_id.lgl_oi),
156                        loghandle->lgh_id.lgl_ogen, rc);
157                 GOTO(out_err, rc);
158         }
159         RETURN(0);
160 out_err:
161         spin_lock(&loghandle->lgh_hdr_lock);
162         ext2_set_bit(index, llh->llh_bitmap);
163         llh->llh_count++;
164         spin_unlock(&loghandle->lgh_hdr_lock);
165         return rc;
166 }
167 EXPORT_SYMBOL(llog_cancel_rec);
168
169 static int llog_read_header(const struct lu_env *env,
170                             struct llog_handle *handle,
171                             struct obd_uuid *uuid)
172 {
173         struct llog_operations *lop;
174         int rc;
175
176         rc = llog_handle2ops(handle, &lop);
177         if (rc)
178                 RETURN(rc);
179
180         if (lop->lop_read_header == NULL)
181                 RETURN(-EOPNOTSUPP);
182
183         rc = lop->lop_read_header(env, handle);
184         if (rc == LLOG_EEMPTY) {
185                 struct llog_log_hdr *llh = handle->lgh_hdr;
186
187                 handle->lgh_last_idx = 0; /* header is record with index 0 */
188                 llh->llh_count = 1;      /* for the header record */
189                 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
190                 llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
191                 llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
192                 llh->llh_timestamp = cfs_time_current_sec();
193                 if (uuid)
194                         memcpy(&llh->llh_tgtuuid, uuid,
195                                sizeof(llh->llh_tgtuuid));
196                 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
197                 ext2_set_bit(0, llh->llh_bitmap);
198                 rc = 0;
199         }
200         return rc;
201 }
202
203 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
204                      int flags, struct obd_uuid *uuid)
205 {
206         struct llog_log_hdr     *llh;
207         int                      rc;
208
209         ENTRY;
210         LASSERT(handle->lgh_hdr == NULL);
211
212         OBD_ALLOC_PTR(llh);
213         if (llh == NULL)
214                 RETURN(-ENOMEM);
215         handle->lgh_hdr = llh;
216         /* first assign flags to use llog_client_ops */
217         llh->llh_flags = flags;
218         rc = llog_read_header(env, handle, uuid);
219         if (rc == 0) {
220                 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
221                               flags & LLOG_F_IS_CAT) ||
222                              (llh->llh_flags & LLOG_F_IS_CAT &&
223                               flags & LLOG_F_IS_PLAIN))) {
224                         CERROR("%s: llog type is %s but initializing %s\n",
225                                handle->lgh_ctxt->loc_obd->obd_name,
226                                llh->llh_flags & LLOG_F_IS_CAT ?
227                                "catalog" : "plain",
228                                flags & LLOG_F_IS_CAT ? "catalog" : "plain");
229                         GOTO(out, rc = -EINVAL);
230                 } else if (llh->llh_flags &
231                            (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
232                         /*
233                          * it is possible to open llog without specifying llog
234                          * type so it is taken from llh_flags
235                          */
236                         flags = llh->llh_flags;
237                 } else {
238                         /* for some reason the llh_flags has no type set */
239                         CERROR("llog type is not specified!\n");
240                         GOTO(out, rc = -EINVAL);
241                 }
242                 if (unlikely(uuid &&
243                              !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
244                         CERROR("%s: llog uuid mismatch: %s/%s\n",
245                                handle->lgh_ctxt->loc_obd->obd_name,
246                                (char *)uuid->uuid,
247                                (char *)llh->llh_tgtuuid.uuid);
248                         GOTO(out, rc = -EEXIST);
249                 }
250         }
251         if (flags & LLOG_F_IS_CAT) {
252                 LASSERT(list_empty(&handle->u.chd.chd_head));
253                 INIT_LIST_HEAD(&handle->u.chd.chd_head);
254                 llh->llh_size = sizeof(struct llog_logid_rec);
255         } else if (!(flags & LLOG_F_IS_PLAIN)) {
256                 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
257                        handle->lgh_ctxt->loc_obd->obd_name,
258                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
259                 rc = -EINVAL;
260         }
261 out:
262         if (rc) {
263                 OBD_FREE_PTR(llh);
264                 handle->lgh_hdr = NULL;
265         }
266         RETURN(rc);
267 }
268 EXPORT_SYMBOL(llog_init_handle);
269
270 int llog_copy_handler(const struct lu_env *env,
271                       struct llog_handle *llh,
272                       struct llog_rec_hdr *rec,
273                       void *data)
274 {
275         struct llog_rec_hdr local_rec = *rec;
276         struct llog_handle *local_llh = (struct llog_handle *)data;
277         char *cfg_buf = (char*) (rec + 1);
278         struct lustre_cfg *lcfg;
279         int rc = 0;
280         ENTRY;
281
282         /* Append all records */
283         local_rec.lrh_len -= sizeof(*rec) + sizeof(struct llog_rec_tail);
284         rc = llog_write(env, local_llh, &local_rec, NULL, 0,
285                         (void *)cfg_buf, -1);
286
287         lcfg = (struct lustre_cfg *)cfg_buf;
288         CDEBUG(D_INFO, "idx=%d, rc=%d, len=%d, cmd %x %s %s\n",
289                rec->lrh_index, rc, rec->lrh_len, lcfg->lcfg_command,
290                lustre_cfg_string(lcfg, 0), lustre_cfg_string(lcfg, 1));
291
292         RETURN(rc);
293 }
294 EXPORT_SYMBOL(llog_copy_handler);
295
296 static int llog_process_thread(void *arg)
297 {
298         struct llog_process_info        *lpi = arg;
299         struct llog_handle              *loghandle = lpi->lpi_loghandle;
300         struct llog_log_hdr             *llh = loghandle->lgh_hdr;
301         struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
302         char                            *buf;
303         __u64                            cur_offset = LLOG_CHUNK_SIZE;
304         __u64                            last_offset;
305         int                              rc = 0, index = 1, last_index;
306         int                              saved_index = 0;
307         int                              last_called_index = 0;
308
309         ENTRY;
310
311         LASSERT(llh);
312
313         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
314         if (!buf) {
315                 lpi->lpi_rc = -ENOMEM;
316                 RETURN(0);
317         }
318
319         if (cd != NULL) {
320                 last_called_index = cd->lpcd_first_idx;
321                 index = cd->lpcd_first_idx + 1;
322         }
323         if (cd != NULL && cd->lpcd_last_idx)
324                 last_index = cd->lpcd_last_idx;
325         else
326                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
327
328         while (rc == 0) {
329                 struct llog_rec_hdr *rec;
330
331                 /* skip records not set in bitmap */
332                 while (index <= last_index &&
333                        !ext2_test_bit(index, llh->llh_bitmap))
334                         ++index;
335
336                 LASSERT(index <= last_index + 1);
337                 if (index == last_index + 1)
338                         break;
339 repeat:
340                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
341                        index, last_index);
342
343                 /* get the buf with our target record; avoid old garbage */
344                 memset(buf, 0, LLOG_CHUNK_SIZE);
345                 last_offset = cur_offset;
346                 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
347                                      index, &cur_offset, buf, LLOG_CHUNK_SIZE);
348                 if (rc)
349                         GOTO(out, rc);
350
351                 /* NB: when rec->lrh_len is accessed it is already swabbed
352                  * since it is used at the "end" of the loop and the rec
353                  * swabbing is done at the beginning of the loop. */
354                 for (rec = (struct llog_rec_hdr *)buf;
355                      (char *)rec < buf + LLOG_CHUNK_SIZE;
356                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)){
357
358                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
359                                rec, rec->lrh_type);
360
361                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
362                                 lustre_swab_llog_rec(rec);
363
364                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
365                                rec->lrh_type, rec->lrh_index);
366
367                         if (rec->lrh_index == 0) {
368                                 /* probably another rec just got added? */
369                                 if (index <= loghandle->lgh_last_idx)
370                                         GOTO(repeat, rc = 0);
371                                 GOTO(out, rc = 0); /* no more records */
372                         }
373                         if (rec->lrh_len == 0 ||
374                             rec->lrh_len > LLOG_CHUNK_SIZE) {
375                                 CWARN("invalid length %d in llog record for "
376                                       "index %d/%d\n", rec->lrh_len,
377                                       rec->lrh_index, index);
378                                 GOTO(out, rc = -EINVAL);
379                         }
380
381                         if (rec->lrh_index < index) {
382                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
383                                        rec->lrh_index);
384                                 continue;
385                         }
386
387                         CDEBUG(D_OTHER,
388                                "lrh_index: %d lrh_len: %d (%d remains)\n",
389                                rec->lrh_index, rec->lrh_len,
390                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
391
392                         loghandle->lgh_cur_idx = rec->lrh_index;
393                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
394                                                     last_offset;
395
396                         /* if set, process the callback on this record */
397                         if (ext2_test_bit(index, llh->llh_bitmap)) {
398                                 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
399                                                  lpi->lpi_cbdata);
400                                 last_called_index = index;
401                                 if (rc == LLOG_PROC_BREAK) {
402                                         GOTO(out, rc);
403                                 } else if (rc == LLOG_DEL_RECORD) {
404                                         llog_cancel_rec(lpi->lpi_env,
405                                                         loghandle,
406                                                         rec->lrh_index);
407                                         rc = 0;
408                                 }
409                                 if (rc)
410                                         GOTO(out, rc);
411                         } else {
412                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
413                         }
414
415                         /* next record, still in buffer? */
416                         ++index;
417                         if (index > last_index)
418                                 GOTO(out, rc = 0);
419                 }
420         }
421
422 out:
423         if (cd != NULL)
424                 cd->lpcd_last_idx = last_called_index;
425
426         OBD_FREE(buf, LLOG_CHUNK_SIZE);
427         lpi->lpi_rc = rc;
428         return 0;
429 }
430
431 static int llog_process_thread_daemonize(void *arg)
432 {
433         struct llog_process_info        *lpi = arg;
434         struct lu_env                    env;
435         int                              rc;
436
437         unshare_fs_struct();
438
439         /* client env has no keys, tags is just 0 */
440         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
441         if (rc)
442                 goto out;
443         lpi->lpi_env = &env;
444
445         rc = llog_process_thread(arg);
446
447         lu_env_fini(&env);
448 out:
449         complete(&lpi->lpi_completion);
450         return rc;
451 }
452
453 int llog_process_or_fork(const struct lu_env *env,
454                          struct llog_handle *loghandle,
455                          llog_cb_t cb, void *data, void *catdata, bool fork)
456 {
457         struct llog_process_info *lpi;
458         int                   rc;
459
460         ENTRY;
461
462         OBD_ALLOC_PTR(lpi);
463         if (lpi == NULL) {
464                 CERROR("cannot alloc pointer\n");
465                 RETURN(-ENOMEM);
466         }
467         lpi->lpi_loghandle = loghandle;
468         lpi->lpi_cb     = cb;
469         lpi->lpi_cbdata    = data;
470         lpi->lpi_catdata   = catdata;
471
472         if (fork) {
473                 /* The new thread can't use parent env,
474                  * init the new one in llog_process_thread_daemonize. */
475                 lpi->lpi_env = NULL;
476                 init_completion(&lpi->lpi_completion);
477                 rc = PTR_ERR(kthread_run(llog_process_thread_daemonize, lpi,
478                                              "llog_process_thread"));
479                 if (IS_ERR_VALUE(rc)) {
480                         CERROR("%s: cannot start thread: rc = %d\n",
481                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
482                         OBD_FREE_PTR(lpi);
483                         RETURN(rc);
484                 }
485                 wait_for_completion(&lpi->lpi_completion);
486         } else {
487                 lpi->lpi_env = env;
488                 llog_process_thread(lpi);
489         }
490         rc = lpi->lpi_rc;
491         OBD_FREE_PTR(lpi);
492         RETURN(rc);
493 }
494 EXPORT_SYMBOL(llog_process_or_fork);
495
496 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
497                  llog_cb_t cb, void *data, void *catdata)
498 {
499         return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
500 }
501 EXPORT_SYMBOL(llog_process);
502
503 inline int llog_get_size(struct llog_handle *loghandle)
504 {
505         if (loghandle && loghandle->lgh_hdr)
506                 return loghandle->lgh_hdr->llh_count;
507         return 0;
508 }
509 EXPORT_SYMBOL(llog_get_size);
510
511 int llog_reverse_process(const struct lu_env *env,
512                          struct llog_handle *loghandle, llog_cb_t cb,
513                          void *data, void *catdata)
514 {
515         struct llog_log_hdr *llh = loghandle->lgh_hdr;
516         struct llog_process_cat_data *cd = catdata;
517         void *buf;
518         int rc = 0, first_index = 1, index, idx;
519         ENTRY;
520
521         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
522         if (!buf)
523                 RETURN(-ENOMEM);
524
525         if (cd != NULL)
526                 first_index = cd->lpcd_first_idx + 1;
527         if (cd != NULL && cd->lpcd_last_idx)
528                 index = cd->lpcd_last_idx;
529         else
530                 index = LLOG_BITMAP_BYTES * 8 - 1;
531
532         while (rc == 0) {
533                 struct llog_rec_hdr *rec;
534                 struct llog_rec_tail *tail;
535
536                 /* skip records not set in bitmap */
537                 while (index >= first_index &&
538                        !ext2_test_bit(index, llh->llh_bitmap))
539                         --index;
540
541                 LASSERT(index >= first_index - 1);
542                 if (index == first_index - 1)
543                         break;
544
545                 /* get the buf with our target record; avoid old garbage */
546                 memset(buf, 0, LLOG_CHUNK_SIZE);
547                 rc = llog_prev_block(env, loghandle, index, buf,
548                                      LLOG_CHUNK_SIZE);
549                 if (rc)
550                         GOTO(out, rc);
551
552                 rec = buf;
553                 idx = rec->lrh_index;
554                 CDEBUG(D_RPCTRACE, "index %u : idx %u\n", index, idx);
555                 while (idx < index) {
556                         rec = (void *)rec + rec->lrh_len;
557                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
558                                 lustre_swab_llog_rec(rec);
559                         idx ++;
560                 }
561                 LASSERT(idx == index);
562                 tail = (void *)rec + rec->lrh_len - sizeof(*tail);
563
564                 /* process records in buffer, starting where we found one */
565                 while ((void *)tail > buf) {
566                         if (tail->lrt_index == 0)
567                                 GOTO(out, rc = 0); /* no more records */
568
569                         /* if set, process the callback on this record */
570                         if (ext2_test_bit(index, llh->llh_bitmap)) {
571                                 rec = (void *)tail - tail->lrt_len +
572                                       sizeof(*tail);
573
574                                 rc = cb(env, loghandle, rec, data);
575                                 if (rc == LLOG_PROC_BREAK) {
576                                         GOTO(out, rc);
577                                 } else if (rc == LLOG_DEL_RECORD) {
578                                         llog_cancel_rec(env, loghandle,
579                                                         tail->lrt_index);
580                                         rc = 0;
581                                 }
582                                 if (rc)
583                                         GOTO(out, rc);
584                         }
585
586                         /* previous record, still in buffer? */
587                         --index;
588                         if (index < first_index)
589                                 GOTO(out, rc = 0);
590                         tail = (void *)tail - tail->lrt_len;
591                 }
592         }
593
594 out:
595         if (buf)
596                 OBD_FREE(buf, LLOG_CHUNK_SIZE);
597         RETURN(rc);
598 }
599 EXPORT_SYMBOL(llog_reverse_process);
600
601 /**
602  * new llog API
603  *
604  * API functions:
605  *      llog_open - open llog, may not exist
606  *      llog_exist - check if llog exists
607  *      llog_close - close opened llog, pair for open, frees llog_handle
608  *      llog_declare_create - declare llog creation
609  *      llog_create - create new llog on disk, need transaction handle
610  *      llog_declare_write_rec - declaration of llog write
611  *      llog_write_rec - write llog record on disk, need transaction handle
612  *      llog_declare_add - declare llog catalog record addition
613  *      llog_add - add llog record in catalog, need transaction handle
614  */
615 int llog_exist(struct llog_handle *loghandle)
616 {
617         struct llog_operations  *lop;
618         int                      rc;
619
620         ENTRY;
621
622         rc = llog_handle2ops(loghandle, &lop);
623         if (rc)
624                 RETURN(rc);
625         if (lop->lop_exist == NULL)
626                 RETURN(-EOPNOTSUPP);
627
628         rc = lop->lop_exist(loghandle);
629         RETURN(rc);
630 }
631 EXPORT_SYMBOL(llog_exist);
632
633 int llog_declare_create(const struct lu_env *env,
634                         struct llog_handle *loghandle, struct thandle *th)
635 {
636         struct llog_operations  *lop;
637         int                      raised, rc;
638
639         ENTRY;
640
641         rc = llog_handle2ops(loghandle, &lop);
642         if (rc)
643                 RETURN(rc);
644         if (lop->lop_declare_create == NULL)
645                 RETURN(-EOPNOTSUPP);
646
647         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
648         if (!raised)
649                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
650         rc = lop->lop_declare_create(env, loghandle, th);
651         if (!raised)
652                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
653         RETURN(rc);
654 }
655 EXPORT_SYMBOL(llog_declare_create);
656
657 int llog_create(const struct lu_env *env, struct llog_handle *handle,
658                 struct thandle *th)
659 {
660         struct llog_operations  *lop;
661         int                      raised, rc;
662
663         ENTRY;
664
665         rc = llog_handle2ops(handle, &lop);
666         if (rc)
667                 RETURN(rc);
668         if (lop->lop_create == NULL)
669                 RETURN(-EOPNOTSUPP);
670
671         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
672         if (!raised)
673                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
674         rc = lop->lop_create(env, handle, th);
675         if (!raised)
676                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
677         RETURN(rc);
678 }
679 EXPORT_SYMBOL(llog_create);
680
681 int llog_declare_write_rec(const struct lu_env *env,
682                            struct llog_handle *handle,
683                            struct llog_rec_hdr *rec, int idx,
684                            struct thandle *th)
685 {
686         struct llog_operations  *lop;
687         int                      raised, rc;
688
689         ENTRY;
690
691         rc = llog_handle2ops(handle, &lop);
692         if (rc)
693                 RETURN(rc);
694         LASSERT(lop);
695         if (lop->lop_declare_write_rec == NULL)
696                 RETURN(-EOPNOTSUPP);
697
698         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
699         if (!raised)
700                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
701         rc = lop->lop_declare_write_rec(env, handle, rec, idx, th);
702         if (!raised)
703                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
704         RETURN(rc);
705 }
706 EXPORT_SYMBOL(llog_declare_write_rec);
707
708 int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
709                    struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
710                    int numcookies, void *buf, int idx, struct thandle *th)
711 {
712         struct llog_operations  *lop;
713         int                      raised, rc, buflen;
714
715         ENTRY;
716
717         rc = llog_handle2ops(handle, &lop);
718         if (rc)
719                 RETURN(rc);
720
721         LASSERT(lop);
722         if (lop->lop_write_rec == NULL)
723                 RETURN(-EOPNOTSUPP);
724
725         if (buf)
726                 buflen = rec->lrh_len + sizeof(struct llog_rec_hdr) +
727                          sizeof(struct llog_rec_tail);
728         else
729                 buflen = rec->lrh_len;
730         LASSERT(cfs_size_round(buflen) == buflen);
731
732         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
733         if (!raised)
734                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
735         rc = lop->lop_write_rec(env, handle, rec, logcookies, numcookies,
736                                 buf, idx, th);
737         if (!raised)
738                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
739         RETURN(rc);
740 }
741 EXPORT_SYMBOL(llog_write_rec);
742
743 int llog_add(const struct lu_env *env, struct llog_handle *lgh,
744              struct llog_rec_hdr *rec, struct llog_cookie *logcookies,
745              void *buf, struct thandle *th)
746 {
747         int raised, rc;
748
749         ENTRY;
750
751         if (lgh->lgh_logops->lop_add == NULL)
752                 RETURN(-EOPNOTSUPP);
753
754         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
755         if (!raised)
756                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
757         rc = lgh->lgh_logops->lop_add(env, lgh, rec, logcookies, buf, th);
758         if (!raised)
759                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
760         RETURN(rc);
761 }
762 EXPORT_SYMBOL(llog_add);
763
764 int llog_declare_add(const struct lu_env *env, struct llog_handle *lgh,
765                      struct llog_rec_hdr *rec, struct thandle *th)
766 {
767         int raised, rc;
768
769         ENTRY;
770
771         if (lgh->lgh_logops->lop_declare_add == NULL)
772                 RETURN(-EOPNOTSUPP);
773
774         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
775         if (!raised)
776                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
777         rc = lgh->lgh_logops->lop_declare_add(env, lgh, rec, th);
778         if (!raised)
779                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
780         RETURN(rc);
781 }
782 EXPORT_SYMBOL(llog_declare_add);
783
784 /**
785  * Helper function to open llog or create it if doesn't exist.
786  * It hides all transaction handling from caller.
787  */
788 int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
789                      struct llog_handle **res, struct llog_logid *logid,
790                      char *name)
791 {
792         struct thandle  *th;
793         int              rc;
794
795         ENTRY;
796
797         rc = llog_open(env, ctxt, res, logid, name, LLOG_OPEN_NEW);
798         if (rc)
799                 RETURN(rc);
800
801         if (llog_exist(*res))
802                 RETURN(0);
803
804         if ((*res)->lgh_obj != NULL) {
805                 struct dt_device *d;
806
807                 d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
808
809                 th = dt_trans_create(env, d);
810                 if (IS_ERR(th))
811                         GOTO(out, rc = PTR_ERR(th));
812
813                 rc = llog_declare_create(env, *res, th);
814                 if (rc == 0) {
815                         rc = dt_trans_start_local(env, d, th);
816                         if (rc == 0)
817                                 rc = llog_create(env, *res, th);
818                 }
819                 dt_trans_stop(env, d, th);
820         } else {
821                 /* lvfs compat code */
822                 LASSERT((*res)->lgh_file == NULL);
823                 rc = llog_create(env, *res, NULL);
824         }
825 out:
826         if (rc)
827                 llog_close(env, *res);
828         RETURN(rc);
829 }
830 EXPORT_SYMBOL(llog_open_create);
831
832 /**
833  * Helper function to delete existent llog.
834  */
835 int llog_erase(const struct lu_env *env, struct llog_ctxt *ctxt,
836                struct llog_logid *logid, char *name)
837 {
838         struct llog_handle      *handle;
839         int                      rc = 0, rc2;
840
841         ENTRY;
842
843         /* nothing to erase */
844         if (name == NULL && logid == NULL)
845                 RETURN(0);
846
847         rc = llog_open(env, ctxt, &handle, logid, name, LLOG_OPEN_EXISTS);
848         if (rc < 0)
849                 RETURN(rc);
850
851         rc = llog_init_handle(env, handle, LLOG_F_IS_PLAIN, NULL);
852         if (rc == 0)
853                 rc = llog_destroy(env, handle);
854
855         rc2 = llog_close(env, handle);
856         if (rc == 0)
857                 rc = rc2;
858         RETURN(rc);
859 }
860 EXPORT_SYMBOL(llog_erase);
861
862 /*
863  * Helper function for write record in llog.
864  * It hides all transaction handling from caller.
865  * Valid only with local llog.
866  */
867 int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
868                struct llog_rec_hdr *rec, struct llog_cookie *reccookie,
869                int cookiecount, void *buf, int idx)
870 {
871         int rc;
872
873         ENTRY;
874
875         LASSERT(loghandle);
876         LASSERT(loghandle->lgh_ctxt);
877
878         if (loghandle->lgh_obj != NULL) {
879                 struct dt_device        *dt;
880                 struct thandle          *th;
881
882                 dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
883
884                 th = dt_trans_create(env, dt);
885                 if (IS_ERR(th))
886                         RETURN(PTR_ERR(th));
887
888                 rc = llog_declare_write_rec(env, loghandle, rec, idx, th);
889                 if (rc)
890                         GOTO(out_trans, rc);
891
892                 rc = dt_trans_start_local(env, dt, th);
893                 if (rc)
894                         GOTO(out_trans, rc);
895
896                 down_write(&loghandle->lgh_lock);
897                 rc = llog_write_rec(env, loghandle, rec, reccookie,
898                                     cookiecount, buf, idx, th);
899                 up_write(&loghandle->lgh_lock);
900 out_trans:
901                 dt_trans_stop(env, dt, th);
902         } else { /* lvfs compatibility */
903                 down_write(&loghandle->lgh_lock);
904                 rc = llog_write_rec(env, loghandle, rec, reccookie,
905                                     cookiecount, buf, idx, NULL);
906                 up_write(&loghandle->lgh_lock);
907         }
908         RETURN(rc);
909 }
910 EXPORT_SYMBOL(llog_write);
911
912 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
913               struct llog_handle **lgh, struct llog_logid *logid,
914               char *name, enum llog_open_param open_param)
915 {
916         int      raised;
917         int      rc;
918
919         ENTRY;
920
921         LASSERT(ctxt);
922         LASSERT(ctxt->loc_logops);
923
924         if (ctxt->loc_logops->lop_open == NULL) {
925                 *lgh = NULL;
926                 RETURN(-EOPNOTSUPP);
927         }
928
929         *lgh = llog_alloc_handle();
930         if (*lgh == NULL)
931                 RETURN(-ENOMEM);
932         (*lgh)->lgh_ctxt = ctxt;
933         (*lgh)->lgh_logops = ctxt->loc_logops;
934
935         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
936         if (!raised)
937                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
938         rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
939         if (!raised)
940                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
941         if (rc) {
942                 llog_free_handle(*lgh);
943                 *lgh = NULL;
944         }
945         RETURN(rc);
946 }
947 EXPORT_SYMBOL(llog_open);
948
949 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
950 {
951         struct llog_operations  *lop;
952         int                      rc;
953
954         ENTRY;
955
956         rc = llog_handle2ops(loghandle, &lop);
957         if (rc)
958                 GOTO(out, rc);
959         if (lop->lop_close == NULL)
960                 GOTO(out, rc = -EOPNOTSUPP);
961         rc = lop->lop_close(env, loghandle);
962 out:
963         llog_handle_put(loghandle);
964         RETURN(rc);
965 }
966 EXPORT_SYMBOL(llog_close);