]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - net/tipc/link.c
Merge branch 'for-3.17-fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/tj...
[karo-tx-linux.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "port.h"
40 #include "socket.h"
41 #include "name_distr.h"
42 #include "discover.h"
43 #include "config.h"
44
45 #include <linux/pkt_sched.h>
46
47 /*
48  * Error message prefixes
49  */
50 static const char *link_co_err = "Link changeover error, ";
51 static const char *link_rst_msg = "Resetting link ";
52 static const char *link_unk_evt = "Unknown link event ";
53
54 /*
55  * Out-of-range value for link session numbers
56  */
57 #define INVALID_SESSION 0x10000
58
59 /*
60  * Link state events:
61  */
62 #define  STARTING_EVT    856384768      /* link processing trigger */
63 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
64 #define  TIMEOUT_EVT     560817u        /* link timer expired */
65
66 /*
67  * The following two 'message types' is really just implementation
68  * data conveniently stored in the message header.
69  * They must not be considered part of the protocol
70  */
71 #define OPEN_MSG   0
72 #define CLOSED_MSG 1
73
74 /*
75  * State value stored in 'exp_msg_count'
76  */
77 #define START_CHANGEOVER 100000u
78
79 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
80                                        struct sk_buff *buf);
81 static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
82 static int  tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
83                                  struct sk_buff **buf);
84 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
85 static void link_state_event(struct tipc_link *l_ptr, u32 event);
86 static void link_reset_statistics(struct tipc_link *l_ptr);
87 static void link_print(struct tipc_link *l_ptr, const char *str);
88 static void tipc_link_sync_xmit(struct tipc_link *l);
89 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
90 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
91 static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf);
92
93 /*
94  *  Simple link routines
95  */
96 static unsigned int align(unsigned int i)
97 {
98         return (i + 3) & ~3u;
99 }
100
101 static void link_init_max_pkt(struct tipc_link *l_ptr)
102 {
103         struct tipc_bearer *b_ptr;
104         u32 max_pkt;
105
106         rcu_read_lock();
107         b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
108         if (!b_ptr) {
109                 rcu_read_unlock();
110                 return;
111         }
112         max_pkt = (b_ptr->mtu & ~3);
113         rcu_read_unlock();
114
115         if (max_pkt > MAX_MSG_SIZE)
116                 max_pkt = MAX_MSG_SIZE;
117
118         l_ptr->max_pkt_target = max_pkt;
119         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
120                 l_ptr->max_pkt = l_ptr->max_pkt_target;
121         else
122                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
123
124         l_ptr->max_pkt_probes = 0;
125 }
126
127 static u32 link_next_sent(struct tipc_link *l_ptr)
128 {
129         if (l_ptr->next_out)
130                 return buf_seqno(l_ptr->next_out);
131         return mod(l_ptr->next_out_no);
132 }
133
134 static u32 link_last_sent(struct tipc_link *l_ptr)
135 {
136         return mod(link_next_sent(l_ptr) - 1);
137 }
138
139 /*
140  *  Simple non-static link routines (i.e. referenced outside this file)
141  */
142 int tipc_link_is_up(struct tipc_link *l_ptr)
143 {
144         if (!l_ptr)
145                 return 0;
146         return link_working_working(l_ptr) || link_working_unknown(l_ptr);
147 }
148
149 int tipc_link_is_active(struct tipc_link *l_ptr)
150 {
151         return  (l_ptr->owner->active_links[0] == l_ptr) ||
152                 (l_ptr->owner->active_links[1] == l_ptr);
153 }
154
155 /**
156  * link_timeout - handle expiration of link timer
157  * @l_ptr: pointer to link
158  */
159 static void link_timeout(struct tipc_link *l_ptr)
160 {
161         tipc_node_lock(l_ptr->owner);
162
163         /* update counters used in statistical profiling of send traffic */
164         l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
165         l_ptr->stats.queue_sz_counts++;
166
167         if (l_ptr->first_out) {
168                 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
169                 u32 length = msg_size(msg);
170
171                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
172                     (msg_type(msg) == FIRST_FRAGMENT)) {
173                         length = msg_size(msg_get_wrapped(msg));
174                 }
175                 if (length) {
176                         l_ptr->stats.msg_lengths_total += length;
177                         l_ptr->stats.msg_length_counts++;
178                         if (length <= 64)
179                                 l_ptr->stats.msg_length_profile[0]++;
180                         else if (length <= 256)
181                                 l_ptr->stats.msg_length_profile[1]++;
182                         else if (length <= 1024)
183                                 l_ptr->stats.msg_length_profile[2]++;
184                         else if (length <= 4096)
185                                 l_ptr->stats.msg_length_profile[3]++;
186                         else if (length <= 16384)
187                                 l_ptr->stats.msg_length_profile[4]++;
188                         else if (length <= 32768)
189                                 l_ptr->stats.msg_length_profile[5]++;
190                         else
191                                 l_ptr->stats.msg_length_profile[6]++;
192                 }
193         }
194
195         /* do all other link processing performed on a periodic basis */
196
197         link_state_event(l_ptr, TIMEOUT_EVT);
198
199         if (l_ptr->next_out)
200                 tipc_link_push_queue(l_ptr);
201
202         tipc_node_unlock(l_ptr->owner);
203 }
204
205 static void link_set_timer(struct tipc_link *l_ptr, u32 time)
206 {
207         k_start_timer(&l_ptr->timer, time);
208 }
209
210 /**
211  * tipc_link_create - create a new link
212  * @n_ptr: pointer to associated node
213  * @b_ptr: pointer to associated bearer
214  * @media_addr: media address to use when sending messages over link
215  *
216  * Returns pointer to link.
217  */
218 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
219                                    struct tipc_bearer *b_ptr,
220                                    const struct tipc_media_addr *media_addr)
221 {
222         struct tipc_link *l_ptr;
223         struct tipc_msg *msg;
224         char *if_name;
225         char addr_string[16];
226         u32 peer = n_ptr->addr;
227
228         if (n_ptr->link_cnt >= 2) {
229                 tipc_addr_string_fill(addr_string, n_ptr->addr);
230                 pr_err("Attempt to establish third link to %s\n", addr_string);
231                 return NULL;
232         }
233
234         if (n_ptr->links[b_ptr->identity]) {
235                 tipc_addr_string_fill(addr_string, n_ptr->addr);
236                 pr_err("Attempt to establish second link on <%s> to %s\n",
237                        b_ptr->name, addr_string);
238                 return NULL;
239         }
240
241         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
242         if (!l_ptr) {
243                 pr_warn("Link creation failed, no memory\n");
244                 return NULL;
245         }
246
247         l_ptr->addr = peer;
248         if_name = strchr(b_ptr->name, ':') + 1;
249         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
250                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
251                 tipc_node(tipc_own_addr),
252                 if_name,
253                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
254                 /* note: peer i/f name is updated by reset/activate message */
255         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
256         l_ptr->owner = n_ptr;
257         l_ptr->checkpoint = 1;
258         l_ptr->peer_session = INVALID_SESSION;
259         l_ptr->bearer_id = b_ptr->identity;
260         link_set_supervision_props(l_ptr, b_ptr->tolerance);
261         l_ptr->state = RESET_UNKNOWN;
262
263         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
264         msg = l_ptr->pmsg;
265         tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
266         msg_set_size(msg, sizeof(l_ptr->proto_msg));
267         msg_set_session(msg, (tipc_random & 0xffff));
268         msg_set_bearer_id(msg, b_ptr->identity);
269         strcpy((char *)msg_data(msg), if_name);
270
271         l_ptr->priority = b_ptr->priority;
272         tipc_link_set_queue_limits(l_ptr, b_ptr->window);
273
274         l_ptr->net_plane = b_ptr->net_plane;
275         link_init_max_pkt(l_ptr);
276
277         l_ptr->next_out_no = 1;
278         INIT_LIST_HEAD(&l_ptr->waiting_ports);
279
280         link_reset_statistics(l_ptr);
281
282         tipc_node_attach_link(n_ptr, l_ptr);
283
284         k_init_timer(&l_ptr->timer, (Handler)link_timeout,
285                      (unsigned long)l_ptr);
286
287         link_state_event(l_ptr, STARTING_EVT);
288
289         return l_ptr;
290 }
291
292 void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
293 {
294         struct tipc_link *l_ptr;
295         struct tipc_node *n_ptr;
296
297         rcu_read_lock();
298         list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
299                 tipc_node_lock(n_ptr);
300                 l_ptr = n_ptr->links[bearer_id];
301                 if (l_ptr) {
302                         tipc_link_reset(l_ptr);
303                         if (shutting_down || !tipc_node_is_up(n_ptr)) {
304                                 tipc_node_detach_link(l_ptr->owner, l_ptr);
305                                 tipc_link_reset_fragments(l_ptr);
306                                 tipc_node_unlock(n_ptr);
307
308                                 /* Nobody else can access this link now: */
309                                 del_timer_sync(&l_ptr->timer);
310                                 kfree(l_ptr);
311                         } else {
312                                 /* Detach/delete when failover is finished: */
313                                 l_ptr->flags |= LINK_STOPPED;
314                                 tipc_node_unlock(n_ptr);
315                                 del_timer_sync(&l_ptr->timer);
316                         }
317                         continue;
318                 }
319                 tipc_node_unlock(n_ptr);
320         }
321         rcu_read_unlock();
322 }
323
324 /**
325  * link_schedule_port - schedule port for deferred sending
326  * @l_ptr: pointer to link
327  * @origport: reference to sending port
328  * @sz: amount of data to be sent
329  *
330  * Schedules port for renewed sending of messages after link congestion
331  * has abated.
332  */
333 static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
334 {
335         struct tipc_port *p_ptr;
336         struct tipc_sock *tsk;
337
338         spin_lock_bh(&tipc_port_list_lock);
339         p_ptr = tipc_port_lock(origport);
340         if (p_ptr) {
341                 if (!list_empty(&p_ptr->wait_list))
342                         goto exit;
343                 tsk = tipc_port_to_sock(p_ptr);
344                 tsk->link_cong = 1;
345                 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
346                 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
347                 l_ptr->stats.link_congs++;
348 exit:
349                 tipc_port_unlock(p_ptr);
350         }
351         spin_unlock_bh(&tipc_port_list_lock);
352         return -ELINKCONG;
353 }
354
355 void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
356 {
357         struct tipc_port *p_ptr;
358         struct tipc_sock *tsk;
359         struct tipc_port *temp_p_ptr;
360         int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
361
362         if (all)
363                 win = 100000;
364         if (win <= 0)
365                 return;
366         if (!spin_trylock_bh(&tipc_port_list_lock))
367                 return;
368         if (link_congested(l_ptr))
369                 goto exit;
370         list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
371                                  wait_list) {
372                 if (win <= 0)
373                         break;
374                 tsk = tipc_port_to_sock(p_ptr);
375                 list_del_init(&p_ptr->wait_list);
376                 spin_lock_bh(p_ptr->lock);
377                 tsk->link_cong = 0;
378                 tipc_sock_wakeup(tsk);
379                 win -= p_ptr->waiting_pkts;
380                 spin_unlock_bh(p_ptr->lock);
381         }
382
383 exit:
384         spin_unlock_bh(&tipc_port_list_lock);
385 }
386
387 /**
388  * link_release_outqueue - purge link's outbound message queue
389  * @l_ptr: pointer to link
390  */
391 static void link_release_outqueue(struct tipc_link *l_ptr)
392 {
393         kfree_skb_list(l_ptr->first_out);
394         l_ptr->first_out = NULL;
395         l_ptr->out_queue_size = 0;
396 }
397
398 /**
399  * tipc_link_reset_fragments - purge link's inbound message fragments queue
400  * @l_ptr: pointer to link
401  */
402 void tipc_link_reset_fragments(struct tipc_link *l_ptr)
403 {
404         kfree_skb(l_ptr->reasm_buf);
405         l_ptr->reasm_buf = NULL;
406 }
407
408 /**
409  * tipc_link_purge_queues - purge all pkt queues associated with link
410  * @l_ptr: pointer to link
411  */
412 void tipc_link_purge_queues(struct tipc_link *l_ptr)
413 {
414         kfree_skb_list(l_ptr->oldest_deferred_in);
415         kfree_skb_list(l_ptr->first_out);
416         tipc_link_reset_fragments(l_ptr);
417         kfree_skb(l_ptr->proto_msg_queue);
418         l_ptr->proto_msg_queue = NULL;
419 }
420
421 void tipc_link_reset(struct tipc_link *l_ptr)
422 {
423         u32 prev_state = l_ptr->state;
424         u32 checkpoint = l_ptr->next_in_no;
425         int was_active_link = tipc_link_is_active(l_ptr);
426
427         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
428
429         /* Link is down, accept any session */
430         l_ptr->peer_session = INVALID_SESSION;
431
432         /* Prepare for max packet size negotiation */
433         link_init_max_pkt(l_ptr);
434
435         l_ptr->state = RESET_UNKNOWN;
436
437         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
438                 return;
439
440         tipc_node_link_down(l_ptr->owner, l_ptr);
441         tipc_bearer_remove_dest(l_ptr->bearer_id, l_ptr->addr);
442
443         if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
444                 l_ptr->reset_checkpoint = checkpoint;
445                 l_ptr->exp_msg_count = START_CHANGEOVER;
446         }
447
448         /* Clean up all queues: */
449         link_release_outqueue(l_ptr);
450         kfree_skb(l_ptr->proto_msg_queue);
451         l_ptr->proto_msg_queue = NULL;
452         kfree_skb_list(l_ptr->oldest_deferred_in);
453         if (!list_empty(&l_ptr->waiting_ports))
454                 tipc_link_wakeup_ports(l_ptr, 1);
455
456         l_ptr->retransm_queue_head = 0;
457         l_ptr->retransm_queue_size = 0;
458         l_ptr->last_out = NULL;
459         l_ptr->first_out = NULL;
460         l_ptr->next_out = NULL;
461         l_ptr->unacked_window = 0;
462         l_ptr->checkpoint = 1;
463         l_ptr->next_out_no = 1;
464         l_ptr->deferred_inqueue_sz = 0;
465         l_ptr->oldest_deferred_in = NULL;
466         l_ptr->newest_deferred_in = NULL;
467         l_ptr->fsm_msg_cnt = 0;
468         l_ptr->stale_count = 0;
469         link_reset_statistics(l_ptr);
470 }
471
472 void tipc_link_reset_list(unsigned int bearer_id)
473 {
474         struct tipc_link *l_ptr;
475         struct tipc_node *n_ptr;
476
477         rcu_read_lock();
478         list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
479                 tipc_node_lock(n_ptr);
480                 l_ptr = n_ptr->links[bearer_id];
481                 if (l_ptr)
482                         tipc_link_reset(l_ptr);
483                 tipc_node_unlock(n_ptr);
484         }
485         rcu_read_unlock();
486 }
487
488 static void link_activate(struct tipc_link *l_ptr)
489 {
490         l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
491         tipc_node_link_up(l_ptr->owner, l_ptr);
492         tipc_bearer_add_dest(l_ptr->bearer_id, l_ptr->addr);
493 }
494
495 /**
496  * link_state_event - link finite state machine
497  * @l_ptr: pointer to link
498  * @event: state machine event to process
499  */
500 static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
501 {
502         struct tipc_link *other;
503         u32 cont_intv = l_ptr->continuity_interval;
504
505         if (l_ptr->flags & LINK_STOPPED)
506                 return;
507
508         if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
509                 return;         /* Not yet. */
510
511         /* Check whether changeover is going on */
512         if (l_ptr->exp_msg_count) {
513                 if (event == TIMEOUT_EVT)
514                         link_set_timer(l_ptr, cont_intv);
515                 return;
516         }
517
518         switch (l_ptr->state) {
519         case WORKING_WORKING:
520                 switch (event) {
521                 case TRAFFIC_MSG_EVT:
522                 case ACTIVATE_MSG:
523                         break;
524                 case TIMEOUT_EVT:
525                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
526                                 l_ptr->checkpoint = l_ptr->next_in_no;
527                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
528                                         tipc_link_proto_xmit(l_ptr, STATE_MSG,
529                                                              0, 0, 0, 0, 0);
530                                         l_ptr->fsm_msg_cnt++;
531                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
532                                         tipc_link_proto_xmit(l_ptr, STATE_MSG,
533                                                              1, 0, 0, 0, 0);
534                                         l_ptr->fsm_msg_cnt++;
535                                 }
536                                 link_set_timer(l_ptr, cont_intv);
537                                 break;
538                         }
539                         l_ptr->state = WORKING_UNKNOWN;
540                         l_ptr->fsm_msg_cnt = 0;
541                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
542                         l_ptr->fsm_msg_cnt++;
543                         link_set_timer(l_ptr, cont_intv / 4);
544                         break;
545                 case RESET_MSG:
546                         pr_info("%s<%s>, requested by peer\n", link_rst_msg,
547                                 l_ptr->name);
548                         tipc_link_reset(l_ptr);
549                         l_ptr->state = RESET_RESET;
550                         l_ptr->fsm_msg_cnt = 0;
551                         tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
552                                              0, 0, 0, 0, 0);
553                         l_ptr->fsm_msg_cnt++;
554                         link_set_timer(l_ptr, cont_intv);
555                         break;
556                 default:
557                         pr_err("%s%u in WW state\n", link_unk_evt, event);
558                 }
559                 break;
560         case WORKING_UNKNOWN:
561                 switch (event) {
562                 case TRAFFIC_MSG_EVT:
563                 case ACTIVATE_MSG:
564                         l_ptr->state = WORKING_WORKING;
565                         l_ptr->fsm_msg_cnt = 0;
566                         link_set_timer(l_ptr, cont_intv);
567                         break;
568                 case RESET_MSG:
569                         pr_info("%s<%s>, requested by peer while probing\n",
570                                 link_rst_msg, l_ptr->name);
571                         tipc_link_reset(l_ptr);
572                         l_ptr->state = RESET_RESET;
573                         l_ptr->fsm_msg_cnt = 0;
574                         tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
575                                              0, 0, 0, 0, 0);
576                         l_ptr->fsm_msg_cnt++;
577                         link_set_timer(l_ptr, cont_intv);
578                         break;
579                 case TIMEOUT_EVT:
580                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
581                                 l_ptr->state = WORKING_WORKING;
582                                 l_ptr->fsm_msg_cnt = 0;
583                                 l_ptr->checkpoint = l_ptr->next_in_no;
584                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
585                                         tipc_link_proto_xmit(l_ptr, STATE_MSG,
586                                                              0, 0, 0, 0, 0);
587                                         l_ptr->fsm_msg_cnt++;
588                                 }
589                                 link_set_timer(l_ptr, cont_intv);
590                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
591                                 tipc_link_proto_xmit(l_ptr, STATE_MSG,
592                                                      1, 0, 0, 0, 0);
593                                 l_ptr->fsm_msg_cnt++;
594                                 link_set_timer(l_ptr, cont_intv / 4);
595                         } else {        /* Link has failed */
596                                 pr_warn("%s<%s>, peer not responding\n",
597                                         link_rst_msg, l_ptr->name);
598                                 tipc_link_reset(l_ptr);
599                                 l_ptr->state = RESET_UNKNOWN;
600                                 l_ptr->fsm_msg_cnt = 0;
601                                 tipc_link_proto_xmit(l_ptr, RESET_MSG,
602                                                      0, 0, 0, 0, 0);
603                                 l_ptr->fsm_msg_cnt++;
604                                 link_set_timer(l_ptr, cont_intv);
605                         }
606                         break;
607                 default:
608                         pr_err("%s%u in WU state\n", link_unk_evt, event);
609                 }
610                 break;
611         case RESET_UNKNOWN:
612                 switch (event) {
613                 case TRAFFIC_MSG_EVT:
614                         break;
615                 case ACTIVATE_MSG:
616                         other = l_ptr->owner->active_links[0];
617                         if (other && link_working_unknown(other))
618                                 break;
619                         l_ptr->state = WORKING_WORKING;
620                         l_ptr->fsm_msg_cnt = 0;
621                         link_activate(l_ptr);
622                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
623                         l_ptr->fsm_msg_cnt++;
624                         if (l_ptr->owner->working_links == 1)
625                                 tipc_link_sync_xmit(l_ptr);
626                         link_set_timer(l_ptr, cont_intv);
627                         break;
628                 case RESET_MSG:
629                         l_ptr->state = RESET_RESET;
630                         l_ptr->fsm_msg_cnt = 0;
631                         tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
632                                              1, 0, 0, 0, 0);
633                         l_ptr->fsm_msg_cnt++;
634                         link_set_timer(l_ptr, cont_intv);
635                         break;
636                 case STARTING_EVT:
637                         l_ptr->flags |= LINK_STARTED;
638                         /* fall through */
639                 case TIMEOUT_EVT:
640                         tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
641                         l_ptr->fsm_msg_cnt++;
642                         link_set_timer(l_ptr, cont_intv);
643                         break;
644                 default:
645                         pr_err("%s%u in RU state\n", link_unk_evt, event);
646                 }
647                 break;
648         case RESET_RESET:
649                 switch (event) {
650                 case TRAFFIC_MSG_EVT:
651                 case ACTIVATE_MSG:
652                         other = l_ptr->owner->active_links[0];
653                         if (other && link_working_unknown(other))
654                                 break;
655                         l_ptr->state = WORKING_WORKING;
656                         l_ptr->fsm_msg_cnt = 0;
657                         link_activate(l_ptr);
658                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
659                         l_ptr->fsm_msg_cnt++;
660                         if (l_ptr->owner->working_links == 1)
661                                 tipc_link_sync_xmit(l_ptr);
662                         link_set_timer(l_ptr, cont_intv);
663                         break;
664                 case RESET_MSG:
665                         break;
666                 case TIMEOUT_EVT:
667                         tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
668                                              0, 0, 0, 0, 0);
669                         l_ptr->fsm_msg_cnt++;
670                         link_set_timer(l_ptr, cont_intv);
671                         break;
672                 default:
673                         pr_err("%s%u in RR state\n", link_unk_evt, event);
674                 }
675                 break;
676         default:
677                 pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
678         }
679 }
680
681 /* tipc_link_cong: determine return value and how to treat the
682  * sent buffer during link congestion.
683  * - For plain, errorless user data messages we keep the buffer and
684  *   return -ELINKONG.
685  * - For all other messages we discard the buffer and return -EHOSTUNREACH
686  * - For TIPC internal messages we also reset the link
687  */
688 static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
689 {
690         struct tipc_msg *msg = buf_msg(buf);
691         uint psz = msg_size(msg);
692         uint imp = tipc_msg_tot_importance(msg);
693         u32 oport = msg_tot_origport(msg);
694
695         if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
696                 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
697                         link_schedule_port(link, oport, psz);
698                         return -ELINKCONG;
699                 }
700         } else {
701                 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
702                 tipc_link_reset(link);
703         }
704         kfree_skb_list(buf);
705         return -EHOSTUNREACH;
706 }
707
708 /**
709  * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
710  * @link: link to use
711  * @buf: chain of buffers containing message
712  * Consumes the buffer chain, except when returning -ELINKCONG
713  * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
714  * user data messages) or -EHOSTUNREACH (all other messages/senders)
715  * Only the socket functions tipc_send_stream() and tipc_send_packet() need
716  * to act on the return value, since they may need to do more send attempts.
717  */
718 int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
719 {
720         struct tipc_msg *msg = buf_msg(buf);
721         uint psz = msg_size(msg);
722         uint qsz = link->out_queue_size;
723         uint sndlim = link->queue_limit[0];
724         uint imp = tipc_msg_tot_importance(msg);
725         uint mtu = link->max_pkt;
726         uint ack = mod(link->next_in_no - 1);
727         uint seqno = link->next_out_no;
728         uint bc_last_in = link->owner->bclink.last_in;
729         struct tipc_media_addr *addr = &link->media_addr;
730         struct sk_buff *next = buf->next;
731
732         /* Match queue limits against msg importance: */
733         if (unlikely(qsz >= link->queue_limit[imp]))
734                 return tipc_link_cong(link, buf);
735
736         /* Has valid packet limit been used ? */
737         if (unlikely(psz > mtu)) {
738                 kfree_skb_list(buf);
739                 return -EMSGSIZE;
740         }
741
742         /* Prepare each packet for sending, and add to outqueue: */
743         while (buf) {
744                 next = buf->next;
745                 msg = buf_msg(buf);
746                 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
747                 msg_set_bcast_ack(msg, bc_last_in);
748
749                 if (!link->first_out) {
750                         link->first_out = buf;
751                 } else if (qsz < sndlim) {
752                         link->last_out->next = buf;
753                 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
754                         link->stats.sent_bundled++;
755                         buf = next;
756                         next = buf->next;
757                         continue;
758                 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
759                         link->stats.sent_bundled++;
760                         link->stats.sent_bundles++;
761                         link->last_out->next = buf;
762                         if (!link->next_out)
763                                 link->next_out = buf;
764                 } else {
765                         link->last_out->next = buf;
766                         if (!link->next_out)
767                                 link->next_out = buf;
768                 }
769
770                 /* Send packet if possible: */
771                 if (likely(++qsz <= sndlim)) {
772                         tipc_bearer_send(link->bearer_id, buf, addr);
773                         link->next_out = next;
774                         link->unacked_window = 0;
775                 }
776                 seqno++;
777                 link->last_out = buf;
778                 buf = next;
779         }
780         link->next_out_no = seqno;
781         link->out_queue_size = qsz;
782         return 0;
783 }
784
785 /**
786  * tipc_link_xmit() is the general link level function for message sending
787  * @buf: chain of buffers containing message
788  * @dsz: amount of user data to be sent
789  * @dnode: address of destination node
790  * @selector: a number used for deterministic link selection
791  * Consumes the buffer chain, except when returning -ELINKCONG
792  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
793  */
794 int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
795 {
796         struct tipc_link *link = NULL;
797         struct tipc_node *node;
798         int rc = -EHOSTUNREACH;
799
800         node = tipc_node_find(dnode);
801         if (node) {
802                 tipc_node_lock(node);
803                 link = node->active_links[selector & 1];
804                 if (link)
805                         rc = __tipc_link_xmit(link, buf);
806                 tipc_node_unlock(node);
807         }
808
809         if (link)
810                 return rc;
811
812         if (likely(in_own_node(dnode)))
813                 return tipc_sk_rcv(buf);
814
815         kfree_skb_list(buf);
816         return rc;
817 }
818
819 /*
820  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
821  *
822  * Give a newly added peer node the sequence number where it should
823  * start receiving and acking broadcast packets.
824  *
825  * Called with node locked
826  */
827 static void tipc_link_sync_xmit(struct tipc_link *link)
828 {
829         struct sk_buff *buf;
830         struct tipc_msg *msg;
831
832         buf = tipc_buf_acquire(INT_H_SIZE);
833         if (!buf)
834                 return;
835
836         msg = buf_msg(buf);
837         tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
838         msg_set_last_bcast(msg, link->owner->bclink.acked);
839         __tipc_link_xmit(link, buf);
840 }
841
842 /*
843  * tipc_link_sync_rcv - synchronize broadcast link endpoints.
844  * Receive the sequence number where we should start receiving and
845  * acking broadcast packets from a newly added peer node, and open
846  * up for reception of such packets.
847  *
848  * Called with node locked
849  */
850 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
851 {
852         struct tipc_msg *msg = buf_msg(buf);
853
854         n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
855         n->bclink.recv_permitted = true;
856         kfree_skb(buf);
857 }
858
859 /*
860  * tipc_link_push_packet: Push one unsent packet to the media
861  */
862 static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
863 {
864         struct sk_buff *buf = l_ptr->first_out;
865         u32 r_q_size = l_ptr->retransm_queue_size;
866         u32 r_q_head = l_ptr->retransm_queue_head;
867
868         /* Step to position where retransmission failed, if any,    */
869         /* consider that buffers may have been released in meantime */
870         if (r_q_size && buf) {
871                 u32 last = lesser(mod(r_q_head + r_q_size),
872                                   link_last_sent(l_ptr));
873                 u32 first = buf_seqno(buf);
874
875                 while (buf && less(first, r_q_head)) {
876                         first = mod(first + 1);
877                         buf = buf->next;
878                 }
879                 l_ptr->retransm_queue_head = r_q_head = first;
880                 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
881         }
882
883         /* Continue retransmission now, if there is anything: */
884         if (r_q_size && buf) {
885                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
886                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
887                 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
888                 l_ptr->retransm_queue_head = mod(++r_q_head);
889                 l_ptr->retransm_queue_size = --r_q_size;
890                 l_ptr->stats.retransmitted++;
891                 return 0;
892         }
893
894         /* Send deferred protocol message, if any: */
895         buf = l_ptr->proto_msg_queue;
896         if (buf) {
897                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
898                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
899                 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
900                 l_ptr->unacked_window = 0;
901                 kfree_skb(buf);
902                 l_ptr->proto_msg_queue = NULL;
903                 return 0;
904         }
905
906         /* Send one deferred data message, if send window not full: */
907         buf = l_ptr->next_out;
908         if (buf) {
909                 struct tipc_msg *msg = buf_msg(buf);
910                 u32 next = msg_seqno(msg);
911                 u32 first = buf_seqno(l_ptr->first_out);
912
913                 if (mod(next - first) < l_ptr->queue_limit[0]) {
914                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
915                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
916                         tipc_bearer_send(l_ptr->bearer_id, buf,
917                                          &l_ptr->media_addr);
918                         if (msg_user(msg) == MSG_BUNDLER)
919                                 msg_set_type(msg, BUNDLE_CLOSED);
920                         l_ptr->next_out = buf->next;
921                         return 0;
922                 }
923         }
924         return 1;
925 }
926
927 /*
928  * push_queue(): push out the unsent messages of a link where
929  *               congestion has abated. Node is locked
930  */
931 void tipc_link_push_queue(struct tipc_link *l_ptr)
932 {
933         u32 res;
934
935         do {
936                 res = tipc_link_push_packet(l_ptr);
937         } while (!res);
938 }
939
940 void tipc_link_reset_all(struct tipc_node *node)
941 {
942         char addr_string[16];
943         u32 i;
944
945         tipc_node_lock(node);
946
947         pr_warn("Resetting all links to %s\n",
948                 tipc_addr_string_fill(addr_string, node->addr));
949
950         for (i = 0; i < MAX_BEARERS; i++) {
951                 if (node->links[i]) {
952                         link_print(node->links[i], "Resetting link\n");
953                         tipc_link_reset(node->links[i]);
954                 }
955         }
956
957         tipc_node_unlock(node);
958 }
959
960 static void link_retransmit_failure(struct tipc_link *l_ptr,
961                                     struct sk_buff *buf)
962 {
963         struct tipc_msg *msg = buf_msg(buf);
964
965         pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
966
967         if (l_ptr->addr) {
968                 /* Handle failure on standard link */
969                 link_print(l_ptr, "Resetting link\n");
970                 tipc_link_reset(l_ptr);
971
972         } else {
973                 /* Handle failure on broadcast link */
974                 struct tipc_node *n_ptr;
975                 char addr_string[16];
976
977                 pr_info("Msg seq number: %u,  ", msg_seqno(msg));
978                 pr_cont("Outstanding acks: %lu\n",
979                         (unsigned long) TIPC_SKB_CB(buf)->handle);
980
981                 n_ptr = tipc_bclink_retransmit_to();
982                 tipc_node_lock(n_ptr);
983
984                 tipc_addr_string_fill(addr_string, n_ptr->addr);
985                 pr_info("Broadcast link info for %s\n", addr_string);
986                 pr_info("Reception permitted: %d,  Acked: %u\n",
987                         n_ptr->bclink.recv_permitted,
988                         n_ptr->bclink.acked);
989                 pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
990                         n_ptr->bclink.last_in,
991                         n_ptr->bclink.oos_state,
992                         n_ptr->bclink.last_sent);
993
994                 tipc_node_unlock(n_ptr);
995
996                 tipc_bclink_set_flags(TIPC_BCLINK_RESET);
997                 l_ptr->stale_count = 0;
998         }
999 }
1000
1001 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1002                           u32 retransmits)
1003 {
1004         struct tipc_msg *msg;
1005
1006         if (!buf)
1007                 return;
1008
1009         msg = buf_msg(buf);
1010
1011         /* Detect repeated retransmit failures */
1012         if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1013                 if (++l_ptr->stale_count > 100) {
1014                         link_retransmit_failure(l_ptr, buf);
1015                         return;
1016                 }
1017         } else {
1018                 l_ptr->last_retransmitted = msg_seqno(msg);
1019                 l_ptr->stale_count = 1;
1020         }
1021
1022         while (retransmits && (buf != l_ptr->next_out) && buf) {
1023                 msg = buf_msg(buf);
1024                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1025                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1026                 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
1027                 buf = buf->next;
1028                 retransmits--;
1029                 l_ptr->stats.retransmitted++;
1030         }
1031
1032         l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1033 }
1034
1035 /**
1036  * link_insert_deferred_queue - insert deferred messages back into receive chain
1037  */
1038 static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1039                                                   struct sk_buff *buf)
1040 {
1041         u32 seq_no;
1042
1043         if (l_ptr->oldest_deferred_in == NULL)
1044                 return buf;
1045
1046         seq_no = buf_seqno(l_ptr->oldest_deferred_in);
1047         if (seq_no == mod(l_ptr->next_in_no)) {
1048                 l_ptr->newest_deferred_in->next = buf;
1049                 buf = l_ptr->oldest_deferred_in;
1050                 l_ptr->oldest_deferred_in = NULL;
1051                 l_ptr->deferred_inqueue_sz = 0;
1052         }
1053         return buf;
1054 }
1055
1056 /**
1057  * link_recv_buf_validate - validate basic format of received message
1058  *
1059  * This routine ensures a TIPC message has an acceptable header, and at least
1060  * as much data as the header indicates it should.  The routine also ensures
1061  * that the entire message header is stored in the main fragment of the message
1062  * buffer, to simplify future access to message header fields.
1063  *
1064  * Note: Having extra info present in the message header or data areas is OK.
1065  * TIPC will ignore the excess, under the assumption that it is optional info
1066  * introduced by a later release of the protocol.
1067  */
1068 static int link_recv_buf_validate(struct sk_buff *buf)
1069 {
1070         static u32 min_data_hdr_size[8] = {
1071                 SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1072                 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1073                 };
1074
1075         struct tipc_msg *msg;
1076         u32 tipc_hdr[2];
1077         u32 size;
1078         u32 hdr_size;
1079         u32 min_hdr_size;
1080
1081         /* If this packet comes from the defer queue, the skb has already
1082          * been validated
1083          */
1084         if (unlikely(TIPC_SKB_CB(buf)->deferred))
1085                 return 1;
1086
1087         if (unlikely(buf->len < MIN_H_SIZE))
1088                 return 0;
1089
1090         msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1091         if (msg == NULL)
1092                 return 0;
1093
1094         if (unlikely(msg_version(msg) != TIPC_VERSION))
1095                 return 0;
1096
1097         size = msg_size(msg);
1098         hdr_size = msg_hdr_sz(msg);
1099         min_hdr_size = msg_isdata(msg) ?
1100                 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1101
1102         if (unlikely((hdr_size < min_hdr_size) ||
1103                      (size < hdr_size) ||
1104                      (buf->len < size) ||
1105                      (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1106                 return 0;
1107
1108         return pskb_may_pull(buf, hdr_size);
1109 }
1110
1111 /**
1112  * tipc_rcv - process TIPC packets/messages arriving from off-node
1113  * @head: pointer to message buffer chain
1114  * @b_ptr: pointer to bearer message arrived on
1115  *
1116  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1117  * structure (i.e. cannot be NULL), but bearer can be inactive.
1118  */
1119 void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1120 {
1121         while (head) {
1122                 struct tipc_node *n_ptr;
1123                 struct tipc_link *l_ptr;
1124                 struct sk_buff *crs;
1125                 struct sk_buff *buf = head;
1126                 struct tipc_msg *msg;
1127                 u32 seq_no;
1128                 u32 ackd;
1129                 u32 released = 0;
1130
1131                 head = head->next;
1132                 buf->next = NULL;
1133
1134                 /* Ensure message is well-formed */
1135                 if (unlikely(!link_recv_buf_validate(buf)))
1136                         goto discard;
1137
1138                 /* Ensure message data is a single contiguous unit */
1139                 if (unlikely(skb_linearize(buf)))
1140                         goto discard;
1141
1142                 /* Handle arrival of a non-unicast link message */
1143                 msg = buf_msg(buf);
1144
1145                 if (unlikely(msg_non_seq(msg))) {
1146                         if (msg_user(msg) ==  LINK_CONFIG)
1147                                 tipc_disc_rcv(buf, b_ptr);
1148                         else
1149                                 tipc_bclink_rcv(buf);
1150                         continue;
1151                 }
1152
1153                 /* Discard unicast link messages destined for another node */
1154                 if (unlikely(!msg_short(msg) &&
1155                              (msg_destnode(msg) != tipc_own_addr)))
1156                         goto discard;
1157
1158                 /* Locate neighboring node that sent message */
1159                 n_ptr = tipc_node_find(msg_prevnode(msg));
1160                 if (unlikely(!n_ptr))
1161                         goto discard;
1162                 tipc_node_lock(n_ptr);
1163
1164                 /* Locate unicast link endpoint that should handle message */
1165                 l_ptr = n_ptr->links[b_ptr->identity];
1166                 if (unlikely(!l_ptr))
1167                         goto unlock_discard;
1168
1169                 /* Verify that communication with node is currently allowed */
1170                 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
1171                     msg_user(msg) == LINK_PROTOCOL &&
1172                     (msg_type(msg) == RESET_MSG ||
1173                     msg_type(msg) == ACTIVATE_MSG) &&
1174                     !msg_redundant_link(msg))
1175                         n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1176
1177                 if (tipc_node_blocked(n_ptr))
1178                         goto unlock_discard;
1179
1180                 /* Validate message sequence number info */
1181                 seq_no = msg_seqno(msg);
1182                 ackd = msg_ack(msg);
1183
1184                 /* Release acked messages */
1185                 if (n_ptr->bclink.recv_permitted)
1186                         tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1187
1188                 crs = l_ptr->first_out;
1189                 while ((crs != l_ptr->next_out) &&
1190                        less_eq(buf_seqno(crs), ackd)) {
1191                         struct sk_buff *next = crs->next;
1192                         kfree_skb(crs);
1193                         crs = next;
1194                         released++;
1195                 }
1196                 if (released) {
1197                         l_ptr->first_out = crs;
1198                         l_ptr->out_queue_size -= released;
1199                 }
1200
1201                 /* Try sending any messages link endpoint has pending */
1202                 if (unlikely(l_ptr->next_out))
1203                         tipc_link_push_queue(l_ptr);
1204
1205                 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1206                         tipc_link_wakeup_ports(l_ptr, 0);
1207
1208                 /* Process the incoming packet */
1209                 if (unlikely(!link_working_working(l_ptr))) {
1210                         if (msg_user(msg) == LINK_PROTOCOL) {
1211                                 tipc_link_proto_rcv(l_ptr, buf);
1212                                 head = link_insert_deferred_queue(l_ptr, head);
1213                                 tipc_node_unlock(n_ptr);
1214                                 continue;
1215                         }
1216
1217                         /* Traffic message. Conditionally activate link */
1218                         link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1219
1220                         if (link_working_working(l_ptr)) {
1221                                 /* Re-insert buffer in front of queue */
1222                                 buf->next = head;
1223                                 head = buf;
1224                                 tipc_node_unlock(n_ptr);
1225                                 continue;
1226                         }
1227                         goto unlock_discard;
1228                 }
1229
1230                 /* Link is now in state WORKING_WORKING */
1231                 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1232                         link_handle_out_of_seq_msg(l_ptr, buf);
1233                         head = link_insert_deferred_queue(l_ptr, head);
1234                         tipc_node_unlock(n_ptr);
1235                         continue;
1236                 }
1237                 l_ptr->next_in_no++;
1238                 if (unlikely(l_ptr->oldest_deferred_in))
1239                         head = link_insert_deferred_queue(l_ptr, head);
1240
1241                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1242                         l_ptr->stats.sent_acks++;
1243                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1244                 }
1245
1246                 if (tipc_link_prepare_input(l_ptr, &buf)) {
1247                         tipc_node_unlock(n_ptr);
1248                         continue;
1249                 }
1250                 tipc_node_unlock(n_ptr);
1251                 msg = buf_msg(buf);
1252                 if (tipc_link_input(l_ptr, buf) != 0)
1253                         goto discard;
1254                 continue;
1255 unlock_discard:
1256                 tipc_node_unlock(n_ptr);
1257 discard:
1258                 kfree_skb(buf);
1259         }
1260 }
1261
1262 /**
1263  * tipc_link_prepare_input - process TIPC link messages
1264  *
1265  * returns nonzero if the message was consumed
1266  *
1267  * Node lock must be held
1268  */
1269 static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf)
1270 {
1271         struct tipc_node *n;
1272         struct tipc_msg *msg;
1273         int res = -EINVAL;
1274
1275         n = l->owner;
1276         msg = buf_msg(*buf);
1277         switch (msg_user(msg)) {
1278         case CHANGEOVER_PROTOCOL:
1279                 if (tipc_link_tunnel_rcv(n, buf))
1280                         res = 0;
1281                 break;
1282         case MSG_FRAGMENTER:
1283                 l->stats.recv_fragments++;
1284                 if (tipc_buf_append(&l->reasm_buf, buf)) {
1285                         l->stats.recv_fragmented++;
1286                         res = 0;
1287                 } else if (!l->reasm_buf) {
1288                         tipc_link_reset(l);
1289                 }
1290                 break;
1291         case MSG_BUNDLER:
1292                 l->stats.recv_bundles++;
1293                 l->stats.recv_bundled += msg_msgcnt(msg);
1294                 res = 0;
1295                 break;
1296         case NAME_DISTRIBUTOR:
1297                 n->bclink.recv_permitted = true;
1298                 res = 0;
1299                 break;
1300         case BCAST_PROTOCOL:
1301                 tipc_link_sync_rcv(n, *buf);
1302                 break;
1303         default:
1304                 res = 0;
1305         }
1306         return res;
1307 }
1308 /**
1309  * tipc_link_input - Deliver message too higher layers
1310  */
1311 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1312 {
1313         struct tipc_msg *msg = buf_msg(buf);
1314         int res = 0;
1315
1316         switch (msg_user(msg)) {
1317         case TIPC_LOW_IMPORTANCE:
1318         case TIPC_MEDIUM_IMPORTANCE:
1319         case TIPC_HIGH_IMPORTANCE:
1320         case TIPC_CRITICAL_IMPORTANCE:
1321         case CONN_MANAGER:
1322                 tipc_sk_rcv(buf);
1323                 break;
1324         case NAME_DISTRIBUTOR:
1325                 tipc_named_rcv(buf);
1326                 break;
1327         case MSG_BUNDLER:
1328                 tipc_link_bundle_rcv(buf);
1329                 break;
1330         default:
1331                 res = -EINVAL;
1332         }
1333         return res;
1334 }
1335
1336 /**
1337  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1338  *
1339  * Returns increase in queue length (i.e. 0 or 1)
1340  */
1341 u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
1342                         struct sk_buff *buf)
1343 {
1344         struct sk_buff *queue_buf;
1345         struct sk_buff **prev;
1346         u32 seq_no = buf_seqno(buf);
1347
1348         buf->next = NULL;
1349
1350         /* Empty queue ? */
1351         if (*head == NULL) {
1352                 *head = *tail = buf;
1353                 return 1;
1354         }
1355
1356         /* Last ? */
1357         if (less(buf_seqno(*tail), seq_no)) {
1358                 (*tail)->next = buf;
1359                 *tail = buf;
1360                 return 1;
1361         }
1362
1363         /* Locate insertion point in queue, then insert; discard if duplicate */
1364         prev = head;
1365         queue_buf = *head;
1366         for (;;) {
1367                 u32 curr_seqno = buf_seqno(queue_buf);
1368
1369                 if (seq_no == curr_seqno) {
1370                         kfree_skb(buf);
1371                         return 0;
1372                 }
1373
1374                 if (less(seq_no, curr_seqno))
1375                         break;
1376
1377                 prev = &queue_buf->next;
1378                 queue_buf = queue_buf->next;
1379         }
1380
1381         buf->next = queue_buf;
1382         *prev = buf;
1383         return 1;
1384 }
1385
1386 /*
1387  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1388  */
1389 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1390                                        struct sk_buff *buf)
1391 {
1392         u32 seq_no = buf_seqno(buf);
1393
1394         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1395                 tipc_link_proto_rcv(l_ptr, buf);
1396                 return;
1397         }
1398
1399         /* Record OOS packet arrival (force mismatch on next timeout) */
1400         l_ptr->checkpoint--;
1401
1402         /*
1403          * Discard packet if a duplicate; otherwise add it to deferred queue
1404          * and notify peer of gap as per protocol specification
1405          */
1406         if (less(seq_no, mod(l_ptr->next_in_no))) {
1407                 l_ptr->stats.duplicates++;
1408                 kfree_skb(buf);
1409                 return;
1410         }
1411
1412         if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1413                                 &l_ptr->newest_deferred_in, buf)) {
1414                 l_ptr->deferred_inqueue_sz++;
1415                 l_ptr->stats.deferred_recv++;
1416                 TIPC_SKB_CB(buf)->deferred = true;
1417                 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1418                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1419         } else
1420                 l_ptr->stats.duplicates++;
1421 }
1422
1423 /*
1424  * Send protocol message to the other endpoint.
1425  */
1426 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1427                           u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
1428 {
1429         struct sk_buff *buf = NULL;
1430         struct tipc_msg *msg = l_ptr->pmsg;
1431         u32 msg_size = sizeof(l_ptr->proto_msg);
1432         int r_flag;
1433
1434         /* Discard any previous message that was deferred due to congestion */
1435         if (l_ptr->proto_msg_queue) {
1436                 kfree_skb(l_ptr->proto_msg_queue);
1437                 l_ptr->proto_msg_queue = NULL;
1438         }
1439
1440         /* Don't send protocol message during link changeover */
1441         if (l_ptr->exp_msg_count)
1442                 return;
1443
1444         /* Abort non-RESET send if communication with node is prohibited */
1445         if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
1446                 return;
1447
1448         /* Create protocol message with "out-of-sequence" sequence number */
1449         msg_set_type(msg, msg_typ);
1450         msg_set_net_plane(msg, l_ptr->net_plane);
1451         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1452         msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1453
1454         if (msg_typ == STATE_MSG) {
1455                 u32 next_sent = mod(l_ptr->next_out_no);
1456
1457                 if (!tipc_link_is_up(l_ptr))
1458                         return;
1459                 if (l_ptr->next_out)
1460                         next_sent = buf_seqno(l_ptr->next_out);
1461                 msg_set_next_sent(msg, next_sent);
1462                 if (l_ptr->oldest_deferred_in) {
1463                         u32 rec = buf_seqno(l_ptr->oldest_deferred_in);
1464                         gap = mod(rec - mod(l_ptr->next_in_no));
1465                 }
1466                 msg_set_seq_gap(msg, gap);
1467                 if (gap)
1468                         l_ptr->stats.sent_nacks++;
1469                 msg_set_link_tolerance(msg, tolerance);
1470                 msg_set_linkprio(msg, priority);
1471                 msg_set_max_pkt(msg, ack_mtu);
1472                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1473                 msg_set_probe(msg, probe_msg != 0);
1474                 if (probe_msg) {
1475                         u32 mtu = l_ptr->max_pkt;
1476
1477                         if ((mtu < l_ptr->max_pkt_target) &&
1478                             link_working_working(l_ptr) &&
1479                             l_ptr->fsm_msg_cnt) {
1480                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1481                                 if (l_ptr->max_pkt_probes == 10) {
1482                                         l_ptr->max_pkt_target = (msg_size - 4);
1483                                         l_ptr->max_pkt_probes = 0;
1484                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1485                                 }
1486                                 l_ptr->max_pkt_probes++;
1487                         }
1488
1489                         l_ptr->stats.sent_probes++;
1490                 }
1491                 l_ptr->stats.sent_states++;
1492         } else {                /* RESET_MSG or ACTIVATE_MSG */
1493                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1494                 msg_set_seq_gap(msg, 0);
1495                 msg_set_next_sent(msg, 1);
1496                 msg_set_probe(msg, 0);
1497                 msg_set_link_tolerance(msg, l_ptr->tolerance);
1498                 msg_set_linkprio(msg, l_ptr->priority);
1499                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1500         }
1501
1502         r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1503         msg_set_redundant_link(msg, r_flag);
1504         msg_set_linkprio(msg, l_ptr->priority);
1505         msg_set_size(msg, msg_size);
1506
1507         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1508
1509         buf = tipc_buf_acquire(msg_size);
1510         if (!buf)
1511                 return;
1512
1513         skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1514         buf->priority = TC_PRIO_CONTROL;
1515
1516         tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
1517         l_ptr->unacked_window = 0;
1518         kfree_skb(buf);
1519 }
1520
1521 /*
1522  * Receive protocol message :
1523  * Note that network plane id propagates through the network, and may
1524  * change at any time. The node with lowest address rules
1525  */
1526 static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1527 {
1528         u32 rec_gap = 0;
1529         u32 max_pkt_info;
1530         u32 max_pkt_ack;
1531         u32 msg_tol;
1532         struct tipc_msg *msg = buf_msg(buf);
1533
1534         /* Discard protocol message during link changeover */
1535         if (l_ptr->exp_msg_count)
1536                 goto exit;
1537
1538         if (l_ptr->net_plane != msg_net_plane(msg))
1539                 if (tipc_own_addr > msg_prevnode(msg))
1540                         l_ptr->net_plane = msg_net_plane(msg);
1541
1542         switch (msg_type(msg)) {
1543
1544         case RESET_MSG:
1545                 if (!link_working_unknown(l_ptr) &&
1546                     (l_ptr->peer_session != INVALID_SESSION)) {
1547                         if (less_eq(msg_session(msg), l_ptr->peer_session))
1548                                 break; /* duplicate or old reset: ignore */
1549                 }
1550
1551                 if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
1552                                 link_working_unknown(l_ptr))) {
1553                         /*
1554                          * peer has lost contact -- don't allow peer's links
1555                          * to reactivate before we recognize loss & clean up
1556                          */
1557                         l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
1558                 }
1559
1560                 link_state_event(l_ptr, RESET_MSG);
1561
1562                 /* fall thru' */
1563         case ACTIVATE_MSG:
1564                 /* Update link settings according other endpoint's values */
1565                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1566
1567                 msg_tol = msg_link_tolerance(msg);
1568                 if (msg_tol > l_ptr->tolerance)
1569                         link_set_supervision_props(l_ptr, msg_tol);
1570
1571                 if (msg_linkprio(msg) > l_ptr->priority)
1572                         l_ptr->priority = msg_linkprio(msg);
1573
1574                 max_pkt_info = msg_max_pkt(msg);
1575                 if (max_pkt_info) {
1576                         if (max_pkt_info < l_ptr->max_pkt_target)
1577                                 l_ptr->max_pkt_target = max_pkt_info;
1578                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
1579                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
1580                 } else {
1581                         l_ptr->max_pkt = l_ptr->max_pkt_target;
1582                 }
1583
1584                 /* Synchronize broadcast link info, if not done previously */
1585                 if (!tipc_node_is_up(l_ptr->owner)) {
1586                         l_ptr->owner->bclink.last_sent =
1587                                 l_ptr->owner->bclink.last_in =
1588                                 msg_last_bcast(msg);
1589                         l_ptr->owner->bclink.oos_state = 0;
1590                 }
1591
1592                 l_ptr->peer_session = msg_session(msg);
1593                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
1594
1595                 if (msg_type(msg) == ACTIVATE_MSG)
1596                         link_state_event(l_ptr, ACTIVATE_MSG);
1597                 break;
1598         case STATE_MSG:
1599
1600                 msg_tol = msg_link_tolerance(msg);
1601                 if (msg_tol)
1602                         link_set_supervision_props(l_ptr, msg_tol);
1603
1604                 if (msg_linkprio(msg) &&
1605                     (msg_linkprio(msg) != l_ptr->priority)) {
1606                         pr_warn("%s<%s>, priority change %u->%u\n",
1607                                 link_rst_msg, l_ptr->name, l_ptr->priority,
1608                                 msg_linkprio(msg));
1609                         l_ptr->priority = msg_linkprio(msg);
1610                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
1611                         break;
1612                 }
1613
1614                 /* Record reception; force mismatch at next timeout: */
1615                 l_ptr->checkpoint--;
1616
1617                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1618                 l_ptr->stats.recv_states++;
1619                 if (link_reset_unknown(l_ptr))
1620                         break;
1621
1622                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
1623                         rec_gap = mod(msg_next_sent(msg) -
1624                                       mod(l_ptr->next_in_no));
1625                 }
1626
1627                 max_pkt_ack = msg_max_pkt(msg);
1628                 if (max_pkt_ack > l_ptr->max_pkt) {
1629                         l_ptr->max_pkt = max_pkt_ack;
1630                         l_ptr->max_pkt_probes = 0;
1631                 }
1632
1633                 max_pkt_ack = 0;
1634                 if (msg_probe(msg)) {
1635                         l_ptr->stats.recv_probes++;
1636                         if (msg_size(msg) > sizeof(l_ptr->proto_msg))
1637                                 max_pkt_ack = msg_size(msg);
1638                 }
1639
1640                 /* Protocol message before retransmits, reduce loss risk */
1641                 if (l_ptr->owner->bclink.recv_permitted)
1642                         tipc_bclink_update_link_state(l_ptr->owner,
1643                                                       msg_last_bcast(msg));
1644
1645                 if (rec_gap || (msg_probe(msg))) {
1646                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0,
1647                                              0, max_pkt_ack);
1648                 }
1649                 if (msg_seq_gap(msg)) {
1650                         l_ptr->stats.recv_nacks++;
1651                         tipc_link_retransmit(l_ptr, l_ptr->first_out,
1652                                              msg_seq_gap(msg));
1653                 }
1654                 break;
1655         }
1656 exit:
1657         kfree_skb(buf);
1658 }
1659
1660
1661 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
1662  * a different bearer. Owner node is locked.
1663  */
1664 static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1665                                   struct tipc_msg *tunnel_hdr,
1666                                   struct tipc_msg *msg,
1667                                   u32 selector)
1668 {
1669         struct tipc_link *tunnel;
1670         struct sk_buff *buf;
1671         u32 length = msg_size(msg);
1672
1673         tunnel = l_ptr->owner->active_links[selector & 1];
1674         if (!tipc_link_is_up(tunnel)) {
1675                 pr_warn("%stunnel link no longer available\n", link_co_err);
1676                 return;
1677         }
1678         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1679         buf = tipc_buf_acquire(length + INT_H_SIZE);
1680         if (!buf) {
1681                 pr_warn("%sunable to send tunnel msg\n", link_co_err);
1682                 return;
1683         }
1684         skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
1685         skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
1686         __tipc_link_xmit(tunnel, buf);
1687 }
1688
1689
1690 /* tipc_link_failover_send_queue(): A link has gone down, but a second
1691  * link is still active. We can do failover. Tunnel the failing link's
1692  * whole send queue via the remaining link. This way, we don't lose
1693  * any packets, and sequence order is preserved for subsequent traffic
1694  * sent over the remaining link. Owner node is locked.
1695  */
1696 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1697 {
1698         u32 msgcount = l_ptr->out_queue_size;
1699         struct sk_buff *crs = l_ptr->first_out;
1700         struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1701         struct tipc_msg tunnel_hdr;
1702         int split_bundles;
1703
1704         if (!tunnel)
1705                 return;
1706
1707         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1708                  ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1709         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1710         msg_set_msgcnt(&tunnel_hdr, msgcount);
1711
1712         if (!l_ptr->first_out) {
1713                 struct sk_buff *buf;
1714
1715                 buf = tipc_buf_acquire(INT_H_SIZE);
1716                 if (buf) {
1717                         skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
1718                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
1719                         __tipc_link_xmit(tunnel, buf);
1720                 } else {
1721                         pr_warn("%sunable to send changeover msg\n",
1722                                 link_co_err);
1723                 }
1724                 return;
1725         }
1726
1727         split_bundles = (l_ptr->owner->active_links[0] !=
1728                          l_ptr->owner->active_links[1]);
1729
1730         while (crs) {
1731                 struct tipc_msg *msg = buf_msg(crs);
1732
1733                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1734                         struct tipc_msg *m = msg_get_wrapped(msg);
1735                         unchar *pos = (unchar *)m;
1736
1737                         msgcount = msg_msgcnt(msg);
1738                         while (msgcount--) {
1739                                 msg_set_seqno(m, msg_seqno(msg));
1740                                 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
1741                                                       msg_link_selector(m));
1742                                 pos += align(msg_size(m));
1743                                 m = (struct tipc_msg *)pos;
1744                         }
1745                 } else {
1746                         tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1747                                               msg_link_selector(msg));
1748                 }
1749                 crs = crs->next;
1750         }
1751 }
1752
1753 /* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
1754  * duplicate of the first link's send queue via the new link. This way, we
1755  * are guaranteed that currently queued packets from a socket are delivered
1756  * before future traffic from the same socket, even if this is using the
1757  * new link. The last arriving copy of each duplicate packet is dropped at
1758  * the receiving end by the regular protocol check, so packet cardinality
1759  * and sequence order is preserved per sender/receiver socket pair.
1760  * Owner node is locked.
1761  */
1762 void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1763                               struct tipc_link *tunnel)
1764 {
1765         struct sk_buff *iter;
1766         struct tipc_msg tunnel_hdr;
1767
1768         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1769                  DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1770         msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
1771         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1772         iter = l_ptr->first_out;
1773         while (iter) {
1774                 struct sk_buff *outbuf;
1775                 struct tipc_msg *msg = buf_msg(iter);
1776                 u32 length = msg_size(msg);
1777
1778                 if (msg_user(msg) == MSG_BUNDLER)
1779                         msg_set_type(msg, CLOSED_MSG);
1780                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
1781                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1782                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1783                 outbuf = tipc_buf_acquire(length + INT_H_SIZE);
1784                 if (outbuf == NULL) {
1785                         pr_warn("%sunable to send duplicate msg\n",
1786                                 link_co_err);
1787                         return;
1788                 }
1789                 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
1790                 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
1791                                                length);
1792                 __tipc_link_xmit(tunnel, outbuf);
1793                 if (!tipc_link_is_up(l_ptr))
1794                         return;
1795                 iter = iter->next;
1796         }
1797 }
1798
1799 /**
1800  * buf_extract - extracts embedded TIPC message from another message
1801  * @skb: encapsulating message buffer
1802  * @from_pos: offset to extract from
1803  *
1804  * Returns a new message buffer containing an embedded message.  The
1805  * encapsulating message itself is left unchanged.
1806  */
1807 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1808 {
1809         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
1810         u32 size = msg_size(msg);
1811         struct sk_buff *eb;
1812
1813         eb = tipc_buf_acquire(size);
1814         if (eb)
1815                 skb_copy_to_linear_data(eb, msg, size);
1816         return eb;
1817 }
1818
1819
1820
1821 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1822  * Owner node is locked.
1823  */
1824 static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
1825                               struct sk_buff *t_buf)
1826 {
1827         struct sk_buff *buf;
1828
1829         if (!tipc_link_is_up(l_ptr))
1830                 return;
1831
1832         buf = buf_extract(t_buf, INT_H_SIZE);
1833         if (buf == NULL) {
1834                 pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
1835                 return;
1836         }
1837
1838         /* Add buffer to deferred queue, if applicable: */
1839         link_handle_out_of_seq_msg(l_ptr, buf);
1840 }
1841
1842 /*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
1843  *  Owner node is locked.
1844  */
1845 static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
1846                                               struct sk_buff *t_buf)
1847 {
1848         struct tipc_msg *t_msg = buf_msg(t_buf);
1849         struct sk_buff *buf = NULL;
1850         struct tipc_msg *msg;
1851
1852         if (tipc_link_is_up(l_ptr))
1853                 tipc_link_reset(l_ptr);
1854
1855         /* First failover packet? */
1856         if (l_ptr->exp_msg_count == START_CHANGEOVER)
1857                 l_ptr->exp_msg_count = msg_msgcnt(t_msg);
1858
1859         /* Should there be an inner packet? */
1860         if (l_ptr->exp_msg_count) {
1861                 l_ptr->exp_msg_count--;
1862                 buf = buf_extract(t_buf, INT_H_SIZE);
1863                 if (buf == NULL) {
1864                         pr_warn("%sno inner failover pkt\n", link_co_err);
1865                         goto exit;
1866                 }
1867                 msg = buf_msg(buf);
1868
1869                 if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
1870                         kfree_skb(buf);
1871                         buf = NULL;
1872                         goto exit;
1873                 }
1874                 if (msg_user(msg) == MSG_FRAGMENTER) {
1875                         l_ptr->stats.recv_fragments++;
1876                         tipc_buf_append(&l_ptr->reasm_buf, &buf);
1877                 }
1878         }
1879 exit:
1880         if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) {
1881                 tipc_node_detach_link(l_ptr->owner, l_ptr);
1882                 kfree(l_ptr);
1883         }
1884         return buf;
1885 }
1886
1887 /*  tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent
1888  *  via other link as result of a failover (ORIGINAL_MSG) or
1889  *  a new active link (DUPLICATE_MSG). Failover packets are
1890  *  returned to the active link for delivery upwards.
1891  *  Owner node is locked.
1892  */
1893 static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
1894                                 struct sk_buff **buf)
1895 {
1896         struct sk_buff *t_buf = *buf;
1897         struct tipc_link *l_ptr;
1898         struct tipc_msg *t_msg = buf_msg(t_buf);
1899         u32 bearer_id = msg_bearer_id(t_msg);
1900
1901         *buf = NULL;
1902
1903         if (bearer_id >= MAX_BEARERS)
1904                 goto exit;
1905
1906         l_ptr = n_ptr->links[bearer_id];
1907         if (!l_ptr)
1908                 goto exit;
1909
1910         if (msg_type(t_msg) == DUPLICATE_MSG)
1911                 tipc_link_dup_rcv(l_ptr, t_buf);
1912         else if (msg_type(t_msg) == ORIGINAL_MSG)
1913                 *buf = tipc_link_failover_rcv(l_ptr, t_buf);
1914         else
1915                 pr_warn("%sunknown tunnel pkt received\n", link_co_err);
1916 exit:
1917         kfree_skb(t_buf);
1918         return *buf != NULL;
1919 }
1920
1921 /*
1922  *  Bundler functionality:
1923  */
1924 void tipc_link_bundle_rcv(struct sk_buff *buf)
1925 {
1926         u32 msgcount = msg_msgcnt(buf_msg(buf));
1927         u32 pos = INT_H_SIZE;
1928         struct sk_buff *obuf;
1929         struct tipc_msg *omsg;
1930
1931         while (msgcount--) {
1932                 obuf = buf_extract(buf, pos);
1933                 if (obuf == NULL) {
1934                         pr_warn("Link unable to unbundle message(s)\n");
1935                         break;
1936                 }
1937                 omsg = buf_msg(obuf);
1938                 pos += align(msg_size(omsg));
1939                 if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
1940                         tipc_sk_rcv(obuf);
1941                 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
1942                         tipc_named_rcv(obuf);
1943                 } else {
1944                         pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
1945                         kfree_skb(obuf);
1946                 }
1947         }
1948         kfree_skb(buf);
1949 }
1950
1951 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
1952 {
1953         if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
1954                 return;
1955
1956         l_ptr->tolerance = tolerance;
1957         l_ptr->continuity_interval =
1958                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
1959         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
1960 }
1961
1962 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
1963 {
1964         /* Data messages from this node, inclusive FIRST_FRAGM */
1965         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
1966         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
1967         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
1968         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
1969         /* Transiting data messages,inclusive FIRST_FRAGM */
1970         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
1971         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
1972         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
1973         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
1974         l_ptr->queue_limit[CONN_MANAGER] = 1200;
1975         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
1976         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
1977         /* FRAGMENT and LAST_FRAGMENT packets */
1978         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
1979 }
1980
1981 /* tipc_link_find_owner - locate owner node of link by link's name
1982  * @name: pointer to link name string
1983  * @bearer_id: pointer to index in 'node->links' array where the link was found.
1984  *
1985  * Returns pointer to node owning the link, or 0 if no matching link is found.
1986  */
1987 static struct tipc_node *tipc_link_find_owner(const char *link_name,
1988                                               unsigned int *bearer_id)
1989 {
1990         struct tipc_link *l_ptr;
1991         struct tipc_node *n_ptr;
1992         struct tipc_node *found_node = 0;
1993         int i;
1994
1995         *bearer_id = 0;
1996         rcu_read_lock();
1997         list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
1998                 tipc_node_lock(n_ptr);
1999                 for (i = 0; i < MAX_BEARERS; i++) {
2000                         l_ptr = n_ptr->links[i];
2001                         if (l_ptr && !strcmp(l_ptr->name, link_name)) {
2002                                 *bearer_id = i;
2003                                 found_node = n_ptr;
2004                                 break;
2005                         }
2006                 }
2007                 tipc_node_unlock(n_ptr);
2008                 if (found_node)
2009                         break;
2010         }
2011         rcu_read_unlock();
2012
2013         return found_node;
2014 }
2015
2016 /**
2017  * link_value_is_valid -- validate proposed link tolerance/priority/window
2018  *
2019  * @cmd: value type (TIPC_CMD_SET_LINK_*)
2020  * @new_value: the new value
2021  *
2022  * Returns 1 if value is within range, 0 if not.
2023  */
2024 static int link_value_is_valid(u16 cmd, u32 new_value)
2025 {
2026         switch (cmd) {
2027         case TIPC_CMD_SET_LINK_TOL:
2028                 return (new_value >= TIPC_MIN_LINK_TOL) &&
2029                         (new_value <= TIPC_MAX_LINK_TOL);
2030         case TIPC_CMD_SET_LINK_PRI:
2031                 return (new_value <= TIPC_MAX_LINK_PRI);
2032         case TIPC_CMD_SET_LINK_WINDOW:
2033                 return (new_value >= TIPC_MIN_LINK_WIN) &&
2034                         (new_value <= TIPC_MAX_LINK_WIN);
2035         }
2036         return 0;
2037 }
2038
2039 /**
2040  * link_cmd_set_value - change priority/tolerance/window for link/bearer/media
2041  * @name: ptr to link, bearer, or media name
2042  * @new_value: new value of link, bearer, or media setting
2043  * @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*)
2044  *
2045  * Caller must hold RTNL lock to ensure link/bearer/media is not deleted.
2046  *
2047  * Returns 0 if value updated and negative value on error.
2048  */
2049 static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
2050 {
2051         struct tipc_node *node;
2052         struct tipc_link *l_ptr;
2053         struct tipc_bearer *b_ptr;
2054         struct tipc_media *m_ptr;
2055         int bearer_id;
2056         int res = 0;
2057
2058         node = tipc_link_find_owner(name, &bearer_id);
2059         if (node) {
2060                 tipc_node_lock(node);
2061                 l_ptr = node->links[bearer_id];
2062
2063                 if (l_ptr) {
2064                         switch (cmd) {
2065                         case TIPC_CMD_SET_LINK_TOL:
2066                                 link_set_supervision_props(l_ptr, new_value);
2067                                 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
2068                                                      new_value, 0, 0);
2069                                 break;
2070                         case TIPC_CMD_SET_LINK_PRI:
2071                                 l_ptr->priority = new_value;
2072                                 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
2073                                                      0, new_value, 0);
2074                                 break;
2075                         case TIPC_CMD_SET_LINK_WINDOW:
2076                                 tipc_link_set_queue_limits(l_ptr, new_value);
2077                                 break;
2078                         default:
2079                                 res = -EINVAL;
2080                                 break;
2081                         }
2082                 }
2083                 tipc_node_unlock(node);
2084                 return res;
2085         }
2086
2087         b_ptr = tipc_bearer_find(name);
2088         if (b_ptr) {
2089                 switch (cmd) {
2090                 case TIPC_CMD_SET_LINK_TOL:
2091                         b_ptr->tolerance = new_value;
2092                         break;
2093                 case TIPC_CMD_SET_LINK_PRI:
2094                         b_ptr->priority = new_value;
2095                         break;
2096                 case TIPC_CMD_SET_LINK_WINDOW:
2097                         b_ptr->window = new_value;
2098                         break;
2099                 default:
2100                         res = -EINVAL;
2101                         break;
2102                 }
2103                 return res;
2104         }
2105
2106         m_ptr = tipc_media_find(name);
2107         if (!m_ptr)
2108                 return -ENODEV;
2109         switch (cmd) {
2110         case TIPC_CMD_SET_LINK_TOL:
2111                 m_ptr->tolerance = new_value;
2112                 break;
2113         case TIPC_CMD_SET_LINK_PRI:
2114                 m_ptr->priority = new_value;
2115                 break;
2116         case TIPC_CMD_SET_LINK_WINDOW:
2117                 m_ptr->window = new_value;
2118                 break;
2119         default:
2120                 res = -EINVAL;
2121                 break;
2122         }
2123         return res;
2124 }
2125
2126 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2127                                      u16 cmd)
2128 {
2129         struct tipc_link_config *args;
2130         u32 new_value;
2131         int res;
2132
2133         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2134                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2135
2136         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2137         new_value = ntohl(args->value);
2138
2139         if (!link_value_is_valid(cmd, new_value))
2140                 return tipc_cfg_reply_error_string(
2141                         "cannot change, value invalid");
2142
2143         if (!strcmp(args->name, tipc_bclink_name)) {
2144                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2145                     (tipc_bclink_set_queue_limits(new_value) == 0))
2146                         return tipc_cfg_reply_none();
2147                 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2148                                                    " (cannot change setting on broadcast link)");
2149         }
2150
2151         res = link_cmd_set_value(args->name, new_value, cmd);
2152         if (res)
2153                 return tipc_cfg_reply_error_string("cannot change link setting");
2154
2155         return tipc_cfg_reply_none();
2156 }
2157
2158 /**
2159  * link_reset_statistics - reset link statistics
2160  * @l_ptr: pointer to link
2161  */
2162 static void link_reset_statistics(struct tipc_link *l_ptr)
2163 {
2164         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2165         l_ptr->stats.sent_info = l_ptr->next_out_no;
2166         l_ptr->stats.recv_info = l_ptr->next_in_no;
2167 }
2168
2169 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2170 {
2171         char *link_name;
2172         struct tipc_link *l_ptr;
2173         struct tipc_node *node;
2174         unsigned int bearer_id;
2175
2176         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2177                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2178
2179         link_name = (char *)TLV_DATA(req_tlv_area);
2180         if (!strcmp(link_name, tipc_bclink_name)) {
2181                 if (tipc_bclink_reset_stats())
2182                         return tipc_cfg_reply_error_string("link not found");
2183                 return tipc_cfg_reply_none();
2184         }
2185         node = tipc_link_find_owner(link_name, &bearer_id);
2186         if (!node)
2187                 return tipc_cfg_reply_error_string("link not found");
2188
2189         tipc_node_lock(node);
2190         l_ptr = node->links[bearer_id];
2191         if (!l_ptr) {
2192                 tipc_node_unlock(node);
2193                 return tipc_cfg_reply_error_string("link not found");
2194         }
2195         link_reset_statistics(l_ptr);
2196         tipc_node_unlock(node);
2197         return tipc_cfg_reply_none();
2198 }
2199
2200 /**
2201  * percent - convert count to a percentage of total (rounding up or down)
2202  */
2203 static u32 percent(u32 count, u32 total)
2204 {
2205         return (count * 100 + (total / 2)) / total;
2206 }
2207
2208 /**
2209  * tipc_link_stats - print link statistics
2210  * @name: link name
2211  * @buf: print buffer area
2212  * @buf_size: size of print buffer area
2213  *
2214  * Returns length of print buffer data string (or 0 if error)
2215  */
2216 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2217 {
2218         struct tipc_link *l;
2219         struct tipc_stats *s;
2220         struct tipc_node *node;
2221         char *status;
2222         u32 profile_total = 0;
2223         unsigned int bearer_id;
2224         int ret;
2225
2226         if (!strcmp(name, tipc_bclink_name))
2227                 return tipc_bclink_stats(buf, buf_size);
2228
2229         node = tipc_link_find_owner(name, &bearer_id);
2230         if (!node)
2231                 return 0;
2232
2233         tipc_node_lock(node);
2234
2235         l = node->links[bearer_id];
2236         if (!l) {
2237                 tipc_node_unlock(node);
2238                 return 0;
2239         }
2240
2241         s = &l->stats;
2242
2243         if (tipc_link_is_active(l))
2244                 status = "ACTIVE";
2245         else if (tipc_link_is_up(l))
2246                 status = "STANDBY";
2247         else
2248                 status = "DEFUNCT";
2249
2250         ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
2251                             "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2252                             "  Window:%u packets\n",
2253                             l->name, status, l->max_pkt, l->priority,
2254                             l->tolerance, l->queue_limit[0]);
2255
2256         ret += tipc_snprintf(buf + ret, buf_size - ret,
2257                              "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2258                              l->next_in_no - s->recv_info, s->recv_fragments,
2259                              s->recv_fragmented, s->recv_bundles,
2260                              s->recv_bundled);
2261
2262         ret += tipc_snprintf(buf + ret, buf_size - ret,
2263                              "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2264                              l->next_out_no - s->sent_info, s->sent_fragments,
2265                              s->sent_fragmented, s->sent_bundles,
2266                              s->sent_bundled);
2267
2268         profile_total = s->msg_length_counts;
2269         if (!profile_total)
2270                 profile_total = 1;
2271
2272         ret += tipc_snprintf(buf + ret, buf_size - ret,
2273                              "  TX profile sample:%u packets  average:%u octets\n"
2274                              "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2275                              "-16384:%u%% -32768:%u%% -66000:%u%%\n",
2276                              s->msg_length_counts,
2277                              s->msg_lengths_total / profile_total,
2278                              percent(s->msg_length_profile[0], profile_total),
2279                              percent(s->msg_length_profile[1], profile_total),
2280                              percent(s->msg_length_profile[2], profile_total),
2281                              percent(s->msg_length_profile[3], profile_total),
2282                              percent(s->msg_length_profile[4], profile_total),
2283                              percent(s->msg_length_profile[5], profile_total),
2284                              percent(s->msg_length_profile[6], profile_total));
2285
2286         ret += tipc_snprintf(buf + ret, buf_size - ret,
2287                              "  RX states:%u probes:%u naks:%u defs:%u"
2288                              " dups:%u\n", s->recv_states, s->recv_probes,
2289                              s->recv_nacks, s->deferred_recv, s->duplicates);
2290
2291         ret += tipc_snprintf(buf + ret, buf_size - ret,
2292                              "  TX states:%u probes:%u naks:%u acks:%u"
2293                              " dups:%u\n", s->sent_states, s->sent_probes,
2294                              s->sent_nacks, s->sent_acks, s->retransmitted);
2295
2296         ret += tipc_snprintf(buf + ret, buf_size - ret,
2297                              "  Congestion link:%u  Send queue"
2298                              " max:%u avg:%u\n", s->link_congs,
2299                              s->max_queue_sz, s->queue_sz_counts ?
2300                              (s->accu_queue_sz / s->queue_sz_counts) : 0);
2301
2302         tipc_node_unlock(node);
2303         return ret;
2304 }
2305
2306 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
2307 {
2308         struct sk_buff *buf;
2309         struct tlv_desc *rep_tlv;
2310         int str_len;
2311         int pb_len;
2312         char *pb;
2313
2314         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2315                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2316
2317         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2318         if (!buf)
2319                 return NULL;
2320
2321         rep_tlv = (struct tlv_desc *)buf->data;
2322         pb = TLV_DATA(rep_tlv);
2323         pb_len = ULTRA_STRING_MAX_LEN;
2324         str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
2325                                   pb, pb_len);
2326         if (!str_len) {
2327                 kfree_skb(buf);
2328                 return tipc_cfg_reply_error_string("link not found");
2329         }
2330         str_len += 1;   /* for "\0" */
2331         skb_put(buf, TLV_SPACE(str_len));
2332         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2333
2334         return buf;
2335 }
2336
2337 /**
2338  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
2339  * @dest: network address of destination node
2340  * @selector: used to select from set of active links
2341  *
2342  * If no active link can be found, uses default maximum packet size.
2343  */
2344 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
2345 {
2346         struct tipc_node *n_ptr;
2347         struct tipc_link *l_ptr;
2348         u32 res = MAX_PKT_DEFAULT;
2349
2350         if (dest == tipc_own_addr)
2351                 return MAX_MSG_SIZE;
2352
2353         n_ptr = tipc_node_find(dest);
2354         if (n_ptr) {
2355                 tipc_node_lock(n_ptr);
2356                 l_ptr = n_ptr->active_links[selector & 1];
2357                 if (l_ptr)
2358                         res = l_ptr->max_pkt;
2359                 tipc_node_unlock(n_ptr);
2360         }
2361         return res;
2362 }
2363
2364 static void link_print(struct tipc_link *l_ptr, const char *str)
2365 {
2366         struct tipc_bearer *b_ptr;
2367
2368         rcu_read_lock();
2369         b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
2370         if (b_ptr)
2371                 pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
2372         rcu_read_unlock();
2373
2374         if (link_working_unknown(l_ptr))
2375                 pr_cont(":WU\n");
2376         else if (link_reset_reset(l_ptr))
2377                 pr_cont(":RR\n");
2378         else if (link_reset_unknown(l_ptr))
2379                 pr_cont(":RU\n");
2380         else if (link_working_working(l_ptr))
2381                 pr_cont(":WW\n");
2382         else
2383                 pr_cont("\n");
2384 }