]> git.kernelconcepts.de Git - karo-tx-linux.git/commitdiff
RAID5: batch adjacent full stripe write
authorshli@kernel.org <shli@kernel.org>
Mon, 15 Dec 2014 01:57:03 +0000 (12:57 +1100)
committerNeilBrown <neilb@suse.de>
Tue, 21 Apr 2015 22:00:41 +0000 (08:00 +1000)
stripe cache is 4k size. Even adjacent full stripe writes are handled in 4k
unit. Idealy we should use big size for adjacent full stripe writes. Bigger
stripe cache size means less stripes runing in the state machine so can reduce
cpu overhead. And also bigger size can cause bigger IO size dispatched to under
layer disks.

With below patch, we will automatically batch adjacent full stripe write
together. Such stripes will be added to the batch list. Only the first stripe
of the list will be put to handle_list and so run handle_stripe(). Some steps
of handle_stripe() are extended to cover all stripes of the list, including
ops_run_io, ops_run_biodrain and so on. With this patch, we have less stripes
running in handle_stripe() and we send IO of whole stripe list together to
increase IO size.

Stripes added to a batch list have some limitations. A batch list can only
include full stripe write and can't cross chunk boundary to make sure stripes
have the same parity disks. Stripes in a batch list must be in the same state
(no written, toread and so on). If a stripe is in a batch list, all new
read/write to add_stripe_bio will be blocked to overlap conflict till the batch
list is handled. The limitations will make sure stripes in a batch list be in
exactly the same state in the life circly.

I did test running 160k randwrite in a RAID5 array with 32k chunk size and 6
PCIe SSD. This patch improves around 30% performance and IO size to under layer
disk is exactly 32k. I also run a 4k randwrite test in the same array to make
sure the performance isn't changed with the patch.

Signed-off-by: Shaohua Li <shli@fusionio.com>
Signed-off-by: NeilBrown <neilb@suse.de>
drivers/md/raid5.c
drivers/md/raid5.h

index e801c6669c6d41a6884289a12d7c9b8aa7e3f17f..717189e742434a68d36e420b43e32183fff08324 100644 (file)
@@ -526,6 +526,7 @@ static void init_stripe(struct stripe_head *sh, sector_t sector, int previous)
        BUG_ON(atomic_read(&sh->count) != 0);
        BUG_ON(test_bit(STRIPE_HANDLE, &sh->state));
        BUG_ON(stripe_operations_active(sh));
+       BUG_ON(sh->batch_head);
 
        pr_debug("init_stripe called, stripe %llu\n",
                (unsigned long long)sector);
@@ -717,6 +718,124 @@ static bool is_full_stripe_write(struct stripe_head *sh)
        return sh->overwrite_disks == (sh->disks - sh->raid_conf->max_degraded);
 }
 
