From: Håkon Bugge Change the return code from rds_send_xmit() when it is unable to acquire the RDS_IN_XMIT lock-bit from -ENOMEM to -EBUSY. This to avoid re-queuing of the rds_send_worker() when someone else is actually executing rds_send_xmit(). Performance is improved by 2% running rds-stress with the following parameters: "-t 16 -d 32 -q 64 -a 64 -o". The test was run five times, each time running for one minute, and the arithmetic average of the tx IOPS was used as performance metric. Send lock contention was reduced by 6.5% and the ib_tx_ring_full condition was more than doubled, indicating better ability to send. Signed-off-by: Håkon Bugge Signed-off-by: Allison Henderson --- net/rds/send.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/net/rds/send.c b/net/rds/send.c index 3e3d028bc21e..747e348f48ba 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -158,7 +158,7 @@ int rds_send_xmit(struct rds_conn_path *cp) */ if (!acquire_in_xmit(cp)) { rds_stats_inc(s_send_lock_contention); - ret = -ENOMEM; + ret = -EBUSY; goto out; } @@ -1375,7 +1375,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) rds_stats_inc(s_send_queued); ret = rds_send_xmit(cpath); - if (ret == -ENOMEM || ret == -EAGAIN) { + if (ret == -ENOMEM || ret == -EAGAIN || ret == -EBUSY) { ret = 0; rcu_read_lock(); if (rds_destroy_pending(cpath->cp_conn)) -- 2.43.0 From: Gerd Rausch RDS connections carry a state "rds_conn_path::cp_state" and transitions from one state to another and are conditional upon an expected state: "rds_conn_path_transition." There is one exception to this conditionality, which is "RDS_CONN_ERROR" that can be enforced by "rds_conn_path_drop" regardless of what state the condition is currently in. But as soon as a connection enters state "RDS_CONN_ERROR", the connection handling code expects it to go through the shutdown-path. The RDS/TCP multipath changes added a shortcut out of "RDS_CONN_ERROR" straight back to "RDS_CONN_CONNECTING" via "rds_tcp_accept_one_path" (e.g. after "rds_tcp_state_change"). A subsequent "rds_tcp_reset_callbacks" can then transition the state to "RDS_CONN_RESETTING" with a shutdown-worker queued. That'll trip up "rds_conn_init_shutdown", which was never adjusted to handle "RDS_CONN_RESETTING" and subsequently drops the connection with the dreaded "DR_INV_CONN_STATE", which leaves "RDS_SHUTDOWN_WORK_QUEUED" on forever. So we do two things here: a) Don't shortcut "RDS_CONN_ERROR", but take the longer path through the shutdown code. b) Add "RDS_CONN_RESETTING" to the expected states in "rds_conn_init_shutdown" so that we won't error out and get stuck, if we ever hit weird state transitions like this again." Fixes: 9c79440e2c5e ("RDS: TCP: fix race windows in send-path quiescence by rds_tcp_accept_one()") Signed-off-by: Gerd Rausch Signed-off-by: Allison Henderson --- net/rds/connection.c | 2 ++ net/rds/tcp_listen.c | 5 ----- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/net/rds/connection.c b/net/rds/connection.c index e920c685e4f2..4a9d80d56f56 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -395,6 +395,8 @@ void rds_conn_shutdown(struct rds_conn_path *cp) if (!rds_conn_path_transition(cp, RDS_CONN_UP, RDS_CONN_DISCONNECTING) && !rds_conn_path_transition(cp, RDS_CONN_ERROR, + RDS_CONN_DISCONNECTING) && + !rds_conn_path_transition(cp, RDS_CONN_RESETTING, RDS_CONN_DISCONNECTING)) { rds_conn_path_error(cp, "shutdown called in state %d\n", diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index 820d3e20de19..27b6107ddc28 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -59,9 +59,6 @@ void rds_tcp_keepalive(struct socket *sock) * socket and force a reconneect from smaller -> larger ip addr. The reason * we special case cp_index 0 is to allow the rds probe ping itself to itself * get through efficiently. - * Since reconnects are only initiated from the node with the numerically - * smaller ip address, we recycle conns in RDS_CONN_ERROR on the passive side - * by moving them to CONNECTING in this function. */ static struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn) @@ -86,8 +83,6 @@ struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn) struct rds_conn_path *cp = &conn->c_path[i]; if (rds_conn_path_transition(cp, RDS_CONN_DOWN, - RDS_CONN_CONNECTING) || - rds_conn_path_transition(cp, RDS_CONN_ERROR, RDS_CONN_CONNECTING)) { return cp->cp_transport_data; } -- 2.43.0 From: Gerd Rausch RDS/TCP differs from RDS/RDMA in that message acknowledgment is done based on TCP sequence numbers: As soon as the last byte of a message has been acknowledged by the TCP stack of a peer, "rds_tcp_write_space()" goes on to discard prior messages from the send queue. Which is fine, for as long as the receiver never throws any messages away. Unfortunately, that is *not* the case since the introduction of MPRDS: commit 1a0e100fb2c96 "RDS: TCP: Enable multipath RDS for TCP" A new function "rds_tcp_accept_one_path" was introduced, which is entitled to return "NULL", if no connection path is currently available. Unfortunately, this happens after the "->accept()" call, and the new socket often already contains messages, since the peer already transitioned to "RDS_CONN_UP" on behalf of "TCP_ESTABLISHED". That's also the case after this [1]: commit 1a0e100fb2c96 "RDS: TCP: Force every connection to be initiated by numerically smaller IP address" which tried to address the situation of pending data by only transitioning connections from a smaller IP address to "RDS_CONN_UP". But even in those cases, and in particular if the "RDS_EXTHDR_NPATHS" handshake has not occurred yet, and therefore we're working with "c_npaths <= 1", "c_conn[0]" may be in a state distinct from "RDS_CONN_DOWN", and therefore all messages on the just accepted socket will be tossed away. This fix changes "rds_tcp_accept_one": * If connected from a peer with a larger IP address, the new socket will continue to get closed right away. With commit [1] above, there should not be any messages in the socket receive buffer, since the peer never transitioned to "RDS_CONN_UP". Therefore it should be okay to not make any efforts to dispatch the socket receive buffer. * If connected from a peer with a smaller IP address, we call "rds_tcp_accept_one_path" to find a free slot/"path". If found, business goes on as usual. If none was found, we save/stash the newly accepted socket into "rds_tcp_accepted_sock", in order to not lose any messages that may have arrived already. We then return from "rds_tcp_accept_one" with "-ENOBUFS". Later on, when a slot/"path" does become available again (e.g. state transitioned to "RDS_CONN_DOWN", or HS extension header was received with "c_npaths > 1") we call "rds_tcp_conn_slots_available" that simply re-issues a "rds_tcp_accept_one_path" worker-callback and picks up the new socket from "rds_tcp_accepted_sock", and thereby continuing where it left with "-ENOBUFS" last time. Since a new slot has become available, those messages won't be lost, since processing proceeds as if that slot had been available the first time around. Signed-off-by: Gerd Rausch Signed-off-by: Jack Vogel Signed-off-by: Allison Henderson --- net/rds/connection.c | 3 ++ net/rds/rds.h | 65 +++++++++++++---------- net/rds/recv.c | 4 ++ net/rds/tcp.c | 27 ++++------ net/rds/tcp.h | 22 +++++++- net/rds/tcp_listen.c | 123 ++++++++++++++++++++++++++++++++----------- 6 files changed, 168 insertions(+), 76 deletions(-) diff --git a/net/rds/connection.c b/net/rds/connection.c index 4a9d80d56f56..3f26a67f3180 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -449,6 +449,9 @@ void rds_conn_shutdown(struct rds_conn_path *cp) } else { rcu_read_unlock(); } + + if (conn->c_trans->conn_slots_available) + conn->c_trans->conn_slots_available(conn); } /* destroy a single rds_conn_path. rds_conn_destroy() iterates over diff --git a/net/rds/rds.h b/net/rds/rds.h index b35afa2658cc..e989af1d1dc0 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -506,33 +506,6 @@ struct rds_notifier { */ #define RDS_TRANS_LOOP 3 -/** - * struct rds_transport - transport specific behavioural hooks - * - * @xmit: .xmit is called by rds_send_xmit() to tell the transport to send - * part of a message. The caller serializes on the send_sem so this - * doesn't need to be reentrant for a given conn. The header must be - * sent before the data payload. .xmit must be prepared to send a - * message with no data payload. .xmit should return the number of - * bytes that were sent down the connection, including header bytes. - * Returning 0 tells the caller that it doesn't need to perform any - * additional work now. This is usually the case when the transport has - * filled the sending queue for its connection and will handle - * triggering the rds thread to continue the send when space becomes - * available. Returning -EAGAIN tells the caller to retry the send - * immediately. Returning -ENOMEM tells the caller to retry the send at - * some point in the future. - * - * @conn_shutdown: conn_shutdown stops traffic on the given connection. Once - * it returns the connection can not call rds_recv_incoming(). - * This will only be called once after conn_connect returns - * non-zero success and will The caller serializes this with - * the send and connecting paths (xmit_* and conn_*). The - * transport is responsible for other serialization, including - * rds_recv_incoming(). This is called in process context but - * should try hard not to block. - */ - struct rds_transport { char t_name[TRANSNAMSIZ]; struct list_head t_item; @@ -545,10 +518,48 @@ struct rds_transport { __u32 scope_id); int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp); void (*conn_free)(void *data); + + /* + * conn_slots_available is invoked when a previously unavailable connection slot + * becomes available again. rds_tcp_accept_one_path may return -ENOBUFS if it + * cannot find an available slot, and then stashes the new socket in + * "rds_tcp_accepted_sock". This function re-issues `rds_tcp_accept_one_path`, + * which picks up the stashed socket and continuing where it left with "-ENOBUFS" + * last time. This ensures messages received on the new socket are not discarded + * when no connection path was available at the time. + */ + void (*conn_slots_available)(struct rds_connection *conn); int (*conn_path_connect)(struct rds_conn_path *cp); + + /* + * conn_shutdown stops traffic on the given connection. Once + * it returns the connection can not call rds_recv_incoming(). + * This will only be called once after conn_connect returns + * non-zero success and will The caller serializes this with + * the send and connecting paths (xmit_* and conn_*). The + * transport is responsible for other serialization, including + * rds_recv_incoming(). This is called in process context but + * should try hard not to block. + */ void (*conn_path_shutdown)(struct rds_conn_path *conn); void (*xmit_path_prepare)(struct rds_conn_path *cp); void (*xmit_path_complete)(struct rds_conn_path *cp); + + /* + * .xmit is called by rds_send_xmit() to tell the transport to send + * part of a message. The caller serializes on the send_sem so this + * doesn't need to be reentrant for a given conn. The header must be + * sent before the data payload. .xmit must be prepared to send a + * message with no data payload. .xmit should return the number of + * bytes that were sent down the connection, including header bytes. + * Returning 0 tells the caller that it doesn't need to perform any + * additional work now. This is usually the case when the transport has + * filled the sending queue for its connection and will handle + * triggering the rds thread to continue the send when space becomes + * available. Returning -EAGAIN tells the caller to retry the send + * immediately. Returning -ENOMEM tells the caller to retry the send at + * some point in the future. + */ int (*xmit)(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); diff --git a/net/rds/recv.c b/net/rds/recv.c index 66205d6924bf..66680f652e74 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -230,6 +230,10 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr, conn->c_npaths = max_t(int, conn->c_npaths, 1); conn->c_ping_triggered = 0; rds_conn_peer_gen_update(conn, new_peer_gen_num); + + if (conn->c_npaths > 1 && + conn->c_trans->conn_slots_available) + conn->c_trans->conn_slots_available(conn); } /* rds_start_mprds() will synchronously start multiple paths when appropriate. diff --git a/net/rds/tcp.c b/net/rds/tcp.c index 3cc2f303bf78..31e7425e2da9 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -213,6 +213,8 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp) sock->sk->sk_data_ready = sock->sk->sk_user_data; tc->t_sock = sock; + if (!tc->t_rtn) + tc->t_rtn = net_generic(sock_net(sock->sk), rds_tcp_netid); tc->t_cpath = cp; tc->t_orig_data_ready = sock->sk->sk_data_ready; tc->t_orig_write_space = sock->sk->sk_write_space; @@ -378,6 +380,7 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp) } mutex_init(&tc->t_conn_path_lock); tc->t_sock = NULL; + tc->t_rtn = NULL; tc->t_tinc = NULL; tc->t_tinc_hdr_rem = sizeof(struct rds_header); tc->t_tinc_data_rem = 0; @@ -458,6 +461,7 @@ struct rds_transport rds_tcp_transport = { .recv_path = rds_tcp_recv_path, .conn_alloc = rds_tcp_conn_alloc, .conn_free = rds_tcp_conn_free, + .conn_slots_available = rds_tcp_conn_slots_available, .conn_path_connect = rds_tcp_conn_path_connect, .conn_path_shutdown = rds_tcp_conn_path_shutdown, .inc_copy_to_user = rds_tcp_inc_copy_to_user, @@ -473,17 +477,7 @@ struct rds_transport rds_tcp_transport = { .t_unloading = rds_tcp_is_unloading, }; -static unsigned int rds_tcp_netid; - -/* per-network namespace private data for this module */ -struct rds_tcp_net { - struct socket *rds_tcp_listen_sock; - struct work_struct rds_tcp_accept_w; - struct ctl_table_header *rds_tcp_sysctl; - struct ctl_table *ctl_table; - int sndbuf_size; - int rcvbuf_size; -}; +int rds_tcp_netid; /* All module specific customizations to the RDS-TCP socket should be done in * rds_tcp_tune() and applied after socket creation. @@ -526,15 +520,12 @@ static void rds_tcp_accept_worker(struct work_struct *work) struct rds_tcp_net, rds_tcp_accept_w); - while (rds_tcp_accept_one(rtn->rds_tcp_listen_sock) == 0) + while (rds_tcp_accept_one(rtn) == 0) cond_resched(); } -void rds_tcp_accept_work(struct sock *sk) +void rds_tcp_accept_work(struct rds_tcp_net *rtn) { - struct net *net = sock_net(sk); - struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid); - queue_work(rds_wq, &rtn->rds_tcp_accept_w); } @@ -546,6 +537,8 @@ static __net_init int rds_tcp_init_net(struct net *net) memset(rtn, 0, sizeof(*rtn)); + mutex_init(&rtn->rds_tcp_accept_lock); + /* {snd, rcv}buf_size default to 0, which implies we let the * stack pick the value, and permit auto-tuning of buffer size. */ @@ -609,6 +602,8 @@ static void rds_tcp_kill_sock(struct net *net) rtn->rds_tcp_listen_sock = NULL; rds_tcp_listen_stop(lsock, &rtn->rds_tcp_accept_w); + if (rtn->rds_tcp_accepted_sock) + sock_release(rtn->rds_tcp_accepted_sock); spin_lock_irq(&rds_tcp_conn_lock); list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) { struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net); diff --git a/net/rds/tcp.h b/net/rds/tcp.h index 053aa7da87ef..2000f4acd57a 100644 --- a/net/rds/tcp.h +++ b/net/rds/tcp.h @@ -4,6 +4,21 @@ #define RDS_TCP_PORT 16385 +/* per-network namespace private data for this module */ +struct rds_tcp_net { + /* serialize "rds_tcp_accept_one" with "rds_tcp_accept_lock" + * to protect "rds_tcp_accepted_sock" + */ + struct mutex rds_tcp_accept_lock; + struct socket *rds_tcp_listen_sock; + struct socket *rds_tcp_accepted_sock; + struct work_struct rds_tcp_accept_w; + struct ctl_table_header *rds_tcp_sysctl; + struct ctl_table *ctl_table; + int sndbuf_size; + int rcvbuf_size; +}; + struct rds_tcp_incoming { struct rds_incoming ti_inc; struct sk_buff_head ti_skb_list; @@ -19,6 +34,7 @@ struct rds_tcp_connection { */ struct mutex t_conn_path_lock; struct socket *t_sock; + struct rds_tcp_net *t_rtn; void *t_orig_write_space; void *t_orig_data_ready; void *t_orig_state_change; @@ -49,6 +65,7 @@ struct rds_tcp_statistics { }; /* tcp.c */ +extern int rds_tcp_netid; bool rds_tcp_tune(struct socket *sock); void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp); void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp); @@ -57,7 +74,7 @@ void rds_tcp_restore_callbacks(struct socket *sock, u32 rds_tcp_write_seq(struct rds_tcp_connection *tc); u32 rds_tcp_snd_una(struct rds_tcp_connection *tc); extern struct rds_transport rds_tcp_transport; -void rds_tcp_accept_work(struct sock *sk); +void rds_tcp_accept_work(struct rds_tcp_net *rtn); int rds_tcp_laddr_check(struct net *net, const struct in6_addr *addr, __u32 scope_id); /* tcp_connect.c */ @@ -69,7 +86,8 @@ void rds_tcp_state_change(struct sock *sk); struct socket *rds_tcp_listen_init(struct net *net, bool isv6); void rds_tcp_listen_stop(struct socket *sock, struct work_struct *acceptor); void rds_tcp_listen_data_ready(struct sock *sk); -int rds_tcp_accept_one(struct socket *sock); +void rds_tcp_conn_slots_available(struct rds_connection *conn); +int rds_tcp_accept_one(struct rds_tcp_net *rtn); void rds_tcp_keepalive(struct socket *sock); void *rds_tcp_listen_sock_def_readable(struct net *net); diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index 27b6107ddc28..7433f2b46a98 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -35,6 +35,8 @@ #include #include #include +#include +#include #include "rds.h" #include "tcp.h" @@ -66,32 +68,46 @@ struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn) int i; int npaths = max_t(int, 1, conn->c_npaths); - /* for mprds, all paths MUST be initiated by the peer - * with the smaller address. - */ - if (rds_addr_cmp(&conn->c_faddr, &conn->c_laddr) >= 0) { - /* Make sure we initiate at least one path if this - * has not already been done; rds_start_mprds() will - * take care of additional paths, if necessary. - */ - if (npaths == 1) - rds_conn_path_connect_if_down(&conn->c_path[0]); - return NULL; - } - for (i = 0; i < npaths; i++) { struct rds_conn_path *cp = &conn->c_path[i]; if (rds_conn_path_transition(cp, RDS_CONN_DOWN, - RDS_CONN_CONNECTING)) { + RDS_CONN_CONNECTING)) return cp->cp_transport_data; - } } return NULL; } -int rds_tcp_accept_one(struct socket *sock) +void rds_tcp_conn_slots_available(struct rds_connection *conn) { + struct rds_tcp_connection *tc; + struct rds_tcp_net *rtn; + + if (rds_destroy_pending(conn)) + return; + + tc = conn->c_path->cp_transport_data; + rtn = tc->t_rtn; + if (!rtn) + return; + + /* As soon as a connection went down, + * it is safe to schedule a "rds_tcp_accept_one" + * attempt even if there are no connections pending: + * Function "rds_tcp_accept_one" won't block + * but simply return -EAGAIN in that case. + * + * Doing so is necessary to address the case where an + * incoming connection on "rds_tcp_listen_sock" is ready + * to be acccepted prior to a free slot being available: + * the -ENOBUFS case in "rds_tcp_accept_one". + */ + rds_tcp_accept_work(rtn); +} + +int rds_tcp_accept_one(struct rds_tcp_net *rtn) +{ + struct socket *listen_sock = rtn->rds_tcp_listen_sock; struct socket *new_sock = NULL; struct rds_connection *conn; int ret; @@ -105,17 +121,23 @@ int rds_tcp_accept_one(struct socket *sock) #endif int dev_if = 0; - if (!sock) /* module unload or netns delete in progress */ + if (!listen_sock) /* module unload or netns delete in progress */ return -ENETUNREACH; - ret = kernel_accept(sock, &new_sock, O_NONBLOCK); - if (ret) - return ret; + mutex_lock(&rtn->rds_tcp_accept_lock); + new_sock = rtn->rds_tcp_accepted_sock; + rtn->rds_tcp_accepted_sock = NULL; - rds_tcp_keepalive(new_sock); - if (!rds_tcp_tune(new_sock)) { - ret = -EINVAL; - goto out; + if (!new_sock) { + ret = kernel_accept(listen_sock, &new_sock, O_NONBLOCK); + if (ret) + goto out; + + rds_tcp_keepalive(new_sock); + if (!rds_tcp_tune(new_sock)) { + ret = -EINVAL; + goto out; + } } inet = inet_sk(new_sock->sk); @@ -130,7 +152,7 @@ int rds_tcp_accept_one(struct socket *sock) peer_addr = &daddr; #endif rdsdebug("accepted family %d tcp %pI6c:%u -> %pI6c:%u\n", - sock->sk->sk_family, + listen_sock->sk->sk_family, my_addr, ntohs(inet->inet_sport), peer_addr, ntohs(inet->inet_dport)); @@ -150,13 +172,13 @@ int rds_tcp_accept_one(struct socket *sock) } #endif - if (!rds_tcp_laddr_check(sock_net(sock->sk), peer_addr, dev_if)) { + if (!rds_tcp_laddr_check(sock_net(listen_sock->sk), peer_addr, dev_if)) { /* local address connection is only allowed via loopback */ ret = -EOPNOTSUPP; goto out; } - conn = rds_conn_create(sock_net(sock->sk), + conn = rds_conn_create(sock_net(listen_sock->sk), my_addr, peer_addr, &rds_tcp_transport, 0, GFP_KERNEL, dev_if); @@ -169,15 +191,51 @@ int rds_tcp_accept_one(struct socket *sock) * If the client reboots, this conn will need to be cleaned up. * rds_tcp_state_change() will do that cleanup */ - rs_tcp = rds_tcp_accept_one_path(conn); - if (!rs_tcp) + if (rds_addr_cmp(&conn->c_faddr, &conn->c_laddr) < 0) { + /* Try to obtain a free connection slot. + * If unsuccessful, we need to preserve "new_sock" + * that we just accepted, since its "sk_receive_queue" + * may contain messages already that have been acknowledged + * to and discarded by the sender. + * We must not throw those away! + */ + rs_tcp = rds_tcp_accept_one_path(conn); + if (!rs_tcp) { + /* It's okay to stash "new_sock", since + * "rds_tcp_conn_slots_available" triggers "rds_tcp_accept_one" + * again as soon as one of the connection slots + * becomes available again + */ + rtn->rds_tcp_accepted_sock = new_sock; + new_sock = NULL; + ret = -ENOBUFS; + goto out; + } + } else { + /* This connection request came from a peer with + * a larger address. + * Function "rds_tcp_state_change" makes sure + * that the connection doesn't transition + * to state "RDS_CONN_UP", and therefore + * we should not have received any messages + * on this socket yet. + * This is the only case where it's okay to + * not dequeue messages from "sk_receive_queue". + */ + if (conn->c_npaths <= 1) + rds_conn_path_connect_if_down(&conn->c_path[0]); + rs_tcp = NULL; goto rst_nsk; + } + mutex_lock(&rs_tcp->t_conn_path_lock); cp = rs_tcp->t_cpath; conn_state = rds_conn_path_state(cp); WARN_ON(conn_state == RDS_CONN_UP); - if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_ERROR) + if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_ERROR) { + rds_conn_path_drop(cp, 0); goto rst_nsk; + } if (rs_tcp->t_sock) { /* Duelling SYN has been handled in rds_tcp_accept_one() */ rds_tcp_reset_callbacks(new_sock, cp); @@ -207,6 +265,9 @@ int rds_tcp_accept_one(struct socket *sock) mutex_unlock(&rs_tcp->t_conn_path_lock); if (new_sock) sock_release(new_sock); + + mutex_unlock(&rtn->rds_tcp_accept_lock); + return ret; } @@ -234,7 +295,7 @@ void rds_tcp_listen_data_ready(struct sock *sk) * the listen socket is being torn down. */ if (sk->sk_state == TCP_LISTEN) - rds_tcp_accept_work(sk); + rds_tcp_accept_work(net_generic(sock_net(sk), rds_tcp_netid)); else ready = rds_tcp_listen_sock_def_readable(sock_net(sk)); -- 2.43.0