]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/ldlm/ldlm_lockd.c
staging: add Lustre file system client support
[karo-tx-linux.git] / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lockd.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 # include <linux/libcfs/libcfs.h>
45
46 #include <lustre_dlm.h>
47 #include <obd_class.h>
48 #include <linux/list.h>
49 #include "ldlm_internal.h"
50
51 static int ldlm_num_threads;
52 CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
53                 "number of DLM service threads to start");
54
55 static char *ldlm_cpts;
56 CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
57                 "CPU partitions ldlm threads should run on");
58
59 extern struct kmem_cache *ldlm_resource_slab;
60 extern struct kmem_cache *ldlm_lock_slab;
61 static struct mutex     ldlm_ref_mutex;
62 static int ldlm_refcount;
63
64 struct ldlm_cb_async_args {
65         struct ldlm_cb_set_arg *ca_set_arg;
66         struct ldlm_lock       *ca_lock;
67 };
68
69 /* LDLM state */
70
71 static struct ldlm_state *ldlm_state;
72
73 inline cfs_time_t round_timeout(cfs_time_t timeout)
74 {
75         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
76 }
77
78 /* timeout for initial callback (AST) reply (bz10399) */
79 static inline unsigned int ldlm_get_rq_timeout(void)
80 {
81         /* Non-AT value */
82         unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
83
84         return timeout < 1 ? 1 : timeout;
85 }
86
87 #define ELT_STOPPED   0
88 #define ELT_READY     1
89 #define ELT_TERMINATE 2
90
91 struct ldlm_bl_pool {
92         spinlock_t              blp_lock;
93
94         /*
95          * blp_prio_list is used for callbacks that should be handled
96          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
97          * see bug 13843
98          */
99         struct list_head              blp_prio_list;
100
101         /*
102          * blp_list is used for all other callbacks which are likely
103          * to take longer to process.
104          */
105         struct list_head              blp_list;
106
107         wait_queue_head_t            blp_waitq;
108         struct completion       blp_comp;
109         atomic_t            blp_num_threads;
110         atomic_t            blp_busy_threads;
111         int                  blp_min_threads;
112         int                  blp_max_threads;
113 };
114
115 struct ldlm_bl_work_item {
116         struct list_head              blwi_entry;
117         struct ldlm_namespace  *blwi_ns;
118         struct ldlm_lock_desc   blwi_ld;
119         struct ldlm_lock       *blwi_lock;
120         struct list_head              blwi_head;
121         int                  blwi_count;
122         struct completion       blwi_comp;
123         ldlm_cancel_flags_t     blwi_flags;
124         int                  blwi_mem_pressure;
125 };
126
127
128 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
129 {
130         RETURN(0);
131 }
132
133 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
134 {
135         RETURN(0);
136 }
137
138
139
140 /**
141  * Callback handler for receiving incoming blocking ASTs.
142  *
143  * This can only happen on client side.
144  */
145 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
146                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
147 {
148         int do_ast;
149         ENTRY;
150
151         LDLM_DEBUG(lock, "client blocking AST callback handler");
152
153         lock_res_and_lock(lock);
154         lock->l_flags |= LDLM_FL_CBPENDING;
155
156         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
157                 lock->l_flags |= LDLM_FL_CANCEL;
158
159         do_ast = (!lock->l_readers && !lock->l_writers);
160         unlock_res_and_lock(lock);
161
162         if (do_ast) {
163                 CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
164                        lock, lock->l_blocking_ast);
165                 if (lock->l_blocking_ast != NULL)
166                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
167                                              LDLM_CB_BLOCKING);
168         } else {
169                 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
170                        lock);
171         }
172
173         LDLM_DEBUG(lock, "client blocking callback handler END");
174         LDLM_LOCK_RELEASE(lock);
175         EXIT;
176 }
177
178 /**
179  * Callback handler for receiving incoming completion ASTs.
180  *
181  * This only can happen on client side.
182  */
183 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
184                                     struct ldlm_namespace *ns,
185                                     struct ldlm_request *dlm_req,
186                                     struct ldlm_lock *lock)
187 {
188         int lvb_len;
189         LIST_HEAD(ast_list);
190         int rc = 0;
191         ENTRY;
192
193         LDLM_DEBUG(lock, "client completion callback handler START");
194
195         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
196                 int to = cfs_time_seconds(1);
197                 while (to > 0) {
198                         schedule_timeout_and_set_state(
199                                 TASK_INTERRUPTIBLE, to);
200                         if (lock->l_granted_mode == lock->l_req_mode ||
201                             lock->l_destroyed)
202                                 break;
203                 }
204         }
205
206         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
207         if (lvb_len < 0) {
208                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
209                 GOTO(out, rc = lvb_len);
210         } else if (lvb_len > 0) {
211                 if (lock->l_lvb_len > 0) {
212                         /* for extent lock, lvb contains ost_lvb{}. */
213                         LASSERT(lock->l_lvb_data != NULL);
214
215                         if (unlikely(lock->l_lvb_len < lvb_len)) {
216                                 LDLM_ERROR(lock, "Replied LVB is larger than "
217                                            "expectation, expected = %d, "
218                                            "replied = %d",
219                                            lock->l_lvb_len, lvb_len);
220                                 GOTO(out, rc = -EINVAL);
221                         }
222                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
223                                                      * variable length */
224                         void *lvb_data;
225
226                         OBD_ALLOC(lvb_data, lvb_len);
227                         if (lvb_data == NULL) {
228                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
229                                 GOTO(out, rc = -ENOMEM);
230                         }
231
232                         lock_res_and_lock(lock);
233                         LASSERT(lock->l_lvb_data == NULL);
234                         lock->l_lvb_data = lvb_data;
235                         lock->l_lvb_len = lvb_len;
236                         unlock_res_and_lock(lock);
237                 }
238         }
239
240         lock_res_and_lock(lock);
241         if (lock->l_destroyed ||
242             lock->l_granted_mode == lock->l_req_mode) {
243                 /* bug 11300: the lock has already been granted */
244                 unlock_res_and_lock(lock);
245                 LDLM_DEBUG(lock, "Double grant race happened");
246                 GOTO(out, rc = 0);
247         }
248
249         /* If we receive the completion AST before the actual enqueue returned,
250          * then we might need to switch lock modes, resources, or extents. */
251         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
252                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
253                 LDLM_DEBUG(lock, "completion AST, new lock mode");
254         }
255
256         if (lock->l_resource->lr_type != LDLM_PLAIN) {
257                 ldlm_convert_policy_to_local(req->rq_export,
258                                           dlm_req->lock_desc.l_resource.lr_type,
259                                           &dlm_req->lock_desc.l_policy_data,
260                                           &lock->l_policy_data);
261                 LDLM_DEBUG(lock, "completion AST, new policy data");
262         }
263
264         ldlm_resource_unlink_lock(lock);
265         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
266                    &lock->l_resource->lr_name,
267                    sizeof(lock->l_resource->lr_name)) != 0) {
268                 unlock_res_and_lock(lock);
269                 rc = ldlm_lock_change_resource(ns, lock,
270                                 &dlm_req->lock_desc.l_resource.lr_name);
271                 if (rc < 0) {
272                         LDLM_ERROR(lock, "Failed to allocate resource");
273                         GOTO(out, rc);
274                 }
275                 LDLM_DEBUG(lock, "completion AST, new resource");
276                 CERROR("change resource!\n");
277                 lock_res_and_lock(lock);
278         }
279
280         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
281                 /* BL_AST locks are not needed in LRU.
282                  * Let ldlm_cancel_lru() be fast. */
283                 ldlm_lock_remove_from_lru(lock);
284                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
285                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
286         }
287
288         if (lock->l_lvb_len > 0) {
289                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
290                                    lock->l_lvb_data, lvb_len);
291                 if (rc < 0) {
292                         unlock_res_and_lock(lock);
293                         GOTO(out, rc);
294                 }
295         }
296
297         ldlm_grant_lock(lock, &ast_list);
298         unlock_res_and_lock(lock);
299
300         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
301
302         /* Let Enqueue to call osc_lock_upcall() and initialize
303          * l_ast_data */
304         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
305
306         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
307
308         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
309                           lock);
310         GOTO(out, rc);
311
312 out:
313         if (rc < 0) {
314                 lock_res_and_lock(lock);
315                 lock->l_flags |= LDLM_FL_FAILED;
316                 unlock_res_and_lock(lock);
317                 wake_up(&lock->l_waitq);
318         }
319         LDLM_LOCK_RELEASE(lock);
320 }
321
322 /**
323  * Callback handler for receiving incoming glimpse ASTs.
324  *
325  * This only can happen on client side.  After handling the glimpse AST
326  * we also consider dropping the lock here if it is unused locally for a
327  * long time.
328  */
329 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
330                                     struct ldlm_namespace *ns,
331                                     struct ldlm_request *dlm_req,
332                                     struct ldlm_lock *lock)
333 {
334         int rc = -ENOSYS;
335         ENTRY;
336
337         LDLM_DEBUG(lock, "client glimpse AST callback handler");
338
339         if (lock->l_glimpse_ast != NULL)
340                 rc = lock->l_glimpse_ast(lock, req);
341
342         if (req->rq_repmsg != NULL) {
343                 ptlrpc_reply(req);
344         } else {
345                 req->rq_status = rc;
346                 ptlrpc_error(req);
347         }
348
349         lock_res_and_lock(lock);
350         if (lock->l_granted_mode == LCK_PW &&
351             !lock->l_readers && !lock->l_writers &&
352             cfs_time_after(cfs_time_current(),
353                            cfs_time_add(lock->l_last_used,
354                                         cfs_time_seconds(10)))) {
355                 unlock_res_and_lock(lock);
356                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
357                         ldlm_handle_bl_callback(ns, NULL, lock);
358
359                 EXIT;
360                 return;
361         }
362         unlock_res_and_lock(lock);
363         LDLM_LOCK_RELEASE(lock);
364         EXIT;
365 }
366
367 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
368 {
369         if (req->rq_no_reply)
370                 return 0;
371
372         req->rq_status = rc;
373         if (!req->rq_packed_final) {
374                 rc = lustre_pack_reply(req, 1, NULL, NULL);
375                 if (rc)
376                         return rc;
377         }
378         return ptlrpc_reply(req);
379 }
380
381 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
382                                ldlm_cancel_flags_t cancel_flags)
383 {
384         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
385         ENTRY;
386
387         spin_lock(&blp->blp_lock);
388         if (blwi->blwi_lock &&
389             blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
390                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
391                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
392         } else {
393                 /* other blocking callbacks are added to the regular list */
394                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
395         }
396         spin_unlock(&blp->blp_lock);
397
398         wake_up(&blp->blp_waitq);
399
400         /* can not check blwi->blwi_flags as blwi could be already freed in
401            LCF_ASYNC mode */
402         if (!(cancel_flags & LCF_ASYNC))
403                 wait_for_completion(&blwi->blwi_comp);
404
405         RETURN(0);
406 }
407
408 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
409                              struct ldlm_namespace *ns,
410                              struct ldlm_lock_desc *ld,
411                              struct list_head *cancels, int count,
412                              struct ldlm_lock *lock,
413                              ldlm_cancel_flags_t cancel_flags)
414 {
415         init_completion(&blwi->blwi_comp);
416         INIT_LIST_HEAD(&blwi->blwi_head);
417
418         if (memory_pressure_get())
419                 blwi->blwi_mem_pressure = 1;
420
421         blwi->blwi_ns = ns;
422         blwi->blwi_flags = cancel_flags;
423         if (ld != NULL)
424                 blwi->blwi_ld = *ld;
425         if (count) {
426                 list_add(&blwi->blwi_head, cancels);
427                 list_del_init(cancels);
428                 blwi->blwi_count = count;
429         } else {
430                 blwi->blwi_lock = lock;
431         }
432 }
433
434 /**
435  * Queues a list of locks \a cancels containing \a count locks
436  * for later processing by a blocking thread.  If \a count is zero,
437  * then the lock referenced as \a lock is queued instead.
438  *
439  * The blocking thread would then call ->l_blocking_ast callback in the lock.
440  * If list addition fails an error is returned and caller is supposed to
441  * call ->l_blocking_ast itself.
442  */
443 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
444                              struct ldlm_lock_desc *ld,
445                              struct ldlm_lock *lock,
446                              struct list_head *cancels, int count,
447                              ldlm_cancel_flags_t cancel_flags)
448 {
449         ENTRY;
450
451         if (cancels && count == 0)
452                 RETURN(0);
453
454         if (cancel_flags & LCF_ASYNC) {
455                 struct ldlm_bl_work_item *blwi;
456
457                 OBD_ALLOC(blwi, sizeof(*blwi));
458                 if (blwi == NULL)
459                         RETURN(-ENOMEM);
460                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
461
462                 RETURN(__ldlm_bl_to_thread(blwi, cancel_flags));
463         } else {
464                 /* if it is synchronous call do minimum mem alloc, as it could
465                  * be triggered from kernel shrinker
466                  */
467                 struct ldlm_bl_work_item blwi;
468
469                 memset(&blwi, 0, sizeof(blwi));
470                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
471                 RETURN(__ldlm_bl_to_thread(&blwi, cancel_flags));
472         }
473 }
474
475
476 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
477                            struct ldlm_lock *lock)
478 {
479         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
480 }
481
482 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
483                            struct list_head *cancels, int count,
484                            ldlm_cancel_flags_t cancel_flags)
485 {
486         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
487 }
488
489 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
490 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
491 {
492         struct obd_device *obd = req->rq_export->exp_obd;
493         char *key;
494         void *val;
495         int keylen, vallen;
496         int rc = -ENOSYS;
497         ENTRY;
498
499         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
500
501         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
502
503         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
504         if (key == NULL) {
505                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
506                 RETURN(-EFAULT);
507         }
508         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
509                                       RCL_CLIENT);
510         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
511         if (val == NULL) {
512                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
513                 RETURN(-EFAULT);
514         }
515         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
516                                       RCL_CLIENT);
517
518         /* We are responsible for swabbing contents of val */
519
520         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
521                 /* Pass it on to mdc (the "export" in this case) */
522                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
523                                         req->rq_export,
524                                         sizeof(KEY_HSM_COPYTOOL_SEND),
525                                         KEY_HSM_COPYTOOL_SEND,
526                                         vallen, val, NULL);
527         else
528                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
529
530         return rc;
531 }
532
533 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
534                                         const char *msg, int rc,
535                                         struct lustre_handle *handle)
536 {
537         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
538                   "%s: [nid %s] [rc %d] [lock "LPX64"]",
539                   msg, libcfs_id2str(req->rq_peer), rc,
540                   handle ? handle->cookie : 0);
541         if (req->rq_no_reply)
542                 CWARN("No reply was sent, maybe cause bug 21636.\n");
543         else if (rc)
544                 CWARN("Send reply failed, maybe cause bug 21636.\n");
545 }
546
547 static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
548 {
549         struct obd_quotactl *oqctl;
550         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
551
552         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
553         if (oqctl == NULL) {
554                 CERROR("Can't unpack obd_quotactl\n");
555                 RETURN(-EPROTO);
556         }
557
558         cli->cl_qchk_stat = oqctl->qc_stat;
559         return 0;
560 }
561
562 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
563 static int ldlm_callback_handler(struct ptlrpc_request *req)
564 {
565         struct ldlm_namespace *ns;
566         struct ldlm_request *dlm_req;
567         struct ldlm_lock *lock;
568         int rc;
569         ENTRY;
570
571         /* Requests arrive in sender's byte order.  The ptlrpc service
572          * handler has already checked and, if necessary, byte-swapped the
573          * incoming request message body, but I am responsible for the
574          * message buffers. */
575
576         /* do nothing for sec context finalize */
577         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
578                 RETURN(0);
579
580         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
581
582         if (req->rq_export == NULL) {
583                 rc = ldlm_callback_reply(req, -ENOTCONN);
584                 ldlm_callback_errmsg(req, "Operate on unconnected server",
585                                      rc, NULL);
586                 RETURN(0);
587         }
588
589         LASSERT(req->rq_export != NULL);
590         LASSERT(req->rq_export->exp_obd != NULL);
591
592         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
593         case LDLM_BL_CALLBACK:
594                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
595                         RETURN(0);
596                 break;
597         case LDLM_CP_CALLBACK:
598                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
599                         RETURN(0);
600                 break;
601         case LDLM_GL_CALLBACK:
602                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
603                         RETURN(0);
604                 break;
605         case LDLM_SET_INFO:
606                 rc = ldlm_handle_setinfo(req);
607                 ldlm_callback_reply(req, rc);
608                 RETURN(0);
609         case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
610                 CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
611                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
612                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
613                         RETURN(0);
614                 rc = llog_origin_handle_cancel(req);
615                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
616                         RETURN(0);
617                 ldlm_callback_reply(req, rc);
618                 RETURN(0);
619         case LLOG_ORIGIN_HANDLE_CREATE:
620                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
621                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
622                         RETURN(0);
623                 rc = llog_origin_handle_open(req);
624                 ldlm_callback_reply(req, rc);
625                 RETURN(0);
626         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
627                 req_capsule_set(&req->rq_pill,
628                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
629                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
630                         RETURN(0);
631                 rc = llog_origin_handle_next_block(req);
632                 ldlm_callback_reply(req, rc);
633                 RETURN(0);
634         case LLOG_ORIGIN_HANDLE_READ_HEADER:
635                 req_capsule_set(&req->rq_pill,
636                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
637                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
638                         RETURN(0);
639                 rc = llog_origin_handle_read_header(req);
640                 ldlm_callback_reply(req, rc);
641                 RETURN(0);
642         case LLOG_ORIGIN_HANDLE_CLOSE:
643                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
644                         RETURN(0);
645                 rc = llog_origin_handle_close(req);
646                 ldlm_callback_reply(req, rc);
647                 RETURN(0);
648         case OBD_QC_CALLBACK:
649                 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
650                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
651                         RETURN(0);
652                 rc = ldlm_handle_qc_callback(req);
653                 ldlm_callback_reply(req, rc);
654                 RETURN(0);
655         default:
656                 CERROR("unknown opcode %u\n",
657                        lustre_msg_get_opc(req->rq_reqmsg));
658                 ldlm_callback_reply(req, -EPROTO);
659                 RETURN(0);
660         }
661
662         ns = req->rq_export->exp_obd->obd_namespace;
663         LASSERT(ns != NULL);
664
665         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
666
667         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
668         if (dlm_req == NULL) {
669                 rc = ldlm_callback_reply(req, -EPROTO);
670                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
671                                      NULL);
672                 RETURN(0);
673         }
674
675         /* Force a known safe race, send a cancel to the server for a lock
676          * which the server has already started a blocking callback on. */
677         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
678             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
679                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
680                 if (rc < 0)
681                         CERROR("ldlm_cli_cancel: %d\n", rc);
682         }
683
684         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
685         if (!lock) {
686                 CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
687                        "disappeared\n", dlm_req->lock_handle[0].cookie);
688                 rc = ldlm_callback_reply(req, -EINVAL);
689                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
690                                      &dlm_req->lock_handle[0]);
691                 RETURN(0);
692         }
693
694         if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
695             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
696                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
697
698         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
699         lock_res_and_lock(lock);
700         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
701                                               LDLM_AST_FLAGS);
702         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
703                 /* If somebody cancels lock and cache is already dropped,
704                  * or lock is failed before cp_ast received on client,
705                  * we can tell the server we have no lock. Otherwise, we
706                  * should send cancel after dropping the cache. */
707                 if (((lock->l_flags & LDLM_FL_CANCELING) &&
708                     (lock->l_flags & LDLM_FL_BL_DONE)) ||
709                     (lock->l_flags & LDLM_FL_FAILED)) {
710                         LDLM_DEBUG(lock, "callback on lock "
711                                    LPX64" - lock disappeared\n",
712                                    dlm_req->lock_handle[0].cookie);
713                         unlock_res_and_lock(lock);
714                         LDLM_LOCK_RELEASE(lock);
715                         rc = ldlm_callback_reply(req, -EINVAL);
716                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
717                                              &dlm_req->lock_handle[0]);
718                         RETURN(0);
719                 }
720                 /* BL_AST locks are not needed in LRU.
721                  * Let ldlm_cancel_lru() be fast. */
722                 ldlm_lock_remove_from_lru(lock);
723                 lock->l_flags |= LDLM_FL_BL_AST;
724         }
725         unlock_res_and_lock(lock);
726
727         /* We want the ost thread to get this reply so that it can respond
728          * to ost requests (write cache writeback) that might be triggered
729          * in the callback.
730          *
731          * But we'd also like to be able to indicate in the reply that we're
732          * cancelling right now, because it's unused, or have an intent result
733          * in the reply, so we might have to push the responsibility for sending
734          * the reply down into the AST handlers, alas. */
735
736         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
737         case LDLM_BL_CALLBACK:
738                 CDEBUG(D_INODE, "blocking ast\n");
739                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
740                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
741                         rc = ldlm_callback_reply(req, 0);
742                         if (req->rq_no_reply || rc)
743                                 ldlm_callback_errmsg(req, "Normal process", rc,
744                                                      &dlm_req->lock_handle[0]);
745                 }
746                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
747                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
748                 break;
749         case LDLM_CP_CALLBACK:
750                 CDEBUG(D_INODE, "completion ast\n");
751                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
752                 ldlm_callback_reply(req, 0);
753                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
754                 break;
755         case LDLM_GL_CALLBACK:
756                 CDEBUG(D_INODE, "glimpse ast\n");
757                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
758                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
759                 break;
760         default:
761                 LBUG();                  /* checked above */
762         }
763
764         RETURN(0);
765 }
766
767
768 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
769 {
770         struct ldlm_bl_work_item *blwi = NULL;
771         static unsigned int num_bl = 0;
772
773         spin_lock(&blp->blp_lock);
774         /* process a request from the blp_list at least every blp_num_threads */
775         if (!list_empty(&blp->blp_list) &&
776             (list_empty(&blp->blp_prio_list) || num_bl == 0))
777                 blwi = list_entry(blp->blp_list.next,
778                                       struct ldlm_bl_work_item, blwi_entry);
779         else
780                 if (!list_empty(&blp->blp_prio_list))
781                         blwi = list_entry(blp->blp_prio_list.next,
782                                               struct ldlm_bl_work_item,
783                                               blwi_entry);
784
785         if (blwi) {
786                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
787                         num_bl = 0;
788                 list_del(&blwi->blwi_entry);
789         }
790         spin_unlock(&blp->blp_lock);
791
792         return blwi;
793 }
794
795 /* This only contains temporary data until the thread starts */
796 struct ldlm_bl_thread_data {
797         char                    bltd_name[CFS_CURPROC_COMM_MAX];
798         struct ldlm_bl_pool     *bltd_blp;
799         struct completion       bltd_comp;
800         int                     bltd_num;
801 };
802
803 static int ldlm_bl_thread_main(void *arg);
804
805 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
806 {
807         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
808         task_t *task;
809
810         init_completion(&bltd.bltd_comp);
811         bltd.bltd_num = atomic_read(&blp->blp_num_threads);
812         snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1,
813                 "ldlm_bl_%02d", bltd.bltd_num);
814         task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name);
815         if (IS_ERR(task)) {
816                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
817                        atomic_read(&blp->blp_num_threads), PTR_ERR(task));
818                 return PTR_ERR(task);
819         }
820         wait_for_completion(&bltd.bltd_comp);
821
822         return 0;
823 }
824
825 /**
826  * Main blocking requests processing thread.
827  *
828  * Callers put locks into its queue by calling ldlm_bl_to_thread.
829  * This thread in the end ends up doing actual call to ->l_blocking_ast
830  * for queued locks.
831  */
832 static int ldlm_bl_thread_main(void *arg)
833 {
834         struct ldlm_bl_pool *blp;
835         ENTRY;
836
837         {
838                 struct ldlm_bl_thread_data *bltd = arg;
839
840                 blp = bltd->bltd_blp;
841
842                 atomic_inc(&blp->blp_num_threads);
843                 atomic_inc(&blp->blp_busy_threads);
844
845                 complete(&bltd->bltd_comp);
846                 /* cannot use bltd after this, it is only on caller's stack */
847         }
848
849         while (1) {
850                 struct l_wait_info lwi = { 0 };
851                 struct ldlm_bl_work_item *blwi = NULL;
852                 int busy;
853
854                 blwi = ldlm_bl_get_work(blp);
855
856                 if (blwi == NULL) {
857                         atomic_dec(&blp->blp_busy_threads);
858                         l_wait_event_exclusive(blp->blp_waitq,
859                                          (blwi = ldlm_bl_get_work(blp)) != NULL,
860                                          &lwi);
861                         busy = atomic_inc_return(&blp->blp_busy_threads);
862                 } else {
863                         busy = atomic_read(&blp->blp_busy_threads);
864                 }
865
866                 if (blwi->blwi_ns == NULL)
867                         /* added by ldlm_cleanup() */
868                         break;
869
870                 /* Not fatal if racy and have a few too many threads */
871                 if (unlikely(busy < blp->blp_max_threads &&
872                              busy >= atomic_read(&blp->blp_num_threads) &&
873                              !blwi->blwi_mem_pressure))
874                         /* discard the return value, we tried */
875                         ldlm_bl_thread_start(blp);
876
877                 if (blwi->blwi_mem_pressure)
878                         memory_pressure_set();
879
880                 if (blwi->blwi_count) {
881                         int count;
882                         /* The special case when we cancel locks in LRU
883                          * asynchronously, we pass the list of locks here.
884                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
885                          * canceled locally yet. */
886                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
887                                                            blwi->blwi_count,
888                                                            LCF_BL_AST);
889                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
890                                              blwi->blwi_flags);
891                 } else {
892                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
893                                                 blwi->blwi_lock);
894                 }
895                 if (blwi->blwi_mem_pressure)
896                         memory_pressure_clr();
897
898                 if (blwi->blwi_flags & LCF_ASYNC)
899                         OBD_FREE(blwi, sizeof(*blwi));
900                 else
901                         complete(&blwi->blwi_comp);
902         }
903
904         atomic_dec(&blp->blp_busy_threads);
905         atomic_dec(&blp->blp_num_threads);
906         complete(&blp->blp_comp);
907         RETURN(0);
908 }
909
910
911 static int ldlm_setup(void);
912 static int ldlm_cleanup(void);
913
914 int ldlm_get_ref(void)
915 {
916         int rc = 0;
917         ENTRY;
918         mutex_lock(&ldlm_ref_mutex);
919         if (++ldlm_refcount == 1) {
920                 rc = ldlm_setup();
921                 if (rc)
922                         ldlm_refcount--;
923         }
924         mutex_unlock(&ldlm_ref_mutex);
925
926         RETURN(rc);
927 }
928 EXPORT_SYMBOL(ldlm_get_ref);
929
930 void ldlm_put_ref(void)
931 {
932         ENTRY;
933         mutex_lock(&ldlm_ref_mutex);
934         if (ldlm_refcount == 1) {
935                 int rc = ldlm_cleanup();
936                 if (rc)
937                         CERROR("ldlm_cleanup failed: %d\n", rc);
938                 else
939                         ldlm_refcount--;
940         } else {
941                 ldlm_refcount--;
942         }
943         mutex_unlock(&ldlm_ref_mutex);
944
945         EXIT;
946 }
947 EXPORT_SYMBOL(ldlm_put_ref);
948
949 /*
950  * Export handle<->lock hash operations.
951  */
952 static unsigned
953 ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
954 {
955         return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
956 }
957
958 static void *
959 ldlm_export_lock_key(struct hlist_node *hnode)
960 {
961         struct ldlm_lock *lock;
962
963         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
964         return &lock->l_remote_handle;
965 }
966
967 static void
968 ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
969 {
970         struct ldlm_lock     *lock;
971
972         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
973         lock->l_remote_handle = *(struct lustre_handle *)key;
974 }
975
976 static int
977 ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
978 {
979         return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
980 }
981
982 static void *
983 ldlm_export_lock_object(struct hlist_node *hnode)
984 {
985         return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
986 }
987
988 static void
989 ldlm_export_lock_get(cfs_hash_t *hs, struct hlist_node *hnode)
990 {
991         struct ldlm_lock *lock;
992
993         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
994         LDLM_LOCK_GET(lock);
995 }
996
997 static void
998 ldlm_export_lock_put(cfs_hash_t *hs, struct hlist_node *hnode)
999 {
1000         struct ldlm_lock *lock;
1001
1002         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
1003         LDLM_LOCK_RELEASE(lock);
1004 }
1005
1006 static cfs_hash_ops_t ldlm_export_lock_ops = {
1007         .hs_hash        = ldlm_export_lock_hash,
1008         .hs_key  = ldlm_export_lock_key,
1009         .hs_keycmp      = ldlm_export_lock_keycmp,
1010         .hs_keycpy      = ldlm_export_lock_keycpy,
1011         .hs_object      = ldlm_export_lock_object,
1012         .hs_get  = ldlm_export_lock_get,
1013         .hs_put  = ldlm_export_lock_put,
1014         .hs_put_locked  = ldlm_export_lock_put,
1015 };
1016
1017 int ldlm_init_export(struct obd_export *exp)
1018 {
1019         ENTRY;
1020
1021         exp->exp_lock_hash =
1022                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
1023                                 HASH_EXP_LOCK_CUR_BITS,
1024                                 HASH_EXP_LOCK_MAX_BITS,
1025                                 HASH_EXP_LOCK_BKT_BITS, 0,
1026                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
1027                                 &ldlm_export_lock_ops,
1028                                 CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
1029                                 CFS_HASH_NBLK_CHANGE);
1030
1031         if (!exp->exp_lock_hash)
1032                 RETURN(-ENOMEM);
1033
1034         RETURN(0);
1035 }
1036 EXPORT_SYMBOL(ldlm_init_export);
1037
1038 void ldlm_destroy_export(struct obd_export *exp)
1039 {
1040         ENTRY;
1041         cfs_hash_putref(exp->exp_lock_hash);
1042         exp->exp_lock_hash = NULL;
1043
1044         ldlm_destroy_flock_export(exp);
1045         EXIT;
1046 }
1047 EXPORT_SYMBOL(ldlm_destroy_export);
1048
1049 static int ldlm_setup(void)
1050 {
1051         static struct ptlrpc_service_conf       conf;
1052         struct ldlm_bl_pool                     *blp = NULL;
1053         int rc = 0;
1054         int i;
1055         ENTRY;
1056
1057         if (ldlm_state != NULL)
1058                 RETURN(-EALREADY);
1059
1060         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1061         if (ldlm_state == NULL)
1062                 RETURN(-ENOMEM);
1063
1064 #ifdef LPROCFS
1065         rc = ldlm_proc_setup();
1066         if (rc != 0)
1067                 GOTO(out, rc);
1068 #endif
1069
1070         memset(&conf, 0, sizeof(conf));
1071         conf = (typeof(conf)) {
1072                 .psc_name               = "ldlm_cbd",
1073                 .psc_watchdog_factor    = 2,
1074                 .psc_buf                = {
1075                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
1076                         .bc_buf_size            = LDLM_BUFSIZE,
1077                         .bc_req_max_size        = LDLM_MAXREQSIZE,
1078                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
1079                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
1080                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
1081                 },
1082                 .psc_thr                = {
1083                         .tc_thr_name            = "ldlm_cb",
1084                         .tc_thr_factor          = LDLM_THR_FACTOR,
1085                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
1086                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
1087                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
1088                         .tc_nthrs_user          = ldlm_num_threads,
1089                         .tc_cpu_affinity        = 1,
1090                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
1091                 },
1092                 .psc_cpt                = {
1093                         .cc_pattern             = ldlm_cpts,
1094                 },
1095                 .psc_ops                = {
1096                         .so_req_handler         = ldlm_callback_handler,
1097                 },
1098         };
1099         ldlm_state->ldlm_cb_service = \
1100                         ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
1101         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
1102                 CERROR("failed to start service\n");
1103                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
1104                 ldlm_state->ldlm_cb_service = NULL;
1105                 GOTO(out, rc);
1106         }
1107
1108
1109         OBD_ALLOC(blp, sizeof(*blp));
1110         if (blp == NULL)
1111                 GOTO(out, rc = -ENOMEM);
1112         ldlm_state->ldlm_bl_pool = blp;
1113
1114         spin_lock_init(&blp->blp_lock);
1115         INIT_LIST_HEAD(&blp->blp_list);
1116         INIT_LIST_HEAD(&blp->blp_prio_list);
1117         init_waitqueue_head(&blp->blp_waitq);
1118         atomic_set(&blp->blp_num_threads, 0);
1119         atomic_set(&blp->blp_busy_threads, 0);
1120
1121         if (ldlm_num_threads == 0) {
1122                 blp->blp_min_threads = LDLM_NTHRS_INIT;
1123                 blp->blp_max_threads = LDLM_NTHRS_MAX;
1124         } else {
1125                 blp->blp_min_threads = blp->blp_max_threads = \
1126                         min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
1127                                                          ldlm_num_threads));
1128         }
1129
1130         for (i = 0; i < blp->blp_min_threads; i++) {
1131                 rc = ldlm_bl_thread_start(blp);
1132                 if (rc < 0)
1133                         GOTO(out, rc);
1134         }
1135
1136
1137         rc = ldlm_pools_init();
1138         if (rc) {
1139                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1140                 GOTO(out, rc);
1141         }
1142         RETURN(0);
1143
1144  out:
1145         ldlm_cleanup();
1146         RETURN(rc);
1147 }
1148
1149 static int ldlm_cleanup(void)
1150 {
1151         ENTRY;
1152
1153         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1154             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1155                 CERROR("ldlm still has namespaces; clean these up first.\n");
1156                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1157                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1158                 RETURN(-EBUSY);
1159         }
1160
1161         ldlm_pools_fini();
1162
1163         if (ldlm_state->ldlm_bl_pool != NULL) {
1164                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1165
1166                 while (atomic_read(&blp->blp_num_threads) > 0) {
1167                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1168
1169                         init_completion(&blp->blp_comp);
1170
1171                         spin_lock(&blp->blp_lock);
1172                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1173                         wake_up(&blp->blp_waitq);
1174                         spin_unlock(&blp->blp_lock);
1175
1176                         wait_for_completion(&blp->blp_comp);
1177                 }
1178
1179                 OBD_FREE(blp, sizeof(*blp));
1180         }
1181
1182         if (ldlm_state->ldlm_cb_service != NULL)
1183                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1184
1185         ldlm_proc_cleanup();
1186
1187
1188         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1189         ldlm_state = NULL;
1190
1191         RETURN(0);
1192 }
1193
1194 int ldlm_init(void)
1195 {
1196         mutex_init(&ldlm_ref_mutex);
1197         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1198         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1199         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1200                                                sizeof(struct ldlm_resource), 0,
1201                                                SLAB_HWCACHE_ALIGN, NULL);
1202         if (ldlm_resource_slab == NULL)
1203                 return -ENOMEM;
1204
1205         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1206                               sizeof(struct ldlm_lock), 0,
1207                               SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
1208         if (ldlm_lock_slab == NULL) {
1209                 kmem_cache_destroy(ldlm_resource_slab);
1210                 return -ENOMEM;
1211         }
1212
1213         ldlm_interval_slab = kmem_cache_create("interval_node",
1214                                         sizeof(struct ldlm_interval),
1215                                         0, SLAB_HWCACHE_ALIGN, NULL);
1216         if (ldlm_interval_slab == NULL) {
1217                 kmem_cache_destroy(ldlm_resource_slab);
1218                 kmem_cache_destroy(ldlm_lock_slab);
1219                 return -ENOMEM;
1220         }
1221 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1222         class_export_dump_hook = ldlm_dump_export_locks;
1223 #endif
1224         return 0;
1225 }
1226
1227 void ldlm_exit(void)
1228 {
1229         if (ldlm_refcount)
1230                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1231         kmem_cache_destroy(ldlm_resource_slab);
1232         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1233          * synchronize_rcu() to wait a grace period elapsed, so that
1234          * ldlm_lock_free() get a chance to be called. */
1235         synchronize_rcu();
1236         kmem_cache_destroy(ldlm_lock_slab);
1237         kmem_cache_destroy(ldlm_interval_slab);
1238 }