+static void lock_two_stripes(struct stripe_head *sh1, struct stripe_head *sh2)
+{
+       local_irq_disable();
+       if (sh1 > sh2) {
+               spin_lock(&sh2->stripe_lock);
+               spin_lock_nested(&sh1->stripe_lock, 1);
+       } else {
+               spin_lock(&sh1->stripe_lock);
+               spin_lock_nested(&sh2->stripe_lock, 1);
+       }
+}
+
+static void unlock_two_stripes(struct stripe_head *sh1, struct stripe_head *sh2)
+{
+       spin_unlock(&sh1->stripe_lock);
+       spin_unlock(&sh2->stripe_lock);
+       local_irq_enable();
+}
+
+/* Only freshly new full stripe normal write stripe can be added to a batch list */
+static bool stripe_can_batch(struct stripe_head *sh)
+{
+       return test_bit(STRIPE_BATCH_READY, &sh->state) &&
+               is_full_stripe_write(sh);
+}
+
+/* we only do back search */
+static void stripe_add_to_batch_list(struct r5conf *conf, struct stripe_head *sh)
+{
+       struct stripe_head *head;
+       sector_t head_sector, tmp_sec;
+       int hash;
+       int dd_idx;
+
+       if (!stripe_can_batch(sh))
+               return;
+       /* Don't cross chunks, so stripe pd_idx/qd_idx is the same */
+       tmp_sec = sh->sector;
+       if (!sector_div(tmp_sec, conf->chunk_sectors))
+               return;
+       head_sector = sh->sector - STRIPE_SECTORS;
+
+       hash = stripe_hash_locks_hash(head_sector);
+       spin_lock_irq(conf->hash_locks + hash);
+       head = __find_stripe(conf, head_sector, conf->generation);
+       if (head && !atomic_inc_not_zero(&head->count)) {
+               spin_lock(&conf->device_lock);
+               if (!atomic_read(&head->count)) {
+                       if (!test_bit(STRIPE_HANDLE, &head->state))
+                               atomic_inc(&conf->active_stripes);
+                       BUG_ON(list_empty(&head->lru) &&
+                              !test_bit(STRIPE_EXPANDING, &head->state));
+                       list_del_init(&head->lru);
+                       if (head->group) {
+                               head->group->stripes_cnt--;
+                               head->group = NULL;
+                       }
+               }
+               atomic_inc(&head->count);
+               spin_unlock(&conf->device_lock);
+       }
+       spin_unlock_irq(conf->hash_locks + hash);
+
+       if (!head)
+               return;
+       if (!stripe_can_batch(head))
+               goto out;
+
+       lock_two_stripes(head, sh);
+       /* clear_batch_ready clear the flag */
+       if (!stripe_can_batch(head) || !stripe_can_batch(sh))
+               goto unlock_out;
+
+       if (sh->batch_head)
+               goto unlock_out;
+
+       dd_idx = 0;
+       while (dd_idx == sh->pd_idx || dd_idx == sh->qd_idx)
+               dd_idx++;
+       if (head->dev[dd_idx].towrite->bi_rw != sh->dev[dd_idx].towrite->bi_rw)
+               goto unlock_out;
+
+       if (head->batch_head) {
+               spin_lock(&head->batch_head->batch_lock);
+               /* This batch list is already running */
+               if (!stripe_can_batch(head)) {
+                       spin_unlock(&head->batch_head->batch_lock);
+                       goto unlock_out;
+               }
+
+               /*
+                * at this point, head's BATCH_READY could be cleared, but we
+                * can still add the stripe to batch list
+                */
+               list_add(&sh->batch_list, &head->batch_list);
+               spin_unlock(&head->batch_head->batch_lock);
+
+               sh->batch_head = head->batch_head;
+       } else {
+               head->batch_head = head;
+               sh->batch_head = head->batch_head;
+               spin_lock(&head->batch_lock);
+               list_add_tail(&sh->batch_list, &head->batch_list);
+               spin_unlock(&head->batch_lock);
+       }
+
+       if (test_and_clear_bit(STRIPE_PREREAD_ACTIVE, &sh->state))
+               if (atomic_dec_return(&conf->preread_active_stripes)
+                   < IO_THRESHOLD)
+                       md_wakeup_thread(conf->mddev->thread);
+
+       atomic_inc(&sh->count);
+unlock_out:
+       unlock_two_stripes(head, sh);
+out:
+       release_stripe(head);
+}
+
 /* Determine if 'data_offset' or 'new_data_offset' should be used
  * in this stripe_head.
  */
