]> git.kernelconcepts.de Git - karo-tx-linux.git/blobdiff - net/tipc/socket.c
tipc: redesign connection-level flow control
[karo-tx-linux.git] / net / tipc / socket.c
index d37a9401e182f599532ee4d02b647e1ecaac4e20..12628890c2190b9cff66a12d2d2169c1192be761 100644 (file)
@@ -96,8 +96,11 @@ struct tipc_sock {
        uint conn_timeout;
        atomic_t dupl_rcvcnt;
        bool link_cong;
-       uint sent_unacked;
-       uint rcv_unacked;
+       u16 snt_unacked;
+       u16 snd_win;
+       u16 peer_caps;
+       u16 rcv_unacked;
+       u16 rcv_win;
        struct sockaddr_tipc remote;
        struct rhash_head node;
        struct rcu_head rcu;
@@ -227,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
        return container_of(sk, struct tipc_sock, sk);
 }
 
-static int tsk_conn_cong(struct tipc_sock *tsk)
+static bool tsk_conn_cong(struct tipc_sock *tsk)
 {
-       return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
+       return tsk->snt_unacked >= tsk->snd_win;
+}
+
+/* tsk_blocks(): translate a buffer size in bytes to number of
+ * advertisable blocks, taking into account the ratio truesize(len)/len
+ * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
+ */
+static u16 tsk_adv_blocks(int len)
+{
+       return len / FLOWCTL_BLK_SZ / 4;
+}
+
+/* tsk_inc(): increment counter for sent or received data
+ * - If block based flow control is not supported by peer we
+ *   fall back to message based ditto, incrementing the counter
+ */
+static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
+{
+       if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+               return ((msglen / FLOWCTL_BLK_SZ) + 1);
+       return 1;
 }
 
 /**
@@ -377,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        sk->sk_write_space = tipc_write_space;
        sk->sk_destruct = tipc_sock_destruct;
        tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
-       tsk->sent_unacked = 0;
        atomic_set(&tsk->dupl_rcvcnt, 0);
 
+       /* Start out with safe limits until we receive an advertised window */
+       tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
+       tsk->rcv_win = tsk->snd_win;
+
        if (sock->state == SS_READY) {
                tsk_set_unreturnable(tsk, true);
                if (sock->type == SOCK_DGRAM)
@@ -775,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
        struct sock *sk = &tsk->sk;
        struct tipc_msg *hdr = buf_msg(skb);
        int mtyp = msg_type(hdr);
-       int conn_cong;
+       bool conn_cong;
 
        /* Ignore if connection cannot be validated: */
        if (!tsk_peer_msg(tsk, hdr))
@@ -789,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
                return;
        } else if (mtyp == CONN_ACK) {
                conn_cong = tsk_conn_cong(tsk);
-               tsk->sent_unacked -= msg_msgcnt(hdr);
+               tsk->snt_unacked -= msg_conn_ack(hdr);
+               if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+                       tsk->snd_win = msg_adv_win(hdr);
                if (conn_cong)
                        sk->sk_write_space(sk);
        } else if (mtyp != CONN_PROBE_REPLY) {
@@ -1020,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
        u32 dnode;
        uint mtu, send, sent = 0;
        struct iov_iter save;
+       int hlen = MIN_H_SIZE;
 
        /* Handle implied connection establishment */
        if (unlikely(dest)) {
                rc = __tipc_sendmsg(sock, m, dsz);
+               hlen = msg_hdr_sz(mhdr);
                if (dsz && (dsz == rc))
-                       tsk->sent_unacked = 1;
+                       tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
                return rc;
        }
        if (dsz > (uint)INT_MAX)
@@ -1054,7 +1084,7 @@ next:
                if (likely(!tsk_conn_cong(tsk))) {
                        rc = tipc_node_xmit(net, &pktchain, dnode, portid);
                        if (likely(!rc)) {
-                               tsk->sent_unacked++;
+                               tsk->snt_unacked += tsk_inc(tsk, send + hlen);
                                sent += send;
                                if (sent == dsz)
                                        return dsz;
@@ -1118,6 +1148,13 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
        sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
        tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
        tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
+       tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+       if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+               return;
+
+       /* Fall back to message based flow control */
+       tsk->rcv_win = FLOWCTL_MSG_WIN;
+       tsk->snd_win = FLOWCTL_MSG_WIN;
 }
 
 /**
@@ -1214,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
        return 0;
 }
 
-static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
+static void tipc_sk_send_ack(struct tipc_sock *tsk)
 {
        struct net *net = sock_net(&tsk->sk);
        struct sk_buff *skb = NULL;
@@ -1230,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
        if (!skb)
                return;
        msg = buf_msg(skb);
-       msg_set_msgcnt(msg, ack);
+       msg_set_conn_ack(msg, tsk->rcv_unacked);
+       tsk->rcv_unacked = 0;
+
+       /* Adjust to and advertize the correct window limit */
+       if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
+               tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
+               msg_set_adv_win(msg, tsk->rcv_win);
+       }
        tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
 }
 
@@ -1288,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
        long timeo;
        unsigned int sz;
        u32 err;
