tipc: let broadcast transmission use new link transmit function
This commit simplifies the broadcast link transmission function, by
leveraging previous changes to the link transmission function and the
broadcast transmission link life cycle.
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3b7bd21..08d64e7 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -112,6 +112,11 @@
return tipc_net(net)->bcbase;
}
+static struct tipc_link *tipc_bc_sndlink(struct net *net)
+{
+ return tipc_net(net)->bcl;
+}
+
/**
* tipc_nmap_equal - test for equality of node maps
*/
@@ -121,6 +126,7 @@
return !memcmp(nm_a, nm_b, sizeof(*nm_a));
}
+static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq);
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff);
@@ -148,14 +154,14 @@
return MAX_PKT_DEFAULT_MCAST;
}
-static u32 bcbuf_acks(struct sk_buff *buf)
+static u16 bcbuf_acks(struct sk_buff *skb)
{
- return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
+ return TIPC_SKB_CB(skb)->ackers;
}
-static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
+static void bcbuf_set_acks(struct sk_buff *buf, u16 ackers)
{
- TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
+ TIPC_SKB_CB(buf)->ackers = ackers;
}
static void bcbuf_decr_acks(struct sk_buff *buf)
@@ -166,9 +172,10 @@
void tipc_bclink_add_node(struct net *net, u32 addr)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
-
+ struct tipc_link *l = tipc_bc_sndlink(net);
tipc_bclink_lock(net);
tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
+ tipc_link_add_bc_peer(l);
tipc_bclink_unlock(net);
}
@@ -178,6 +185,7 @@
tipc_bclink_lock(net);
tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
+ tn->bcl->ackers--;
/* Last node? => reset backlog queue */
if (!tn->bcbase->bcast_nodes.count)
@@ -295,7 +303,6 @@
if (unlikely(!n_ptr->bclink.recv_permitted))
return;
-
tipc_bclink_lock(net);
/* Bail out if tx queue is empty (no clean up is required) */
@@ -324,13 +331,11 @@
less_eq(acked, n_ptr->bclink.acked))
goto exit;
}
-
/* Skip over packets that node has previously acknowledged */
skb_queue_walk(&tn->bcl->transmq, skb) {
if (more(buf_seqno(skb), n_ptr->bclink.acked))
break;
}
-
/* Update packets that node is now acknowledging */
skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
if (more(buf_seqno(skb), acked))
@@ -367,6 +372,7 @@
struct sk_buff *buf;
struct net *net = n_ptr->net;
struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_link *bcl = tn->bcl;
/* Ignore "stale" link state info */
if (less_eq(last_sent, n_ptr->bclink.last_in))
@@ -375,6 +381,10 @@
/* Update link synchronization state; quit if in sync */
bclink_update_last_sent(n_ptr, last_sent);
+ /* This is a good location for statistical profiling */
+ bcl->stats.queue_sz_counts++;
+ bcl->stats.accu_queue_sz += skb_queue_len(&bcl->transmq);
+
if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
return;
@@ -468,52 +478,35 @@
*/
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
- struct tipc_link *bcl = tn->bcl;
- struct tipc_bc_base *bclink = tn->bcbase;
+ struct tipc_link *l = tipc_bc_sndlink(net);
+ struct sk_buff_head xmitq, inputq, rcvq;
int rc = 0;
- int bc = 0;
- struct sk_buff *skb;
- struct sk_buff_head arrvq;
- struct sk_buff_head inputq;
- /* Prepare clone of message for local node */
- skb = tipc_msg_reassemble(list);
- if (unlikely(!skb))
+ __skb_queue_head_init(&rcvq);
+ __skb_queue_head_init(&xmitq);
+ skb_queue_head_init(&inputq);
+
+ /* Prepare message clone for local node */
+ if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
return -EHOSTUNREACH;
- /* Broadcast to all nodes */
- if (likely(bclink)) {
- tipc_bclink_lock(net);
- if (likely(bclink->bcast_nodes.count)) {
- rc = __tipc_link_xmit(net, bcl, list);
- if (likely(!rc)) {
- u32 len = skb_queue_len(&bcl->transmq);
+ tipc_bcast_lock(net);
+ if (tipc_link_bc_peers(l))
+ rc = tipc_link_xmit(l, list, &xmitq);
+ bclink_set_last_sent(net);
+ tipc_bcast_unlock(net);
- bclink_set_last_sent(net);
- bcl->stats.queue_sz_counts++;
- bcl->stats.accu_queue_sz += len;
- }
- bc = 1;
- }
- tipc_bclink_unlock(net);
- }
-
- if (unlikely(!bc))
- __skb_queue_purge(list);
-
+ /* Don't send to local node if adding to link failed */
if (unlikely(rc)) {
- kfree_skb(skb);
+ __skb_queue_purge(&rcvq);
return rc;
}
- /* Deliver message clone */
- __skb_queue_head_init(&arrvq);
- skb_queue_head_init(&inputq);
- __skb_queue_tail(&arrvq, skb);
- tipc_sk_mcast_rcv(net, &arrvq, &inputq);
- return rc;
+ /* Broadcast to all nodes, inluding local node */
+ tipc_bcbearer_xmit(net, &xmitq);
+ tipc_sk_mcast_rcv(net, &rcvq, &inputq);
+ __skb_queue_purge(list);
+ return 0;
}
-
/**
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
@@ -564,7 +557,6 @@
node = tipc_node_find(net, msg_prevnode(msg));
if (unlikely(!node))
goto exit;
-
tipc_node_lock(node);
if (unlikely(!node->bclink.recv_permitted))
goto unlock;
@@ -589,7 +581,6 @@
tipc_node_put(node);
goto exit;
}
-
/* Handle in-sequence broadcast message */
seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1);
@@ -778,6 +769,19 @@
return 0;
}
+static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq)
+{
+ struct sk_buff *skb, *tmp;
+
+ skb_queue_walk_safe(xmitq, skb, tmp) {
+ __skb_dequeue(xmitq);
+ tipc_bcbearer_send(net, skb, NULL, NULL);
+
+ /* Until we remove cloning in tipc_l2_send_msg(): */
+ kfree_skb(skb);
+ }
+}
+
/**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/