]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - fs/dlm/dir.c
Merge remote-tracking branch 'dlm/next'
[karo-tx-linux.git] / fs / dlm / dir.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25
26 /*
27  * We use the upper 16 bits of the hash value to select the directory node.
28  * Low bits are used for distribution of rsb's among hash buckets on each node.
29  *
30  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
31  * num_nodes to the hash value.  This value in the desired range is used as an
32  * offset into the sorted list of nodeid's to give the particular nodeid.
33  */
34
35 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
36 {
37         uint32_t node;
38
39         if (ls->ls_num_nodes == 1)
40                 return dlm_our_nodeid();
41         else {
42                 node = (hash >> 16) % ls->ls_total_weight;
43                 return ls->ls_node_array[node];
44         }
45 }
46
47 int dlm_dir_nodeid(struct dlm_rsb *r)
48 {
49         return r->res_dir_nodeid;
50 }
51
52 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
53 {
54         struct dlm_rsb *r;
55
56         down_read(&ls->ls_root_sem);
57         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
58                 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
59         }
60         up_read(&ls->ls_root_sem);
61 }
62
63 int dlm_recover_directory(struct dlm_ls *ls)
64 {
65         struct dlm_member *memb;
66         char *b, *last_name = NULL;
67         int error = -ENOMEM, last_len, nodeid, result;
68         uint16_t namelen;
69         unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
70
71         log_rinfo(ls, "dlm_recover_directory");
72
73         if (dlm_no_directory(ls))
74                 goto out_status;
75
76         last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
77         if (!last_name)
78                 goto out;
79
80         list_for_each_entry(memb, &ls->ls_nodes, list) {
81                 if (memb->nodeid == dlm_our_nodeid())
82                         continue;
83
84                 memset(last_name, 0, DLM_RESNAME_MAXLEN);
85                 last_len = 0;
86
87                 for (;;) {
88                         int left;
89                         error = dlm_recovery_stopped(ls);
90                         if (error)
91                                 goto out_free;
92
93                         error = dlm_rcom_names(ls, memb->nodeid,
94                                                last_name, last_len);
95                         if (error)
96                                 goto out_free;
97
98                         cond_resched();
99
100                         /*
101                          * pick namelen/name pairs out of received buffer
102                          */
103
104                         b = ls->ls_recover_buf->rc_buf;
105                         left = ls->ls_recover_buf->rc_header.h_length;
106                         left -= sizeof(struct dlm_rcom);
107
108                         for (;;) {
109                                 __be16 v;
110
111                                 error = -EINVAL;
112                                 if (left < sizeof(__be16))
113                                         goto out_free;
114
115                                 memcpy(&v, b, sizeof(__be16));
116                                 namelen = be16_to_cpu(v);
117                                 b += sizeof(__be16);
118                                 left -= sizeof(__be16);
119
120                                 /* namelen of 0xFFFFF marks end of names for
121                                    this node; namelen of 0 marks end of the
122                                    buffer */
123
124                                 if (namelen == 0xFFFF)
125                                         goto done;
126                                 if (!namelen)
127                                         break;
128
129                                 if (namelen > left)
130                                         goto out_free;
131
132                                 if (namelen > DLM_RESNAME_MAXLEN)
133                                         goto out_free;
134
135                                 error = dlm_master_lookup(ls, memb->nodeid,
136                                                           b, namelen,
137                                                           DLM_LU_RECOVER_DIR,
138                                                           &nodeid, &result);
139                                 if (error) {
140                                         log_error(ls, "recover_dir lookup %d",
141                                                   error);
142                                         goto out_free;
143                                 }
144
145                                 /* The name was found in rsbtbl, but the
146                                  * master nodeid is different from
147                                  * memb->nodeid which says it is the master.
148                                  * This should not happen. */
149
150                                 if (result == DLM_LU_MATCH &&
151                                     nodeid != memb->nodeid) {
152                                         count_bad++;
153                                         log_error(ls, "recover_dir lookup %d "
154                                                   "nodeid %d memb %d bad %u",
155                                                   result, nodeid, memb->nodeid,
156                                                   count_bad);
157                                         print_hex_dump_bytes("dlm_recover_dir ",
158                                                              DUMP_PREFIX_NONE,
159                                                              b, namelen);
160                                 }
161
162                                 /* The name was found in rsbtbl, and the
163                                  * master nodeid matches memb->nodeid. */
164
165                                 if (result == DLM_LU_MATCH &&
166                                     nodeid == memb->nodeid) {
167                                         count_match++;
168                                 }
169
170                                 /* The name was not found in rsbtbl and was
171                                  * added with memb->nodeid as the master. */
172
173                                 if (result == DLM_LU_ADD) {
174                                         count_add++;
175                                 }
176
177                                 last_len = namelen;
178                                 memcpy(last_name, b, namelen);
179                                 b += namelen;
180                                 left -= namelen;
181                                 count++;
182                         }
183                 }
184          done:
185                 ;
186         }
187
188  out_status:
189         error = 0;
190         dlm_set_recover_status(ls, DLM_RS_DIR);
191
192         log_rinfo(ls, "dlm_recover_directory %u in %u new",
193                   count, count_add);
194  out_free:
195         kfree(last_name);
196  out:
197         return error;
198 }
199
200 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
201 {
202         struct dlm_rsb *r;
203         uint32_t hash, bucket;
204         int rv;
205
206         hash = jhash(name, len, 0);
207         bucket = hash & (ls->ls_rsbtbl_size - 1);
208
209         spin_lock(&ls->ls_rsbtbl[bucket].lock);
210         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211         if (rv)
212                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213                                          name, len, &r);
214         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215
216         if (!rv)
217                 return r;
218
219         down_read(&ls->ls_root_sem);
220         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221                 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222                         up_read(&ls->ls_root_sem);
223                         log_debug(ls, "find_rsb_root revert to root_list %s",
224                                   r->res_name);
225                         return r;
226                 }
227         }
228         up_read(&ls->ls_root_sem);
229         return NULL;
230 }
231
232 /* Find the rsb where we left off (or start again), then send rsb names
233    for rsb's we're master of and whose directory node matches the requesting
234    node.  inbuf is the rsb name last sent, inlen is the name's length */
235
236 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237                            char *outbuf, int outlen, int nodeid)
238 {
239         struct list_head *list;
240         struct dlm_rsb *r;
241         int offset = 0, dir_nodeid;
242         __be16 be_namelen;
243
244         down_read(&ls->ls_root_sem);
245
246         if (inlen > 1) {
247                 r = find_rsb_root(ls, inbuf, inlen);
248                 if (!r) {
249                         inbuf[inlen - 1] = '\0';
250                         log_error(ls, "copy_master_names from %d start %d %s",
251                                   nodeid, inlen, inbuf);
252                         goto out;
253                 }
254                 list = r->res_root_list.next;
255         } else {
256                 list = ls->ls_root_list.next;
257         }
258
259         for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260                 r = list_entry(list, struct dlm_rsb, res_root_list);
261                 if (r->res_nodeid)
262                         continue;
263
264                 dir_nodeid = dlm_dir_nodeid(r);
265                 if (dir_nodeid != nodeid)
266                         continue;
267
268                 /*
269                  * The block ends when we can't fit the following in the
270                  * remaining buffer space:
271                  * namelen (uint16_t) +
272                  * name (r->res_length) +
273                  * end-of-block record 0x0000 (uint16_t)
274                  */
275
276                 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277                         /* Write end-of-block record */
278                         be_namelen = cpu_to_be16(0);
279                         memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280                         offset += sizeof(__be16);
281                         ls->ls_recover_dir_sent_msg++;
282                         goto out;
283                 }
284
285                 be_namelen = cpu_to_be16(r->res_length);
286                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287                 offset += sizeof(__be16);
288                 memcpy(outbuf + offset, r->res_name, r->res_length);
289                 offset += r->res_length;
290                 ls->ls_recover_dir_sent_res++;
291         }
292
293         /*
294          * If we've reached the end of the list (and there's room) write a
295          * terminating record.
296          */
297
298         if ((list == &ls->ls_root_list) &&
299             (offset + sizeof(uint16_t) <= outlen)) {
300                 be_namelen = cpu_to_be16(0xFFFF);
301                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302                 offset += sizeof(__be16);
303                 ls->ls_recover_dir_sent_msg++;
304         }
305  out:
306         up_read(&ls->ls_root_sem);
307 }
308