-       int res;
+       int res, hlen;
 
        /* Catch invalid receive requests */
        if (unlikely(!buf_len))
@@ -1313,6 +1357,7 @@ restart:
        buf = skb_peek(&sk->sk_receive_queue);
        msg = buf_msg(buf);
        sz = msg_data_sz(msg);
+       hlen = msg_hdr_sz(msg);
        err = msg_errcode(msg);
 
        /* Discard an empty non-errored message & try again */
@@ -1335,7 +1380,7 @@ restart:
                        sz = buf_len;
                        m->msg_flags |= MSG_TRUNC;
                }
-               res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
+               res = skb_copy_datagram_msg(buf, hlen, m, sz);
                if (res)
                        goto exit;
                res = sz;
@@ -1347,15 +1392,15 @@ restart:
                        res = -ECONNRESET;
        }
 
-       /* Consume received message (optional) */
-       if (likely(!(flags & MSG_PEEK))) {
-               if ((sock->state != SS_READY) &&
-                   (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
-                       tipc_sk_send_ack(tsk, tsk->rcv_unacked);
-                       tsk->rcv_unacked = 0;
-               }
-               tsk_advance_rx_queue(sk);
+       if (unlikely(flags & MSG_PEEK))
+               goto exit;
+
+       if (likely(sock->state != SS_READY)) {
+               tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+               if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+                       tipc_sk_send_ack(tsk);
        }
+       tsk_advance_rx_queue(sk);
 exit:
        release_sock(sk);
        return res;
@@ -1384,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
        int sz_to_copy, target, needed;
        int sz_copied = 0;
        u32 err;
-       int res = 0;
+       int res = 0, hlen;
 
        /* Catch invalid receive attempts */
        if (unlikely(!buf_len))
@@ -1410,6 +1455,7 @@ restart:
        buf = skb_peek(&sk->sk_receive_queue);
        msg = buf_msg(buf);
        sz = msg_data_sz(msg);
+       hlen = msg_hdr_sz(msg);
        err = msg_errcode(msg);
 
        /* Discard an empty non-errored message & try again */
@@ -1434,8 +1480,7 @@ restart:
                needed = (buf_len - sz_copied);
                sz_to_copy = (sz <= needed) ? sz : needed;
 
-               res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
-                                           m, sz_to_copy);
+               res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
                if (res)
                        goto exit;
 
@@ -1457,20 +1502,18 @@ restart:
                        res = -ECONNRESET;
        }
 
-       /* Consume received message (optional) */
-       if (likely(!(flags & MSG_PEEK))) {
-               if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
-                       tipc_sk_send_ack(tsk, tsk->rcv_unacked);
-                       tsk->rcv_unacked = 0;
-               }
-               tsk_advance_rx_queue(sk);
-       }
+       if (unlikely(flags & MSG_PEEK))
+               goto exit;
+
+       tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+       if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+               tipc_sk_send_ack(tsk);
+       tsk_advance_rx_queue(sk);
 
        /* Loop around if more data is required */
        if ((sz_copied < buf_len) &&    /* didn't get all requested data */
            (!skb_queue_empty(&sk->sk_receive_queue) ||
            (sz_copied < target)) &&    /* and more is ready or required */
-           (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
            (!err))                     /* and haven't reached a FIN */
                goto restart;
 
@@ -1602,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
 /**
  * rcvbuf_limit - get proper overload limit of socket receive queue
  * @sk: socket
- * @buf: message
+ * @skb: message
  *
- * For all connection oriented messages, irrespective of importance,
- * the default overload value (i.e. 67MB) is set as limit.
+ * For connection oriented messages, irrespective of importance,
+ * default queue limit is 2 MB.
  *
- * For all connectionless messages, by default new queue limits are
- * as belows:
+ * For connectionless messages, queue limits are based on message
+ * importance as follows:
  *
- * TIPC_LOW_IMPORTANCE       (4 MB)
- * TIPC_MEDIUM_IMPORTANCE    (8 MB)
- * TIPC_HIGH_IMPORTANCE      (16 MB)
- * TIPC_CRITICAL_IMPORTANCE  (32 MB)
+ * TIPC_LOW_IMPORTANCE       (2 MB)
+ * TIPC_MEDIUM_IMPORTANCE    (4 MB)
+ * TIPC_HIGH_IMPORTANCE      (8 MB)
+ * TIPC_CRITICAL_IMPORTANCE  (16 MB)
  *
  * Returns overload limit according to corresponding message importance
  */
-static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
+static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
 {
-       struct tipc_msg *msg = buf_msg(buf);
+       struct tipc_sock *tsk = tipc_sk(sk);
+       struct tipc_msg *hdr = buf_msg(skb);
+
+       if (unlikely(!msg_connected(hdr)))
+               return sk->sk_rcvbuf << msg_importance(hdr);
 
-       if (msg_connected(msg))
-               return sysctl_tipc_rmem[2];
+       if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+               return sk->sk_rcvbuf;
 
-       return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
-               msg_importance(msg);
+       return FLOWCTL_MSG_LIM;
 }
 
 /**