tipc: convert node lock to rwlock

According to the node FSM a node in state SELF_UP_PEER_UP cannot
change state inside a lock context, except when a TUNNEL_PROTOCOL
(SYNCH or FAILOVER) packet arrives. However, the node's individual
links may still change state.

Since each link now is protected by its own spinlock, we finally have
the conditions in place to convert the node spinlock to an rwlock_t.
If the node state and arriving packet type are rigth, we can let the
link directly receive the packet under protection of its own spinlock
and the node lock in read mode. In all other cases we use the node
lock in write mode. This enables full concurrent execution between
parallel links during steady-state traffic situations, i.e., 99+ %
of the time.

This commit implements this change.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index b5e895c..1dda46e 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1547,7 +1547,7 @@
 	*bearer_id = 0;
 	rcu_read_lock();
 	list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
-		tipc_node_lock(n_ptr);
+		tipc_node_read_lock(n_ptr);
 		for (i = 0; i < MAX_BEARERS; i++) {
 			l_ptr = n_ptr->links[i].link;
 			if (l_ptr && !strcmp(l_ptr->name, link_name)) {
@@ -1556,7 +1556,7 @@
 				break;
 			}
 		}
-		tipc_node_unlock(n_ptr);
+		tipc_node_read_unlock(n_ptr);
 		if (found_node)
 			break;
 	}
@@ -1658,7 +1658,7 @@
 	if (!node)
 		return -EINVAL;
 
-	tipc_node_lock(node);
+	tipc_node_read_lock(node);
 
 	link = node->links[bearer_id].link;
 	if (!link) {
@@ -1699,7 +1699,7 @@
 	}
 
 out:
-	tipc_node_unlock(node);
+	tipc_node_read_unlock(node);
 
 	return res;
 }
@@ -1898,10 +1898,10 @@
 
 		list_for_each_entry_continue_rcu(node, &tn->node_list,
 						 list) {
-			tipc_node_lock(node);
+			tipc_node_read_lock(node);
 			err = __tipc_nl_add_node_links(net, &msg, node,
 						       &prev_link);
-			tipc_node_unlock(node);
+			tipc_node_read_unlock(node);
 			if (err)
 				goto out;
 
@@ -1913,10 +1913,10 @@
 			goto out;
 
 		list_for_each_entry_rcu(node, &tn->node_list, list) {
-			tipc_node_lock(node);
+			tipc_node_read_lock(node);
 			err = __tipc_nl_add_node_links(net, &msg, node,
 						       &prev_link);
-			tipc_node_unlock(node);
+			tipc_node_read_unlock(node);
 			if (err)
 				goto out;
 
@@ -1967,16 +1967,16 @@
 		if (!node)
 			return -EINVAL;
 
-		tipc_node_lock(node);
+		tipc_node_read_lock(node);
 		link = node->links[bearer_id].link;
 		if (!link) {
-			tipc_node_unlock(node);
+			tipc_node_read_unlock(node);
 			nlmsg_free(msg.skb);
 			return -EINVAL;
 		}
 
 		err = __tipc_nl_add_link(net, &msg, link, 0);
-		tipc_node_unlock(node);
+		tipc_node_read_unlock(node);
 		if (err) {
 			nlmsg_free(msg.skb);
 			return err;
@@ -2021,18 +2021,18 @@
 	node = tipc_link_find_owner(net, link_name, &bearer_id);
 	if (!node)
 		return -EINVAL;
+
 	le = &node->links[bearer_id];
-	tipc_node_lock(node);
+	tipc_node_read_lock(node);
 	spin_lock_bh(&le->lock);
 	link = le->link;
 	if (!link) {
-		tipc_node_unlock(node);
+		spin_unlock_bh(&le->lock);
+		tipc_node_read_unlock(node);
 		return -EINVAL;
 	}
-
 	link_reset_statistics(link);
 	spin_unlock_bh(&le->lock);
-	tipc_node_unlock(node);
-
+	tipc_node_read_unlock(node);
 	return 0;
 }
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 572063a..47d5f84 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -141,10 +141,63 @@
 	return NULL;
 }
 
+void tipc_node_read_lock(struct tipc_node *n)
+{
+	read_lock_bh(&n->lock);
+}
+
+void tipc_node_read_unlock(struct tipc_node *n)
+{
+	read_unlock_bh(&n->lock);
+}
+
+static void tipc_node_write_lock(struct tipc_node *n)
+{
+	write_lock_bh(&n->lock);
+}
+
+static void tipc_node_write_unlock(struct tipc_node *n)
+{
+	struct net *net = n->net;
+	u32 addr = 0;
+	u32 flags = n->action_flags;
+	u32 link_id = 0;
+	struct list_head *publ_list;
+
+	if (likely(!flags)) {
+		write_unlock_bh(&n->lock);
+		return;
+	}
+
+	addr = n->addr;
+	link_id = n->link_id;
+	publ_list = &n->publ_list;
+
+	n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
+			     TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
+
+	write_unlock_bh(&n->lock);
+
+	if (flags & TIPC_NOTIFY_NODE_DOWN)
+		tipc_publ_notify(net, publ_list, addr);
+
+	if (flags & TIPC_NOTIFY_NODE_UP)
+		tipc_named_node_up(net, addr);
+
+	if (flags & TIPC_NOTIFY_LINK_UP)
+		tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
+				     TIPC_NODE_SCOPE, link_id, addr);
+
+	if (flags & TIPC_NOTIFY_LINK_DOWN)
+		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
+				      link_id, addr);
+}
+
 struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_node *n_ptr, *temp_node;
