]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/misc/mic/scif/scif_nodeqp.c
Merge remote-tracking branch 'char-misc/char-misc-next'
[karo-tx-linux.git] / drivers / misc / mic / scif / scif_nodeqp.c
1 /*
2  * Intel MIC Platform Software Stack (MPSS)
3  *
4  * Copyright(c) 2014 Intel Corporation.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License, version 2, as
8  * published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * General Public License for more details.
14  *
15  * Intel SCIF driver.
16  *
17  */
18 #include "../bus/scif_bus.h"
19 #include "scif_peer_bus.h"
20 #include "scif_main.h"
21 #include "scif_nodeqp.h"
22 #include "scif_map.h"
23
24 /*
25  ************************************************************************
26  * SCIF node Queue Pair (QP) setup flow:
27  *
28  * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus
29  * 2) scif_setup_qp(..) allocates the local qp and calls
30  *      scif_setup_qp_connect(..) which allocates and maps the local
31  *      buffer for the inbound QP
32  * 3) The local node updates the device page with the DMA address of the QP
33  * 4) A delayed work is scheduled (qp_dwork) which periodically reads if
34  *      the peer node has updated its QP DMA address
35  * 5) Once a valid non zero address is found in the QP DMA address field
36  *      in the device page, the local node maps the remote node's QP,
37  *      updates its outbound QP and sends a SCIF_INIT message to the peer
38  * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom
39  *      half handler by calling scif_init(..)
40  * 7) scif_init(..) registers a new SCIF peer node by calling
41  *      scif_peer_register_device(..) which signifies the addition of a new
42  *      SCIF node
43  * 8) On the mgmt node, P2P network setup/teardown is initiated if all the
44  *      remote nodes are online via scif_p2p_setup(..)
45  * 9) For P2P setup, the host maps the remote nodes' aperture and memory
46  *      bars and sends a SCIF_NODE_ADD message to both nodes
47  * 10) As part of scif_nodeadd, both nodes set up their local inbound
48  *      QPs and send a SCIF_NODE_ADD_ACK to the mgmt node
49  * 11) As part of scif_node_add_ack(..) the mgmt node forwards the
50  *      SCIF_NODE_ADD_ACK to the remote nodes
51  * 12) As part of scif_node_add_ack(..) the remote nodes update their
52  *      outbound QPs, make sure they can access memory on the remote node
53  *      and then add a new SCIF peer node by calling
54  *      scif_peer_register_device(..) which signifies the addition of a new
55  *      SCIF node.
56  * 13) The SCIF network is now established across all nodes.
57  *
58  ************************************************************************
59  * SCIF node QP teardown flow (initiated by non mgmt node):
60  *
61  * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
62  * 2) The device page QP DMA address field is updated with 0x0
63  * 3) A non mgmt node now cleans up all local data structures and sends a
64  *      SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK
65  * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called
66  * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the
67  *      peers and waits for a SCIF_NODE_REMOVE_ACK
68  * 6) As part of scif_node_remove(..) a remote node unregisters the peer
69  *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
70  * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
71  *      it sends itself a node remove message whose handling cleans up local
72  *      data structures and unregisters the peer node from the SCIF network
73  * 8) The mgmt node sends a SCIF_EXIT_ACK
74  * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown
75  *      completes the SCIF remove routine
76  * 10) The SCIF network is now torn down for the node initiating the
77  *      teardown sequence
78  *
79  ************************************************************************
80  * SCIF node QP teardown flow (initiated by mgmt node):
81  *
82  * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
83  * 2) The device page QP DMA address field is updated with 0x0
84  * 3) The mgmt node calls scif_disconnect_node(..)
85  * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers
86  *      and waits for a SCIF_NODE_REMOVE_ACK
87  * 5) As part of scif_node_remove(..) a remote node unregisters the peer
88  *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
89  * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
90  *      it unregisters the peer node from the SCIF network
91  * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK.
92  * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..)
93  *      which would clean up local data structures for all SCIF nodes and
94  *      then send a SCIF_EXIT_ACK back to the mgmt node
95  * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node
96  *      remove message whose handling cleans up local data structures and
97  *      destroys any P2P mappings.
98  * 10) The SCIF hardware device for which a remove callback was received is now
99  *      disconnected from the SCIF network.
100  */
101 /*
102  * Initializes "local" data structures for the QP. Allocates the QP
103  * ring buffer (rb) and initializes the "in bound" queue.
104  */
105 int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset,
106                           int local_size, struct scif_dev *scifdev)
107 {
108         void *local_q = qp->inbound_q.rb_base;
109         int err = 0;
110         u32 tmp_rd = 0;
111
112         spin_lock_init(&qp->send_lock);
113         spin_lock_init(&qp->recv_lock);
114
115         /* Allocate rb only if not already allocated */
116         if (!local_q) {
117                 local_q = kzalloc(local_size, GFP_KERNEL);
118                 if (!local_q) {
119                         err = -ENOMEM;
120                         return err;
121                 }
122         }
123
124         err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
125         if (err)
126                 goto kfree;
127         /*
128          * To setup the inbound_q, the buffer lives locally, the read pointer
129          * is remote and the write pointer is local.
130          */
131         scif_rb_init(&qp->inbound_q,
132                      &tmp_rd,
133                      &qp->local_write,
134                      local_q, get_count_order(local_size));
135         /*
136          * The read pointer is NULL initially and it is unsafe to use the ring
137          * buffer til this changes!
138          */
139         qp->inbound_q.read_ptr = NULL;
140         err = scif_map_single(qp_offset, qp,
141                               scifdev, sizeof(struct scif_qp));
142         if (err)
143                 goto unmap;
144         qp->local_qp = *qp_offset;
145         return err;
146 unmap:
147         scif_unmap_single(qp->local_buf, scifdev, local_size);
148         qp->local_buf = 0;
149 kfree:
150         kfree(local_q);
151         return err;
152 }
153
154 /* When the other side has already done it's allocation, this is called */
155 int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset,
156                          dma_addr_t phys, int local_size,
157                          struct scif_dev *scifdev)
158 {
159         void *local_q;
160         void *remote_q;
161         struct scif_qp *remote_qp;
162         int remote_size;
163         int err = 0;
164
165         spin_lock_init(&qp->send_lock);
166         spin_lock_init(&qp->recv_lock);
167         /* Start by figuring out where we need to point */
168         remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev);
169         if (!remote_qp)
170                 return -EIO;
171         qp->remote_qp = remote_qp;
172         if (qp->remote_qp->magic != SCIFEP_MAGIC) {
173                 err = -EIO;
174                 goto iounmap;
175         }
176         qp->remote_buf = remote_qp->local_buf;
177         remote_size = qp->remote_qp->inbound_q.size;
178         remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev);
179         if (!remote_q) {
180                 err = -EIO;
181                 goto iounmap;
182         }
183         qp->remote_qp->local_write = 0;
184         /*
185          * To setup the outbound_q, the buffer lives in remote memory,
186          * the read pointer is local, the write pointer is remote
187          */
188         scif_rb_init(&qp->outbound_q,
189                      &qp->local_read,
190                      &qp->remote_qp->local_write,
191                      remote_q,
192                      get_count_order(remote_size));
193         local_q = kzalloc(local_size, GFP_KERNEL);
194         if (!local_q) {
195                 err = -ENOMEM;
196                 goto iounmap_1;
197         }
198         err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
199         if (err)
200                 goto kfree;
201         qp->remote_qp->local_read = 0;
202         /*
203          * To setup the inbound_q, the buffer lives locally, the read pointer
204          * is remote and the write pointer is local
205          */
206         scif_rb_init(&qp->inbound_q,
207                      &qp->remote_qp->local_read,
208                      &qp->local_write,
209                      local_q, get_count_order(local_size));
210         err = scif_map_single(qp_offset, qp, scifdev,
211                               sizeof(struct scif_qp));
212         if (err)
213                 goto unmap;
214         qp->local_qp = *qp_offset;
215         return err;
216 unmap:
217         scif_unmap_single(qp->local_buf, scifdev, local_size);
218         qp->local_buf = 0;
219 kfree:
220         kfree(local_q);
221 iounmap_1:
222         scif_iounmap(remote_q, remote_size, scifdev);
223         qp->outbound_q.rb_base = NULL;
224 iounmap:
225         scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev);
226         qp->remote_qp = NULL;
227         return err;
228 }
229
230 int scif_setup_qp_connect_response(struct scif_dev *scifdev,
231                                    struct scif_qp *qp, u64 payload)
232 {
233         int err = 0;
234         void *r_buf;
235         int remote_size;
236         phys_addr_t tmp_phys;
237
238         qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev);
239
240         if (!qp->remote_qp) {
241                 err = -ENOMEM;
242                 goto error;
243         }
244
245         if (qp->remote_qp->magic != SCIFEP_MAGIC) {
246                 dev_err(&scifdev->sdev->dev,
247                         "SCIFEP_MAGIC mismatch between self %d remote %d\n",
248                         scif_dev[scif_info.nodeid].node, scifdev->node);
249                 err = -ENODEV;
250                 goto error;
251         }
252
253         tmp_phys = qp->remote_qp->local_buf;
254         remote_size = qp->remote_qp->inbound_q.size;
255         r_buf = scif_ioremap(tmp_phys, remote_size, scifdev);
256
257         if (!r_buf)
258                 return -EIO;
259
260         qp->local_read = 0;
261         scif_rb_init(&qp->outbound_q,
262                      &qp->local_read,
263                      &qp->remote_qp->local_write,
264                      r_buf,
265                      get_count_order(remote_size));
266         /*
267          * Because the node QP may already be processing an INIT message, set
268          * the read pointer so the cached read offset isn't lost
269          */
270         qp->remote_qp->local_read = qp->inbound_q.current_read_offset;
271         /*
272          * resetup the inbound_q now that we know where the
273          * inbound_read really is.
274          */
275         scif_rb_init(&qp->inbound_q,
276                      &qp->remote_qp->local_read,
277                      &qp->local_write,
278                      qp->inbound_q.rb_base,
279                      get_count_order(qp->inbound_q.size));
280 error:
281         return err;
282 }
283
284 static __always_inline void
285 scif_send_msg_intr(struct scif_dev *scifdev)
286 {
287         struct scif_hw_dev *sdev = scifdev->sdev;
288
289         if (scifdev_is_p2p(scifdev))
290                 sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio);
291         else
292                 sdev->hw_ops->send_intr(sdev, scifdev->rdb);
293 }
294
295 int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev)
296 {
297         int err = 0;
298         struct scifmsg msg;
299
300         err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys);
301         if (!err) {
302                 /*
303                  * Now that everything is setup and mapped, we're ready
304                  * to tell the peer about our queue's location
305                  */
306                 msg.uop = SCIF_INIT;
307                 msg.dst.node = scifdev->node;
308                 err = scif_nodeqp_send(scifdev, &msg);
309         }
310         return err;
311 }
312
313 void scif_send_exit(struct scif_dev *scifdev)
314 {
315         struct scifmsg msg;
316         int ret;
317
318         scifdev->exit = OP_IN_PROGRESS;
319         msg.uop = SCIF_EXIT;
320         msg.src.node = scif_info.nodeid;
321         msg.dst.node = scifdev->node;
322         ret = scif_nodeqp_send(scifdev, &msg);
323         if (ret)
324                 goto done;
325         /* Wait for a SCIF_EXIT_ACK message */
326         wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED,
327                            SCIF_NODE_ALIVE_TIMEOUT);
328 done:
329         scifdev->exit = OP_IDLE;
330 }
331
332 int scif_setup_qp(struct scif_dev *scifdev)
333 {
334         int err = 0;
335         int local_size;
336         struct scif_qp *qp;
337
338         local_size = SCIF_NODE_QP_SIZE;
339
340         qp = kzalloc(sizeof(*qp), GFP_KERNEL);
341         if (!qp) {
342                 err = -ENOMEM;
343                 return err;
344         }
345         qp->magic = SCIFEP_MAGIC;
346         scifdev->qpairs = qp;
347         err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr,
348                                     local_size, scifdev);
349         if (err)
350                 goto free_qp;
351         /*
352          * We're as setup as we can be. The inbound_q is setup, w/o a usable
353          * outbound q.  When we get a message, the read_ptr will be updated,
354          * and we will pull the message.
355          */
356         return err;
357 free_qp:
358         kfree(scifdev->qpairs);
359         scifdev->qpairs = NULL;
360         return err;
361 }
362
363 static void scif_p2p_freesg(struct scatterlist *sg)
364 {
365         kfree(sg);
366 }
367
368 static struct scatterlist *
369 scif_p2p_setsg(phys_addr_t pa, int page_size, int page_cnt)
370 {
371         struct scatterlist *sg;
372         struct page *page;
373         int i;
374
375         sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL);
376         if (!sg)
377                 return NULL;
378         sg_init_table(sg, page_cnt);
379         for (i = 0; i < page_cnt; i++) {
380                 page = pfn_to_page(pa >> PAGE_SHIFT);
381                 sg_set_page(&sg[i], page, page_size, 0);
382                 pa += page_size;
383         }
384         return sg;
385 }
386
387 /* Init p2p mappings required to access peerdev from scifdev */
388 static struct scif_p2p_info *
389 scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev)
390 {
391         struct scif_p2p_info *p2p;
392         int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks;
393         struct scif_hw_dev *psdev = peerdev->sdev;
394         struct scif_hw_dev *sdev = scifdev->sdev;
395
396         num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT;
397         num_aper_pages = psdev->aper->len >> PAGE_SHIFT;
398
399         p2p = kzalloc(sizeof(*p2p), GFP_KERNEL);
400         if (!p2p)
401                 return NULL;
402         p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->pa,
403                                                     PAGE_SIZE, num_mmio_pages);
404         if (!p2p->ppi_sg[SCIF_PPI_MMIO])
405                 goto free_p2p;
406         p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages;
407         sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30)));
408         num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT);
409         p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->pa,
410                                                     1 << sg_page_shift,
411                                                     num_aper_chunks);
412         p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks;
413         err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
414                          num_mmio_pages, PCI_DMA_BIDIRECTIONAL);
415         if (err != num_mmio_pages)
416                 goto scif_p2p_free;
417         err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
418                          num_aper_chunks, PCI_DMA_BIDIRECTIONAL);
419         if (err != num_aper_chunks)
420                 goto dma_unmap;
421         p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]);
422         p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]);
423         p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages;
424         p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages;
425         p2p->ppi_peer_id = peerdev->node;
426         return p2p;
427 dma_unmap:
428         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
429                      p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
430 scif_p2p_free:
431         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
432         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
433 free_p2p:
434         kfree(p2p);
435         return NULL;
436 }
437
438 /* Uninitialize and release resources from a p2p mapping */
439 static void scif_deinit_p2p_info(struct scif_dev *scifdev,
440                                  struct scif_p2p_info *p2p)
441 {
442         struct scif_hw_dev *sdev = scifdev->sdev;
443
444         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
445                      p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
446         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
447                      p2p->sg_nentries[SCIF_PPI_APER], DMA_BIDIRECTIONAL);
448         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
449         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
450         kfree(p2p);
451 }
452
453 /**
454  * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message
455  * @dst: Destination node
456  *
457  * Connect the src and dst node by setting up the p2p connection
458  * between them. Management node here acts like a proxy.
459  */
460 static void scif_node_connect(struct scif_dev *scifdev, int dst)
461 {
462         struct scif_dev *dev_j = scifdev;
463         struct scif_dev *dev_i = NULL;
464         struct scif_p2p_info *p2p_ij = NULL;    /* bus addr for j from i */
465         struct scif_p2p_info *p2p_ji = NULL;    /* bus addr for i from j */
466         struct scif_p2p_info *p2p;
467         struct list_head *pos, *tmp;
468         struct scifmsg msg;
469         int err;
470         u64 tmppayload;
471
472         if (dst < 1 || dst > scif_info.maxid)
473                 return;
474
475         dev_i = &scif_dev[dst];
476
477         if (!_scifdev_alive(dev_i))
478                 return;
479         /*
480          * If the p2p connection is already setup or in the process of setting
481          * up then just ignore this request. The requested node will get
482          * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK
483          */
484         if (!list_empty(&dev_i->p2p)) {
485                 list_for_each_safe(pos, tmp, &dev_i->p2p) {
486                         p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
487                         if (p2p->ppi_peer_id == dev_j->node)
488                                 return;
489                 }
490         }
491         p2p_ij = scif_init_p2p_info(dev_i, dev_j);
492         if (!p2p_ij)
493                 return;
494         p2p_ji = scif_init_p2p_info(dev_j, dev_i);
495         if (!p2p_ji) {
496                 scif_deinit_p2p_info(dev_i, p2p_ij);
497                 return;
498         }
499         list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p);
500         list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p);
501
502         /*
503          * Send a SCIF_NODE_ADD to dev_i, pass it its bus address
504          * as seen from dev_j
505          */
506         msg.uop = SCIF_NODE_ADD;
507         msg.src.node = dev_j->node;
508         msg.dst.node = dev_i->node;
509
510         msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER];
511         msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO];
512         msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER];
513         msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
514
515         err = scif_nodeqp_send(dev_i,  &msg);
516         if (err) {
517                 dev_err(&scifdev->sdev->dev,
518                         "%s %d error %d\n", __func__, __LINE__, err);
519                 return;
520         }
521
522         /* Same as above but to dev_j */
523         msg.uop = SCIF_NODE_ADD;
524         msg.src.node = dev_i->node;
525         msg.dst.node = dev_j->node;
526
527         tmppayload = msg.payload[0];
528         msg.payload[0] = msg.payload[2];
529         msg.payload[2] = tmppayload;
530         msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO];
531         msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
532
533         scif_nodeqp_send(dev_j, &msg);
534 }
535
536 static void scif_p2p_setup(void)
537 {
538         int i, j;
539
540         if (!scif_info.p2p_enable)
541                 return;
542
543         for (i = 1; i <= scif_info.maxid; i++)
544                 if (!_scifdev_alive(&scif_dev[i]))
545                         return;
546
547         for (i = 1; i <= scif_info.maxid; i++) {
548                 for (j = 1; j <= scif_info.maxid; j++) {
549                         struct scif_dev *scifdev = &scif_dev[i];
550
551                         if (i == j)
552                                 continue;
553                         scif_node_connect(scifdev, j);
554                 }
555         }
556 }
557
558 static char *message_types[] = {"BAD",
559                                 "INIT",
560                                 "EXIT",
561                                 "SCIF_EXIT_ACK",
562                                 "SCIF_NODE_ADD",
563                                 "SCIF_NODE_ADD_ACK",
564                                 "SCIF_NODE_ADD_NACK",
565                                 "REMOVE_NODE",
566                                 "REMOVE_NODE_ACK",
567                                 "CNCT_REQ",
568                                 "CNCT_GNT",
569                                 "CNCT_GNTACK",
570                                 "CNCT_GNTNACK",
571                                 "CNCT_REJ",
572                                 "DISCNCT",
573                                 "DISCNT_ACK",
574                                 "CLIENT_SENT",
575                                 "CLIENT_RCVD",
576                                 "SCIF_GET_NODE_INFO",
577                                 "REGISTER",
578                                 "REGISTER_ACK",
579                                 "REGISTER_NACK",
580                                 "UNREGISTER",
581                                 "UNREGISTER_ACK",
582                                 "UNREGISTER_NACK",
583                                 "ALLOC_REQ",
584                                 "ALLOC_GNT",
585                                 "ALLOC_REJ",
586                                 "FREE_PHYS",
587                                 "FREE_VIRT",
588                                 "MUNMAP",
589                                 "MARK",
590                                 "MARK_ACK",
591                                 "MARK_NACK",
592                                 "WAIT",
593                                 "WAIT_ACK",
594                                 "WAIT_NACK",
595                                 "SIGNAL_LOCAL",
596                                 "SIGNAL_REMOTE",
597                                 "SIG_ACK",
598                                 "SIG_NACK"};
599
600 static void
601 scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg,
602                      const char *label)
603 {
604         if (!scif_info.en_msg_log)
605                 return;
606         if (msg->uop > SCIF_MAX_MSG) {
607                 dev_err(&scifdev->sdev->dev,
608                         "%s: unknown msg type %d\n", label, msg->uop);
609                 return;
610         }
611         dev_info(&scifdev->sdev->dev,
612                  "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n",
613                  label, message_types[msg->uop], msg->src.node, msg->src.port,
614                  msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1],
615                  msg->payload[2], msg->payload[3]);
616 }
617
618 int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
619 {
620         struct scif_qp *qp = scifdev->qpairs;
621         int err = -ENOMEM, loop_cnt = 0;
622
623         scif_display_message(scifdev, msg, "Sent");
624         if (!qp) {
625                 err = -EINVAL;
626                 goto error;
627         }
628         spin_lock(&qp->send_lock);
629
630         while ((err = scif_rb_write(&qp->outbound_q,
631                                     msg, sizeof(struct scifmsg)))) {
632                 mdelay(1);
633 #define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000)
634                 if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) {
635                         err = -ENODEV;
636                         break;
637                 }
638         }
639         if (!err)
640                 scif_rb_commit(&qp->outbound_q);
641         spin_unlock(&qp->send_lock);
642         if (!err) {
643                 if (scifdev_self(scifdev))
644                         /*
645                          * For loopback we need to emulate an interrupt by
646                          * queuing work for the queue handling real node
647                          * Qp interrupts.
648                          */
649                         queue_work(scifdev->intr_wq, &scifdev->intr_bh);
650                 else
651                         scif_send_msg_intr(scifdev);
652         }
653 error:
654         if (err)
655                 dev_dbg(&scifdev->sdev->dev,
656                         "%s %d error %d uop %d\n",
657                          __func__, __LINE__, err, msg->uop);
658         return err;
659 }
660
661 /**
662  * scif_nodeqp_send - Send a message on the node queue pair
663  * @scifdev: Scif Device.
664  * @msg: The message to be sent.
665  */
666 int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
667 {
668         int err;
669         struct device *spdev = NULL;
670
671         if (msg->uop > SCIF_EXIT_ACK) {
672                 /* Dont send messages once the exit flow has begun */
673                 if (OP_IDLE != scifdev->exit)
674                         return -ENODEV;
675                 spdev = scif_get_peer_dev(scifdev);
676                 if (IS_ERR(spdev)) {
677                         err = PTR_ERR(spdev);
678                         return err;
679                 }
680         }
681         err = _scif_nodeqp_send(scifdev, msg);
682         if (msg->uop > SCIF_EXIT_ACK)
683                 scif_put_peer_dev(spdev);
684         return err;
685 }
686
687 /*
688  * scif_misc_handler:
689  *
690  * Work queue handler for servicing miscellaneous SCIF tasks.
691  * Examples include:
692  * 1) Remote fence requests.
693  * 2) Destruction of temporary registered windows
694  *    created during scif_vreadfrom()/scif_vwriteto().
695  * 3) Cleanup of zombie endpoints.
696  */
697 void scif_misc_handler(struct work_struct *work)
698 {
699         scif_rma_handle_remote_fences();
700         scif_rma_destroy_windows();
701         scif_rma_destroy_tcw_invalid();
702         scif_cleanup_zombie_epd();
703 }
704
705 /**
706  * scif_init() - Respond to SCIF_INIT interrupt message
707  * @scifdev:    Remote SCIF device node
708  * @msg:        Interrupt message
709  */
710 static __always_inline void
711 scif_init(struct scif_dev *scifdev, struct scifmsg *msg)
712 {
713         /*
714          * Allow the thread waiting for device page updates for the peer QP DMA
715          * address to complete initializing the inbound_q.
716          */
717         flush_delayed_work(&scifdev->qp_dwork);
718
719         scif_peer_register_device(scifdev);
720
721         if (scif_is_mgmt_node()) {
722                 mutex_lock(&scif_info.conflock);
723                 scif_p2p_setup();
724                 mutex_unlock(&scif_info.conflock);
725         }
726 }
727
728 /**
729  * scif_exit() - Respond to SCIF_EXIT interrupt message
730  * @scifdev:    Remote SCIF device node
731  * @msg:        Interrupt message
732  *
733  * This function stops the SCIF interface for the node which sent
734  * the SCIF_EXIT message and starts waiting for that node to
735  * resetup the queue pair again.
736  */
737 static __always_inline void
738 scif_exit(struct scif_dev *scifdev, struct scifmsg *unused)
739 {
740         scifdev->exit_ack_pending = true;
741         if (scif_is_mgmt_node())
742                 scif_disconnect_node(scifdev->node, false);
743         else
744                 scif_stop(scifdev);
745         schedule_delayed_work(&scifdev->qp_dwork,
746                               msecs_to_jiffies(1000));
747 }
748
749 /**
750  * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message
751  * @scifdev:    Remote SCIF device node
752  * @msg:        Interrupt message
753  *
754  */
755 static __always_inline void
756 scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused)
757 {
758         scifdev->exit = OP_COMPLETED;
759         wake_up(&scif_info.exitwq);
760 }
761
762 /**
763  * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message
764  * @scifdev:    Remote SCIF device node
765  * @msg:        Interrupt message
766  *
767  * When the mgmt node driver has finished initializing a MIC node queue pair it
768  * marks the node as online. It then looks for all currently online MIC cards
769  * and send a SCIF_NODE_ADD message to identify the ID of the new card for
770  * peer to peer initialization
771  *
772  * The local node allocates its incoming queue and sends its address in the
773  * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects"
774  * this message to the new node
775  */
776 static __always_inline void
777 scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg)
778 {
779         struct scif_dev *newdev;
780         dma_addr_t qp_offset;
781         int qp_connect;
782         struct scif_hw_dev *sdev;
783
784         dev_dbg(&scifdev->sdev->dev,
785                 "Scifdev %d:%d received NODE_ADD msg for node %d\n",
786                 scifdev->node, msg->dst.node, msg->src.node);
787         dev_dbg(&scifdev->sdev->dev,
788                 "Remote address for this node's aperture %llx\n",
789                 msg->payload[0]);
790         newdev = &scif_dev[msg->src.node];
791         newdev->node = msg->src.node;
792         newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
793         sdev = newdev->sdev;
794
795         if (scif_setup_intr_wq(newdev)) {
796                 dev_err(&scifdev->sdev->dev,
797                         "failed to setup interrupts for %d\n", msg->src.node);
798                 goto interrupt_setup_error;
799         }
800         newdev->mmio.va = ioremap_nocache(msg->payload[1], sdev->mmio->len);
801         if (!newdev->mmio.va) {
802                 dev_err(&scifdev->sdev->dev,
803                         "failed to map mmio for %d\n", msg->src.node);
804                 goto mmio_map_error;
805         }
806         newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL);
807         if (!newdev->qpairs)
808                 goto qp_alloc_error;
809         /*
810          * Set the base address of the remote node's memory since it gets
811          * added to qp_offset
812          */
813         newdev->base_addr = msg->payload[0];
814
815         qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset,
816                                            SCIF_NODE_QP_SIZE, newdev);
817         if (qp_connect) {
818                 dev_err(&scifdev->sdev->dev,
819                         "failed to setup qp_connect %d\n", qp_connect);
820                 goto qp_connect_error;
821         }
822
823         newdev->db = sdev->hw_ops->next_db(sdev);
824         newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler,
825                                                    "SCIF_INTR", newdev,
826                                                    newdev->db);
827         if (IS_ERR(newdev->cookie))
828                 goto qp_connect_error;
829         newdev->qpairs->magic = SCIFEP_MAGIC;
830         newdev->qpairs->qp_state = SCIF_QP_OFFLINE;
831
832         msg->uop = SCIF_NODE_ADD_ACK;
833         msg->dst.node = msg->src.node;
834         msg->src.node = scif_info.nodeid;
835         msg->payload[0] = qp_offset;
836         msg->payload[2] = newdev->db;
837         scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
838         return;
839 qp_connect_error:
840         kfree(newdev->qpairs);
841         newdev->qpairs = NULL;
842 qp_alloc_error:
843         iounmap(newdev->mmio.va);
844         newdev->mmio.va = NULL;
845 mmio_map_error:
846 interrupt_setup_error:
847         dev_err(&scifdev->sdev->dev,
848                 "node add failed for node %d\n", msg->src.node);
849         msg->uop = SCIF_NODE_ADD_NACK;
850         msg->dst.node = msg->src.node;
851         msg->src.node = scif_info.nodeid;
852         scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
853 }
854
855 void scif_poll_qp_state(struct work_struct *work)
856 {
857 #define SCIF_NODE_QP_RETRY 100
858 #define SCIF_NODE_QP_TIMEOUT 100
859         struct scif_dev *peerdev = container_of(work, struct scif_dev,
860                                                         p2p_dwork.work);
861         struct scif_qp *qp = &peerdev->qpairs[0];
862
863         if (qp->qp_state != SCIF_QP_ONLINE ||
864             qp->remote_qp->qp_state != SCIF_QP_ONLINE) {
865                 if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) {
866                         dev_err(&peerdev->sdev->dev,
867                                 "Warning: QP check timeout with state %d\n",
868                                 qp->qp_state);
869                         goto timeout;
870                 }
871                 schedule_delayed_work(&peerdev->p2p_dwork,
872                                       msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT));
873                 return;
874         }
875         return;
876 timeout:
877         dev_err(&peerdev->sdev->dev,
878                 "%s %d remote node %d offline,  state = 0x%x\n",
879                 __func__, __LINE__, peerdev->node, qp->qp_state);
880         qp->remote_qp->qp_state = SCIF_QP_OFFLINE;
881         scif_peer_unregister_device(peerdev);
882         scif_cleanup_scifdev(peerdev);
883 }
884
885 /**
886  * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message
887  * @scifdev:    Remote SCIF device node
888  * @msg:        Interrupt message
889  *
890  * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this
891  * message to the mgmt node to confirm the sequence is finished.
892  *
893  */
894 static __always_inline void
895 scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg)
896 {
897         struct scif_dev *peerdev;
898         struct scif_qp *qp;
899         struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
900
901         dev_dbg(&scifdev->sdev->dev,
902                 "Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n",
903                 scifdev->node, msg->src.node, msg->dst.node);
904         dev_dbg(&scifdev->sdev->dev,
905                 "payload %llx %llx %llx %llx\n", msg->payload[0],
906                 msg->payload[1], msg->payload[2], msg->payload[3]);
907         if (scif_is_mgmt_node()) {
908                 /*
909                  * the lock serializes with scif_qp_response_ack. The mgmt node
910                  * is forwarding the NODE_ADD_ACK message from src to dst we
911                  * need to make sure that the dst has already received a
912                  * NODE_ADD for src and setup its end of the qp to dst
913                  */
914                 mutex_lock(&scif_info.conflock);
915                 msg->payload[1] = scif_info.maxid;
916                 scif_nodeqp_send(dst_dev, msg);
917                 mutex_unlock(&scif_info.conflock);
918                 return;
919         }
920         peerdev = &scif_dev[msg->src.node];
921         peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
922         peerdev->node = msg->src.node;
923
924         qp = &peerdev->qpairs[0];
925
926         if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0],
927                                             msg->payload[0])))
928                 goto local_error;
929         peerdev->rdb = msg->payload[2];
930         qp->remote_qp->qp_state = SCIF_QP_ONLINE;
931
932         scif_peer_register_device(peerdev);
933
934         schedule_delayed_work(&peerdev->p2p_dwork, 0);
935         return;
936 local_error:
937         scif_cleanup_scifdev(peerdev);
938 }
939
940 /**
941  * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message
942  * @msg:        Interrupt message
943  *
944  * SCIF_NODE_ADD failed, so inform the waiting wq.
945  */
946 static __always_inline void
947 scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg)
948 {
949         if (scif_is_mgmt_node()) {
950                 struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
951
952                 dev_dbg(&scifdev->sdev->dev,
953                         "SCIF_NODE_ADD_NACK received from %d\n", scifdev->node);
954                 scif_nodeqp_send(dst_dev, msg);
955         }
956 }
957
958 /*
959  * scif_node_remove: Handle SCIF_NODE_REMOVE message
960  * @msg: Interrupt message
961  *
962  * Handle node removal.
963  */
964 static __always_inline void
965 scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg)
966 {
967         int node = msg->payload[0];
968         struct scif_dev *scdev = &scif_dev[node];
969
970         scdev->node_remove_ack_pending = true;
971         scif_handle_remove_node(node);
972 }
973
974 /*
975  * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message
976  * @msg: Interrupt message
977  *
978  * The peer has acked a SCIF_NODE_REMOVE message.
979  */
980 static __always_inline void
981 scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg)
982 {
983         struct scif_dev *sdev = &scif_dev[msg->payload[0]];
984
985         atomic_inc(&sdev->disconn_rescnt);
986         wake_up(&sdev->disconn_wq);
987 }
988
989 /**
990  * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message
991  * @msg:        Interrupt message
992  *
993  * Retrieve node info i.e maxid and total from the mgmt node.
994  */
995 static __always_inline void
996 scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg)
997 {
998         if (scif_is_mgmt_node()) {
999                 swap(msg->dst.node, msg->src.node);
1000                 mutex_lock(&scif_info.conflock);
1001                 msg->payload[1] = scif_info.maxid;
1002                 msg->payload[2] = scif_info.total;
1003                 mutex_unlock(&scif_info.conflock);
1004                 scif_nodeqp_send(scifdev, msg);
1005         } else {
1006                 struct completion *node_info =
1007                         (struct completion *)msg->payload[3];
1008
1009                 mutex_lock(&scif_info.conflock);
1010                 scif_info.maxid = msg->payload[1];
1011                 scif_info.total = msg->payload[2];
1012                 complete_all(node_info);
1013                 mutex_unlock(&scif_info.conflock);
1014         }
1015 }
1016
1017 static void
1018 scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg)
1019 {
1020         /* Bogus Node Qp Message? */
1021         dev_err(&scifdev->sdev->dev,
1022                 "Unknown message 0x%xn scifdev->node 0x%x\n",
1023                 msg->uop, scifdev->node);
1024 }
1025
1026 static void (*scif_intr_func[SCIF_MAX_MSG + 1])
1027             (struct scif_dev *, struct scifmsg *msg) = {
1028         scif_msg_unknown,       /* Error */
1029         scif_init,              /* SCIF_INIT */
1030         scif_exit,              /* SCIF_EXIT */
1031         scif_exit_ack,          /* SCIF_EXIT_ACK */
1032         scif_node_add,          /* SCIF_NODE_ADD */
1033         scif_node_add_ack,      /* SCIF_NODE_ADD_ACK */
1034         scif_node_add_nack,     /* SCIF_NODE_ADD_NACK */
1035         scif_node_remove,       /* SCIF_NODE_REMOVE */
1036         scif_node_remove_ack,   /* SCIF_NODE_REMOVE_ACK */
1037         scif_cnctreq,           /* SCIF_CNCT_REQ */
1038         scif_cnctgnt,           /* SCIF_CNCT_GNT */
1039         scif_cnctgnt_ack,       /* SCIF_CNCT_GNTACK */
1040         scif_cnctgnt_nack,      /* SCIF_CNCT_GNTNACK */
1041         scif_cnctrej,           /* SCIF_CNCT_REJ */
1042         scif_discnct,           /* SCIF_DISCNCT */
1043         scif_discnt_ack,        /* SCIF_DISCNT_ACK */
1044         scif_clientsend,        /* SCIF_CLIENT_SENT */
1045         scif_clientrcvd,        /* SCIF_CLIENT_RCVD */
1046         scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */
1047         scif_recv_reg,          /* SCIF_REGISTER */
1048         scif_recv_reg_ack,      /* SCIF_REGISTER_ACK */
1049         scif_recv_reg_nack,     /* SCIF_REGISTER_NACK */
1050         scif_recv_unreg,        /* SCIF_UNREGISTER */
1051         scif_recv_unreg_ack,    /* SCIF_UNREGISTER_ACK */
1052         scif_recv_unreg_nack,   /* SCIF_UNREGISTER_NACK */
1053         scif_alloc_req,         /* SCIF_ALLOC_REQ */
1054         scif_alloc_gnt_rej,     /* SCIF_ALLOC_GNT */
1055         scif_alloc_gnt_rej,     /* SCIF_ALLOC_REJ */
1056         scif_free_virt,         /* SCIF_FREE_VIRT */
1057         scif_recv_munmap,       /* SCIF_MUNMAP */
1058         scif_recv_mark,         /* SCIF_MARK */
1059         scif_recv_mark_resp,    /* SCIF_MARK_ACK */
1060         scif_recv_mark_resp,    /* SCIF_MARK_NACK */
1061         scif_recv_wait,         /* SCIF_WAIT */
1062         scif_recv_wait_resp,    /* SCIF_WAIT_ACK */
1063         scif_recv_wait_resp,    /* SCIF_WAIT_NACK */
1064         scif_recv_sig_local,    /* SCIF_SIG_LOCAL */
1065         scif_recv_sig_remote,   /* SCIF_SIG_REMOTE */
1066         scif_recv_sig_resp,     /* SCIF_SIG_ACK */
1067         scif_recv_sig_resp,     /* SCIF_SIG_NACK */
1068 };
1069
1070 /**
1071  * scif_nodeqp_msg_handler() - Common handler for node messages
1072  * @scifdev: Remote device to respond to
1073  * @qp: Remote memory pointer
1074  * @msg: The message to be handled.
1075  *
1076  * This routine calls the appropriate routine to handle a Node Qp
1077  * message receipt
1078  */
1079 static int scif_max_msg_id = SCIF_MAX_MSG;
1080
1081 static void
1082 scif_nodeqp_msg_handler(struct scif_dev *scifdev,
1083                         struct scif_qp *qp, struct scifmsg *msg)
1084 {
1085         scif_display_message(scifdev, msg, "Rcvd");
1086
1087         if (msg->uop > (u32)scif_max_msg_id) {
1088                 /* Bogus Node Qp Message? */
1089                 dev_err(&scifdev->sdev->dev,
1090                         "Unknown message 0x%xn scifdev->node 0x%x\n",
1091                         msg->uop, scifdev->node);
1092                 return;
1093         }
1094
1095         scif_intr_func[msg->uop](scifdev, msg);
1096 }
1097
1098 /**
1099  * scif_nodeqp_intrhandler() - Interrupt handler for node messages
1100  * @scifdev:    Remote device to respond to
1101  * @qp:         Remote memory pointer
1102  *
1103  * This routine is triggered by the interrupt mechanism.  It reads
1104  * messages from the node queue RB and calls the Node QP Message handling
1105  * routine.
1106  */
1107 void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp)
1108 {
1109         struct scifmsg msg;
1110         int read_size;
1111
1112         do {
1113                 read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg));
1114                 if (!read_size)
1115                         break;
1116                 scif_nodeqp_msg_handler(scifdev, qp, &msg);
1117                 /*
1118                  * The node queue pair is unmapped so skip the read pointer
1119                  * update after receipt of a SCIF_EXIT_ACK
1120                  */
1121                 if (SCIF_EXIT_ACK == msg.uop)
1122                         break;
1123                 scif_rb_update_read_ptr(&qp->inbound_q);
1124         } while (1);
1125 }
1126
1127 /**
1128  * scif_loopb_wq_handler - Loopback Workqueue Handler.
1129  * @work: loop back work
1130  *
1131  * This work queue routine is invoked by the loopback work queue handler.
1132  * It grabs the recv lock, dequeues any available messages from the head
1133  * of the loopback message list, calls the node QP message handler,
1134  * waits for it to return, then frees up this message and dequeues more
1135  * elements of the list if available.
1136  */
1137 static void scif_loopb_wq_handler(struct work_struct *unused)
1138 {
1139         struct scif_dev *scifdev = scif_info.loopb_dev;
1140         struct scif_qp *qp = scifdev->qpairs;
1141         struct scif_loopb_msg *msg;
1142
1143         do {
1144                 msg = NULL;
1145                 spin_lock(&qp->recv_lock);
1146                 if (!list_empty(&scif_info.loopb_recv_q)) {
1147                         msg = list_first_entry(&scif_info.loopb_recv_q,
1148                                                struct scif_loopb_msg,
1149                                                list);
1150                         list_del(&msg->list);
1151                 }
1152                 spin_unlock(&qp->recv_lock);
1153
1154                 if (msg) {
1155                         scif_nodeqp_msg_handler(scifdev, qp, &msg->msg);
1156                         kfree(msg);
1157                 }
1158         } while (msg);
1159 }
1160
1161 /**
1162  * scif_loopb_msg_handler() - Workqueue handler for loopback messages.
1163  * @scifdev: SCIF device
1164  * @qp: Queue pair.
1165  *
1166  * This work queue routine is triggered when a loopback message is received.
1167  *
1168  * We need special handling for receiving Node Qp messages on a loopback SCIF
1169  * device via two workqueues for receiving messages.
1170  *
1171  * The reason we need the extra workqueue which is not required with *normal*
1172  * non-loopback SCIF devices is the potential classic deadlock described below:
1173  *
1174  * Thread A tries to send a message on a loopback SCIF device and blocks since
1175  * there is no space in the RB while it has the send_lock held or another
1176  * lock called lock X for example.
1177  *
1178  * Thread B: The Loopback Node QP message receive workqueue receives the message
1179  * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries
1180  * to grab the send lock again or lock X and deadlocks with Thread A. The RB
1181  * cannot be drained any further due to this classic deadlock.
1182  *
1183  * In order to avoid deadlocks as mentioned above we have an extra level of
1184  * indirection achieved by having two workqueues.
1185  * 1) The first workqueue whose handler is scif_loopb_msg_handler reads
1186  * messages from the Node QP RB, adds them to a list and queues work for the
1187  * second workqueue.
1188  *
1189  * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues
1190  * messages from the list, handles them, frees up the memory and dequeues
1191  * more elements from the list if possible.
1192  */
1193 int
1194 scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp)
1195 {
1196         int read_size;
1197         struct scif_loopb_msg *msg;
1198
1199         do {
1200                 msg = kmalloc(sizeof(*msg), GFP_KERNEL);
1201                 if (!msg)
1202                         return -ENOMEM;
1203                 read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg,
1204                                              sizeof(struct scifmsg));
1205                 if (read_size != sizeof(struct scifmsg)) {
1206                         kfree(msg);
1207                         scif_rb_update_read_ptr(&qp->inbound_q);
1208                         break;
1209                 }
1210                 spin_lock(&qp->recv_lock);
1211                 list_add_tail(&msg->list, &scif_info.loopb_recv_q);
1212                 spin_unlock(&qp->recv_lock);
1213                 queue_work(scif_info.loopb_wq, &scif_info.loopb_work);
1214                 scif_rb_update_read_ptr(&qp->inbound_q);
1215         } while (read_size == sizeof(struct scifmsg));
1216         return read_size;
1217 }
1218
1219 /**
1220  * scif_setup_loopback_qp - One time setup work for Loopback Node Qp.
1221  * @scifdev: SCIF device
1222  *
1223  * Sets up the required loopback workqueues, queue pairs and ring buffers
1224  */
1225 int scif_setup_loopback_qp(struct scif_dev *scifdev)
1226 {
1227         int err = 0;
1228         void *local_q;
1229         struct scif_qp *qp;
1230
1231         err = scif_setup_intr_wq(scifdev);
1232         if (err)
1233                 goto exit;
1234         INIT_LIST_HEAD(&scif_info.loopb_recv_q);
1235         snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname),
1236                  "SCIF LOOPB %d", scifdev->node);
1237         scif_info.loopb_wq =
1238                 alloc_ordered_workqueue(scif_info.loopb_wqname, 0);
1239         if (!scif_info.loopb_wq) {
1240                 err = -ENOMEM;
1241                 goto destroy_intr;
1242         }
1243         INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler);
1244         /* Allocate Self Qpair */
1245         scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL);
1246         if (!scifdev->qpairs) {
1247                 err = -ENOMEM;
1248                 goto destroy_loopb_wq;
1249         }
1250
1251         qp = scifdev->qpairs;
1252         qp->magic = SCIFEP_MAGIC;
1253         spin_lock_init(&qp->send_lock);
1254         spin_lock_init(&qp->recv_lock);
1255
1256         local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL);
1257         if (!local_q) {
1258                 err = -ENOMEM;
1259                 goto free_qpairs;
1260         }
1261         /*
1262          * For loopback the inbound_q and outbound_q are essentially the same
1263          * since the Node sends a message on the loopback interface to the
1264          * outbound_q which is then received on the inbound_q.
1265          */
1266         scif_rb_init(&qp->outbound_q,
1267                      &qp->local_read,
1268                      &qp->local_write,
1269                      local_q, get_count_order(SCIF_NODE_QP_SIZE));
1270
1271         scif_rb_init(&qp->inbound_q,
1272                      &qp->local_read,
1273                      &qp->local_write,
1274                      local_q, get_count_order(SCIF_NODE_QP_SIZE));
1275         scif_info.nodeid = scifdev->node;
1276
1277         scif_peer_register_device(scifdev);
1278
1279         scif_info.loopb_dev = scifdev;
1280         return err;
1281 free_qpairs:
1282         kfree(scifdev->qpairs);
1283 destroy_loopb_wq:
1284         destroy_workqueue(scif_info.loopb_wq);
1285 destroy_intr:
1286         scif_destroy_intr_wq(scifdev);
1287 exit:
1288         return err;
1289 }
1290
1291 /**
1292  * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp
1293  * @scifdev: SCIF device
1294  *
1295  * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory.
1296  */
1297 int scif_destroy_loopback_qp(struct scif_dev *scifdev)
1298 {
1299         scif_peer_unregister_device(scifdev);
1300         destroy_workqueue(scif_info.loopb_wq);
1301         scif_destroy_intr_wq(scifdev);
1302         kfree(scifdev->qpairs->outbound_q.rb_base);
1303         kfree(scifdev->qpairs);
1304         scifdev->sdev = NULL;
1305         scif_info.loopb_dev = NULL;
1306         return 0;
1307 }
1308
1309 void scif_destroy_p2p(struct scif_dev *scifdev)
1310 {
1311         struct scif_dev *peer_dev;
1312         struct scif_p2p_info *p2p;
1313         struct list_head *pos, *tmp;
1314         int bd;
1315
1316         mutex_lock(&scif_info.conflock);
1317         /* Free P2P mappings in the given node for all its peer nodes */
1318         list_for_each_safe(pos, tmp, &scifdev->p2p) {
1319                 p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1320                 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
1321                              p2p->sg_nentries[SCIF_PPI_MMIO],
1322                              DMA_BIDIRECTIONAL);
1323                 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
1324                              p2p->sg_nentries[SCIF_PPI_APER],
1325                              DMA_BIDIRECTIONAL);
1326                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1327                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1328                 list_del(pos);
1329                 kfree(p2p);
1330         }
1331
1332         /* Free P2P mapping created in the peer nodes for the given node */
1333         for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) {
1334                 peer_dev = &scif_dev[bd];
1335                 list_for_each_safe(pos, tmp, &peer_dev->p2p) {
1336                         p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1337                         if (p2p->ppi_peer_id == scifdev->node) {
1338                                 dma_unmap_sg(&peer_dev->sdev->dev,
1339                                              p2p->ppi_sg[SCIF_PPI_MMIO],
1340                                              p2p->sg_nentries[SCIF_PPI_MMIO],
1341                                              DMA_BIDIRECTIONAL);
1342                                 dma_unmap_sg(&peer_dev->sdev->dev,
1343                                              p2p->ppi_sg[SCIF_PPI_APER],
1344                                              p2p->sg_nentries[SCIF_PPI_APER],
1345                                              DMA_BIDIRECTIONAL);
1346                                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1347                                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1348                                 list_del(pos);
1349                                 kfree(p2p);
1350                         }
1351                 }
1352         }
1353         mutex_unlock(&scif_info.conflock);
1354 }