@@ -747,6 +866,7 @@ static void ops_run_io(struct stripe_head *sh, struct stripe_head_state *s)
 {
        struct r5conf *conf = sh->raid_conf;
        int i, disks = sh->disks;
+       struct stripe_head *head_sh = sh;
 
        might_sleep();
 
@@ -755,6 +875,8 @@ static void ops_run_io(struct stripe_head *sh, struct stripe_head_state *s)
                int replace_only = 0;
                struct bio *bi, *rbi;
                struct md_rdev *rdev, *rrdev = NULL;
+
+               sh = head_sh;
                if (test_and_clear_bit(R5_Wantwrite, &sh->dev[i].flags)) {
                        if (test_and_clear_bit(R5_WantFUA, &sh->dev[i].flags))
                                rw = WRITE_FUA;
@@ -773,6 +895,7 @@ static void ops_run_io(struct stripe_head *sh, struct stripe_head_state *s)
                if (test_and_clear_bit(R5_SyncIO, &sh->dev[i].flags))
                        rw |= REQ_SYNC;
 
+again:
                bi = &sh->dev[i].req;
                rbi = &sh->dev[i].rreq; /* For writing to replacement */
 
@@ -791,7 +914,7 @@ static void ops_run_io(struct stripe_head *sh, struct stripe_head_state *s)
                                /* We raced and saw duplicates */
                                rrdev = NULL;
                } else {
-                       if (test_bit(R5_ReadRepl, &sh->dev[i].flags) && rrdev)
+                       if (test_bit(R5_ReadRepl, &head_sh->dev[i].flags) && rrdev)
                                rdev = rrdev;
                        rrdev = NULL;
                }
@@ -862,13 +985,15 @@ static void ops_run_io(struct stripe_head *sh, struct stripe_head_state *s)
                                __func__, (unsigned long long)sh->sector,
                                bi->bi_rw, i);
                        atomic_inc(&sh->count);
+                       if (sh != head_sh)
+                               atomic_inc(&head_sh->count);
                        if (use_new_offset(conf, sh))
                                bi->bi_iter.bi_sector = (sh->sector
                                                 + rdev->new_data_offset);
                        else
                                bi->bi_iter.bi_sector = (sh->sector
                                                 + rdev->data_offset);
-                       if (test_bit(R5_ReadNoMerge, &sh->dev[i].flags))
+                       if (test_bit(R5_ReadNoMerge, &head_sh->dev[i].flags))
                                bi->bi_rw |= REQ_NOMERGE;
 
                        if (test_bit(R5_SkipCopy, &sh->dev[i].flags))
@@ -912,6 +1037,8 @@ static void ops_run_io(struct stripe_head *sh, struct stripe_head_state *s)
                                __func__, (unsigned long long)sh->sector,
                                rbi->bi_rw, i);
                        atomic_inc(&sh->count);
+                       if (sh != head_sh)
+                               atomic_inc(&head_sh->count);
                        if (use_new_offset(conf, sh))
                                rbi->bi_iter.bi_sector = (sh->sector
                                                  + rrdev->new_data_offset);
@@ -945,6 +1072,13 @@ static void ops_run_io(struct stripe_head *sh, struct stripe_head_state *s)
                        clear_bit(R5_LOCKED, &sh->dev[i].flags);
                        set_bit(STRIPE_HANDLE, &sh->state);
                }
+
+               if (!head_sh->batch_head)
+                       continue;
+               sh = list_first_entry(&sh->batch_list, struct stripe_head,
+                                     batch_list);
+               if (sh != head_sh)
+                       goto again;
        }
 }
 
@@ -1060,6 +1194,7 @@ static void ops_run_biofill(struct stripe_head *sh)
        struct async_submit_ctl submit;
        int i;
 
+       BUG_ON(sh->batch_head);
        pr_debug("%s: stripe %llu\n", __func__,
                (unsigned long long)sh->sector);
 
@@ -1148,6 +1283,8 @@ ops_run_compute5(struct stripe_head *sh, struct raid5_percpu *percpu)
        struct async_submit_ctl submit;
        int i;
 
+       BUG_ON(sh->batch_head);
+
        pr_debug("%s: stripe %llu block: %d\n",
                __func__, (unsigned long long)sh->sector, target);
        BUG_ON(!test_bit(R5_Wantcompute, &tgt->flags));
@@ -1214,6 +1351,7 @@ ops_run_compute6_1(struct stripe_head *sh, struct raid5_percpu *percpu)
        int i;
        int count;
 
+       BUG_ON(sh->batch_head);
        if (sh->ops.target < 0)
                target = sh->ops.target2;
        else if (sh->ops.target2 < 0)
@@ -1272,6 +1410,7 @@ ops_run_compute6_2(struct stripe_head *sh, struct raid5_percpu *percpu)
        struct page **blocks = to_addr_page(percpu, 0);
        struct async_submit_ctl submit;
 