+	int i;
 
 	spin_lock_bh(&tn->node_list_lock);
 	n_ptr = tipc_node_find(net, addr);
@@ -159,7 +212,7 @@
 	n_ptr->net = net;
 	n_ptr->capabilities = capabilities;
 	kref_init(&n_ptr->kref);
-	spin_lock_init(&n_ptr->lock);
+	rwlock_init(&n_ptr->lock);
 	INIT_HLIST_NODE(&n_ptr->hash);
 	INIT_LIST_HEAD(&n_ptr->list);
 	INIT_LIST_HEAD(&n_ptr->publ_list);
@@ -168,6 +221,8 @@
 	skb_queue_head_init(&n_ptr->bc_entry.inputq1);
 	__skb_queue_head_init(&n_ptr->bc_entry.arrvq);
 	skb_queue_head_init(&n_ptr->bc_entry.inputq2);
+	for (i = 0; i < MAX_BEARERS; i++)
+		spin_lock_init(&n_ptr->links[i].lock);
 	hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
 	list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
 		if (n_ptr->addr < temp_node->addr)
@@ -246,9 +301,9 @@
 		pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
 		return;
 	}
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	list_add_tail(subscr, &n->publ_list);
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	tipc_node_put(n);
 }
 
@@ -264,9 +319,9 @@
 		pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
 		return;
 	}
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	list_del_init(subscr);
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	tipc_node_put(n);
 }
 
@@ -293,9 +348,9 @@
 	conn->port = port;
 	conn->peer_port = peer_port;
 
-	tipc_node_lock(node);
+	tipc_node_write_lock(node);
 	list_add_tail(&conn->list, &node->conn_sks);
-	tipc_node_unlock(node);
+	tipc_node_write_unlock(node);
 exit:
 	tipc_node_put(node);
 	return err;
@@ -313,14 +368,14 @@
 	if (!node)
 		return;
 
-	tipc_node_lock(node);
+	tipc_node_write_lock(node);
 	list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
 		if (port != conn->port)
 			continue;
 		list_del(&conn->list);
 		kfree(conn);
 	}
-	tipc_node_unlock(node);
+	tipc_node_write_unlock(node);
 	tipc_node_put(node);
 }
 
@@ -337,7 +392,7 @@
 	__skb_queue_head_init(&xmitq);
 
 	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
-		tipc_node_lock(n);
+		tipc_node_read_lock(n);
 		le = &n->links[bearer_id];
 		spin_lock_bh(&le->lock);
 		if (le->link) {
@@ -346,7 +401,7 @@
 			rc = tipc_link_timeout(le->link, &xmitq);
 		}
 		spin_unlock_bh(&le->lock);
-		tipc_node_unlock(n);
+		tipc_node_read_unlock(n);
 		tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
 		if (rc & TIPC_LINK_DOWN_EVT)
 			tipc_node_link_down(n, bearer_id, false);
@@ -425,9 +480,9 @@
 static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
 			      struct sk_buff_head *xmitq)
 {
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	__tipc_node_link_up(n, bearer_id, xmitq);
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 }
 
 /**
@@ -516,7 +571,7 @@
 
 	__skb_queue_head_init(&xmitq);
 
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	if (!tipc_link_is_establishing(l)) {
 		__tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
 		if (delete) {
@@ -528,7 +583,7 @@
 		/* Defuse pending tipc_node_link_up() */
 		tipc_link_fsm_evt(l, LINK_RESET_EVT);
 	}
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
 	tipc_sk_rcv(n->net, &le->inputq);
 }
