]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/staging/lustre/lustre/ldlm/ldlm_lockd.c
arch/include: remove empty Kbuild files
[karo-tx-linux.git] / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_lockd.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LDLM
39
40 #include "../../include/linux/libcfs/libcfs.h"
41 #include "../include/lustre_dlm.h"
42 #include "../include/obd_class.h"
43 #include <linux/list.h>
44 #include "ldlm_internal.h"
45
46 static int ldlm_num_threads;
47 module_param(ldlm_num_threads, int, 0444);
48 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
49
50 static char *ldlm_cpts;
51 module_param(ldlm_cpts, charp, 0444);
52 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
53
54 static struct mutex     ldlm_ref_mutex;
55 static int ldlm_refcount;
56
57 static struct kobject *ldlm_kobj;
58 struct kset *ldlm_ns_kset;
59 static struct kset *ldlm_svc_kset;
60
61 struct ldlm_cb_async_args {
62         struct ldlm_cb_set_arg *ca_set_arg;
63         struct ldlm_lock       *ca_lock;
64 };
65
66 /* LDLM state */
67
68 static struct ldlm_state *ldlm_state;
69
70 #define ELT_STOPPED   0
71 #define ELT_READY     1
72 #define ELT_TERMINATE 2
73
74 struct ldlm_bl_pool {
75         spinlock_t              blp_lock;
76
77         /*
78          * blp_prio_list is used for callbacks that should be handled
79          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
80          * see bug 13843
81          */
82         struct list_head              blp_prio_list;
83
84         /*
85          * blp_list is used for all other callbacks which are likely
86          * to take longer to process.
87          */
88         struct list_head              blp_list;
89
90         wait_queue_head_t            blp_waitq;
91         struct completion       blp_comp;
92         atomic_t            blp_num_threads;
93         atomic_t            blp_busy_threads;
94         int                  blp_min_threads;
95         int                  blp_max_threads;
96 };
97
98 struct ldlm_bl_work_item {
99         struct list_head              blwi_entry;
100         struct ldlm_namespace  *blwi_ns;
101         struct ldlm_lock_desc   blwi_ld;
102         struct ldlm_lock       *blwi_lock;
103         struct list_head              blwi_head;
104         int                  blwi_count;
105         struct completion       blwi_comp;
106         enum ldlm_cancel_flags  blwi_flags;
107         int                  blwi_mem_pressure;
108 };
109
110 /**
111  * Callback handler for receiving incoming blocking ASTs.
112  *
113  * This can only happen on client side.
114  */
115 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
116                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
117 {
118         int do_ast;
119
120         LDLM_DEBUG(lock, "client blocking AST callback handler");
121
122         lock_res_and_lock(lock);
123         ldlm_set_cbpending(lock);
124
125         if (ldlm_is_cancel_on_block(lock))
126                 ldlm_set_cancel(lock);
127
128         do_ast = !lock->l_readers && !lock->l_writers;
129         unlock_res_and_lock(lock);
130
131         if (do_ast) {
132                 CDEBUG(D_DLMTRACE,
133                        "Lock %p already unused, calling callback (%p)\n", lock,
134                        lock->l_blocking_ast);
135                 if (lock->l_blocking_ast)
136                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
137                                              LDLM_CB_BLOCKING);
138         } else {
139                 CDEBUG(D_DLMTRACE,
140                        "Lock %p is referenced, will be cancelled later\n",
141                        lock);
142         }
143
144         LDLM_DEBUG(lock, "client blocking callback handler END");
145         LDLM_LOCK_RELEASE(lock);
146 }
147
148 /**
149  * Callback handler for receiving incoming completion ASTs.
150  *
151  * This only can happen on client side.
152  */
153 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
154                                     struct ldlm_namespace *ns,
155                                     struct ldlm_request *dlm_req,
156                                     struct ldlm_lock *lock)
157 {
158         int lvb_len;
159         LIST_HEAD(ast_list);
160         int rc = 0;
161
162         LDLM_DEBUG(lock, "client completion callback handler START");
163
164         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
165                 int to = cfs_time_seconds(1);
166
167                 while (to > 0) {
168                         set_current_state(TASK_INTERRUPTIBLE);
169                         schedule_timeout(to);
170                         if (lock->l_granted_mode == lock->l_req_mode ||
171                             ldlm_is_destroyed(lock))
172                                 break;
173                 }
174         }
175
176         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
177         if (lvb_len < 0) {
178                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
179                 rc = lvb_len;
180                 goto out;
181         } else if (lvb_len > 0) {
182                 if (lock->l_lvb_len > 0) {
183                         /* for extent lock, lvb contains ost_lvb{}. */
184                         LASSERT(lock->l_lvb_data);
185
186                         if (unlikely(lock->l_lvb_len < lvb_len)) {
187                                 LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
188                                            lock->l_lvb_len, lvb_len);
189                                 rc = -EINVAL;
190                                 goto out;
191                         }
192                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
193                                                      * variable length
194                                                      */
195                         void *lvb_data;
196
197                         lvb_data = kzalloc(lvb_len, GFP_NOFS);
198                         if (!lvb_data) {
199                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
200                                 rc = -ENOMEM;
201                                 goto out;
202                         }
203
204                         lock_res_and_lock(lock);
205                         LASSERT(!lock->l_lvb_data);
206                         lock->l_lvb_type = LVB_T_LAYOUT;
207                         lock->l_lvb_data = lvb_data;
208                         lock->l_lvb_len = lvb_len;
209                         unlock_res_and_lock(lock);
210                 }
211         }
212
213         lock_res_and_lock(lock);
214         if (ldlm_is_destroyed(lock) ||
215             lock->l_granted_mode == lock->l_req_mode) {
216                 /* bug 11300: the lock has already been granted */
217                 unlock_res_and_lock(lock);
218                 LDLM_DEBUG(lock, "Double grant race happened");
219                 rc = 0;
220                 goto out;
221         }
222
223         /* If we receive the completion AST before the actual enqueue returned,
224          * then we might need to switch lock modes, resources, or extents.
225          */
226         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
227                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
228                 LDLM_DEBUG(lock, "completion AST, new lock mode");
229         }
230
231         if (lock->l_resource->lr_type != LDLM_PLAIN) {
232                 ldlm_convert_policy_to_local(req->rq_export,
233                                           dlm_req->lock_desc.l_resource.lr_type,
234                                           &dlm_req->lock_desc.l_policy_data,
235                                           &lock->l_policy_data);
236                 LDLM_DEBUG(lock, "completion AST, new policy data");
237         }
238
239         ldlm_resource_unlink_lock(lock);
240         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
241                    &lock->l_resource->lr_name,
242                    sizeof(lock->l_resource->lr_name)) != 0) {
243                 unlock_res_and_lock(lock);
244                 rc = ldlm_lock_change_resource(ns, lock,
245                                 &dlm_req->lock_desc.l_resource.lr_name);
246                 if (rc < 0) {
247                         LDLM_ERROR(lock, "Failed to allocate resource");
248                         goto out;
249                 }
250                 LDLM_DEBUG(lock, "completion AST, new resource");
251                 CERROR("change resource!\n");
252                 lock_res_and_lock(lock);
253         }
254
255         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
256                 /* BL_AST locks are not needed in LRU.
257                  * Let ldlm_cancel_lru() be fast.
258                  */
259                 ldlm_lock_remove_from_lru(lock);
260                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
261                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
262         }
263
264         if (lock->l_lvb_len > 0) {
265                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
266                                    lock->l_lvb_data, lvb_len);
267                 if (rc < 0) {
268                         unlock_res_and_lock(lock);
269                         goto out;
270                 }
271         }
272
273         ldlm_grant_lock(lock, &ast_list);
274         unlock_res_and_lock(lock);
275
276         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
277
278         /* Let Enqueue to call osc_lock_upcall() and initialize l_ast_data */
279         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
280
281         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
282
283         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
284                           lock);
285         goto out;
286
287 out:
288         if (rc < 0) {
289                 lock_res_and_lock(lock);
290                 ldlm_set_failed(lock);
291                 unlock_res_and_lock(lock);
292                 wake_up(&lock->l_waitq);
293         }
294         LDLM_LOCK_RELEASE(lock);
295 }
296
297 /**
298  * Callback handler for receiving incoming glimpse ASTs.
299  *
300  * This only can happen on client side.  After handling the glimpse AST
301  * we also consider dropping the lock here if it is unused locally for a
302  * long time.
303  */
304 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
305                                     struct ldlm_namespace *ns,
306                                     struct ldlm_request *dlm_req,
307                                     struct ldlm_lock *lock)
308 {
309         int rc = -ENOSYS;
310
311         LDLM_DEBUG(lock, "client glimpse AST callback handler");
312
313         if (lock->l_glimpse_ast)
314                 rc = lock->l_glimpse_ast(lock, req);
315
316         if (req->rq_repmsg) {
317                 ptlrpc_reply(req);
318         } else {
319                 req->rq_status = rc;
320                 ptlrpc_error(req);
321         }
322
323         lock_res_and_lock(lock);
324         if (lock->l_granted_mode == LCK_PW &&
325             !lock->l_readers && !lock->l_writers &&
326             cfs_time_after(cfs_time_current(),
327                            cfs_time_add(lock->l_last_used,
328                                         cfs_time_seconds(10)))) {
329                 unlock_res_and_lock(lock);
330                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
331                         ldlm_handle_bl_callback(ns, NULL, lock);
332
333                 return;
334         }
335         unlock_res_and_lock(lock);
336         LDLM_LOCK_RELEASE(lock);
337 }
338
339 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
340 {
341         if (req->rq_no_reply)
342                 return 0;
343
344         req->rq_status = rc;
345         if (!req->rq_packed_final) {
346                 rc = lustre_pack_reply(req, 1, NULL, NULL);
347                 if (rc)
348                         return rc;
349         }
350         return ptlrpc_reply(req);
351 }
352
353 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
354                                enum ldlm_cancel_flags cancel_flags)
355 {
356         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
357
358         spin_lock(&blp->blp_lock);
359         if (blwi->blwi_lock && ldlm_is_discard_data(blwi->blwi_lock)) {
360                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
361                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
362         } else {
363                 /* other blocking callbacks are added to the regular list */
364                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
365         }
366         spin_unlock(&blp->blp_lock);
367
368         wake_up(&blp->blp_waitq);
369
370         /* can not check blwi->blwi_flags as blwi could be already freed in
371          * LCF_ASYNC mode
372          */
373         if (!(cancel_flags & LCF_ASYNC))
374                 wait_for_completion(&blwi->blwi_comp);
375
376         return 0;
377 }
378
379 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
380                              struct ldlm_namespace *ns,
381                              struct ldlm_lock_desc *ld,
382                              struct list_head *cancels, int count,
383                              struct ldlm_lock *lock,
384                              enum ldlm_cancel_flags cancel_flags)
385 {
386         init_completion(&blwi->blwi_comp);
387         INIT_LIST_HEAD(&blwi->blwi_head);
388
389         if (memory_pressure_get())
390                 blwi->blwi_mem_pressure = 1;
391
392         blwi->blwi_ns = ns;
393         blwi->blwi_flags = cancel_flags;
394         if (ld)
395                 blwi->blwi_ld = *ld;
396         if (count) {
397                 list_add(&blwi->blwi_head, cancels);
398                 list_del_init(cancels);
399                 blwi->blwi_count = count;
400         } else {
401                 blwi->blwi_lock = lock;
402         }
403 }
404
405 /**
406  * Queues a list of locks \a cancels containing \a count locks
407  * for later processing by a blocking thread.  If \a count is zero,
408  * then the lock referenced as \a lock is queued instead.
409  *
410  * The blocking thread would then call ->l_blocking_ast callback in the lock.
411  * If list addition fails an error is returned and caller is supposed to
412  * call ->l_blocking_ast itself.
413  */
414 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
415                              struct ldlm_lock_desc *ld,
416                              struct ldlm_lock *lock,
417                              struct list_head *cancels, int count,
418                              enum ldlm_cancel_flags cancel_flags)
419 {
420         if (cancels && count == 0)
421                 return 0;
422
423         if (cancel_flags & LCF_ASYNC) {
424                 struct ldlm_bl_work_item *blwi;
425
426                 blwi = kzalloc(sizeof(*blwi), GFP_NOFS);
427                 if (!blwi)
428                         return -ENOMEM;
429                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
430
431                 return __ldlm_bl_to_thread(blwi, cancel_flags);
432         } else {
433                 /* if it is synchronous call do minimum mem alloc, as it could
434                  * be triggered from kernel shrinker
435                  */
436                 struct ldlm_bl_work_item blwi;
437
438                 memset(&blwi, 0, sizeof(blwi));
439                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
440                 return __ldlm_bl_to_thread(&blwi, cancel_flags);
441         }
442 }
443
444 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
445                            struct ldlm_lock *lock)
446 {
447         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
448 }
449
450 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
451                            struct list_head *cancels, int count,
452                            enum ldlm_cancel_flags cancel_flags)
453 {
454         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
455 }
456
457 int ldlm_bl_thread_wakeup(void)
458 {
459         wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
460         return 0;
461 }
462
463 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
464 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
465 {
466         struct obd_device *obd = req->rq_export->exp_obd;
467         char *key;
468         void *val;
469         int keylen, vallen;
470         int rc = -ENOSYS;
471
472         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
473
474         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
475
476         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
477         if (!key) {
478                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
479                 return -EFAULT;
480         }
481         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
482                                       RCL_CLIENT);
483         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
484         if (!val) {
485                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
486                 return -EFAULT;
487         }
488         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
489                                       RCL_CLIENT);
490
491         /* We are responsible for swabbing contents of val */
492
493         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
494                 /* Pass it on to mdc (the "export" in this case) */
495                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
496                                         req->rq_export,
497                                         sizeof(KEY_HSM_COPYTOOL_SEND),
498                                         KEY_HSM_COPYTOOL_SEND,
499                                         vallen, val, NULL);
500         else
501                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
502
503         return rc;
504 }
505
506 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
507                                         const char *msg, int rc,
508                                         const struct lustre_handle *handle)
509 {
510         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
511                   "%s: [nid %s] [rc %d] [lock %#llx]",
512                   msg, libcfs_id2str(req->rq_peer), rc,
513                   handle ? handle->cookie : 0);
514         if (req->rq_no_reply)
515                 CWARN("No reply was sent, maybe cause bug 21636.\n");
516         else if (rc)
517                 CWARN("Send reply failed, maybe cause bug 21636.\n");
518 }
519
520 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
521 static int ldlm_callback_handler(struct ptlrpc_request *req)
522 {
523         struct ldlm_namespace *ns;
524         struct ldlm_request *dlm_req;
525         struct ldlm_lock *lock;
526         int rc;
527
528         /* Requests arrive in sender's byte order.  The ptlrpc service
529          * handler has already checked and, if necessary, byte-swapped the
530          * incoming request message body, but I am responsible for the
531          * message buffers.
532          */
533
534         /* do nothing for sec context finalize */
535         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
536                 return 0;
537
538         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
539
540         if (!req->rq_export) {
541                 rc = ldlm_callback_reply(req, -ENOTCONN);
542                 ldlm_callback_errmsg(req, "Operate on unconnected server",
543                                      rc, NULL);
544                 return 0;
545         }
546
547         LASSERT(req->rq_export->exp_obd);
548
549         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
550         case LDLM_BL_CALLBACK:
551                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
552                         if (cfs_fail_err)
553                                 ldlm_callback_reply(req, -(int)cfs_fail_err);
554                         return 0;
555                 }
556                 break;
557         case LDLM_CP_CALLBACK:
558                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
559                         return 0;
560                 break;
561         case LDLM_GL_CALLBACK:
562                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
563                         return 0;
564                 break;
565         case LDLM_SET_INFO:
566                 rc = ldlm_handle_setinfo(req);
567                 ldlm_callback_reply(req, rc);
568                 return 0;
569         default:
570                 CERROR("unknown opcode %u\n",
571                        lustre_msg_get_opc(req->rq_reqmsg));
572                 ldlm_callback_reply(req, -EPROTO);
573                 return 0;
574         }
575
576         ns = req->rq_export->exp_obd->obd_namespace;
577         LASSERT(ns);
578
579         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
580
581         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
582         if (!dlm_req) {
583                 rc = ldlm_callback_reply(req, -EPROTO);
584                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
585                                      NULL);
586                 return 0;
587         }
588
589         /* Force a known safe race, send a cancel to the server for a lock
590          * which the server has already started a blocking callback on.
591          */
592         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
593             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
594                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
595                 if (rc < 0)
596                         CERROR("ldlm_cli_cancel: %d\n", rc);
597         }
598
599         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
600         if (!lock) {
601                 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock disappeared\n",
602                        dlm_req->lock_handle[0].cookie);
603                 rc = ldlm_callback_reply(req, -EINVAL);
604                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
605                                      &dlm_req->lock_handle[0]);
606                 return 0;
607         }
608
609         if (ldlm_is_fail_loc(lock) &&
610             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
611                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
612
613         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
614         lock_res_and_lock(lock);
615         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
616                                               LDLM_FL_AST_MASK);
617         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
618                 /* If somebody cancels lock and cache is already dropped,
619                  * or lock is failed before cp_ast received on client,
620                  * we can tell the server we have no lock. Otherwise, we
621                  * should send cancel after dropping the cache.
622                  */
623                 if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
624                     ldlm_is_failed(lock)) {
625                         LDLM_DEBUG(lock,
626                                    "callback on lock %#llx - lock disappeared",
627                                    dlm_req->lock_handle[0].cookie);
628                         unlock_res_and_lock(lock);
629                         LDLM_LOCK_RELEASE(lock);
630                         rc = ldlm_callback_reply(req, -EINVAL);
631                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
632                                              &dlm_req->lock_handle[0]);
633                         return 0;
634                 }
635                 /* BL_AST locks are not needed in LRU.
636                  * Let ldlm_cancel_lru() be fast.
637                  */
638                 ldlm_lock_remove_from_lru(lock);
639                 ldlm_set_bl_ast(lock);
640         }
641         unlock_res_and_lock(lock);
642
643         /* We want the ost thread to get this reply so that it can respond
644          * to ost requests (write cache writeback) that might be triggered
645          * in the callback.
646          *
647          * But we'd also like to be able to indicate in the reply that we're
648          * cancelling right now, because it's unused, or have an intent result
649          * in the reply, so we might have to push the responsibility for sending
650          * the reply down into the AST handlers, alas.
651          */
652
653         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
654         case LDLM_BL_CALLBACK:
655                 CDEBUG(D_INODE, "blocking ast\n");
656                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
657                 if (!ldlm_is_cancel_on_block(lock)) {
658                         rc = ldlm_callback_reply(req, 0);
659                         if (req->rq_no_reply || rc)
660                                 ldlm_callback_errmsg(req, "Normal process", rc,
661                                                      &dlm_req->lock_handle[0]);
662                 }
663                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
664                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
665                 break;
666         case LDLM_CP_CALLBACK:
667                 CDEBUG(D_INODE, "completion ast\n");
668                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
669                 ldlm_callback_reply(req, 0);
670                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
671                 break;
672         case LDLM_GL_CALLBACK:
673                 CDEBUG(D_INODE, "glimpse ast\n");
674                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
675                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
676                 break;
677         default:
678                 LBUG();                  /* checked above */
679         }
680
681         return 0;
682 }
683
684 static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
685                             struct ldlm_bl_work_item **p_blwi,
686                             struct obd_export **p_exp)
687 {
688         int num_th = atomic_read(&blp->blp_num_threads);
689         struct ldlm_bl_work_item *blwi = NULL;
690         static unsigned int num_bl;
691
692         spin_lock(&blp->blp_lock);
693         /* process a request from the blp_list at least every blp_num_threads */
694         if (!list_empty(&blp->blp_list) &&
695             (list_empty(&blp->blp_prio_list) || num_bl == 0))
696                 blwi = list_entry(blp->blp_list.next,
697                                   struct ldlm_bl_work_item, blwi_entry);
698         else
699                 if (!list_empty(&blp->blp_prio_list))
700                         blwi = list_entry(blp->blp_prio_list.next,
701                                           struct ldlm_bl_work_item,
702                                           blwi_entry);
703
704         if (blwi) {
705                 if (++num_bl >= num_th)
706                         num_bl = 0;
707                 list_del(&blwi->blwi_entry);
708         }
709         spin_unlock(&blp->blp_lock);
710         *p_blwi = blwi;
711
712         return (*p_blwi || *p_exp) ? 1 : 0;
713 }
714
715 /* This only contains temporary data until the thread starts */
716 struct ldlm_bl_thread_data {
717         struct ldlm_bl_pool     *bltd_blp;
718         struct completion       bltd_comp;
719         int                     bltd_num;
720 };
721
722 static int ldlm_bl_thread_main(void *arg);
723
724 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy)
725 {
726         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
727         struct task_struct *task;
728
729         init_completion(&bltd.bltd_comp);
730
731         bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads);
732         if (bltd.bltd_num >= blp->blp_max_threads) {
733                 atomic_dec(&blp->blp_num_threads);
734                 return 0;
735         }
736
737         LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num);
738         if (check_busy &&
739             atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) {
740                 atomic_dec(&blp->blp_num_threads);
741                 return 0;
742         }
743
744         task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d",
745                            bltd.bltd_num);
746         if (IS_ERR(task)) {
747                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
748                        bltd.bltd_num, PTR_ERR(task));
749                 atomic_dec(&blp->blp_num_threads);
750                 return PTR_ERR(task);
751         }
752         wait_for_completion(&bltd.bltd_comp);
753
754         return 0;
755 }
756
757 /* Not fatal if racy and have a few too many threads */
758 static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
759                                       struct ldlm_bl_work_item *blwi)
760 {
761         if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads)
762                 return 0;
763
764         if (atomic_read(&blp->blp_busy_threads) <
765             atomic_read(&blp->blp_num_threads))
766                 return 0;
767
768         if (blwi && (!blwi->blwi_ns || blwi->blwi_mem_pressure))
769                 return 0;
770
771         return 1;
772 }
773
774 static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
775                                struct ldlm_bl_work_item *blwi)
776 {
777         if (!blwi->blwi_ns)
778                 /* added by ldlm_cleanup() */
779                 return LDLM_ITER_STOP;
780
781         if (blwi->blwi_mem_pressure)
782                 memory_pressure_set();
783
784         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
785
786         if (blwi->blwi_count) {
787                 int count;
788
789                 /*
790                  * The special case when we cancel locks in lru
791                  * asynchronously, we pass the list of locks here.
792                  * Thus locks are marked LDLM_FL_CANCELING, but NOT
793                  * canceled locally yet.
794                  */
795                 count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
796                                                    blwi->blwi_count,
797                                                    LCF_BL_AST);
798                 ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
799                                      blwi->blwi_flags);
800         } else {
801                 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
802                                         blwi->blwi_lock);
803         }
804         if (blwi->blwi_mem_pressure)
805                 memory_pressure_clr();
806
807         if (blwi->blwi_flags & LCF_ASYNC)
808                 kfree(blwi);
809         else
810                 complete(&blwi->blwi_comp);
811
812         return 0;
813 }
814
815 /**
816  * Main blocking requests processing thread.
817  *
818  * Callers put locks into its queue by calling ldlm_bl_to_thread.
819  * This thread in the end ends up doing actual call to ->l_blocking_ast
820  * for queued locks.
821  */
822 static int ldlm_bl_thread_main(void *arg)
823 {
824         struct ldlm_bl_pool *blp;
825         struct ldlm_bl_thread_data *bltd = arg;
826
827         blp = bltd->bltd_blp;
828
829         complete(&bltd->bltd_comp);
830         /* cannot use bltd after this, it is only on caller's stack */
831
832         while (1) {
833                 struct l_wait_info lwi = { 0 };
834                 struct ldlm_bl_work_item *blwi = NULL;
835                 struct obd_export *exp = NULL;
836                 int rc;
837
838                 rc = ldlm_bl_get_work(blp, &blwi, &exp);
839                 if (!rc)
840                         l_wait_event_exclusive(blp->blp_waitq,
841                                                ldlm_bl_get_work(blp, &blwi,
842                                                                 &exp),
843                                                &lwi);
844                 atomic_inc(&blp->blp_busy_threads);
845
846                 if (ldlm_bl_thread_need_create(blp, blwi))
847                         /* discard the return value, we tried */
848                         ldlm_bl_thread_start(blp, true);
849
850                 if (blwi)
851                         rc = ldlm_bl_thread_blwi(blp, blwi);
852
853                 atomic_dec(&blp->blp_busy_threads);
854
855                 if (rc == LDLM_ITER_STOP)
856                         break;
857         }
858
859         atomic_dec(&blp->blp_num_threads);
860         complete(&blp->blp_comp);
861         return 0;
862 }
863
864 static int ldlm_setup(void);
865 static int ldlm_cleanup(void);
866
867 int ldlm_get_ref(void)
868 {
869         int rc = 0;
870
871         mutex_lock(&ldlm_ref_mutex);
872         if (++ldlm_refcount == 1) {
873                 rc = ldlm_setup();
874                 if (rc)
875                         ldlm_refcount--;
876         }
877         mutex_unlock(&ldlm_ref_mutex);
878
879         return rc;
880 }
881
882 void ldlm_put_ref(void)
883 {
884         mutex_lock(&ldlm_ref_mutex);
885         if (ldlm_refcount == 1) {
886                 int rc = ldlm_cleanup();
887
888                 if (rc)
889                         CERROR("ldlm_cleanup failed: %d\n", rc);
890                 else
891                         ldlm_refcount--;
892         } else {
893                 ldlm_refcount--;
894         }
895         mutex_unlock(&ldlm_ref_mutex);
896 }
897
898 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
899                                                       struct attribute *attr,
900                                                       char *buf)
901 {
902         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
903 }
904
905 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
906                                                        struct attribute *attr,
907                                                        const char *buffer,
908                                                        size_t count)
909 {
910         int rc;
911         unsigned long val;
912
913         rc = kstrtoul(buffer, 10, &val);
914         if (rc)
915                 return rc;
916
917         ldlm_cancel_unused_locks_before_replay = val;
918
919         return count;
920 }
921 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
922
923 /* These are for root of /sys/fs/lustre/ldlm */
924 static struct attribute *ldlm_attrs[] = {
925         &lustre_attr_cancel_unused_locks_before_replay.attr,
926         NULL,
927 };
928
929 static struct attribute_group ldlm_attr_group = {
930         .attrs = ldlm_attrs,
931 };
932
933 static int ldlm_setup(void)
934 {
935         static struct ptlrpc_service_conf       conf;
936         struct ldlm_bl_pool                     *blp = NULL;
937         int rc = 0;
938         int i;
939
940         if (ldlm_state)
941                 return -EALREADY;
942
943         ldlm_state = kzalloc(sizeof(*ldlm_state), GFP_NOFS);
944         if (!ldlm_state)
945                 return -ENOMEM;
946
947         ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
948         if (!ldlm_kobj) {
949                 rc = -ENOMEM;
950                 goto out;
951         }
952
953         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
954         if (rc)
955                 goto out;
956
957         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
958         if (!ldlm_ns_kset) {
959                 rc = -ENOMEM;
960                 goto out;
961         }
962
963         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
964         if (!ldlm_svc_kset) {
965                 rc = -ENOMEM;
966                 goto out;
967         }
968
969         rc = ldlm_debugfs_setup();
970         if (rc != 0)
971                 goto out;
972
973         memset(&conf, 0, sizeof(conf));
974         conf = (typeof(conf)) {
975                 .psc_name               = "ldlm_cbd",
976                 .psc_watchdog_factor    = 2,
977                 .psc_buf                = {
978                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
979                         .bc_buf_size            = LDLM_BUFSIZE,
980                         .bc_req_max_size        = LDLM_MAXREQSIZE,
981                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
982                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
983                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
984                 },
985                 .psc_thr                = {
986                         .tc_thr_name            = "ldlm_cb",
987                         .tc_thr_factor          = LDLM_THR_FACTOR,
988                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
989                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
990                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
991                         .tc_nthrs_user          = ldlm_num_threads,
992                         .tc_cpu_affinity        = 1,
993                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
994                 },
995                 .psc_cpt                = {
996                         .cc_pattern             = ldlm_cpts,
997                 },
998                 .psc_ops                = {
999                         .so_req_handler         = ldlm_callback_handler,
1000                 },
1001         };
1002         ldlm_state->ldlm_cb_service =
1003                         ptlrpc_register_service(&conf, ldlm_svc_kset,
1004                                                 ldlm_svc_debugfs_dir);
1005         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
1006                 CERROR("failed to start service\n");
1007                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
1008                 ldlm_state->ldlm_cb_service = NULL;
1009                 goto out;
1010         }
1011
1012         blp = kzalloc(sizeof(*blp), GFP_NOFS);
1013         if (!blp) {
1014                 rc = -ENOMEM;
1015                 goto out;
1016         }
1017         ldlm_state->ldlm_bl_pool = blp;
1018
1019         spin_lock_init(&blp->blp_lock);
1020         INIT_LIST_HEAD(&blp->blp_list);
1021         INIT_LIST_HEAD(&blp->blp_prio_list);
1022         init_waitqueue_head(&blp->blp_waitq);
1023         atomic_set(&blp->blp_num_threads, 0);
1024         atomic_set(&blp->blp_busy_threads, 0);
1025
1026         if (ldlm_num_threads == 0) {
1027                 blp->blp_min_threads = LDLM_NTHRS_INIT;
1028                 blp->blp_max_threads = LDLM_NTHRS_MAX;
1029         } else {
1030                 blp->blp_min_threads = min_t(int, LDLM_NTHRS_MAX,
1031                                              max_t(int, LDLM_NTHRS_INIT,
1032                                                    ldlm_num_threads));
1033
1034                 blp->blp_max_threads = blp->blp_min_threads;
1035         }
1036
1037         for (i = 0; i < blp->blp_min_threads; i++) {
1038                 rc = ldlm_bl_thread_start(blp, false);
1039                 if (rc < 0)
1040                         goto out;
1041         }
1042
1043         rc = ldlm_pools_init();
1044         if (rc) {
1045                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1046                 goto out;
1047         }
1048         return 0;
1049
1050  out:
1051         ldlm_cleanup();
1052         return rc;
1053 }
1054
1055 static int ldlm_cleanup(void)
1056 {
1057         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1058             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1059                 CERROR("ldlm still has namespaces; clean these up first.\n");
1060                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1061                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1062                 return -EBUSY;
1063         }
1064
1065         ldlm_pools_fini();
1066
1067         if (ldlm_state->ldlm_bl_pool) {
1068                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1069
1070                 while (atomic_read(&blp->blp_num_threads) > 0) {
1071                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1072
1073                         init_completion(&blp->blp_comp);
1074
1075                         spin_lock(&blp->blp_lock);
1076                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1077                         wake_up(&blp->blp_waitq);
1078                         spin_unlock(&blp->blp_lock);
1079
1080                         wait_for_completion(&blp->blp_comp);
1081                 }
1082
1083                 kfree(blp);
1084         }
1085
1086         if (ldlm_state->ldlm_cb_service)
1087                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1088
1089         if (ldlm_ns_kset)
1090                 kset_unregister(ldlm_ns_kset);
1091         if (ldlm_svc_kset)
1092                 kset_unregister(ldlm_svc_kset);
1093         if (ldlm_kobj)
1094                 kobject_put(ldlm_kobj);
1095
1096         ldlm_debugfs_cleanup();
1097
1098         kfree(ldlm_state);
1099         ldlm_state = NULL;
1100
1101         return 0;
1102 }
1103
1104 int ldlm_init(void)
1105 {
1106         mutex_init(&ldlm_ref_mutex);
1107         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1108         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1109         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1110                                                sizeof(struct ldlm_resource), 0,
1111                                                SLAB_HWCACHE_ALIGN, NULL);
1112         if (!ldlm_resource_slab)
1113                 return -ENOMEM;
1114
1115         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1116                                            sizeof(struct ldlm_lock), 0,
1117                                            SLAB_HWCACHE_ALIGN |
1118                                            SLAB_DESTROY_BY_RCU, NULL);
1119         if (!ldlm_lock_slab) {
1120                 kmem_cache_destroy(ldlm_resource_slab);
1121                 return -ENOMEM;
1122         }
1123
1124         ldlm_interval_slab = kmem_cache_create("interval_node",
1125                                                sizeof(struct ldlm_interval),
1126                                                0, SLAB_HWCACHE_ALIGN, NULL);
1127         if (!ldlm_interval_slab) {
1128                 kmem_cache_destroy(ldlm_resource_slab);
1129                 kmem_cache_destroy(ldlm_lock_slab);
1130                 return -ENOMEM;
1131         }
1132 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1133         class_export_dump_hook = ldlm_dump_export_locks;
1134 #endif
1135         return 0;
1136 }
1137
1138 void ldlm_exit(void)
1139 {
1140         if (ldlm_refcount)
1141                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1142         kmem_cache_destroy(ldlm_resource_slab);
1143         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1144          * synchronize_rcu() to wait a grace period elapsed, so that
1145          * ldlm_lock_free() get a chance to be called.
1146          */
1147         synchronize_rcu();
1148         kmem_cache_destroy(ldlm_lock_slab);
1149         kmem_cache_destroy(ldlm_interval_slab);
1150 }