]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - fs/nfs/direct.c
DT: Add vendor prefix for Emerging Display Technologies
[karo-tx-linux.git] / fs / nfs / direct.c
1 /*
2  * linux/fs/nfs/direct.c
3  *
4  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
5  *
6  * High-performance uncached I/O for the Linux NFS client
7  *
8  * There are important applications whose performance or correctness
9  * depends on uncached access to file data.  Database clusters
10  * (multiple copies of the same instance running on separate hosts)
11  * implement their own cache coherency protocol that subsumes file
12  * system cache protocols.  Applications that process datasets
13  * considerably larger than the client's memory do not always benefit
14  * from a local cache.  A streaming video server, for instance, has no
15  * need to cache the contents of a file.
16  *
17  * When an application requests uncached I/O, all read and write requests
18  * are made directly to the server; data stored or fetched via these
19  * requests is not cached in the Linux page cache.  The client does not
20  * correct unaligned requests from applications.  All requested bytes are
21  * held on permanent storage before a direct write system call returns to
22  * an application.
23  *
24  * Solaris implements an uncached I/O facility called directio() that
25  * is used for backups and sequential I/O to very large files.  Solaris
26  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
27  * an undocumented mount option.
28  *
29  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
30  * help from Andrew Morton.
31  *
32  * 18 Dec 2001  Initial implementation for 2.4  --cel
33  * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
34  * 08 Jun 2003  Port to 2.5 APIs  --cel
35  * 31 Mar 2004  Handle direct I/O without VFS support  --cel
36  * 15 Sep 2004  Parallel async reads  --cel
37  * 04 May 2005  support O_DIRECT with aio  --cel
38  *
39  */
40
41 #include <linux/errno.h>
42 #include <linux/sched.h>
43 #include <linux/kernel.h>
44 #include <linux/file.h>
45 #include <linux/pagemap.h>
46 #include <linux/kref.h>
47 #include <linux/slab.h>
48 #include <linux/task_io_accounting_ops.h>
49 #include <linux/module.h>
50
51 #include <linux/nfs_fs.h>
52 #include <linux/nfs_page.h>
53 #include <linux/sunrpc/clnt.h>
54
55 #include <asm/uaccess.h>
56 #include <linux/atomic.h>
57
58 #include "internal.h"
59 #include "iostat.h"
60 #include "pnfs.h"
61
62 #define NFSDBG_FACILITY         NFSDBG_VFS
63
64 static struct kmem_cache *nfs_direct_cachep;
65
66 /*
67  * This represents a set of asynchronous requests that we're waiting on
68  */
69 struct nfs_direct_req {
70         struct kref             kref;           /* release manager */
71
72         /* I/O parameters */
73         struct nfs_open_context *ctx;           /* file open context info */
74         struct nfs_lock_context *l_ctx;         /* Lock context info */
75         struct kiocb *          iocb;           /* controlling i/o request */
76         struct inode *          inode;          /* target file of i/o */
77
78         /* completion state */
79         atomic_t                io_count;       /* i/os we're waiting for */
80         spinlock_t              lock;           /* protect completion state */
81         ssize_t                 count,          /* bytes actually processed */
82                                 bytes_left,     /* bytes left to be sent */
83                                 error;          /* any reported error */
84         struct completion       completion;     /* wait for i/o completion */
85
86         /* commit state */
87         struct nfs_mds_commit_info mds_cinfo;   /* Storage for cinfo */
88         struct pnfs_ds_commit_info ds_cinfo;    /* Storage for cinfo */
89         struct work_struct      work;
90         int                     flags;
91 #define NFS_ODIRECT_DO_COMMIT           (1)     /* an unstable reply was received */
92 #define NFS_ODIRECT_RESCHED_WRITES      (2)     /* write verification failed */
93         struct nfs_writeverf    verf;           /* unstable write verifier */
94 };
95
96 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
97 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops;
98 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
99 static void nfs_direct_write_schedule_work(struct work_struct *work);
100
101 static inline void get_dreq(struct nfs_direct_req *dreq)
102 {
103         atomic_inc(&dreq->io_count);
104 }
105
106 static inline int put_dreq(struct nfs_direct_req *dreq)
107 {
108         return atomic_dec_and_test(&dreq->io_count);
109 }
110
111 /**
112  * nfs_direct_IO - NFS address space operation for direct I/O
113  * @rw: direction (read or write)
114  * @iocb: target I/O control block
115  * @iov: array of vectors that define I/O buffer
116  * @pos: offset in file to begin the operation
117  * @nr_segs: size of iovec array
118  *
119  * The presence of this routine in the address space ops vector means
120  * the NFS client supports direct I/O. However, for most direct IO, we
121  * shunt off direct read and write requests before the VFS gets them,
122  * so this method is only ever called for swap.
123  */
124 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
125 {
126 #ifndef CONFIG_NFS_SWAP
127         dprintk("NFS: nfs_direct_IO (%pD) off/no(%Ld/%lu) EINVAL\n",
128                         iocb->ki_filp, (long long) pos, nr_segs);
129
130         return -EINVAL;
131 #else
132         VM_BUG_ON(iocb->ki_nbytes != PAGE_SIZE);
133
134         if (rw == READ || rw == KERNEL_READ)
135                 return nfs_file_direct_read(iocb, iov, nr_segs, pos,
136                                 rw == READ ? true : false);
137         return nfs_file_direct_write(iocb, iov, nr_segs, pos,
138                                 rw == WRITE ? true : false);
139 #endif /* CONFIG_NFS_SWAP */
140 }
141
142 static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
143 {
144         unsigned int i;
145         for (i = 0; i < npages; i++)
146                 page_cache_release(pages[i]);
147 }
148
149 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
150                               struct nfs_direct_req *dreq)
151 {
152         cinfo->lock = &dreq->lock;
153         cinfo->mds = &dreq->mds_cinfo;
154         cinfo->ds = &dreq->ds_cinfo;
155         cinfo->dreq = dreq;
156         cinfo->completion_ops = &nfs_direct_commit_completion_ops;
157 }
158
159 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
160 {
161         struct nfs_direct_req *dreq;
162
163         dreq = kmem_cache_zalloc(nfs_direct_cachep, GFP_KERNEL);
164         if (!dreq)
165                 return NULL;
166
167         kref_init(&dreq->kref);
168         kref_get(&dreq->kref);
169         init_completion(&dreq->completion);
170         INIT_LIST_HEAD(&dreq->mds_cinfo.list);
171         INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
172         spin_lock_init(&dreq->lock);
173
174         return dreq;
175 }
176
177 static void nfs_direct_req_free(struct kref *kref)
178 {
179         struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
180
181         if (dreq->l_ctx != NULL)
182                 nfs_put_lock_context(dreq->l_ctx);
183         if (dreq->ctx != NULL)
184                 put_nfs_open_context(dreq->ctx);
185         kmem_cache_free(nfs_direct_cachep, dreq);
186 }
187
188 static void nfs_direct_req_release(struct nfs_direct_req *dreq)
189 {
190         kref_put(&dreq->kref, nfs_direct_req_free);
191 }
192
193 ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq)
194 {
195         return dreq->bytes_left;
196 }
197 EXPORT_SYMBOL_GPL(nfs_dreq_bytes_left);
198
199 /*
200  * Collects and returns the final error value/byte-count.
201  */
202 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
203 {
204         ssize_t result = -EIOCBQUEUED;
205
206         /* Async requests don't wait here */
207         if (dreq->iocb)
208                 goto out;
209
210         result = wait_for_completion_killable(&dreq->completion);
211
212         if (!result)
213                 result = dreq->error;
214         if (!result)
215                 result = dreq->count;
216
217 out:
218         return (ssize_t) result;
219 }
220
221 /*
222  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
223  * the iocb is still valid here if this is a synchronous request.
224  */
225 static void nfs_direct_complete(struct nfs_direct_req *dreq)
226 {
227         if (dreq->iocb) {
228                 long res = (long) dreq->error;
229                 if (!res)
230                         res = (long) dreq->count;
231                 aio_complete(dreq->iocb, res, 0);
232         }
233         complete_all(&dreq->completion);
234
235         nfs_direct_req_release(dreq);
236 }
237
238 static void nfs_direct_readpage_release(struct nfs_page *req)
239 {
240         dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
241                 req->wb_context->dentry->d_inode->i_sb->s_id,
242                 (long long)NFS_FILEID(req->wb_context->dentry->d_inode),
243                 req->wb_bytes,
244                 (long long)req_offset(req));
245         nfs_release_request(req);
246 }
247
248 static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
249 {
250         unsigned long bytes = 0;
251         struct nfs_direct_req *dreq = hdr->dreq;
252
253         if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
254                 goto out_put;
255
256         spin_lock(&dreq->lock);
257         if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
258                 dreq->error = hdr->error;
259         else
260                 dreq->count += hdr->good_bytes;
261         spin_unlock(&dreq->lock);
262
263         while (!list_empty(&hdr->pages)) {
264                 struct nfs_page *req = nfs_list_entry(hdr->pages.next);
265                 struct page *page = req->wb_page;
266
267                 if (!PageCompound(page) && bytes < hdr->good_bytes)
268                         set_page_dirty(page);
269                 bytes += req->wb_bytes;
270                 nfs_list_remove_request(req);
271                 nfs_direct_readpage_release(req);
272         }
273 out_put:
274         if (put_dreq(dreq))
275                 nfs_direct_complete(dreq);
276         hdr->release(hdr);
277 }
278
279 static void nfs_read_sync_pgio_error(struct list_head *head)
280 {
281         struct nfs_page *req;
282
283         while (!list_empty(head)) {
284                 req = nfs_list_entry(head->next);
285                 nfs_list_remove_request(req);
286                 nfs_release_request(req);
287         }
288 }
289
290 static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
291 {
292         get_dreq(hdr->dreq);
293 }
294
295 static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
296         .error_cleanup = nfs_read_sync_pgio_error,
297         .init_hdr = nfs_direct_pgio_init,
298         .completion = nfs_direct_read_completion,
299 };
300
301 /*
302  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
303  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
304  * bail and stop sending more reads.  Read length accounting is
305  * handled automatically by nfs_direct_read_result().  Otherwise, if
306  * no requests have been sent, just return an error.
307  */
308 static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
309                                                 const struct iovec *iov,
310                                                 loff_t pos, bool uio)
311 {
312         struct nfs_direct_req *dreq = desc->pg_dreq;
313         struct nfs_open_context *ctx = dreq->ctx;
314         struct inode *inode = ctx->dentry->d_inode;
315         unsigned long user_addr = (unsigned long)iov->iov_base;
316         size_t count = iov->iov_len;
317         size_t rsize = NFS_SERVER(inode)->rsize;
318         unsigned int pgbase;
319         int result;
320         ssize_t started = 0;
321         struct page **pagevec = NULL;
322         unsigned int npages;
323
324         do {
325                 size_t bytes;
326                 int i;
327
328                 pgbase = user_addr & ~PAGE_MASK;
329                 bytes = min(max_t(size_t, rsize, PAGE_SIZE), count);
330
331                 result = -ENOMEM;
332                 npages = nfs_page_array_len(pgbase, bytes);
333                 if (!pagevec)
334                         pagevec = kmalloc(npages * sizeof(struct page *),
335                                           GFP_KERNEL);
336                 if (!pagevec)
337                         break;
338                 if (uio) {
339                         down_read(&current->mm->mmap_sem);
340                         result = get_user_pages(current, current->mm, user_addr,
341                                         npages, 1, 0, pagevec, NULL);
342                         up_read(&current->mm->mmap_sem);
343                         if (result < 0)
344                                 break;
345                 } else {
346                         WARN_ON(npages != 1);
347                         result = get_kernel_page(user_addr, 1, pagevec);
348                         if (WARN_ON(result != 1))
349                                 break;
350                 }
351
352                 if ((unsigned)result < npages) {
353                         bytes = result * PAGE_SIZE;
354                         if (bytes <= pgbase) {
355                                 nfs_direct_release_pages(pagevec, result);
356                                 break;
357                         }
358                         bytes -= pgbase;
359                         npages = result;
360                 }
361
362                 for (i = 0; i < npages; i++) {
363                         struct nfs_page *req;
364                         unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
365                         /* XXX do we need to do the eof zeroing found in async_filler? */
366                         req = nfs_create_request(dreq->ctx, dreq->inode,
367                                                  pagevec[i],
368                                                  pgbase, req_len);
369                         if (IS_ERR(req)) {
370                                 result = PTR_ERR(req);
371                                 break;
372                         }
373                         req->wb_index = pos >> PAGE_SHIFT;
374                         req->wb_offset = pos & ~PAGE_MASK;
375                         if (!nfs_pageio_add_request(desc, req)) {
376                                 result = desc->pg_error;
377                                 nfs_release_request(req);
378                                 break;
379                         }
380                         pgbase = 0;
381                         bytes -= req_len;
382                         started += req_len;
383                         user_addr += req_len;
384                         pos += req_len;
385                         count -= req_len;
386                         dreq->bytes_left -= req_len;
387                 }
388                 /* The nfs_page now hold references to these pages */
389                 nfs_direct_release_pages(pagevec, npages);
390         } while (count != 0 && result >= 0);
391
392         kfree(pagevec);
393
394         if (started)
395                 return started;
396         return result < 0 ? (ssize_t) result : -EFAULT;
397 }
398
399 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
400                                               const struct iovec *iov,
401                                               unsigned long nr_segs,
402                                               loff_t pos, bool uio)
403 {
404         struct nfs_pageio_descriptor desc;
405         ssize_t result = -EINVAL;
406         size_t requested_bytes = 0;
407         unsigned long seg;
408
409         NFS_PROTO(dreq->inode)->read_pageio_init(&desc, dreq->inode,
410                              &nfs_direct_read_completion_ops);
411         get_dreq(dreq);
412         desc.pg_dreq = dreq;
413
414         for (seg = 0; seg < nr_segs; seg++) {
415                 const struct iovec *vec = &iov[seg];
416                 result = nfs_direct_read_schedule_segment(&desc, vec, pos, uio);
417                 if (result < 0)
418                         break;
419                 requested_bytes += result;
420                 if ((size_t)result < vec->iov_len)
421                         break;
422                 pos += vec->iov_len;
423         }
424
425         nfs_pageio_complete(&desc);
426
427         /*
428          * If no bytes were started, return the error, and let the
429          * generic layer handle the completion.
430          */
431         if (requested_bytes == 0) {
432                 nfs_direct_req_release(dreq);
433                 return result < 0 ? result : -EIO;
434         }
435
436         if (put_dreq(dreq))
437                 nfs_direct_complete(dreq);
438         return 0;
439 }
440
441 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov,
442                                unsigned long nr_segs, loff_t pos, bool uio)
443 {
444         ssize_t result = -ENOMEM;
445         struct inode *inode = iocb->ki_filp->f_mapping->host;
446         struct nfs_direct_req *dreq;
447         struct nfs_lock_context *l_ctx;
448
449         dreq = nfs_direct_req_alloc();
450         if (dreq == NULL)
451                 goto out;
452
453         dreq->inode = inode;
454         dreq->bytes_left = iov_length(iov, nr_segs);
455         dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
456         l_ctx = nfs_get_lock_context(dreq->ctx);
457         if (IS_ERR(l_ctx)) {
458                 result = PTR_ERR(l_ctx);
459                 goto out_release;
460         }
461         dreq->l_ctx = l_ctx;
462         if (!is_sync_kiocb(iocb))
463                 dreq->iocb = iocb;
464
465         NFS_I(inode)->read_io += iov_length(iov, nr_segs);
466         result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos, uio);
467         if (!result)
468                 result = nfs_direct_wait(dreq);
469 out_release:
470         nfs_direct_req_release(dreq);
471 out:
472         return result;
473 }
474
475 static void nfs_inode_dio_write_done(struct inode *inode)
476 {
477         nfs_zap_mapping(inode, inode->i_mapping);
478         inode_dio_done(inode);
479 }
480
481 #if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
482 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
483 {
484         struct nfs_pageio_descriptor desc;
485         struct nfs_page *req, *tmp;
486         LIST_HEAD(reqs);
487         struct nfs_commit_info cinfo;
488         LIST_HEAD(failed);
489
490         nfs_init_cinfo_from_dreq(&cinfo, dreq);
491         pnfs_recover_commit_reqs(dreq->inode, &reqs, &cinfo);
492         spin_lock(cinfo.lock);
493         nfs_scan_commit_list(&cinfo.mds->list, &reqs, &cinfo, 0);
494         spin_unlock(cinfo.lock);
495
496         dreq->count = 0;
497         get_dreq(dreq);
498
499         NFS_PROTO(dreq->inode)->write_pageio_init(&desc, dreq->inode, FLUSH_STABLE,
500                               &nfs_direct_write_completion_ops);
501         desc.pg_dreq = dreq;
502
503         list_for_each_entry_safe(req, tmp, &reqs, wb_list) {
504                 if (!nfs_pageio_add_request(&desc, req)) {
505                         nfs_list_remove_request(req);
506                         nfs_list_add_request(req, &failed);
507                         spin_lock(cinfo.lock);
508                         dreq->flags = 0;
509                         dreq->error = -EIO;
510                         spin_unlock(cinfo.lock);
511                 }
512                 nfs_release_request(req);
513         }
514         nfs_pageio_complete(&desc);
515
516         while (!list_empty(&failed)) {
517                 req = nfs_list_entry(failed.next);
518                 nfs_list_remove_request(req);
519                 nfs_unlock_and_release_request(req);
520         }
521
522         if (put_dreq(dreq))
523                 nfs_direct_write_complete(dreq, dreq->inode);
524 }
525
526 static void nfs_direct_commit_complete(struct nfs_commit_data *data)
527 {
528         struct nfs_direct_req *dreq = data->dreq;
529         struct nfs_commit_info cinfo;
530         struct nfs_page *req;
531         int status = data->task.tk_status;
532
533         nfs_init_cinfo_from_dreq(&cinfo, dreq);
534         if (status < 0) {
535                 dprintk("NFS: %5u commit failed with error %d.\n",
536                         data->task.tk_pid, status);
537                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
538         } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
539                 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
540                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
541         }
542
543         dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status);
544         while (!list_empty(&data->pages)) {
545                 req = nfs_list_entry(data->pages.next);
546                 nfs_list_remove_request(req);
547                 if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
548                         /* Note the rewrite will go through mds */
549                         nfs_mark_request_commit(req, NULL, &cinfo);
550                 } else
551                         nfs_release_request(req);
552                 nfs_unlock_and_release_request(req);
553         }
554
555         if (atomic_dec_and_test(&cinfo.mds->rpcs_out))
556                 nfs_direct_write_complete(dreq, data->inode);
557 }
558
559 static void nfs_direct_error_cleanup(struct nfs_inode *nfsi)
560 {
561         /* There is no lock to clear */
562 }
563
564 static const struct nfs_commit_completion_ops nfs_direct_commit_completion_ops = {
565         .completion = nfs_direct_commit_complete,
566         .error_cleanup = nfs_direct_error_cleanup,
567 };
568
569 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
570 {
571         int res;
572         struct nfs_commit_info cinfo;
573         LIST_HEAD(mds_list);
574
575         nfs_init_cinfo_from_dreq(&cinfo, dreq);
576         nfs_scan_commit(dreq->inode, &mds_list, &cinfo);
577         res = nfs_generic_commit_list(dreq->inode, &mds_list, 0, &cinfo);
578         if (res < 0) /* res == -ENOMEM */
579                 nfs_direct_write_reschedule(dreq);
580 }
581
582 static void nfs_direct_write_schedule_work(struct work_struct *work)
583 {
584         struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
585         int flags = dreq->flags;
586
587         dreq->flags = 0;
588         switch (flags) {
589                 case NFS_ODIRECT_DO_COMMIT:
590                         nfs_direct_commit_schedule(dreq);
591                         break;
592                 case NFS_ODIRECT_RESCHED_WRITES:
593                         nfs_direct_write_reschedule(dreq);
594                         break;
595                 default:
596                         nfs_inode_dio_write_done(dreq->inode);
597                         nfs_direct_complete(dreq);
598         }
599 }
600
601 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
602 {
603         schedule_work(&dreq->work); /* Calls nfs_direct_write_schedule_work */
604 }
605
606 #else
607 static void nfs_direct_write_schedule_work(struct work_struct *work)
608 {
609 }
610
611 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
612 {
613         nfs_inode_dio_write_done(inode);
614         nfs_direct_complete(dreq);
615 }
616 #endif
617
618 /*
619  * NB: Return the value of the first error return code.  Subsequent
620  *     errors after the first one are ignored.
621  */
622 /*
623  * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
624  * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
625  * bail and stop sending more writes.  Write length accounting is
626  * handled automatically by nfs_direct_write_result().  Otherwise, if
627  * no requests have been sent, just return an error.
628  */
629 static ssize_t nfs_direct_write_schedule_segment(struct nfs_pageio_descriptor *desc,
630                                                  const struct iovec *iov,
631                                                  loff_t pos, bool uio)
632 {
633         struct nfs_direct_req *dreq = desc->pg_dreq;
634         struct nfs_open_context *ctx = dreq->ctx;
635         struct inode *inode = ctx->dentry->d_inode;
636         unsigned long user_addr = (unsigned long)iov->iov_base;
637         size_t count = iov->iov_len;
638         size_t wsize = NFS_SERVER(inode)->wsize;
639         unsigned int pgbase;
640         int result;
641         ssize_t started = 0;
642         struct page **pagevec = NULL;
643         unsigned int npages;
644
645         do {
646                 size_t bytes;
647                 int i;
648
649                 pgbase = user_addr & ~PAGE_MASK;
650                 bytes = min(max_t(size_t, wsize, PAGE_SIZE), count);
651
652                 result = -ENOMEM;
653                 npages = nfs_page_array_len(pgbase, bytes);
654                 if (!pagevec)
655                         pagevec = kmalloc(npages * sizeof(struct page *), GFP_KERNEL);
656                 if (!pagevec)
657                         break;
658
659                 if (uio) {
660                         down_read(&current->mm->mmap_sem);
661                         result = get_user_pages(current, current->mm, user_addr,
662                                                 npages, 0, 0, pagevec, NULL);
663                         up_read(&current->mm->mmap_sem);
664                         if (result < 0)
665                                 break;
666                 } else {
667                         WARN_ON(npages != 1);
668                         result = get_kernel_page(user_addr, 0, pagevec);
669                         if (WARN_ON(result != 1))
670                                 break;
671                 }
672
673                 if ((unsigned)result < npages) {
674                         bytes = result * PAGE_SIZE;
675                         if (bytes <= pgbase) {
676                                 nfs_direct_release_pages(pagevec, result);
677                                 break;
678                         }
679                         bytes -= pgbase;
680                         npages = result;
681                 }
682
683                 for (i = 0; i < npages; i++) {
684                         struct nfs_page *req;
685                         unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
686
687                         req = nfs_create_request(dreq->ctx, dreq->inode,
688                                                  pagevec[i],
689                                                  pgbase, req_len);
690                         if (IS_ERR(req)) {
691                                 result = PTR_ERR(req);
692                                 break;
693                         }
694                         nfs_lock_request(req);
695                         req->wb_index = pos >> PAGE_SHIFT;
696                         req->wb_offset = pos & ~PAGE_MASK;
697                         if (!nfs_pageio_add_request(desc, req)) {
698                                 result = desc->pg_error;
699                                 nfs_unlock_and_release_request(req);
700                                 break;
701                         }
702                         pgbase = 0;
703                         bytes -= req_len;
704                         started += req_len;
705                         user_addr += req_len;
706                         pos += req_len;
707                         count -= req_len;
708                         dreq->bytes_left -= req_len;
709                 }
710                 /* The nfs_page now hold references to these pages */
711                 nfs_direct_release_pages(pagevec, npages);
712         } while (count != 0 && result >= 0);
713
714         kfree(pagevec);
715
716         if (started)
717                 return started;
718         return result < 0 ? (ssize_t) result : -EFAULT;
719 }
720
721 static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
722 {
723         struct nfs_direct_req *dreq = hdr->dreq;
724         struct nfs_commit_info cinfo;
725         int bit = -1;
726         struct nfs_page *req = nfs_list_entry(hdr->pages.next);
727
728         if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
729                 goto out_put;
730
731         nfs_init_cinfo_from_dreq(&cinfo, dreq);
732
733         spin_lock(&dreq->lock);
734
735         if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
736                 dreq->flags = 0;
737                 dreq->error = hdr->error;
738         }
739         if (dreq->error != 0)
740                 bit = NFS_IOHDR_ERROR;
741         else {
742                 dreq->count += hdr->good_bytes;
743                 if (test_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags)) {
744                         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
745                         bit = NFS_IOHDR_NEED_RESCHED;
746                 } else if (test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags)) {
747                         if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
748                                 bit = NFS_IOHDR_NEED_RESCHED;
749                         else if (dreq->flags == 0) {
750                                 memcpy(&dreq->verf, hdr->verf,
751                                        sizeof(dreq->verf));
752                                 bit = NFS_IOHDR_NEED_COMMIT;
753                                 dreq->flags = NFS_ODIRECT_DO_COMMIT;
754                         } else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
755                                 if (memcmp(&dreq->verf, hdr->verf, sizeof(dreq->verf))) {
756                                         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
757                                         bit = NFS_IOHDR_NEED_RESCHED;
758                                 } else
759                                         bit = NFS_IOHDR_NEED_COMMIT;
760                         }
761                 }
762         }
763         spin_unlock(&dreq->lock);
764
765         while (!list_empty(&hdr->pages)) {
766                 req = nfs_list_entry(hdr->pages.next);
767                 nfs_list_remove_request(req);
768                 switch (bit) {
769                 case NFS_IOHDR_NEED_RESCHED:
770                 case NFS_IOHDR_NEED_COMMIT:
771                         kref_get(&req->wb_kref);
772                         nfs_mark_request_commit(req, hdr->lseg, &cinfo);
773                 }
774                 nfs_unlock_and_release_request(req);
775         }
776
777 out_put:
778         if (put_dreq(dreq))
779                 nfs_direct_write_complete(dreq, hdr->inode);
780         hdr->release(hdr);
781 }
782
783 static void nfs_write_sync_pgio_error(struct list_head *head)
784 {
785         struct nfs_page *req;
786
787         while (!list_empty(head)) {
788                 req = nfs_list_entry(head->next);
789                 nfs_list_remove_request(req);
790                 nfs_unlock_and_release_request(req);
791         }
792 }
793
794 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops = {
795         .error_cleanup = nfs_write_sync_pgio_error,
796         .init_hdr = nfs_direct_pgio_init,
797         .completion = nfs_direct_write_completion,
798 };
799
800 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
801                                                const struct iovec *iov,
802                                                unsigned long nr_segs,
803                                                loff_t pos, bool uio)
804 {
805         struct nfs_pageio_descriptor desc;
806         struct inode *inode = dreq->inode;
807         ssize_t result = 0;
808         size_t requested_bytes = 0;
809         unsigned long seg;
810
811         NFS_PROTO(inode)->write_pageio_init(&desc, inode, FLUSH_COND_STABLE,
812                               &nfs_direct_write_completion_ops);
813         desc.pg_dreq = dreq;
814         get_dreq(dreq);
815         atomic_inc(&inode->i_dio_count);
816
817         NFS_I(dreq->inode)->write_io += iov_length(iov, nr_segs);
818         for (seg = 0; seg < nr_segs; seg++) {
819                 const struct iovec *vec = &iov[seg];
820                 result = nfs_direct_write_schedule_segment(&desc, vec, pos, uio);
821                 if (result < 0)
822                         break;
823                 requested_bytes += result;
824                 if ((size_t)result < vec->iov_len)
825                         break;
826                 pos += vec->iov_len;
827         }
828         nfs_pageio_complete(&desc);
829
830         /*
831          * If no bytes were started, return the error, and let the
832          * generic layer handle the completion.
833          */
834         if (requested_bytes == 0) {
835                 inode_dio_done(inode);
836                 nfs_direct_req_release(dreq);
837                 return result < 0 ? result : -EIO;
838         }
839
840         if (put_dreq(dreq))
841                 nfs_direct_write_complete(dreq, dreq->inode);
842         return 0;
843 }
844
845 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov,
846                                 unsigned long nr_segs, loff_t pos,
847                                 size_t count, bool uio)
848 {
849         ssize_t result = -ENOMEM;
850         struct inode *inode = iocb->ki_filp->f_mapping->host;
851         struct nfs_direct_req *dreq;
852         struct nfs_lock_context *l_ctx;
853
854         dreq = nfs_direct_req_alloc();
855         if (!dreq)
856                 goto out;
857
858         dreq->inode = inode;
859         dreq->bytes_left = count;
860         dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
861         l_ctx = nfs_get_lock_context(dreq->ctx);
862         if (IS_ERR(l_ctx)) {
863                 result = PTR_ERR(l_ctx);
864                 goto out_release;
865         }
866         dreq->l_ctx = l_ctx;
867         if (!is_sync_kiocb(iocb))
868                 dreq->iocb = iocb;
869
870         result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, uio);
871         if (!result)
872                 result = nfs_direct_wait(dreq);
873 out_release:
874         nfs_direct_req_release(dreq);
875 out:
876         return result;
877 }
878
879 /**
880  * nfs_file_direct_read - file direct read operation for NFS files
881  * @iocb: target I/O control block
882  * @iov: vector of user buffers into which to read data
883  * @nr_segs: size of iov vector
884  * @pos: byte offset in file where reading starts
885  *
886  * We use this function for direct reads instead of calling
887  * generic_file_aio_read() in order to avoid gfar's check to see if
888  * the request starts before the end of the file.  For that check
889  * to work, we must generate a GETATTR before each direct read, and
890  * even then there is a window between the GETATTR and the subsequent
891  * READ where the file size could change.  Our preference is simply
892  * to do all reads the application wants, and the server will take
893  * care of managing the end of file boundary.
894  *
895  * This function also eliminates unnecessarily updating the file's
896  * atime locally, as the NFS server sets the file's atime, and this
897  * client must read the updated atime from the server back into its
898  * cache.
899  */
900 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov,
901                                 unsigned long nr_segs, loff_t pos, bool uio)
902 {
903         ssize_t retval = -EINVAL;
904         struct file *file = iocb->ki_filp;
905         struct address_space *mapping = file->f_mapping;
906         size_t count;
907
908         count = iov_length(iov, nr_segs);
909         nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
910
911         dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
912                 file, count, (long long) pos);
913
914         retval = 0;
915         if (!count)
916                 goto out;
917
918         retval = nfs_sync_mapping(mapping);
919         if (retval)
920                 goto out;
921
922         task_io_account_read(count);
923
924         retval = nfs_direct_read(iocb, iov, nr_segs, pos, uio);
925         if (retval > 0)
926                 iocb->ki_pos = pos + retval;
927
928 out:
929         return retval;
930 }
931
932 /**
933  * nfs_file_direct_write - file direct write operation for NFS files
934  * @iocb: target I/O control block
935  * @iov: vector of user buffers from which to write data
936  * @nr_segs: size of iov vector
937  * @pos: byte offset in file where writing starts
938  *
939  * We use this function for direct writes instead of calling
940  * generic_file_aio_write() in order to avoid taking the inode
941  * semaphore and updating the i_size.  The NFS server will set
942  * the new i_size and this client must read the updated size
943  * back into its cache.  We let the server do generic write
944  * parameter checking and report problems.
945  *
946  * We eliminate local atime updates, see direct read above.
947  *
948  * We avoid unnecessary page cache invalidations for normal cached
949  * readers of this file.
950  *
951  * Note that O_APPEND is not supported for NFS direct writes, as there
952  * is no atomic O_APPEND write facility in the NFS protocol.
953  */
954 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov,
955                                 unsigned long nr_segs, loff_t pos, bool uio)
956 {
957         ssize_t retval = -EINVAL;
958         struct file *file = iocb->ki_filp;
959         struct address_space *mapping = file->f_mapping;
960         size_t count;
961
962         count = iov_length(iov, nr_segs);
963         nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
964
965         dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
966                 file, count, (long long) pos);
967
968         retval = generic_write_checks(file, &pos, &count, 0);
969         if (retval)
970                 goto out;
971
972         retval = -EINVAL;
973         if ((ssize_t) count < 0)
974                 goto out;
975         retval = 0;
976         if (!count)
977                 goto out;
978
979         retval = nfs_sync_mapping(mapping);
980         if (retval)
981                 goto out;
982
983         task_io_account_write(count);
984
985         retval = nfs_direct_write(iocb, iov, nr_segs, pos, count, uio);
986         if (retval > 0) {
987                 struct inode *inode = mapping->host;
988
989                 iocb->ki_pos = pos + retval;
990                 spin_lock(&inode->i_lock);
991                 if (i_size_read(inode) < iocb->ki_pos)
992                         i_size_write(inode, iocb->ki_pos);
993                 spin_unlock(&inode->i_lock);
994         }
995 out:
996         return retval;
997 }
998
999 /**
1000  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
1001  *
1002  */
1003 int __init nfs_init_directcache(void)
1004 {
1005         nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
1006                                                 sizeof(struct nfs_direct_req),
1007                                                 0, (SLAB_RECLAIM_ACCOUNT|
1008                                                         SLAB_MEM_SPREAD),
1009                                                 NULL);
1010         if (nfs_direct_cachep == NULL)
1011                 return -ENOMEM;
1012
1013         return 0;
1014 }
1015
1016 /**
1017  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
1018  *
1019  */
1020 void nfs_destroy_directcache(void)
1021 {
1022         kmem_cache_destroy(nfs_direct_cachep);
1023 }