+       BUG_ON(sh->batch_head);
        pr_debug("%s: stripe %llu block1: %d block2: %d\n",
                 __func__, (unsigned long long)sh->sector, target, target2);
        BUG_ON(target < 0 || target2 < 0);
@@ -1384,6 +1523,7 @@ ops_run_prexor(struct stripe_head *sh, struct raid5_percpu *percpu,
        /* existing parity data subtracted */
        struct page *xor_dest = xor_srcs[count++] = sh->dev[pd_idx].page;
 
+       BUG_ON(sh->batch_head);
        pr_debug("%s: stripe %llu\n", __func__,
                (unsigned long long)sh->sector);
 
@@ -1406,17 +1546,21 @@ ops_run_biodrain(struct stripe_head *sh, struct dma_async_tx_descriptor *tx)
 {
        int disks = sh->disks;
        int i;
+       struct stripe_head *head_sh = sh;
 
        pr_debug("%s: stripe %llu\n", __func__,
                (unsigned long long)sh->sector);
 
        for (i = disks; i--; ) {
-               struct r5dev *dev = &sh->dev[i];
+               struct r5dev *dev;
                struct bio *chosen;
 
-               if (test_and_clear_bit(R5_Wantdrain, &dev->flags)) {
+               sh = head_sh;
+               if (test_and_clear_bit(R5_Wantdrain, &head_sh->dev[i].flags)) {
                        struct bio *wbi;
 
+again:
+                       dev = &sh->dev[i];
                        spin_lock_irq(&sh->stripe_lock);
                        chosen = dev->towrite;
                        dev->towrite = NULL;
@@ -1445,6 +1589,15 @@ ops_run_biodrain(struct stripe_head *sh, struct dma_async_tx_descriptor *tx)
                                }
                                wbi = r5_next_bio(wbi, dev->sector);
                        }
+
+                       if (head_sh->batch_head) {
+                               sh = list_first_entry(&sh->batch_list,
+                                                     struct stripe_head,
+                                                     batch_list);
+                               if (sh == head_sh)
+                                       continue;
+                               goto again;
+                       }
                }
        }
 
@@ -1500,12 +1653,15 @@ ops_run_reconstruct5(struct stripe_head *sh, struct raid5_percpu *percpu,
                     struct dma_async_tx_descriptor *tx)
 {
        int disks = sh->disks;
-       struct page **xor_srcs = to_addr_page(percpu, 0);
+       struct page **xor_srcs;
        struct async_submit_ctl submit;
-       int count = 0, pd_idx = sh->pd_idx, i;
+       int count, pd_idx = sh->pd_idx, i;
        struct page *xor_dest;
        int prexor = 0;
        unsigned long flags;
+       int j = 0;
+       struct stripe_head *head_sh = sh;
+       int last_stripe;
 
        pr_debug("%s: stripe %llu\n", __func__,
                (unsigned long long)sh->sector);
@@ -1522,15 +1678,18 @@ ops_run_reconstruct5(struct stripe_head *sh, struct raid5_percpu *percpu,
                ops_complete_reconstruct(sh);
                return;
        }
+again:
+       count = 0;
+       xor_srcs = to_addr_page(percpu, j);
        /* check if prexor is active which means only process blocks
         * that are part of a read-modify-write (written)
         */
-       if (sh->reconstruct_state == reconstruct_state_prexor_drain_run) {
+       if (head_sh->reconstruct_state == reconstruct_state_prexor_drain_run) {
                prexor = 1;
                xor_dest = xor_srcs[count++] = sh->dev[pd_idx].page;
                for (i = disks; i--; ) {
                        struct r5dev *dev = &sh->dev[i];
-                       if (dev->written)
+                       if (head_sh->dev[i].written)
                                xor_srcs[count++] = dev->page;
                }
        } else {
@@ -1547,17 +1706,32 @@ ops_run_reconstruct5(struct stripe_head *sh, struct raid5_percpu *percpu,
         * set ASYNC_TX_XOR_DROP_DST and ASYNC_TX_XOR_ZERO_DST
         * for the synchronous xor case
         */
-       flags = ASYNC_TX_ACK |
-               (prexor ? ASYNC_TX_XOR_DROP_DST : ASYNC_TX_XOR_ZERO_DST);
-
-       atomic_inc(&sh->count);
+       last_stripe = !head_sh->batch_head ||
+               list_first_entry(&sh->batch_list,
+                                struct stripe_head, batch_list) == head_sh;
+       if (last_stripe) {
+               flags = ASYNC_TX_ACK |
+                       (prexor ? ASYNC_TX_XOR_DROP_DST : ASYNC_TX_XOR_ZERO_DST);
+
+               atomic_inc(&head_sh->count);
+               init_async_submit(&submit, flags, tx, ops_complete_reconstruct, head_sh,
+                                 to_addr_conv(sh, percpu, j));
+       } else {
+               flags = prexor ? ASYNC_TX_XOR_DROP_DST : ASYNC_TX_XOR_ZERO_DST;
+               init_async_submit(&submit, flags, tx, NULL, NULL,
+                                 to_addr_conv(sh, percpu, j));
+       }
 
-       init_async_submit(&submit, flags, tx, ops_complete_reconstruct, sh,
-                         to_addr_conv(sh, percpu, 0));
        if (unlikely(count == 1))
                tx = async_memcpy(xor_dest, xor_srcs[0], 0, 0, STRIPE_SIZE, &submit);
        else
                tx = async_xor(xor_dest, xor_srcs, 0, count, STRIPE_SIZE, &submit);
+       if (!last_stripe) {
+               j++;
+               sh = list_first_entry(&sh->batch_list, struct stripe_head,
+                                     batch_list);
+               goto again;
+       }
 }
 
 static void
@@ -1565,8 +1739,10 @@ ops_run_reconstruct6(struct stripe_head *sh, struct raid5_percpu *percpu,
                     struct dma_async_tx_descriptor *tx)
 {
        struct async_submit_ctl submit;
-       struct page **blocks = to_addr_page(percpu, 0);
-       int count, i;
+       struct page **blocks;
+       int count, i, j = 0;
+       struct stripe_head *head_sh = sh;
+       int last_stripe;
 
        pr_debug("%s: stripe %llu\n", __func__, (unsigned long long)sh->sector);
 
@@ -1584,13 +1760,27 @@ ops_run_reconstruct6(struct stripe_head *sh, struct raid5_percpu *percpu,
                return;
        }
 
+again:
+       blocks = to_addr_page(percpu, j);
        count = set_syndrome_sources(blocks, sh);
-
-       atomic_inc(&sh->count);
-
-       init_async_submit(&submit, ASYNC_TX_ACK, tx, ops_complete_reconstruct,
-                         sh, to_addr_conv(sh, percpu, 0));
+       last_stripe = !head_sh->batch_head ||
+               list_first_entry(&sh->batch_list,
+                                struct stripe_head, batch_list) == head_sh;
+
+       if (last_stripe) {
+               atomic_inc(&head_sh->count);
+               init_async_submit(&submit, ASYNC_TX_ACK, tx, ops_complete_reconstruct,
+                                 head_sh, to_addr_conv(sh, percpu, j));
+       } else
+               init_async_submit(&submit, 0, tx, NULL, NULL,
+                                 to_addr_conv(sh, percpu, j));
        async_gen_syndrome(blocks, 0, count+2, STRIPE_SIZE,  &submit);
+       if (!last_stripe) {
+               j++;
+               sh = list_first_entry(&sh->batch_list, struct stripe_head,
+                                     batch_list);
+               goto again;
+       }
 }
 
 static void ops_complete_check(void *stripe_head_ref)
@@ -1620,6 +1810,7 @@ static void ops_run_check_p(struct stripe_head *sh, struct raid5_percpu *percpu)
        pr_debug("%s: stripe %llu\n", __func__,
                (unsigned long long)sh->sector);
 
+       BUG_ON(sh->batch_head);
        count = 0;
        xor_dest = sh->dev[pd_idx].page;
        xor_srcs[count++] = xor_dest;
@@ -1648,6 +1839,7 @@ static void ops_run_check_pq(struct stripe_head *sh, struct raid5_percpu *percpu
        pr_debug("%s: stripe %llu checkp: %d\n", __func__,
                (unsigned long long)sh->sector, checkp);
 
+       BUG_ON(sh->batch_head);
        count = set_syndrome_sources(srcs, sh);
        if (!checkp)
                srcs[count] = NULL;
@@ -1715,7 +1907,7 @@ static void raid_run_ops(struct stripe_head *sh, unsigned long ops_request)
                        BUG();
        }
 
-       if (overlap_clear)
+       if (overlap_clear && !sh->batch_head)
                for (i = disks; i--; ) {
                        struct r5dev *dev = &sh->dev[i];
                        if (test_and_clear_bit(R5_Overlap, &dev->flags))
@@ -1745,6 +1937,10 @@ static int grow_one_stripe(struct r5conf *conf, int hash)
        atomic_set(&sh->count, 1);
        atomic_inc(&conf->active_stripes);
        INIT_LIST_HEAD(&sh->lru);
+
+       spin_lock_init(&sh->batch_lock);
+       INIT_LIST_HEAD(&sh->batch_list);
+       sh->batch_head = NULL;
        release_stripe(sh);
        return 1;
 }
@@ -2188,6 +2384,9 @@ static void raid5_end_write_request(struct bio *bi, int error)
                clear_bit(R5_LOCKED, &sh->dev[i].flags);
        set_bit(STRIPE_HANDLE, &sh->state);
        release_stripe(sh);
+
+       if (sh->batch_head && sh != sh->batch_head)
+               release_stripe(sh->batch_head);
 }
 
 static sector_t compute_blocknr(struct stripe_head *sh, int i, int previous);
@@ -2674,6 +2873,9 @@ static int add_stripe_bio(struct stripe_head *sh, struct bio *bi, int dd_idx,
         * protect it.
         */
        spin_lock_irq(&sh->stripe_lock);
+       /* Don't allow new IO added to stripes in batch list */
+       if (sh->batch_head)
+               goto overlap;
        if (forwrite) {
                bip = &sh->dev[dd_idx].towrite;
                if (*bip == NULL)
@@ -2723,6 +2925,9 @@ static int add_stripe_bio(struct stripe_head *sh, struct bio *bi, int dd_idx,
                sh->bm_seq = conf->seq_flush+1;
                set_bit(STRIPE_BIT_DELAY, &sh->state);
        }
+
+       if (stripe_can_batch(sh))
+               stripe_add_to_batch_list(conf, sh);
        return 1;
 
  overlap:
@@ -2755,6 +2960,7 @@ handle_failed_stripe(struct r5conf *conf, struct stripe_head *sh,
                                struct bio **return_bi)
 {
        int i;
+       BUG_ON(sh->batch_head);
        for (i = disks; i--; ) {
                struct bio *bi;
                int bitmap_end = 0;
@@ -2870,6 +3076,7 @@ handle_failed_sync(struct r5conf *conf, struct stripe_head *sh,
        int abort = 0;
        int i;
 
+       BUG_ON(sh->batch_head);
        clear_bit(STRIPE_SYNCING, &sh->state);
        if (test_and_clear_bit(R5_Overlap, &sh->dev[sh->pd_idx].flags))
                wake_up(&conf->wait_for_overlap);
@@ -3100,6 +3307,7 @@ static void handle_stripe_fill(struct stripe_head *sh,
 {
        int i;
 
+       BUG_ON(sh->batch_head);
        /* look for blocks to read/compute, skip this if a compute
         * is already in flight, or if the stripe contents are in the
         * midst of changing due to a write
@@ -3123,6 +3331,9 @@ static void handle_stripe_clean_event(struct r5conf *conf,
        int i;
        struct r5dev *dev;
        int discard_pending = 0;
+       struct stripe_head *head_sh = sh;
+       bool do_endio = false;
+       int wakeup_nr = 0;
 
        for (i = disks; i--; )
                if (sh->dev[i].written) {
@@ -3138,8 +3349,11 @@ static void handle_stripe_clean_event(struct r5conf *conf,
                                        clear_bit(R5_UPTODATE, &dev->flags);
                                if (test_and_clear_bit(R5_SkipCopy, &dev->flags)) {
                                        WARN_ON(test_bit(R5_UPTODATE, &dev->flags));
-                                       dev->page = dev->orig_page;
                                }
+                               do_endio = true;
+
+returnbi:
+                               dev->page = dev->orig_page;
                                wbi = dev->written;
                                dev->written = NULL;
                                while (wbi && wbi->bi_iter.bi_sector <
@@ -3156,6 +3370,17 @@ static void handle_stripe_clean_event(struct r5conf *conf,
                                                STRIPE_SECTORS,
                                         !test_bit(STRIPE_DEGRADED, &sh->state),
                                                0);
+                               if (head_sh->batch_head) {
+                                       sh = list_first_entry(&sh->batch_list,
+                                                             struct stripe_head,
+                                                             batch_list);
+                                       if (sh != head_sh) {
+                                               dev = &sh->dev[i];
+                                               goto returnbi;
+                                       }
+                               }
+                               sh = head_sh;
+                               dev = &sh->dev[i];
                        } else if (test_bit(R5_Discard, &dev->flags))
                                discard_pending = 1;
                        WARN_ON(test_bit(R5_SkipCopy, &dev->flags));
@@ -3177,8 +3402,17 @@ static void handle_stripe_clean_event(struct r5conf *conf,
                 * will be reinitialized
                 */
                spin_lock_irq(&conf->device_lock);
+unhash:
                remove_hash(sh);
+               if (head_sh->batch_head) {
+                       sh = list_first_entry(&sh->batch_list,
+                                             struct stripe_head, batch_list);
+                       if (sh != head_sh)
+                                       goto unhash;
+               }
                spin_unlock_irq(&conf->device_lock);
+               sh = head_sh;
+
                if (test_bit(STRIPE_SYNC_REQUESTED, &sh->state))
                        set_bit(STRIPE_HANDLE, &sh->state);
 
@@ -3187,6 +3421,39 @@ static void handle_stripe_clean_event(struct r5conf *conf,
        if (test_and_clear_bit(STRIPE_FULL_WRITE, &sh->state))
                if (atomic_dec_and_test(&conf->pending_full_writes))
                        md_wakeup_thread(conf->mddev->thread);
+
+       if (!head_sh->batch_head || !do_endio)
+               return;
+       for (i = 0; i < head_sh->disks; i++) {
+               if (test_and_clear_bit(R5_Overlap, &head_sh->dev[i].flags))
+                       wakeup_nr++;
+       }
+       while (!list_empty(&head_sh->batch_list)) {
+               int i;
+               sh = list_first_entry(&head_sh->batch_list,
+                                     struct stripe_head, batch_list);
+               list_del_init(&sh->batch_list);
+
+               sh->state = head_sh->state & (~((1 << STRIPE_ACTIVE) |
+                       (1 << STRIPE_PREREAD_ACTIVE)));
+               sh->check_state = head_sh->check_state;
+               sh->reconstruct_state = head_sh->reconstruct_state;
+               for (i = 0; i < sh->disks; i++) {
+                       if (test_and_clear_bit(R5_Overlap, &sh->dev[i].flags))
+                               wakeup_nr++;
+                       sh->dev[i].flags = head_sh->dev[i].flags;
+               }
+
+               spin_lock_irq(&sh->stripe_lock);
+               sh->batch_head = NULL;
+               spin_unlock_irq(&sh->stripe_lock);
+               release_stripe(sh);
+       }
+
+       spin_lock_irq(&head_sh->stripe_lock);
+       head_sh->batch_head = NULL;
+       spin_unlock_irq(&head_sh->stripe_lock);
+       wake_up_nr(&conf->wait_for_overlap, wakeup_nr);
 }
 
 static void handle_stripe_dirtying(struct r5conf *conf,
@@ -3326,6 +3593,7 @@ static void handle_parity_checks5(struct r5conf *conf, struct stripe_head *sh,
 {
        struct r5dev *dev = NULL;
 
+       BUG_ON(sh->batch_head);
        set_bit(STRIPE_HANDLE, &sh->state);
 
        switch (sh->check_state) {
@@ -3416,6 +3684,7 @@ static void handle_parity_checks6(struct r5conf *conf, struct stripe_head *sh,
        int qd_idx = sh->qd_idx;
        struct r5dev *dev;
 
+       BUG_ON(sh->batch_head);
        set_bit(STRIPE_HANDLE, &sh->state);
 
        BUG_ON(s->failed > 2);
@@ -3579,6 +3848,7 @@ static void handle_stripe_expansion(struct r5conf *conf, struct stripe_head *sh)
         * copy some of them into a target stripe for expand.
         */
        struct dma_async_tx_descriptor *tx = NULL;
+       BUG_ON(sh->batch_head);
        clear_bit(STRIPE_EXPAND_SOURCE, &sh->state);
        for (i = 0; i < sh->disks; i++)
                if (i != sh->pd_idx && i != sh->qd_idx) {
@@ -3822,6 +4092,38 @@ static void analyse_stripe(struct stripe_head *sh, struct stripe_head_state *s)
        rcu_read_unlock();
 }
 
+static int clear_batch_ready(struct stripe_head *sh)
+{
+       struct stripe_head *tmp;
+       if (!test_and_clear_bit(STRIPE_BATCH_READY, &sh->state))
+               return 0;
+       spin_lock(&sh->stripe_lock);
+       if (!sh->batch_head) {
+               spin_unlock(&sh->stripe_lock);
+               return 0;
+       }
+
+       /*
+        * this stripe could be added to a batch list before we check
+        * BATCH_READY, skips it
+        */
+       if (sh->batch_head != sh) {
+               spin_unlock(&sh->stripe_lock);
+               return 1;
+       }
+       spin_lock(&sh->batch_lock);
+       list_for_each_entry(tmp, &sh->batch_list, batch_list)
+               clear_bit(STRIPE_BATCH_READY, &tmp->state);
+       spin_unlock(&sh->batch_lock);
+       spin_unlock(&sh->stripe_lock);
+
+       /*
+        * BATCH_READY is cleared, no new stripes can be added.
+        * batch_list can be accessed without lock
+        */
+       return 0;
+}
+
 static void handle_stripe(struct stripe_head *sh)
 {
        struct stripe_head_state s;
@@ -3839,7 +4141,11 @@ static void handle_stripe(struct stripe_head *sh)
                return;
        }
 
-       clear_bit(STRIPE_BATCH_READY, &sh->state);
+       if (clear_batch_ready(sh) ) {
+               clear_bit_unlock(STRIPE_ACTIVE, &sh->state);
+               return;
+       }
+
        if (test_bit(STRIPE_SYNC_REQUESTED, &sh->state)) {
                spin_lock(&sh->stripe_lock);
                /* Cannot process 'sync' concurrently with 'discard' */
@@ -4824,7 +5130,8 @@ static void make_request(struct mddev *mddev, struct bio * bi)
                        }
                        set_bit(STRIPE_HANDLE, &sh->state);
                        clear_bit(STRIPE_DELAYED, &sh->state);
-                       if ((bi->bi_rw & REQ_SYNC) &&
+                       if ((!sh->batch_head || sh == sh->batch_head) &&
+                           (bi->bi_rw & REQ_SYNC) &&
                            !test_and_set_bit(STRIPE_PREREAD_ACTIVE, &sh->state))
                                atomic_inc(&conf->preread_active_stripes);
                        release_stripe_plug(mddev, sh);
index 4cc1a48127c77b08d7c63179aa94ce1b93831e2b..c8d0004dca8f15f4449ff9042300fcaed091d51b 100644 (file)
@@ -219,6 +219,10 @@ struct stripe_head {
        spinlock_t              stripe_lock;
        int                     cpu;
        struct r5worker_group   *group;
+
+       struct stripe_head      *batch_head; /* protected by stripe lock */
+       spinlock_t              batch_lock; /* only header's lock is useful */
+       struct list_head        batch_list; /* protected by head's batch lock*/
        /**
         * struct stripe_operations
         * @target - STRIPE_OP_COMPUTE_BLK target