]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - net/sunrpc/clnt.c
Merge tag 'sound-fix-3.13-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/tiwai...
[karo-tx-linux.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19
20
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/rcupdate.h>
29 #include <linux/utsname.h>
30 #include <linux/workqueue.h>
31 #include <linux/in.h>
32 #include <linux/in6.h>
33 #include <linux/un.h>
34
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/addr.h>
37 #include <linux/sunrpc/rpc_pipe_fs.h>
38 #include <linux/sunrpc/metrics.h>
39 #include <linux/sunrpc/bc_xprt.h>
40 #include <trace/events/sunrpc.h>
41
42 #include "sunrpc.h"
43 #include "netns.h"
44
45 #ifdef RPC_DEBUG
46 # define RPCDBG_FACILITY        RPCDBG_CALL
47 #endif
48
49 #define dprint_status(t)                                        \
50         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
51                         __func__, t->tk_status)
52
53 /*
54  * All RPC clients are linked into this list
55  */
56
57 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60 static void     call_start(struct rpc_task *task);
61 static void     call_reserve(struct rpc_task *task);
62 static void     call_reserveresult(struct rpc_task *task);
63 static void     call_allocate(struct rpc_task *task);
64 static void     call_decode(struct rpc_task *task);
65 static void     call_bind(struct rpc_task *task);
66 static void     call_bind_status(struct rpc_task *task);
67 static void     call_transmit(struct rpc_task *task);
68 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
69 static void     call_bc_transmit(struct rpc_task *task);
70 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
71 static void     call_status(struct rpc_task *task);
72 static void     call_transmit_status(struct rpc_task *task);
73 static void     call_refresh(struct rpc_task *task);
74 static void     call_refreshresult(struct rpc_task *task);
75 static void     call_timeout(struct rpc_task *task);
76 static void     call_connect(struct rpc_task *task);
77 static void     call_connect_status(struct rpc_task *task);
78
79 static __be32   *rpc_encode_header(struct rpc_task *task);
80 static __be32   *rpc_verify_header(struct rpc_task *task);
81 static int      rpc_ping(struct rpc_clnt *clnt);
82
83 static void rpc_register_client(struct rpc_clnt *clnt)
84 {
85         struct net *net = rpc_net_ns(clnt);
86         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88         spin_lock(&sn->rpc_client_lock);
89         list_add(&clnt->cl_clients, &sn->all_clients);
90         spin_unlock(&sn->rpc_client_lock);
91 }
92
93 static void rpc_unregister_client(struct rpc_clnt *clnt)
94 {
95         struct net *net = rpc_net_ns(clnt);
96         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98         spin_lock(&sn->rpc_client_lock);
99         list_del(&clnt->cl_clients);
100         spin_unlock(&sn->rpc_client_lock);
101 }
102
103 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104 {
105         rpc_remove_client_dir(clnt);
106 }
107
108 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
109 {
110         struct net *net = rpc_net_ns(clnt);
111         struct super_block *pipefs_sb;
112
113         pipefs_sb = rpc_get_sb_net(net);
114         if (pipefs_sb) {
115                 __rpc_clnt_remove_pipedir(clnt);
116                 rpc_put_sb_net(net);
117         }
118 }
119
120 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
121                                     struct rpc_clnt *clnt)
122 {
123         static uint32_t clntid;
124         const char *dir_name = clnt->cl_program->pipe_dir_name;
125         char name[15];
126         struct dentry *dir, *dentry;
127
128         dir = rpc_d_lookup_sb(sb, dir_name);
129         if (dir == NULL) {
130                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
131                 return dir;
132         }
133         for (;;) {
134                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
135                 name[sizeof(name) - 1] = '\0';
136                 dentry = rpc_create_client_dir(dir, name, clnt);
137                 if (!IS_ERR(dentry))
138                         break;
139                 if (dentry == ERR_PTR(-EEXIST))
140                         continue;
141                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
142                                 " %s/%s, error %ld\n",
143                                 dir_name, name, PTR_ERR(dentry));
144                 break;
145         }
146         dput(dir);
147         return dentry;
148 }
149
150 static int
151 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
152 {
153         struct dentry *dentry;
154
155         if (clnt->cl_program->pipe_dir_name != NULL) {
156                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
157                 if (IS_ERR(dentry))
158                         return PTR_ERR(dentry);
159         }
160         return 0;
161 }
162
163 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
164 {
165         if (clnt->cl_program->pipe_dir_name == NULL)
166                 return 1;
167
168         switch (event) {
169         case RPC_PIPEFS_MOUNT:
170                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
171                         return 1;
172                 if (atomic_read(&clnt->cl_count) == 0)
173                         return 1;
174                 break;
175         case RPC_PIPEFS_UMOUNT:
176                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
177                         return 1;
178                 break;
179         }
180         return 0;
181 }
182
183 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
184                                    struct super_block *sb)
185 {
186         struct dentry *dentry;
187         int err = 0;
188
189         switch (event) {
190         case RPC_PIPEFS_MOUNT:
191                 dentry = rpc_setup_pipedir_sb(sb, clnt);
192                 if (!dentry)
193                         return -ENOENT;
194                 if (IS_ERR(dentry))
195                         return PTR_ERR(dentry);
196                 break;
197         case RPC_PIPEFS_UMOUNT:
198                 __rpc_clnt_remove_pipedir(clnt);
199                 break;
200         default:
201                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
202                 return -ENOTSUPP;
203         }
204         return err;
205 }
206
207 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
208                                 struct super_block *sb)
209 {
210         int error = 0;
211
212         for (;; clnt = clnt->cl_parent) {
213                 if (!rpc_clnt_skip_event(clnt, event))
214                         error = __rpc_clnt_handle_event(clnt, event, sb);
215                 if (error || clnt == clnt->cl_parent)
216                         break;
217         }
218         return error;
219 }
220
221 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
222 {
223         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
224         struct rpc_clnt *clnt;
225
226         spin_lock(&sn->rpc_client_lock);
227         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
228                 if (rpc_clnt_skip_event(clnt, event))
229                         continue;
230                 spin_unlock(&sn->rpc_client_lock);
231                 return clnt;
232         }
233         spin_unlock(&sn->rpc_client_lock);
234         return NULL;
235 }
236
237 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
238                             void *ptr)
239 {
240         struct super_block *sb = ptr;
241         struct rpc_clnt *clnt;
242         int error = 0;
243
244         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
245                 error = __rpc_pipefs_event(clnt, event, sb);
246                 if (error)
247                         break;
248         }
249         return error;
250 }
251
252 static struct notifier_block rpc_clients_block = {
253         .notifier_call  = rpc_pipefs_event,
254         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
255 };
256
257 int rpc_clients_notifier_register(void)
258 {
259         return rpc_pipefs_notifier_register(&rpc_clients_block);
260 }
261
262 void rpc_clients_notifier_unregister(void)
263 {
264         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
265 }
266
267 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
268                 struct rpc_xprt *xprt,
269                 const struct rpc_timeout *timeout)
270 {
271         struct rpc_xprt *old;
272
273         spin_lock(&clnt->cl_lock);
274         old = rcu_dereference_protected(clnt->cl_xprt,
275                         lockdep_is_held(&clnt->cl_lock));
276
277         if (!xprt_bound(xprt))
278                 clnt->cl_autobind = 1;
279
280         clnt->cl_timeout = timeout;
281         rcu_assign_pointer(clnt->cl_xprt, xprt);
282         spin_unlock(&clnt->cl_lock);
283
284         return old;
285 }
286
287 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
288 {
289         clnt->cl_nodelen = strlen(nodename);
290         if (clnt->cl_nodelen > UNX_MAXNODENAME)
291                 clnt->cl_nodelen = UNX_MAXNODENAME;
292         memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
293 }
294
295 static int rpc_client_register(struct rpc_clnt *clnt,
296                                rpc_authflavor_t pseudoflavor,
297                                const char *client_name)
298 {
299         struct rpc_auth_create_args auth_args = {
300                 .pseudoflavor = pseudoflavor,
301                 .target_name = client_name,
302         };
303         struct rpc_auth *auth;
304         struct net *net = rpc_net_ns(clnt);
305         struct super_block *pipefs_sb;
306         int err;
307
308         pipefs_sb = rpc_get_sb_net(net);
309         if (pipefs_sb) {
310                 err = rpc_setup_pipedir(pipefs_sb, clnt);
311                 if (err)
312                         goto out;
313         }
314
315         rpc_register_client(clnt);
316         if (pipefs_sb)
317                 rpc_put_sb_net(net);
318
319         auth = rpcauth_create(&auth_args, clnt);
320         if (IS_ERR(auth)) {
321                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
322                                 pseudoflavor);
323                 err = PTR_ERR(auth);
324                 goto err_auth;
325         }
326         return 0;
327 err_auth:
328         pipefs_sb = rpc_get_sb_net(net);
329         rpc_unregister_client(clnt);
330         __rpc_clnt_remove_pipedir(clnt);
331 out:
332         if (pipefs_sb)
333                 rpc_put_sb_net(net);
334         return err;
335 }
336
337 static DEFINE_IDA(rpc_clids);
338
339 static int rpc_alloc_clid(struct rpc_clnt *clnt)
340 {
341         int clid;
342
343         clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
344         if (clid < 0)
345                 return clid;
346         clnt->cl_clid = clid;
347         return 0;
348 }
349
350 static void rpc_free_clid(struct rpc_clnt *clnt)
351 {
352         ida_simple_remove(&rpc_clids, clnt->cl_clid);
353 }
354
355 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
356                 struct rpc_xprt *xprt,
357                 struct rpc_clnt *parent)
358 {
359         const struct rpc_program *program = args->program;
360         const struct rpc_version *version;
361         struct rpc_clnt *clnt = NULL;
362         const struct rpc_timeout *timeout;
363         int err;
364
365         /* sanity check the name before trying to print it */
366         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
367                         program->name, args->servername, xprt);
368
369         err = rpciod_up();
370         if (err)
371                 goto out_no_rpciod;
372
373         err = -EINVAL;
374         if (args->version >= program->nrvers)
375                 goto out_err;
376         version = program->version[args->version];
377         if (version == NULL)
378                 goto out_err;
379
380         err = -ENOMEM;
381         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
382         if (!clnt)
383                 goto out_err;
384         clnt->cl_parent = parent ? : clnt;
385
386         err = rpc_alloc_clid(clnt);
387         if (err)
388                 goto out_no_clid;
389
390         clnt->cl_procinfo = version->procs;
391         clnt->cl_maxproc  = version->nrprocs;
392         clnt->cl_prog     = args->prognumber ? : program->number;
393         clnt->cl_vers     = version->number;
394         clnt->cl_stats    = program->stats;
395         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
396         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
397         err = -ENOMEM;
398         if (clnt->cl_metrics == NULL)
399                 goto out_no_stats;
400         clnt->cl_program  = program;
401         INIT_LIST_HEAD(&clnt->cl_tasks);
402         spin_lock_init(&clnt->cl_lock);
403
404         timeout = xprt->timeout;
405         if (args->timeout != NULL) {
406                 memcpy(&clnt->cl_timeout_default, args->timeout,
407                                 sizeof(clnt->cl_timeout_default));
408                 timeout = &clnt->cl_timeout_default;
409         }
410
411         rpc_clnt_set_transport(clnt, xprt, timeout);
412
413         clnt->cl_rtt = &clnt->cl_rtt_default;
414         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
415
416         atomic_set(&clnt->cl_count, 1);
417
418         /* save the nodename */
419         rpc_clnt_set_nodename(clnt, utsname()->nodename);
420
421         err = rpc_client_register(clnt, args->authflavor, args->client_name);
422         if (err)
423                 goto out_no_path;
424         if (parent)
425                 atomic_inc(&parent->cl_count);
426         return clnt;
427
428 out_no_path:
429         rpc_free_iostats(clnt->cl_metrics);
430 out_no_stats:
431         rpc_free_clid(clnt);
432 out_no_clid:
433         kfree(clnt);
434 out_err:
435         rpciod_down();
436 out_no_rpciod:
437         xprt_put(xprt);
438         return ERR_PTR(err);
439 }
440
441 /**
442  * rpc_create - create an RPC client and transport with one call
443  * @args: rpc_clnt create argument structure
444  *
445  * Creates and initializes an RPC transport and an RPC client.
446  *
447  * It can ping the server in order to determine if it is up, and to see if
448  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
449  * this behavior so asynchronous tasks can also use rpc_create.
450  */
451 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
452 {
453         struct rpc_xprt *xprt;
454         struct rpc_clnt *clnt;
455         struct xprt_create xprtargs = {
456                 .net = args->net,
457                 .ident = args->protocol,
458                 .srcaddr = args->saddress,
459                 .dstaddr = args->address,
460                 .addrlen = args->addrsize,
461                 .servername = args->servername,
462                 .bc_xprt = args->bc_xprt,
463         };
464         char servername[48];
465
466         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
467                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
468         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
469                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
470         /*
471          * If the caller chooses not to specify a hostname, whip
472          * up a string representation of the passed-in address.
473          */
474         if (xprtargs.servername == NULL) {
475                 struct sockaddr_un *sun =
476                                 (struct sockaddr_un *)args->address;
477                 struct sockaddr_in *sin =
478                                 (struct sockaddr_in *)args->address;
479                 struct sockaddr_in6 *sin6 =
480                                 (struct sockaddr_in6 *)args->address;
481
482                 servername[0] = '\0';
483                 switch (args->address->sa_family) {
484                 case AF_LOCAL:
485                         snprintf(servername, sizeof(servername), "%s",
486                                  sun->sun_path);
487                         break;
488                 case AF_INET:
489                         snprintf(servername, sizeof(servername), "%pI4",
490                                  &sin->sin_addr.s_addr);
491                         break;
492                 case AF_INET6:
493                         snprintf(servername, sizeof(servername), "%pI6",
494                                  &sin6->sin6_addr);
495                         break;
496                 default:
497                         /* caller wants default server name, but
498                          * address family isn't recognized. */
499                         return ERR_PTR(-EINVAL);
500                 }
501                 xprtargs.servername = servername;
502         }
503
504         xprt = xprt_create_transport(&xprtargs);
505         if (IS_ERR(xprt))
506                 return (struct rpc_clnt *)xprt;
507
508         /*
509          * By default, kernel RPC client connects from a reserved port.
510          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
511          * but it is always enabled for rpciod, which handles the connect
512          * operation.
513          */
514         xprt->resvport = 1;
515         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
516                 xprt->resvport = 0;
517
518         clnt = rpc_new_client(args, xprt, NULL);
519         if (IS_ERR(clnt))
520                 return clnt;
521
522         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
523                 int err = rpc_ping(clnt);
524                 if (err != 0) {
525                         rpc_shutdown_client(clnt);
526                         return ERR_PTR(err);
527                 }
528         }
529
530         clnt->cl_softrtry = 1;
531         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
532                 clnt->cl_softrtry = 0;
533
534         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
535                 clnt->cl_autobind = 1;
536         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
537                 clnt->cl_discrtry = 1;
538         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
539                 clnt->cl_chatty = 1;
540
541         return clnt;
542 }
543 EXPORT_SYMBOL_GPL(rpc_create);
544
545 /*
546  * This function clones the RPC client structure. It allows us to share the
547  * same transport while varying parameters such as the authentication
548  * flavour.
549  */
550 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
551                                            struct rpc_clnt *clnt)
552 {
553         struct rpc_xprt *xprt;
554         struct rpc_clnt *new;
555         int err;
556
557         err = -ENOMEM;
558         rcu_read_lock();
559         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
560         rcu_read_unlock();
561         if (xprt == NULL)
562                 goto out_err;
563         args->servername = xprt->servername;
564
565         new = rpc_new_client(args, xprt, clnt);
566         if (IS_ERR(new)) {
567                 err = PTR_ERR(new);
568                 goto out_err;
569         }
570
571         /* Turn off autobind on clones */
572         new->cl_autobind = 0;
573         new->cl_softrtry = clnt->cl_softrtry;
574         new->cl_discrtry = clnt->cl_discrtry;
575         new->cl_chatty = clnt->cl_chatty;
576         return new;
577
578 out_err:
579         dprintk("RPC:       %s: returned error %d\n", __func__, err);
580         return ERR_PTR(err);
581 }
582
583 /**
584  * rpc_clone_client - Clone an RPC client structure
585  *
586  * @clnt: RPC client whose parameters are copied
587  *
588  * Returns a fresh RPC client or an ERR_PTR.
589  */
590 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
591 {
592         struct rpc_create_args args = {
593                 .program        = clnt->cl_program,
594                 .prognumber     = clnt->cl_prog,
595                 .version        = clnt->cl_vers,
596                 .authflavor     = clnt->cl_auth->au_flavor,
597         };
598         return __rpc_clone_client(&args, clnt);
599 }
600 EXPORT_SYMBOL_GPL(rpc_clone_client);
601
602 /**
603  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
604  *
605  * @clnt: RPC client whose parameters are copied
606  * @flavor: security flavor for new client
607  *
608  * Returns a fresh RPC client or an ERR_PTR.
609  */
610 struct rpc_clnt *
611 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
612 {
613         struct rpc_create_args args = {
614                 .program        = clnt->cl_program,
615                 .prognumber     = clnt->cl_prog,
616                 .version        = clnt->cl_vers,
617                 .authflavor     = flavor,
618         };
619         return __rpc_clone_client(&args, clnt);
620 }
621 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
622
623 /**
624  * rpc_switch_client_transport: switch the RPC transport on the fly
625  * @clnt: pointer to a struct rpc_clnt
626  * @args: pointer to the new transport arguments
627  * @timeout: pointer to the new timeout parameters
628  *
629  * This function allows the caller to switch the RPC transport for the
630  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
631  * server, for instance.  It assumes that the caller has ensured that
632  * there are no active RPC tasks by using some form of locking.
633  *
634  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
635  * negative errno is returned, and "clnt" continues to use the old
636  * xprt.
637  */
638 int rpc_switch_client_transport(struct rpc_clnt *clnt,
639                 struct xprt_create *args,
640                 const struct rpc_timeout *timeout)
641 {
642         const struct rpc_timeout *old_timeo;
643         rpc_authflavor_t pseudoflavor;
644         struct rpc_xprt *xprt, *old;
645         struct rpc_clnt *parent;
646         int err;
647
648         xprt = xprt_create_transport(args);
649         if (IS_ERR(xprt)) {
650                 dprintk("RPC:       failed to create new xprt for clnt %p\n",
651                         clnt);
652                 return PTR_ERR(xprt);
653         }
654
655         pseudoflavor = clnt->cl_auth->au_flavor;
656
657         old_timeo = clnt->cl_timeout;
658         old = rpc_clnt_set_transport(clnt, xprt, timeout);
659
660         rpc_unregister_client(clnt);
661         __rpc_clnt_remove_pipedir(clnt);
662
663         /*
664          * A new transport was created.  "clnt" therefore
665          * becomes the root of a new cl_parent tree.  clnt's
666          * children, if it has any, still point to the old xprt.
667          */
668         parent = clnt->cl_parent;
669         clnt->cl_parent = clnt;
670
671         /*
672          * The old rpc_auth cache cannot be re-used.  GSS
673          * contexts in particular are between a single
674          * client and server.
675          */
676         err = rpc_client_register(clnt, pseudoflavor, NULL);
677         if (err)
678                 goto out_revert;
679
680         synchronize_rcu();
681         if (parent != clnt)
682                 rpc_release_client(parent);
683         xprt_put(old);
684         dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
685         return 0;
686
687 out_revert:
688         rpc_clnt_set_transport(clnt, old, old_timeo);
689         clnt->cl_parent = parent;
690         rpc_client_register(clnt, pseudoflavor, NULL);
691         xprt_put(xprt);
692         dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
693         return err;
694 }
695 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
696
697 /*
698  * Kill all tasks for the given client.
699  * XXX: kill their descendants as well?
700  */
701 void rpc_killall_tasks(struct rpc_clnt *clnt)
702 {
703         struct rpc_task *rovr;
704
705
706         if (list_empty(&clnt->cl_tasks))
707                 return;
708         dprintk("RPC:       killing all tasks for client %p\n", clnt);
709         /*
710          * Spin lock all_tasks to prevent changes...
711          */
712         spin_lock(&clnt->cl_lock);
713         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
714                 if (!RPC_IS_ACTIVATED(rovr))
715                         continue;
716                 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
717                         rovr->tk_flags |= RPC_TASK_KILLED;
718                         rpc_exit(rovr, -EIO);
719                         if (RPC_IS_QUEUED(rovr))
720                                 rpc_wake_up_queued_task(rovr->tk_waitqueue,
721                                                         rovr);
722                 }
723         }
724         spin_unlock(&clnt->cl_lock);
725 }
726 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
727
728 /*
729  * Properly shut down an RPC client, terminating all outstanding
730  * requests.
731  */
732 void rpc_shutdown_client(struct rpc_clnt *clnt)
733 {
734         might_sleep();
735
736         dprintk_rcu("RPC:       shutting down %s client for %s\n",
737                         clnt->cl_program->name,
738                         rcu_dereference(clnt->cl_xprt)->servername);
739
740         while (!list_empty(&clnt->cl_tasks)) {
741                 rpc_killall_tasks(clnt);
742                 wait_event_timeout(destroy_wait,
743                         list_empty(&clnt->cl_tasks), 1*HZ);
744         }
745
746         rpc_release_client(clnt);
747 }
748 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
749
750 /*
751  * Free an RPC client
752  */
753 static void
754 rpc_free_client(struct rpc_clnt *clnt)
755 {
756         dprintk_rcu("RPC:       destroying %s client for %s\n",
757                         clnt->cl_program->name,
758                         rcu_dereference(clnt->cl_xprt)->servername);
759         if (clnt->cl_parent != clnt)
760                 rpc_release_client(clnt->cl_parent);
761         rpc_clnt_remove_pipedir(clnt);
762         rpc_unregister_client(clnt);
763         rpc_free_iostats(clnt->cl_metrics);
764         clnt->cl_metrics = NULL;
765         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
766         rpciod_down();
767         rpc_free_clid(clnt);
768         kfree(clnt);
769 }
770
771 /*
772  * Free an RPC client
773  */
774 static void
775 rpc_free_auth(struct rpc_clnt *clnt)
776 {
777         if (clnt->cl_auth == NULL) {
778                 rpc_free_client(clnt);
779                 return;
780         }
781
782         /*
783          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
784          *       release remaining GSS contexts. This mechanism ensures
785          *       that it can do so safely.
786          */
787         atomic_inc(&clnt->cl_count);
788         rpcauth_release(clnt->cl_auth);
789         clnt->cl_auth = NULL;
790         if (atomic_dec_and_test(&clnt->cl_count))
791                 rpc_free_client(clnt);
792 }
793
794 /*
795  * Release reference to the RPC client
796  */
797 void
798 rpc_release_client(struct rpc_clnt *clnt)
799 {
800         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
801
802         if (list_empty(&clnt->cl_tasks))
803                 wake_up(&destroy_wait);
804         if (atomic_dec_and_test(&clnt->cl_count))
805                 rpc_free_auth(clnt);
806 }
807 EXPORT_SYMBOL_GPL(rpc_release_client);
808
809 /**
810  * rpc_bind_new_program - bind a new RPC program to an existing client
811  * @old: old rpc_client
812  * @program: rpc program to set
813  * @vers: rpc program version
814  *
815  * Clones the rpc client and sets up a new RPC program. This is mainly
816  * of use for enabling different RPC programs to share the same transport.
817  * The Sun NFSv2/v3 ACL protocol can do this.
818  */
819 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
820                                       const struct rpc_program *program,
821                                       u32 vers)
822 {
823         struct rpc_create_args args = {
824                 .program        = program,
825                 .prognumber     = program->number,
826                 .version        = vers,
827                 .authflavor     = old->cl_auth->au_flavor,
828         };
829         struct rpc_clnt *clnt;
830         int err;
831
832         clnt = __rpc_clone_client(&args, old);
833         if (IS_ERR(clnt))
834                 goto out;
835         err = rpc_ping(clnt);
836         if (err != 0) {
837                 rpc_shutdown_client(clnt);
838                 clnt = ERR_PTR(err);
839         }
840 out:
841         return clnt;
842 }
843 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
844
845 void rpc_task_release_client(struct rpc_task *task)
846 {
847         struct rpc_clnt *clnt = task->tk_client;
848
849         if (clnt != NULL) {
850                 /* Remove from client task list */
851                 spin_lock(&clnt->cl_lock);
852                 list_del(&task->tk_task);
853                 spin_unlock(&clnt->cl_lock);
854                 task->tk_client = NULL;
855
856                 rpc_release_client(clnt);
857         }
858 }
859
860 static
861 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
862 {
863         if (clnt != NULL) {
864                 rpc_task_release_client(task);
865                 task->tk_client = clnt;
866                 atomic_inc(&clnt->cl_count);
867                 if (clnt->cl_softrtry)
868                         task->tk_flags |= RPC_TASK_SOFT;
869                 if (clnt->cl_noretranstimeo)
870                         task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
871                 if (sk_memalloc_socks()) {
872                         struct rpc_xprt *xprt;
873
874                         rcu_read_lock();
875                         xprt = rcu_dereference(clnt->cl_xprt);
876                         if (xprt->swapper)
877                                 task->tk_flags |= RPC_TASK_SWAPPER;
878                         rcu_read_unlock();
879                 }
880                 /* Add to the client's list of all tasks */
881                 spin_lock(&clnt->cl_lock);
882                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
883                 spin_unlock(&clnt->cl_lock);
884         }
885 }
886
887 void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
888 {
889         rpc_task_release_client(task);
890         rpc_task_set_client(task, clnt);
891 }
892 EXPORT_SYMBOL_GPL(rpc_task_reset_client);
893
894
895 static void
896 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
897 {
898         if (msg != NULL) {
899                 task->tk_msg.rpc_proc = msg->rpc_proc;
900                 task->tk_msg.rpc_argp = msg->rpc_argp;
901                 task->tk_msg.rpc_resp = msg->rpc_resp;
902                 if (msg->rpc_cred != NULL)
903                         task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
904         }
905 }
906
907 /*
908  * Default callback for async RPC calls
909  */
910 static void
911 rpc_default_callback(struct rpc_task *task, void *data)
912 {
913 }
914
915 static const struct rpc_call_ops rpc_default_ops = {
916         .rpc_call_done = rpc_default_callback,
917 };
918
919 /**
920  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
921  * @task_setup_data: pointer to task initialisation data
922  */
923 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
924 {
925         struct rpc_task *task;
926
927         task = rpc_new_task(task_setup_data);
928         if (IS_ERR(task))
929                 goto out;
930
931         rpc_task_set_client(task, task_setup_data->rpc_client);
932         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
933
934         if (task->tk_action == NULL)
935                 rpc_call_start(task);
936
937         atomic_inc(&task->tk_count);
938         rpc_execute(task);
939 out:
940         return task;
941 }
942 EXPORT_SYMBOL_GPL(rpc_run_task);
943
944 /**
945  * rpc_call_sync - Perform a synchronous RPC call
946  * @clnt: pointer to RPC client
947  * @msg: RPC call parameters
948  * @flags: RPC call flags
949  */
950 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
951 {
952         struct rpc_task *task;
953         struct rpc_task_setup task_setup_data = {
954                 .rpc_client = clnt,
955                 .rpc_message = msg,
956                 .callback_ops = &rpc_default_ops,
957                 .flags = flags,
958         };
959         int status;
960
961         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
962         if (flags & RPC_TASK_ASYNC) {
963                 rpc_release_calldata(task_setup_data.callback_ops,
964                         task_setup_data.callback_data);
965                 return -EINVAL;
966         }
967
968         task = rpc_run_task(&task_setup_data);
969         if (IS_ERR(task))
970                 return PTR_ERR(task);
971         status = task->tk_status;
972         rpc_put_task(task);
973         return status;
974 }
975 EXPORT_SYMBOL_GPL(rpc_call_sync);
976
977 /**
978  * rpc_call_async - Perform an asynchronous RPC call
979  * @clnt: pointer to RPC client
980  * @msg: RPC call parameters
981  * @flags: RPC call flags
982  * @tk_ops: RPC call ops
983  * @data: user call data
984  */
985 int
986 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
987                const struct rpc_call_ops *tk_ops, void *data)
988 {
989         struct rpc_task *task;
990         struct rpc_task_setup task_setup_data = {
991                 .rpc_client = clnt,
992                 .rpc_message = msg,
993                 .callback_ops = tk_ops,
994                 .callback_data = data,
995                 .flags = flags|RPC_TASK_ASYNC,
996         };
997
998         task = rpc_run_task(&task_setup_data);
999         if (IS_ERR(task))
1000                 return PTR_ERR(task);
1001         rpc_put_task(task);
1002         return 0;
1003 }
1004 EXPORT_SYMBOL_GPL(rpc_call_async);
1005
1006 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1007 /**
1008  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1009  * rpc_execute against it
1010  * @req: RPC request
1011  * @tk_ops: RPC call ops
1012  */
1013 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
1014                                 const struct rpc_call_ops *tk_ops)
1015 {
1016         struct rpc_task *task;
1017         struct xdr_buf *xbufp = &req->rq_snd_buf;
1018         struct rpc_task_setup task_setup_data = {
1019                 .callback_ops = tk_ops,
1020         };
1021
1022         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1023         /*
1024          * Create an rpc_task to send the data
1025          */
1026         task = rpc_new_task(&task_setup_data);
1027         if (IS_ERR(task)) {
1028                 xprt_free_bc_request(req);
1029                 goto out;
1030         }
1031         task->tk_rqstp = req;
1032
1033         /*
1034          * Set up the xdr_buf length.
1035          * This also indicates that the buffer is XDR encoded already.
1036          */
1037         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1038                         xbufp->tail[0].iov_len;
1039
1040         task->tk_action = call_bc_transmit;
1041         atomic_inc(&task->tk_count);
1042         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1043         rpc_execute(task);
1044
1045 out:
1046         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1047         return task;
1048 }
1049 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1050
1051 void
1052 rpc_call_start(struct rpc_task *task)
1053 {
1054         task->tk_action = call_start;
1055 }
1056 EXPORT_SYMBOL_GPL(rpc_call_start);
1057
1058 /**
1059  * rpc_peeraddr - extract remote peer address from clnt's xprt
1060  * @clnt: RPC client structure
1061  * @buf: target buffer
1062  * @bufsize: length of target buffer
1063  *
1064  * Returns the number of bytes that are actually in the stored address.
1065  */
1066 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1067 {
1068         size_t bytes;
1069         struct rpc_xprt *xprt;
1070
1071         rcu_read_lock();
1072         xprt = rcu_dereference(clnt->cl_xprt);
1073
1074         bytes = xprt->addrlen;
1075         if (bytes > bufsize)
1076                 bytes = bufsize;
1077         memcpy(buf, &xprt->addr, bytes);
1078         rcu_read_unlock();
1079
1080         return bytes;
1081 }
1082 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1083
1084 /**
1085  * rpc_peeraddr2str - return remote peer address in printable format
1086  * @clnt: RPC client structure
1087  * @format: address format
1088  *
1089  * NB: the lifetime of the memory referenced by the returned pointer is
1090  * the same as the rpc_xprt itself.  As long as the caller uses this
1091  * pointer, it must hold the RCU read lock.
1092  */
1093 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1094                              enum rpc_display_format_t format)
1095 {
1096         struct rpc_xprt *xprt;
1097
1098         xprt = rcu_dereference(clnt->cl_xprt);
1099
1100         if (xprt->address_strings[format] != NULL)
1101                 return xprt->address_strings[format];
1102         else
1103                 return "unprintable";
1104 }
1105 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1106
1107 static const struct sockaddr_in rpc_inaddr_loopback = {
1108         .sin_family             = AF_INET,
1109         .sin_addr.s_addr        = htonl(INADDR_ANY),
1110 };
1111
1112 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1113         .sin6_family            = AF_INET6,
1114         .sin6_addr              = IN6ADDR_ANY_INIT,
1115 };
1116
1117 /*
1118  * Try a getsockname() on a connected datagram socket.  Using a
1119  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1120  * This conserves the ephemeral port number space.
1121  *
1122  * Returns zero and fills in "buf" if successful; otherwise, a
1123  * negative errno is returned.
1124  */
1125 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1126                         struct sockaddr *buf, int buflen)
1127 {
1128         struct socket *sock;
1129         int err;
1130
1131         err = __sock_create(net, sap->sa_family,
1132                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1133         if (err < 0) {
1134                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1135                 goto out;
1136         }
1137
1138         switch (sap->sa_family) {
1139         case AF_INET:
1140                 err = kernel_bind(sock,
1141                                 (struct sockaddr *)&rpc_inaddr_loopback,
1142                                 sizeof(rpc_inaddr_loopback));
1143                 break;
1144         case AF_INET6:
1145                 err = kernel_bind(sock,
1146                                 (struct sockaddr *)&rpc_in6addr_loopback,
1147                                 sizeof(rpc_in6addr_loopback));
1148                 break;
1149         default:
1150                 err = -EAFNOSUPPORT;
1151                 goto out;
1152         }
1153         if (err < 0) {
1154                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1155                 goto out_release;
1156         }
1157
1158         err = kernel_connect(sock, sap, salen, 0);
1159         if (err < 0) {
1160                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1161                 goto out_release;
1162         }
1163
1164         err = kernel_getsockname(sock, buf, &buflen);
1165         if (err < 0) {
1166                 dprintk("RPC:       getsockname failed (%d)\n", err);
1167                 goto out_release;
1168         }
1169
1170         err = 0;
1171         if (buf->sa_family == AF_INET6) {
1172                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1173                 sin6->sin6_scope_id = 0;
1174         }
1175         dprintk("RPC:       %s succeeded\n", __func__);
1176
1177 out_release:
1178         sock_release(sock);
1179 out:
1180         return err;
1181 }
1182
1183 /*
1184  * Scraping a connected socket failed, so we don't have a useable
1185  * local address.  Fallback: generate an address that will prevent
1186  * the server from calling us back.
1187  *
1188  * Returns zero and fills in "buf" if successful; otherwise, a
1189  * negative errno is returned.
1190  */
1191 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1192 {
1193         switch (family) {
1194         case AF_INET:
1195                 if (buflen < sizeof(rpc_inaddr_loopback))
1196                         return -EINVAL;
1197                 memcpy(buf, &rpc_inaddr_loopback,
1198                                 sizeof(rpc_inaddr_loopback));
1199                 break;
1200         case AF_INET6:
1201                 if (buflen < sizeof(rpc_in6addr_loopback))
1202                         return -EINVAL;
1203                 memcpy(buf, &rpc_in6addr_loopback,
1204                                 sizeof(rpc_in6addr_loopback));
1205         default:
1206                 dprintk("RPC:       %s: address family not supported\n",
1207                         __func__);
1208                 return -EAFNOSUPPORT;
1209         }
1210         dprintk("RPC:       %s: succeeded\n", __func__);
1211         return 0;
1212 }
1213
1214 /**
1215  * rpc_localaddr - discover local endpoint address for an RPC client
1216  * @clnt: RPC client structure
1217  * @buf: target buffer
1218  * @buflen: size of target buffer, in bytes
1219  *
1220  * Returns zero and fills in "buf" and "buflen" if successful;
1221  * otherwise, a negative errno is returned.
1222  *
1223  * This works even if the underlying transport is not currently connected,
1224  * or if the upper layer never previously provided a source address.
1225  *
1226  * The result of this function call is transient: multiple calls in
1227  * succession may give different results, depending on how local
1228  * networking configuration changes over time.
1229  */
1230 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1231 {
1232         struct sockaddr_storage address;
1233         struct sockaddr *sap = (struct sockaddr *)&address;
1234         struct rpc_xprt *xprt;
1235         struct net *net;
1236         size_t salen;
1237         int err;
1238
1239         rcu_read_lock();
1240         xprt = rcu_dereference(clnt->cl_xprt);
1241         salen = xprt->addrlen;
1242         memcpy(sap, &xprt->addr, salen);
1243         net = get_net(xprt->xprt_net);
1244         rcu_read_unlock();
1245
1246         rpc_set_port(sap, 0);
1247         err = rpc_sockname(net, sap, salen, buf, buflen);
1248         put_net(net);
1249         if (err != 0)
1250                 /* Couldn't discover local address, return ANYADDR */
1251                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1252         return 0;
1253 }
1254 EXPORT_SYMBOL_GPL(rpc_localaddr);
1255
1256 void
1257 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1258 {
1259         struct rpc_xprt *xprt;
1260
1261         rcu_read_lock();
1262         xprt = rcu_dereference(clnt->cl_xprt);
1263         if (xprt->ops->set_buffer_size)
1264                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1265         rcu_read_unlock();
1266 }
1267 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1268
1269 /**
1270  * rpc_protocol - Get transport protocol number for an RPC client
1271  * @clnt: RPC client to query
1272  *
1273  */
1274 int rpc_protocol(struct rpc_clnt *clnt)
1275 {
1276         int protocol;
1277
1278         rcu_read_lock();
1279         protocol = rcu_dereference(clnt->cl_xprt)->prot;
1280         rcu_read_unlock();
1281         return protocol;
1282 }
1283 EXPORT_SYMBOL_GPL(rpc_protocol);
1284
1285 /**
1286  * rpc_net_ns - Get the network namespace for this RPC client
1287  * @clnt: RPC client to query
1288  *
1289  */
1290 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1291 {
1292         struct net *ret;
1293
1294         rcu_read_lock();
1295         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1296         rcu_read_unlock();
1297         return ret;
1298 }
1299 EXPORT_SYMBOL_GPL(rpc_net_ns);
1300
1301 /**
1302  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1303  * @clnt: RPC client to query
1304  *
1305  * For stream transports, this is one RPC record fragment (see RFC
1306  * 1831), as we don't support multi-record requests yet.  For datagram
1307  * transports, this is the size of an IP packet minus the IP, UDP, and
1308  * RPC header sizes.
1309  */
1310 size_t rpc_max_payload(struct rpc_clnt *clnt)
1311 {
1312         size_t ret;
1313
1314         rcu_read_lock();
1315         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1316         rcu_read_unlock();
1317         return ret;
1318 }
1319 EXPORT_SYMBOL_GPL(rpc_max_payload);
1320
1321 /**
1322  * rpc_get_timeout - Get timeout for transport in units of HZ
1323  * @clnt: RPC client to query
1324  */
1325 unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1326 {
1327         unsigned long ret;
1328
1329         rcu_read_lock();
1330         ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1331         rcu_read_unlock();
1332         return ret;
1333 }
1334 EXPORT_SYMBOL_GPL(rpc_get_timeout);
1335
1336 /**
1337  * rpc_force_rebind - force transport to check that remote port is unchanged
1338  * @clnt: client to rebind
1339  *
1340  */
1341 void rpc_force_rebind(struct rpc_clnt *clnt)
1342 {
1343         if (clnt->cl_autobind) {
1344                 rcu_read_lock();
1345                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1346                 rcu_read_unlock();
1347         }
1348 }
1349 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1350
1351 /*
1352  * Restart an (async) RPC call from the call_prepare state.
1353  * Usually called from within the exit handler.
1354  */
1355 int
1356 rpc_restart_call_prepare(struct rpc_task *task)
1357 {
1358         if (RPC_ASSASSINATED(task))
1359                 return 0;
1360         task->tk_action = call_start;
1361         if (task->tk_ops->rpc_call_prepare != NULL)
1362                 task->tk_action = rpc_prepare_task;
1363         return 1;
1364 }
1365 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1366
1367 /*
1368  * Restart an (async) RPC call. Usually called from within the
1369  * exit handler.
1370  */
1371 int
1372 rpc_restart_call(struct rpc_task *task)
1373 {
1374         if (RPC_ASSASSINATED(task))
1375                 return 0;
1376         task->tk_action = call_start;
1377         return 1;
1378 }
1379 EXPORT_SYMBOL_GPL(rpc_restart_call);
1380
1381 #ifdef RPC_DEBUG
1382 static const char *rpc_proc_name(const struct rpc_task *task)
1383 {
1384         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1385
1386         if (proc) {
1387                 if (proc->p_name)
1388                         return proc->p_name;
1389                 else
1390                         return "NULL";
1391         } else
1392                 return "no proc";
1393 }
1394 #endif
1395
1396 /*
1397  * 0.  Initial state
1398  *
1399  *     Other FSM states can be visited zero or more times, but
1400  *     this state is visited exactly once for each RPC.
1401  */
1402 static void
1403 call_start(struct rpc_task *task)
1404 {
1405         struct rpc_clnt *clnt = task->tk_client;
1406
1407         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1408                         clnt->cl_program->name, clnt->cl_vers,
1409                         rpc_proc_name(task),
1410                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
1411
1412         /* Increment call count */
1413         task->tk_msg.rpc_proc->p_count++;
1414         clnt->cl_stats->rpccnt++;
1415         task->tk_action = call_reserve;
1416 }
1417
1418 /*
1419  * 1.   Reserve an RPC call slot
1420  */
1421 static void
1422 call_reserve(struct rpc_task *task)
1423 {
1424         dprint_status(task);
1425
1426         task->tk_status  = 0;
1427         task->tk_action  = call_reserveresult;
1428         xprt_reserve(task);
1429 }
1430
1431 static void call_retry_reserve(struct rpc_task *task);
1432
1433 /*
1434  * 1b.  Grok the result of xprt_reserve()
1435  */
1436 static void
1437 call_reserveresult(struct rpc_task *task)
1438 {
1439         int status = task->tk_status;
1440
1441         dprint_status(task);
1442
1443         /*
1444          * After a call to xprt_reserve(), we must have either
1445          * a request slot or else an error status.
1446          */
1447         task->tk_status = 0;
1448         if (status >= 0) {
1449                 if (task->tk_rqstp) {
1450                         task->tk_action = call_refresh;
1451                         return;
1452                 }
1453
1454                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1455                                 __func__, status);
1456                 rpc_exit(task, -EIO);
1457                 return;
1458         }
1459
1460         /*
1461          * Even though there was an error, we may have acquired
1462          * a request slot somehow.  Make sure not to leak it.
1463          */
1464         if (task->tk_rqstp) {
1465                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1466                                 __func__, status);
1467                 xprt_release(task);
1468         }
1469
1470         switch (status) {
1471         case -ENOMEM:
1472                 rpc_delay(task, HZ >> 2);
1473         case -EAGAIN:   /* woken up; retry */
1474                 task->tk_action = call_retry_reserve;
1475                 return;
1476         case -EIO:      /* probably a shutdown */
1477                 break;
1478         default:
1479                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1480                                 __func__, status);
1481                 break;
1482         }
1483         rpc_exit(task, status);
1484 }
1485
1486 /*
1487  * 1c.  Retry reserving an RPC call slot
1488  */
1489 static void
1490 call_retry_reserve(struct rpc_task *task)
1491 {
1492         dprint_status(task);
1493
1494         task->tk_status  = 0;
1495         task->tk_action  = call_reserveresult;
1496         xprt_retry_reserve(task);
1497 }
1498
1499 /*
1500  * 2.   Bind and/or refresh the credentials
1501  */
1502 static void
1503 call_refresh(struct rpc_task *task)
1504 {
1505         dprint_status(task);
1506
1507         task->tk_action = call_refreshresult;
1508         task->tk_status = 0;
1509         task->tk_client->cl_stats->rpcauthrefresh++;
1510         rpcauth_refreshcred(task);
1511 }
1512
1513 /*
1514  * 2a.  Process the results of a credential refresh
1515  */
1516 static void
1517 call_refreshresult(struct rpc_task *task)
1518 {
1519         int status = task->tk_status;
1520
1521         dprint_status(task);
1522
1523         task->tk_status = 0;
1524         task->tk_action = call_refresh;
1525         switch (status) {
1526         case 0:
1527                 if (rpcauth_uptodatecred(task))
1528                         task->tk_action = call_allocate;
1529                 return;
1530         case -ETIMEDOUT:
1531                 rpc_delay(task, 3*HZ);
1532         case -EAGAIN:
1533                 status = -EACCES;
1534         case -EKEYEXPIRED:
1535                 if (!task->tk_cred_retry)
1536                         break;
1537                 task->tk_cred_retry--;
1538                 dprintk("RPC: %5u %s: retry refresh creds\n",
1539                                 task->tk_pid, __func__);
1540                 return;
1541         }
1542         dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1543                                 task->tk_pid, __func__, status);
1544         rpc_exit(task, status);
1545 }
1546
1547 /*
1548  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1549  *      (Note: buffer memory is freed in xprt_release).
1550  */
1551 static void
1552 call_allocate(struct rpc_task *task)
1553 {
1554         unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1555         struct rpc_rqst *req = task->tk_rqstp;
1556         struct rpc_xprt *xprt = req->rq_xprt;
1557         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1558
1559         dprint_status(task);
1560
1561         task->tk_status = 0;
1562         task->tk_action = call_bind;
1563
1564         if (req->rq_buffer)
1565                 return;
1566
1567         if (proc->p_proc != 0) {
1568                 BUG_ON(proc->p_arglen == 0);
1569                 if (proc->p_decode != NULL)
1570                         BUG_ON(proc->p_replen == 0);
1571         }
1572
1573         /*
1574          * Calculate the size (in quads) of the RPC call
1575          * and reply headers, and convert both values
1576          * to byte sizes.
1577          */
1578         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1579         req->rq_callsize <<= 2;
1580         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1581         req->rq_rcvsize <<= 2;
1582
1583         req->rq_buffer = xprt->ops->buf_alloc(task,
1584                                         req->rq_callsize + req->rq_rcvsize);
1585         if (req->rq_buffer != NULL)
1586                 return;
1587
1588         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1589
1590         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1591                 task->tk_action = call_allocate;
1592                 rpc_delay(task, HZ>>4);
1593                 return;
1594         }
1595
1596         rpc_exit(task, -ERESTARTSYS);
1597 }
1598
1599 static inline int
1600 rpc_task_need_encode(struct rpc_task *task)
1601 {
1602         return task->tk_rqstp->rq_snd_buf.len == 0;
1603 }
1604
1605 static inline void
1606 rpc_task_force_reencode(struct rpc_task *task)
1607 {
1608         task->tk_rqstp->rq_snd_buf.len = 0;
1609         task->tk_rqstp->rq_bytes_sent = 0;
1610 }
1611
1612 static inline void
1613 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1614 {
1615         buf->head[0].iov_base = start;
1616         buf->head[0].iov_len = len;
1617         buf->tail[0].iov_len = 0;
1618         buf->page_len = 0;
1619         buf->flags = 0;
1620         buf->len = 0;
1621         buf->buflen = len;
1622 }
1623
1624 /*
1625  * 3.   Encode arguments of an RPC call
1626  */
1627 static void
1628 rpc_xdr_encode(struct rpc_task *task)
1629 {
1630         struct rpc_rqst *req = task->tk_rqstp;
1631         kxdreproc_t     encode;
1632         __be32          *p;
1633
1634         dprint_status(task);
1635
1636         rpc_xdr_buf_init(&req->rq_snd_buf,
1637                          req->rq_buffer,
1638                          req->rq_callsize);
1639         rpc_xdr_buf_init(&req->rq_rcv_buf,
1640                          (char *)req->rq_buffer + req->rq_callsize,
1641                          req->rq_rcvsize);
1642
1643         p = rpc_encode_header(task);
1644         if (p == NULL) {
1645                 printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1646                 rpc_exit(task, -EIO);
1647                 return;
1648         }
1649
1650         encode = task->tk_msg.rpc_proc->p_encode;
1651         if (encode == NULL)
1652                 return;
1653
1654         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1655                         task->tk_msg.rpc_argp);
1656 }
1657
1658 /*
1659  * 4.   Get the server port number if not yet set
1660  */
1661 static void
1662 call_bind(struct rpc_task *task)
1663 {
1664         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1665
1666         dprint_status(task);
1667
1668         task->tk_action = call_connect;
1669         if (!xprt_bound(xprt)) {
1670                 task->tk_action = call_bind_status;
1671                 task->tk_timeout = xprt->bind_timeout;
1672                 xprt->ops->rpcbind(task);
1673         }
1674 }
1675
1676 /*
1677  * 4a.  Sort out bind result
1678  */
1679 static void
1680 call_bind_status(struct rpc_task *task)
1681 {
1682         int status = -EIO;
1683
1684         if (task->tk_status >= 0) {
1685                 dprint_status(task);
1686                 task->tk_status = 0;
1687                 task->tk_action = call_connect;
1688                 return;
1689         }
1690
1691         trace_rpc_bind_status(task);
1692         switch (task->tk_status) {
1693         case -ENOMEM:
1694                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1695                 rpc_delay(task, HZ >> 2);
1696                 goto retry_timeout;
1697         case -EACCES:
1698                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
1699                                 "unavailable\n", task->tk_pid);
1700                 /* fail immediately if this is an RPC ping */
1701                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1702                         status = -EOPNOTSUPP;
1703                         break;
1704                 }
1705                 if (task->tk_rebind_retry == 0)
1706                         break;
1707                 task->tk_rebind_retry--;
1708                 rpc_delay(task, 3*HZ);
1709                 goto retry_timeout;
1710         case -ETIMEDOUT:
1711                 dprintk("RPC: %5u rpcbind request timed out\n",
1712                                 task->tk_pid);
1713                 goto retry_timeout;
1714         case -EPFNOSUPPORT:
1715                 /* server doesn't support any rpcbind version we know of */
1716                 dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1717                                 task->tk_pid);
1718                 break;
1719         case -EPROTONOSUPPORT:
1720                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1721                                 task->tk_pid);
1722                 task->tk_status = 0;
1723                 task->tk_action = call_bind;
1724                 return;
1725         case -ECONNREFUSED:             /* connection problems */
1726         case -ECONNRESET:
1727         case -ENOTCONN:
1728         case -EHOSTDOWN:
1729         case -EHOSTUNREACH:
1730         case -ENETUNREACH:
1731         case -EPIPE:
1732                 dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1733                                 task->tk_pid, task->tk_status);
1734                 if (!RPC_IS_SOFTCONN(task)) {
1735                         rpc_delay(task, 5*HZ);
1736                         goto retry_timeout;
1737                 }
1738                 status = task->tk_status;
1739                 break;
1740         default:
1741                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1742                                 task->tk_pid, -task->tk_status);
1743         }
1744
1745         rpc_exit(task, status);
1746         return;
1747
1748 retry_timeout:
1749         task->tk_action = call_timeout;
1750 }
1751
1752 /*
1753  * 4b.  Connect to the RPC server
1754  */
1755 static void
1756 call_connect(struct rpc_task *task)
1757 {
1758         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1759
1760         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1761                         task->tk_pid, xprt,
1762                         (xprt_connected(xprt) ? "is" : "is not"));
1763
1764         task->tk_action = call_transmit;
1765         if (!xprt_connected(xprt)) {
1766                 task->tk_action = call_connect_status;
1767                 if (task->tk_status < 0)
1768                         return;
1769                 if (task->tk_flags & RPC_TASK_NOCONNECT) {
1770                         rpc_exit(task, -ENOTCONN);
1771                         return;
1772                 }
1773                 xprt_connect(task);
1774         }
1775 }
1776
1777 /*
1778  * 4c.  Sort out connect result
1779  */
1780 static void
1781 call_connect_status(struct rpc_task *task)
1782 {
1783         struct rpc_clnt *clnt = task->tk_client;
1784         int status = task->tk_status;
1785
1786         dprint_status(task);
1787
1788         trace_rpc_connect_status(task, status);
1789         task->tk_status = 0;
1790         switch (status) {
1791                 /* if soft mounted, test if we've timed out */
1792         case -ETIMEDOUT:
1793                 task->tk_action = call_timeout;
1794                 return;
1795         case -ECONNREFUSED:
1796         case -ECONNRESET:
1797         case -ENETUNREACH:
1798                 /* retry with existing socket, after a delay */
1799                 rpc_delay(task, 3*HZ);
1800                 if (RPC_IS_SOFTCONN(task))
1801                         break;
1802         case -EAGAIN:
1803                 task->tk_action = call_bind;
1804                 return;
1805         case 0:
1806                 clnt->cl_stats->netreconn++;
1807                 task->tk_action = call_transmit;
1808                 return;
1809         }
1810         rpc_exit(task, status);
1811 }
1812
1813 /*
1814  * 5.   Transmit the RPC request, and wait for reply
1815  */
1816 static void
1817 call_transmit(struct rpc_task *task)
1818 {
1819         int is_retrans = RPC_WAS_SENT(task);
1820
1821         dprint_status(task);
1822
1823         task->tk_action = call_status;
1824         if (task->tk_status < 0)
1825                 return;
1826         if (!xprt_prepare_transmit(task))
1827                 return;
1828         task->tk_action = call_transmit_status;
1829         /* Encode here so that rpcsec_gss can use correct sequence number. */
1830         if (rpc_task_need_encode(task)) {
1831                 rpc_xdr_encode(task);
1832                 /* Did the encode result in an error condition? */
1833                 if (task->tk_status != 0) {
1834                         /* Was the error nonfatal? */
1835                         if (task->tk_status == -EAGAIN)
1836                                 rpc_delay(task, HZ >> 4);
1837                         else
1838                                 rpc_exit(task, task->tk_status);
1839                         return;
1840                 }
1841         }
1842         xprt_transmit(task);
1843         if (task->tk_status < 0)
1844                 return;
1845         if (is_retrans)
1846                 task->tk_client->cl_stats->rpcretrans++;
1847         /*
1848          * On success, ensure that we call xprt_end_transmit() before sleeping
1849          * in order to allow access to the socket to other RPC requests.
1850          */
1851         call_transmit_status(task);
1852         if (rpc_reply_expected(task))
1853                 return;
1854         task->tk_action = rpc_exit_task;
1855         rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1856 }
1857
1858 /*
1859  * 5a.  Handle cleanup after a transmission
1860  */
1861 static void
1862 call_transmit_status(struct rpc_task *task)
1863 {
1864         task->tk_action = call_status;
1865
1866         /*
1867          * Common case: success.  Force the compiler to put this
1868          * test first.
1869          */
1870         if (task->tk_status == 0) {
1871                 xprt_end_transmit(task);
1872                 rpc_task_force_reencode(task);
1873                 return;
1874         }
1875
1876         switch (task->tk_status) {
1877         case -EAGAIN:
1878                 break;
1879         default:
1880                 dprint_status(task);
1881                 xprt_end_transmit(task);
1882                 rpc_task_force_reencode(task);
1883                 break;
1884                 /*
1885                  * Special cases: if we've been waiting on the
1886                  * socket's write_space() callback, or if the
1887                  * socket just returned a connection error,
1888                  * then hold onto the transport lock.
1889                  */
1890         case -ECONNREFUSED:
1891         case -EHOSTDOWN:
1892         case -EHOSTUNREACH:
1893         case -ENETUNREACH:
1894                 if (RPC_IS_SOFTCONN(task)) {
1895                         xprt_end_transmit(task);
1896                         rpc_exit(task, task->tk_status);
1897                         break;
1898                 }
1899         case -ECONNRESET:
1900         case -ENOTCONN:
1901         case -EPIPE:
1902                 rpc_task_force_reencode(task);
1903         }
1904 }
1905
1906 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1907 /*
1908  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1909  * addition, disconnect on connectivity errors.
1910  */
1911 static void
1912 call_bc_transmit(struct rpc_task *task)
1913 {
1914         struct rpc_rqst *req = task->tk_rqstp;
1915
1916         if (!xprt_prepare_transmit(task)) {
1917                 /*
1918                  * Could not reserve the transport. Try again after the
1919                  * transport is released.
1920                  */
1921                 task->tk_status = 0;
1922                 task->tk_action = call_bc_transmit;
1923                 return;
1924         }
1925
1926         task->tk_action = rpc_exit_task;
1927         if (task->tk_status < 0) {
1928                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1929                         "error: %d\n", task->tk_status);
1930                 return;
1931         }
1932
1933         xprt_transmit(task);
1934         xprt_end_transmit(task);
1935         dprint_status(task);
1936         switch (task->tk_status) {
1937         case 0:
1938                 /* Success */
1939                 break;
1940         case -EHOSTDOWN:
1941         case -EHOSTUNREACH:
1942         case -ENETUNREACH:
1943         case -ETIMEDOUT:
1944                 /*
1945                  * Problem reaching the server.  Disconnect and let the
1946                  * forechannel reestablish the connection.  The server will
1947                  * have to retransmit the backchannel request and we'll
1948                  * reprocess it.  Since these ops are idempotent, there's no
1949                  * need to cache our reply at this time.
1950                  */
1951                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1952                         "error: %d\n", task->tk_status);
1953                 xprt_conditional_disconnect(req->rq_xprt,
1954                         req->rq_connect_cookie);
1955                 break;
1956         default:
1957                 /*
1958                  * We were unable to reply and will have to drop the
1959                  * request.  The server should reconnect and retransmit.
1960                  */
1961                 WARN_ON_ONCE(task->tk_status == -EAGAIN);
1962                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1963                         "error: %d\n", task->tk_status);
1964                 break;
1965         }
1966         rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1967 }
1968 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1969
1970 /*
1971  * 6.   Sort out the RPC call status
1972  */
1973 static void
1974 call_status(struct rpc_task *task)
1975 {
1976         struct rpc_clnt *clnt = task->tk_client;
1977         struct rpc_rqst *req = task->tk_rqstp;
1978         int             status;
1979
1980         if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1981                 task->tk_status = req->rq_reply_bytes_recvd;
1982
1983         dprint_status(task);
1984
1985         status = task->tk_status;
1986         if (status >= 0) {
1987                 task->tk_action = call_decode;
1988                 return;
1989         }
1990
1991         trace_rpc_call_status(task);
1992         task->tk_status = 0;
1993         switch(status) {
1994         case -EHOSTDOWN:
1995         case -EHOSTUNREACH:
1996         case -ENETUNREACH:
1997                 /*
1998                  * Delay any retries for 3 seconds, then handle as if it
1999                  * were a timeout.
2000                  */
2001                 rpc_delay(task, 3*HZ);
2002         case -ETIMEDOUT:
2003                 task->tk_action = call_timeout;
2004                 if (!(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
2005                     && task->tk_client->cl_discrtry)
2006                         xprt_conditional_disconnect(req->rq_xprt,
2007                                         req->rq_connect_cookie);
2008                 break;
2009         case -ECONNRESET:
2010         case -ECONNREFUSED:
2011                 rpc_force_rebind(clnt);
2012                 rpc_delay(task, 3*HZ);
2013         case -EPIPE:
2014         case -ENOTCONN:
2015                 task->tk_action = call_bind;
2016                 break;
2017         case -EAGAIN:
2018                 task->tk_action = call_transmit;
2019                 break;
2020         case -EIO:
2021                 /* shutdown or soft timeout */
2022                 rpc_exit(task, status);
2023                 break;
2024         default:
2025                 if (clnt->cl_chatty)
2026                         printk("%s: RPC call returned error %d\n",
2027                                clnt->cl_program->name, -status);
2028                 rpc_exit(task, status);
2029         }
2030 }
2031
2032 /*
2033  * 6a.  Handle RPC timeout
2034  *      We do not release the request slot, so we keep using the
2035  *      same XID for all retransmits.
2036  */
2037 static void
2038 call_timeout(struct rpc_task *task)
2039 {
2040         struct rpc_clnt *clnt = task->tk_client;
2041
2042         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
2043                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
2044                 goto retry;
2045         }
2046
2047         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
2048         task->tk_timeouts++;
2049
2050         if (RPC_IS_SOFTCONN(task)) {
2051                 rpc_exit(task, -ETIMEDOUT);
2052                 return;
2053         }
2054         if (RPC_IS_SOFT(task)) {
2055                 if (clnt->cl_chatty) {
2056                         rcu_read_lock();
2057                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
2058                                 clnt->cl_program->name,
2059                                 rcu_dereference(clnt->cl_xprt)->servername);
2060                         rcu_read_unlock();
2061                 }
2062                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2063                         rpc_exit(task, -ETIMEDOUT);
2064                 else
2065                         rpc_exit(task, -EIO);
2066                 return;
2067         }
2068
2069         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2070                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2071                 if (clnt->cl_chatty) {
2072                         rcu_read_lock();
2073                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
2074                         clnt->cl_program->name,
2075                         rcu_dereference(clnt->cl_xprt)->servername);
2076                         rcu_read_unlock();
2077                 }
2078         }
2079         rpc_force_rebind(clnt);
2080         /*
2081          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2082          * event? RFC2203 requires the server to drop all such requests.
2083          */
2084         rpcauth_invalcred(task);
2085
2086 retry:
2087         task->tk_action = call_bind;
2088         task->tk_status = 0;
2089 }
2090
2091 /*
2092  * 7.   Decode the RPC reply
2093  */
2094 static void
2095 call_decode(struct rpc_task *task)
2096 {
2097         struct rpc_clnt *clnt = task->tk_client;
2098         struct rpc_rqst *req = task->tk_rqstp;
2099         kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
2100         __be32          *p;
2101
2102         dprint_status(task);
2103
2104         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2105                 if (clnt->cl_chatty) {
2106                         rcu_read_lock();
2107                         printk(KERN_NOTICE "%s: server %s OK\n",
2108                                 clnt->cl_program->name,
2109                                 rcu_dereference(clnt->cl_xprt)->servername);
2110                         rcu_read_unlock();
2111                 }
2112                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2113         }
2114
2115         /*
2116          * Ensure that we see all writes made by xprt_complete_rqst()
2117          * before it changed req->rq_reply_bytes_recvd.
2118          */
2119         smp_rmb();
2120         req->rq_rcv_buf.len = req->rq_private_buf.len;
2121
2122         /* Check that the softirq receive buffer is valid */
2123         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2124                                 sizeof(req->rq_rcv_buf)) != 0);
2125
2126         if (req->rq_rcv_buf.len < 12) {
2127                 if (!RPC_IS_SOFT(task)) {
2128                         task->tk_action = call_bind;
2129                         goto out_retry;
2130                 }
2131                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2132                                 clnt->cl_program->name, task->tk_status);
2133                 task->tk_action = call_timeout;
2134                 goto out_retry;
2135         }
2136
2137         p = rpc_verify_header(task);
2138         if (IS_ERR(p)) {
2139                 if (p == ERR_PTR(-EAGAIN))
2140                         goto out_retry;
2141                 return;
2142         }
2143
2144         task->tk_action = rpc_exit_task;
2145
2146         if (decode) {
2147                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2148                                                       task->tk_msg.rpc_resp);
2149         }
2150         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2151                         task->tk_status);
2152         return;
2153 out_retry:
2154         task->tk_status = 0;
2155         /* Note: rpc_verify_header() may have freed the RPC slot */
2156         if (task->tk_rqstp == req) {
2157                 req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2158                 if (task->tk_client->cl_discrtry)
2159                         xprt_conditional_disconnect(req->rq_xprt,
2160                                         req->rq_connect_cookie);
2161         }
2162 }
2163
2164 static __be32 *
2165 rpc_encode_header(struct rpc_task *task)
2166 {
2167         struct rpc_clnt *clnt = task->tk_client;
2168         struct rpc_rqst *req = task->tk_rqstp;
2169         __be32          *p = req->rq_svec[0].iov_base;
2170
2171         /* FIXME: check buffer size? */
2172
2173         p = xprt_skip_transport_header(req->rq_xprt, p);
2174         *p++ = req->rq_xid;             /* XID */
2175         *p++ = htonl(RPC_CALL);         /* CALL */
2176         *p++ = htonl(RPC_VERSION);      /* RPC version */
2177         *p++ = htonl(clnt->cl_prog);    /* program number */
2178         *p++ = htonl(clnt->cl_vers);    /* program version */
2179         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2180         p = rpcauth_marshcred(task, p);
2181         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2182         return p;
2183 }
2184
2185 static __be32 *
2186 rpc_verify_header(struct rpc_task *task)
2187 {
2188         struct rpc_clnt *clnt = task->tk_client;
2189         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2190         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2191         __be32  *p = iov->iov_base;
2192         u32 n;
2193         int error = -EACCES;
2194
2195         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2196                 /* RFC-1014 says that the representation of XDR data must be a
2197                  * multiple of four bytes
2198                  * - if it isn't pointer subtraction in the NFS client may give
2199                  *   undefined results
2200                  */
2201                 dprintk("RPC: %5u %s: XDR representation not a multiple of"
2202                        " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2203                        task->tk_rqstp->rq_rcv_buf.len);
2204                 error = -EIO;
2205                 goto out_err;
2206         }
2207         if ((len -= 3) < 0)
2208                 goto out_overflow;
2209
2210         p += 1; /* skip XID */
2211         if ((n = ntohl(*p++)) != RPC_REPLY) {
2212                 dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2213                         task->tk_pid, __func__, n);
2214                 error = -EIO;
2215                 goto out_garbage;
2216         }
2217
2218         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2219                 if (--len < 0)
2220                         goto out_overflow;
2221                 switch ((n = ntohl(*p++))) {
2222                 case RPC_AUTH_ERROR:
2223                         break;
2224                 case RPC_MISMATCH:
2225                         dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2226                                 task->tk_pid, __func__);
2227                         error = -EPROTONOSUPPORT;
2228                         goto out_err;
2229                 default:
2230                         dprintk("RPC: %5u %s: RPC call rejected, "
2231                                 "unknown error: %x\n",
2232                                 task->tk_pid, __func__, n);
2233                         error = -EIO;
2234                         goto out_err;
2235                 }
2236                 if (--len < 0)
2237                         goto out_overflow;
2238                 switch ((n = ntohl(*p++))) {
2239                 case RPC_AUTH_REJECTEDCRED:
2240                 case RPC_AUTH_REJECTEDVERF:
2241                 case RPCSEC_GSS_CREDPROBLEM:
2242                 case RPCSEC_GSS_CTXPROBLEM:
2243                         if (!task->tk_cred_retry)
2244                                 break;
2245                         task->tk_cred_retry--;
2246                         dprintk("RPC: %5u %s: retry stale creds\n",
2247                                         task->tk_pid, __func__);
2248                         rpcauth_invalcred(task);
2249                         /* Ensure we obtain a new XID! */
2250                         xprt_release(task);
2251                         task->tk_action = call_reserve;
2252                         goto out_retry;
2253                 case RPC_AUTH_BADCRED:
2254                 case RPC_AUTH_BADVERF:
2255                         /* possibly garbled cred/verf? */
2256                         if (!task->tk_garb_retry)
2257                                 break;
2258                         task->tk_garb_retry--;
2259                         dprintk("RPC: %5u %s: retry garbled creds\n",
2260                                         task->tk_pid, __func__);
2261                         task->tk_action = call_bind;
2262                         goto out_retry;
2263                 case RPC_AUTH_TOOWEAK:
2264                         rcu_read_lock();
2265                         printk(KERN_NOTICE "RPC: server %s requires stronger "
2266                                "authentication.\n",
2267                                rcu_dereference(clnt->cl_xprt)->servername);
2268                         rcu_read_unlock();
2269                         break;
2270                 default:
2271                         dprintk("RPC: %5u %s: unknown auth error: %x\n",
2272                                         task->tk_pid, __func__, n);
2273                         error = -EIO;
2274                 }
2275                 dprintk("RPC: %5u %s: call rejected %d\n",
2276                                 task->tk_pid, __func__, n);
2277                 goto out_err;
2278         }
2279         p = rpcauth_checkverf(task, p);
2280         if (IS_ERR(p)) {
2281                 error = PTR_ERR(p);
2282                 dprintk("RPC: %5u %s: auth check failed with %d\n",
2283                                 task->tk_pid, __func__, error);
2284                 goto out_garbage;               /* bad verifier, retry */
2285         }
2286         len = p - (__be32 *)iov->iov_base - 1;
2287         if (len < 0)
2288                 goto out_overflow;
2289         switch ((n = ntohl(*p++))) {
2290         case RPC_SUCCESS:
2291                 return p;
2292         case RPC_PROG_UNAVAIL:
2293                 dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2294                                 "by server %s\n", task->tk_pid, __func__,
2295                                 (unsigned int)clnt->cl_prog,
2296                                 rcu_dereference(clnt->cl_xprt)->servername);
2297                 error = -EPFNOSUPPORT;
2298                 goto out_err;
2299         case RPC_PROG_MISMATCH:
2300                 dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2301                                 "by server %s\n", task->tk_pid, __func__,
2302                                 (unsigned int)clnt->cl_prog,
2303                                 (unsigned int)clnt->cl_vers,
2304                                 rcu_dereference(clnt->cl_xprt)->servername);
2305                 error = -EPROTONOSUPPORT;
2306                 goto out_err;
2307         case RPC_PROC_UNAVAIL:
2308                 dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2309                                 "version %u on server %s\n",
2310                                 task->tk_pid, __func__,
2311                                 rpc_proc_name(task),
2312                                 clnt->cl_prog, clnt->cl_vers,
2313                                 rcu_dereference(clnt->cl_xprt)->servername);
2314                 error = -EOPNOTSUPP;
2315                 goto out_err;
2316         case RPC_GARBAGE_ARGS:
2317                 dprintk("RPC: %5u %s: server saw garbage\n",
2318                                 task->tk_pid, __func__);
2319                 break;                  /* retry */
2320         default:
2321                 dprintk("RPC: %5u %s: server accept status: %x\n",
2322                                 task->tk_pid, __func__, n);
2323                 /* Also retry */
2324         }
2325
2326 out_garbage:
2327         clnt->cl_stats->rpcgarbage++;
2328         if (task->tk_garb_retry) {
2329                 task->tk_garb_retry--;
2330                 dprintk("RPC: %5u %s: retrying\n",
2331                                 task->tk_pid, __func__);
2332                 task->tk_action = call_bind;
2333 out_retry:
2334                 return ERR_PTR(-EAGAIN);
2335         }
2336 out_err:
2337         rpc_exit(task, error);
2338         dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2339                         __func__, error);
2340         return ERR_PTR(error);
2341 out_overflow:
2342         dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2343                         __func__);
2344         goto out_garbage;
2345 }
2346
2347 static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2348 {
2349 }
2350
2351 static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2352 {
2353         return 0;
2354 }
2355
2356 static struct rpc_procinfo rpcproc_null = {
2357         .p_encode = rpcproc_encode_null,
2358         .p_decode = rpcproc_decode_null,
2359 };
2360
2361 static int rpc_ping(struct rpc_clnt *clnt)
2362 {
2363         struct rpc_message msg = {
2364                 .rpc_proc = &rpcproc_null,
2365         };
2366         int err;
2367         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2368         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2369         put_rpccred(msg.rpc_cred);
2370         return err;
2371 }
2372
2373 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2374 {
2375         struct rpc_message msg = {
2376                 .rpc_proc = &rpcproc_null,
2377                 .rpc_cred = cred,
2378         };
2379         struct rpc_task_setup task_setup_data = {
2380                 .rpc_client = clnt,
2381                 .rpc_message = &msg,
2382                 .callback_ops = &rpc_default_ops,
2383                 .flags = flags,
2384         };
2385         return rpc_run_task(&task_setup_data);
2386 }
2387 EXPORT_SYMBOL_GPL(rpc_call_null);
2388
2389 #ifdef RPC_DEBUG
2390 static void rpc_show_header(void)
2391 {
2392         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2393                 "-timeout ---ops--\n");
2394 }
2395
2396 static void rpc_show_task(const struct rpc_clnt *clnt,
2397                           const struct rpc_task *task)
2398 {
2399         const char *rpc_waitq = "none";
2400
2401         if (RPC_IS_QUEUED(task))
2402                 rpc_waitq = rpc_qname(task->tk_waitqueue);
2403
2404         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2405                 task->tk_pid, task->tk_flags, task->tk_status,
2406                 clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2407                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2408                 task->tk_action, rpc_waitq);
2409 }
2410
2411 void rpc_show_tasks(struct net *net)
2412 {
2413         struct rpc_clnt *clnt;
2414         struct rpc_task *task;
2415         int header = 0;
2416         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2417
2418         spin_lock(&sn->rpc_client_lock);
2419         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2420                 spin_lock(&clnt->cl_lock);
2421                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2422                         if (!header) {
2423                                 rpc_show_header();
2424                                 header++;
2425                         }
2426                         rpc_show_task(clnt, task);
2427                 }
2428                 spin_unlock(&clnt->cl_lock);
2429         }
2430         spin_unlock(&sn->rpc_client_lock);
2431 }
2432 #endif