@@ -561,7 +616,7 @@
 	if (!n)
 		return;
 
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 
 	le = &n->links[b->identity];
 
@@ -656,7 +711,6 @@
 		if (n->state == NODE_FAILINGOVER)
 			tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
 		le->link = l;
-		spin_lock_init(&le->lock);
 		n->link_cnt++;
 		tipc_node_calculate_timer(n, l);
 		if (n->link_cnt == 1)
@@ -665,7 +719,7 @@
 	}
 	memcpy(&le->maddr, maddr, sizeof(*maddr));
 exit:
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	if (reset && !tipc_link_is_reset(l))
 		tipc_node_link_down(n, b->identity, false);
 	tipc_node_put(n);
@@ -873,24 +927,6 @@
 	pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
 }
 
-bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
-{
-	int state = n->state;
-
-	if (likely(state == SELF_UP_PEER_UP))
-		return true;
-
-	if (state == SELF_LEAVING_PEER_DOWN)
-		return false;
-
-	if (state == SELF_DOWN_PEER_LEAVING) {
-		if (msg_peer_node_is_up(hdr))
-			return false;
-	}
-
-	return true;
-}
-
 static void node_lost_contact(struct tipc_node *n,
 			      struct sk_buff_head *inputq)
 {
@@ -952,56 +988,18 @@
 	if (bearer_id >= MAX_BEARERS)
 		goto exit;
 
-	tipc_node_lock(node);
+	tipc_node_read_lock(node);
 	link = node->links[bearer_id].link;
 	if (link) {
 		strncpy(linkname, link->name, len);
 		err = 0;
 	}
 exit:
-	tipc_node_unlock(node);
+	tipc_node_read_unlock(node);
 	tipc_node_put(node);
 	return err;
 }
 
-void tipc_node_unlock(struct tipc_node *node)
-{
-	struct net *net = node->net;
-	u32 addr = 0;
-	u32 flags = node->action_flags;
-	u32 link_id = 0;
-	struct list_head *publ_list;
-
-	if (likely(!flags)) {
-		spin_unlock_bh(&node->lock);
-		return;
-	}
-
-	addr = node->addr;
-	link_id = node->link_id;
-	publ_list = &node->publ_list;
-
-	node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
-				TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
-
-	spin_unlock_bh(&node->lock);
-
-	if (flags & TIPC_NOTIFY_NODE_DOWN)
-		tipc_publ_notify(net, publ_list, addr);
-
-	if (flags & TIPC_NOTIFY_NODE_UP)
-		tipc_named_node_up(net, addr);
-
-	if (flags & TIPC_NOTIFY_LINK_UP)
-		tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
-				     TIPC_NODE_SCOPE, link_id, addr);
-
-	if (flags & TIPC_NOTIFY_LINK_DOWN)
-		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
-				      link_id, addr);
-
-}
-
 /* Caller should hold node lock for the passed node */
 static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
 {
@@ -1048,40 +1046,38 @@
 int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
 		   u32 dnode, int selector)
 {
-	struct tipc_link_entry *le;
+	struct tipc_link_entry *le = NULL;
 	struct tipc_node *n;
 	struct sk_buff_head xmitq;
-	struct tipc_media_addr *maddr = NULL;
 	int bearer_id = -1;
 	int rc = -EHOSTUNREACH;
 
 	__skb_queue_head_init(&xmitq);
 	n = tipc_node_find(net, dnode);
 	if (likely(n)) {
-		tipc_node_lock(n);
+		tipc_node_read_lock(n);
 		bearer_id = n->active_links[selector & 1];
 		if (bearer_id >= 0) {
 			le = &n->links[bearer_id];
-			maddr = &le->maddr;
 			spin_lock_bh(&le->lock);
-			if (likely(le->link))
-				rc = tipc_link_xmit(le->link, list, &xmitq);
+			rc = tipc_link_xmit(le->link, list, &xmitq);
 			spin_unlock_bh(&le->lock);
 		}
-		tipc_node_unlock(n);
+		tipc_node_read_unlock(n);
+		if (likely(!skb_queue_empty(&xmitq))) {
+			tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
+			return 0;
+		}
 		if (unlikely(rc == -ENOBUFS))
 			tipc_node_link_down(n, bearer_id, false);
 		tipc_node_put(n);
+		return rc;
 	}
-	if (likely(!skb_queue_empty(&xmitq))) {
-		tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
-		return 0;
-	}
-	if (likely(in_own_node(net, dnode))) {
-		tipc_sk_rcv(net, list);
-		return 0;
-	}
-	return rc;
+
+	if (unlikely(!in_own_node(net, dnode)))
+		return rc;
+	tipc_sk_rcv(net, list);
+	return 0;
 }
 
 /* tipc_node_xmit_skb(): send single buffer to destination
@@ -1171,9 +1167,9 @@
 
 	/* Broadcast ACKs are sent on a unicast link */
 	if (rc & TIPC_LINK_SND_BC_ACK) {
-		tipc_node_lock(n);
+		tipc_node_read_lock(n);
 		tipc_link_build_ack_msg(le->link, &xmitq);
-		tipc_node_unlock(n);
+		tipc_node_read_unlock(n);
 	}
 
 	if (!skb_queue_empty(&xmitq))
@@ -1229,7 +1225,7 @@
 		}
 	}
 
-	/* Update node accesibility if applicable */
+	/* Check and update node accesibility if applicable */
 	if (state == SELF_UP_PEER_COMING) {
 		if (!tipc_link_is_up(l))
 			return true;
@@ -1245,6 +1241,9 @@
 		return true;
 	}
 
+	if (state == SELF_LEAVING_PEER_DOWN)
+		return false;
+
 	/* Ignore duplicate packets */
 	if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
 		return true;
@@ -1361,21 +1360,29 @@
 	else if (unlikely(n->bc_entry.link->acked != bc_ack))
 		tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
 
-	tipc_node_lock(n);
-
-	/* Is reception permitted at the moment ? */
-	if (!tipc_node_filter_pkt(n, hdr))
-		goto unlock;
-
-	/* Check and if necessary update node state */
-	if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
+	/* Receive packet directly if conditions permit */
+	tipc_node_read_lock(n);
+	if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
 		spin_lock_bh(&le->lock);
-		rc = tipc_link_rcv(le->link, skb, &xmitq);
+		if (le->link) {
+			rc = tipc_link_rcv(le->link, skb, &xmitq);
+			skb = NULL;
+		}
 		spin_unlock_bh(&le->lock);
-		skb = NULL;
 	}
-unlock:
-	tipc_node_unlock(n);
+	tipc_node_read_unlock(n);
+
+	/* Check/update node state before receiving */
+	if (unlikely(skb)) {
+		tipc_node_write_lock(n);
+		if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
+			if (le->link) {
+				rc = tipc_link_rcv(le->link, skb, &xmitq);
+				skb = NULL;
+			}
+		}
+		tipc_node_write_unlock(n);
+	}
 
 	if (unlikely(rc & TIPC_LINK_UP_EVT))
 		tipc_node_link_up(n, bearer_id, &xmitq);
@@ -1440,15 +1447,15 @@
 				continue;
 		}
 
-		tipc_node_lock(node);
+		tipc_node_read_lock(node);
 		err = __tipc_nl_add_node(&msg, node);
 		if (err) {
 			last_addr = node->addr;
-			tipc_node_unlock(node);
+			tipc_node_read_unlock(node);
 			goto out;
 		}
 
-		tipc_node_unlock(node);
+		tipc_node_read_unlock(node);
 	}
 	done = 1;
 out:
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 8784907..651a1581 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -109,7 +109,7 @@
 struct tipc_node {
 	u32 addr;
 	struct kref kref;
-	spinlock_t lock;
+	rwlock_t lock;
 	struct net *net;
 	struct hlist_node hash;
 	int active_links[2];
@@ -145,7 +145,8 @@
 bool tipc_node_is_up(struct tipc_node *n);
 int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
 			   char *linkname, size_t len);
-void tipc_node_unlock(struct tipc_node *node);
+void tipc_node_read_lock(struct tipc_node *n);
+void tipc_node_read_unlock(struct tipc_node *node);
 int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
 		   int selector);
 int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
@@ -157,11 +158,6 @@
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
 
-static inline void tipc_node_lock(struct tipc_node *node)
-{
-	spin_lock_bh(&node->lock);
-}
-
 static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
 {
 	int bearer_id = n->active_links[sel & 1];