This file does most of the work of transmitting outgoing messages. It is also responsible for copying data from user space into skbs. Signed-off-by: John Ousterhout --- Changes for v10: * Revise sparse annotations to eliminate __context__ definition * Remove log messages after alloc errors Changes for v9: * Use new homa_clock abstraction layer * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) * Eliminate sizeof32 define: use sizeof instead Changes for v7: * Implement accounting for bytes in tx skbs * Rename UNKNOWN packet type to RPC_UNKNOWN * Use new RPC reference counts; eliminates need for RCU * Remove locker argument from locking functions * Use u64 and __u64 properly * Fix incorrect skb check in homa_message_out_fill --- net/homa/homa_impl.h | 13 + net/homa/homa_outgoing.c | 570 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 583 insertions(+) create mode 100644 net/homa/homa_outgoing.c diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h index 6dd24383efa2..a03ee6ca3403 100644 --- a/net/homa/homa_impl.h +++ b/net/homa/homa_impl.h @@ -379,12 +379,25 @@ static inline bool homa_make_header_avl(struct sk_buff *skb) extern unsigned int homa_net_id; +int homa_fill_data_interleaved(struct homa_rpc *rpc, + struct sk_buff *skb, struct iov_iter *iter); +int homa_message_out_fill(struct homa_rpc *rpc, + struct iov_iter *iter, int xmit); +void homa_message_out_init(struct homa_rpc *rpc, int length); void homa_rpc_handoff(struct homa_rpc *rpc); +struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, + struct iov_iter *iter, int offset, + int length, int max_seg_data); int homa_xmit_control(enum homa_packet_type type, void *contents, size_t length, struct homa_rpc *rpc); +int __homa_xmit_control(void *contents, size_t length, + struct homa_peer *peer, struct homa_sock *hsk); void homa_xmit_data(struct homa_rpc *rpc, bool force); +void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk); int homa_message_in_init(struct homa_rpc *rpc, int unsched); +void homa_resend_data(struct homa_rpc *rpc, int start, int end); +void __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc); /** * homa_net_from_net() - Return the struct homa_net associated with a particular diff --git a/net/homa/homa_outgoing.c b/net/homa/homa_outgoing.c new file mode 100644 index 000000000000..b56872608aa2 --- /dev/null +++ b/net/homa/homa_outgoing.c @@ -0,0 +1,570 @@ +// SPDX-License-Identifier: BSD-2-Clause + +/* This file contains functions related to the sender side of message + * transmission. It also contains utility functions for sending packets. + */ + +#include "homa_impl.h" +#include "homa_pacer.h" +#include "homa_peer.h" +#include "homa_rpc.h" +#include "homa_wire.h" +#include "homa_stub.h" + +/** + * homa_message_out_init() - Initialize rpc->msgout. + * @rpc: RPC whose output message should be initialized. Must be + * locked by caller. + * @length: Number of bytes that will eventually be in rpc->msgout. + */ +void homa_message_out_init(struct homa_rpc *rpc, int length) + __must_hold(rpc->bucket->lock) +{ + memset(&rpc->msgout, 0, sizeof(rpc->msgout)); + rpc->msgout.length = length; + rpc->msgout.next_xmit = &rpc->msgout.packets; + rpc->msgout.init_time = homa_clock(); +} + +/** + * homa_fill_data_interleaved() - This function is invoked to fill in the + * part of a data packet after the initial header, when GSO is being used. + * homa_seg_hdrs must be interleaved with the data to provide the correct + * offset for each segment. + * @rpc: RPC whose output message is being created. Must be + * locked by caller. + * @skb: The packet being filled. The initial homa_data_hdr was + * created and initialized by the caller and the + * homa_skb_info has been filled in with the packet geometry. + * @iter: Describes location(s) of (remaining) message data in user + * space. + * Return: Either a negative errno or 0 (for success). + */ +int homa_fill_data_interleaved(struct homa_rpc *rpc, struct sk_buff *skb, + struct iov_iter *iter) + __must_hold(rpc->bucket->lock) +{ + struct homa_skb_info *homa_info = homa_get_skb_info(skb); + int seg_length = homa_info->seg_length; + int bytes_left = homa_info->data_bytes; + int offset = homa_info->offset; + int err; + + /* Each iteration of the following loop adds info for one packet, + * which includes a homa_seg_hdr followed by the data for that + * segment. The first homa_seg_hdr was already added by the caller. + */ + while (1) { + struct homa_seg_hdr seg; + + if (bytes_left < seg_length) + seg_length = bytes_left; + err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter, + seg_length); + if (err != 0) + return err; + bytes_left -= seg_length; + offset += seg_length; + + if (bytes_left == 0) + break; + + seg.offset = htonl(offset); + err = homa_skb_append_to_frag(rpc->hsk->homa, skb, &seg, + sizeof(seg)); + if (err != 0) + return err; + } + return 0; +} + +/** + * homa_tx_data_pkt_alloc() - Allocate a new sk_buff and fill it with an + * outgoing Homa data packet. The resulting packet will be a GSO packet + * that will eventually be segmented by the NIC. + * @rpc: RPC that packet will belong to (msgout must have been + * initialized). Must be locked by caller. + * @iter: Describes location(s) of (remaining) message data in user + * space. + * @offset: Offset in the message of the first byte of data in this + * packet. + * @length: How many bytes of data to include in the skb. Caller must + * ensure that this amount of data isn't too much for a + * well-formed GSO packet, and that iter has at least this + * much data. + * @max_seg_data: Maximum number of bytes of message data that can go in + * a single segment of the GSO packet. + * Return: A pointer to the new packet, or a negative errno. + */ +struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, + struct iov_iter *iter, int offset, + int length, int max_seg_data) + __must_hold(rpc->bucket->lock) +{ + struct homa_skb_info *homa_info; + struct homa_data_hdr *h; + struct sk_buff *skb; + int err, gso_size; + u64 segs; + + segs = length + max_seg_data - 1; + do_div(segs, max_seg_data); + + /* Initialize the overall skb. */ + skb = homa_skb_alloc_tx(sizeof(struct homa_data_hdr) + length + + (segs - 1) * sizeof(struct homa_seg_hdr)); + if (!skb) + return ERR_PTR(-ENOMEM); + + /* Fill in the Homa header (which will be replicated in every + * network packet by GSO). + */ + h = (struct homa_data_hdr *)skb_put(skb, sizeof(struct homa_data_hdr)); + h->common.sport = htons(rpc->hsk->port); + h->common.dport = htons(rpc->dport); + h->common.sequence = htonl(offset); + h->common.type = DATA; + homa_set_doff(h, sizeof(struct homa_data_hdr)); + h->common.checksum = 0; + h->common.sender_id = cpu_to_be64(rpc->id); + h->message_length = htonl(rpc->msgout.length); + h->ack.client_id = 0; + homa_peer_get_acks(rpc->peer, 1, &h->ack); + h->retransmit = 0; + h->seg.offset = htonl(offset); + + homa_info = homa_get_skb_info(skb); + homa_info->next_skb = NULL; + homa_info->wire_bytes = length + segs * (sizeof(struct homa_data_hdr) + + rpc->hsk->ip_header_length + HOMA_ETH_OVERHEAD); + homa_info->data_bytes = length; + homa_info->seg_length = max_seg_data; + homa_info->offset = offset; + + if (segs > 1) { + homa_set_doff(h, sizeof(struct homa_data_hdr) - + sizeof(struct homa_seg_hdr)); + gso_size = max_seg_data + sizeof(struct homa_seg_hdr); + err = homa_fill_data_interleaved(rpc, skb, iter); + } else { + gso_size = max_seg_data; + err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter, + length); + } + if (err) + goto error; + + if (segs > 1) { + skb_shinfo(skb)->gso_segs = segs; + skb_shinfo(skb)->gso_size = gso_size; + + /* It's unclear what gso_type should be used to force software + * GSO; the value below seems to work... + */ + skb_shinfo(skb)->gso_type = + rpc->hsk->homa->gso_force_software ? 0xd : SKB_GSO_TCPV6; + } + return skb; + +error: + homa_skb_free_tx(rpc->hsk->homa, skb); + return ERR_PTR(err); +} + +/** + * homa_message_out_fill() - Initializes information for sending a message + * for an RPC (either request or response); copies the message data from + * user space and (possibly) begins transmitting the message. + * @rpc: RPC for which to send message; this function must not + * previously have been called for the RPC. Must be locked. The RPC + * will be unlocked while copying data, but will be locked again + * before returning. + * @iter: Describes location(s) of message data in user space. + * @xmit: Nonzero means this method should start transmitting packets; + * transmission will be overlapped with copying from user space. + * Zero means the caller will initiate transmission after this + * function returns. + * + * Return: 0 for success, or a negative errno for failure. It is possible + * for the RPC to be freed while this function is active. If that + * happens, copying will cease, -EINVAL will be returned, and + * rpc->state will be RPC_DEAD. + */ +int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit) + __must_hold(rpc->bucket->lock) +{ + /* Geometry information for packets: + * mtu: largest size for an on-the-wire packet (including + * all headers through IP header, but not Ethernet + * header). + * max_seg_data: largest amount of Homa message data that fits + * in an on-the-wire packet (after segmentation). + * max_gso_data: largest amount of Homa message data that fits + * in a GSO packet (before segmentation). + */ + int mtu, max_seg_data, max_gso_data; + + struct sk_buff **last_link; + struct dst_entry *dst; + u64 segs_per_gso; + int overlap_xmit; + + /* Bytes of the message that haven't yet been copied into skbs. */ + int bytes_left; + + int gso_size; + int err; + + homa_rpc_hold(rpc); + if (unlikely(iter->count > HOMA_MAX_MESSAGE_LENGTH || + iter->count == 0)) { + err = -EINVAL; + goto error; + } + homa_message_out_init(rpc, iter->count); + + /* Compute the geometry of packets. */ + dst = homa_get_dst(rpc->peer, rpc->hsk); + mtu = dst_mtu(dst); + max_seg_data = mtu - rpc->hsk->ip_header_length + - sizeof(struct homa_data_hdr); + gso_size = dst->dev->gso_max_size; + if (gso_size > rpc->hsk->homa->max_gso_size) + gso_size = rpc->hsk->homa->max_gso_size; + dst_release(dst); + + /* Round gso_size down to an even # of mtus. */ + segs_per_gso = gso_size - rpc->hsk->ip_header_length - + sizeof(struct homa_data_hdr) + + sizeof(struct homa_seg_hdr); + do_div(segs_per_gso, max_seg_data + + sizeof(struct homa_seg_hdr)); + if (segs_per_gso == 0) + segs_per_gso = 1; + max_gso_data = segs_per_gso * max_seg_data; + + overlap_xmit = rpc->msgout.length > 2 * max_gso_data; + homa_skb_stash_pages(rpc->hsk->homa, rpc->msgout.length); + + /* Each iteration of the loop below creates one GSO packet. */ + last_link = &rpc->msgout.packets; + for (bytes_left = rpc->msgout.length; bytes_left > 0; ) { + int skb_data_bytes, offset; + struct sk_buff *skb; + + homa_rpc_unlock(rpc); + skb_data_bytes = max_gso_data; + offset = rpc->msgout.length - bytes_left; + if (skb_data_bytes > bytes_left) + skb_data_bytes = bytes_left; + skb = homa_tx_data_pkt_alloc(rpc, iter, offset, skb_data_bytes, + max_seg_data); + if (IS_ERR(skb)) { + err = PTR_ERR(skb); + homa_rpc_lock(rpc); + goto error; + } + bytes_left -= skb_data_bytes; + + homa_rpc_lock(rpc); + if (rpc->state == RPC_DEAD) { + /* RPC was freed while we were copying. */ + err = -EINVAL; + homa_skb_free_tx(rpc->hsk->homa, skb); + goto error; + } + *last_link = skb; + last_link = &(homa_get_skb_info(skb)->next_skb); + *last_link = NULL; + rpc->msgout.num_skbs++; + rpc->msgout.skb_memory += skb->truesize; + rpc->msgout.copied_from_user = rpc->msgout.length - bytes_left; + if (overlap_xmit && list_empty(&rpc->throttled_links) && + xmit) + homa_pacer_manage_rpc(rpc); + } + refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc); + homa_rpc_put(rpc); + if (!overlap_xmit && xmit) + homa_xmit_data(rpc, false); + return 0; + +error: + refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc); + homa_rpc_put(rpc); + return err; +} + +/** + * homa_xmit_control() - Send a control packet to the other end of an RPC. + * @type: Packet type, such as DATA. + * @contents: Address of buffer containing the contents of the packet. + * Only information after the common header must be valid; + * the common header will be filled in by this function. + * @length: Length of @contents (including the common header). + * @rpc: The packet will go to the socket that handles the other end + * of this RPC. Addressing info for the packet, including all of + * the fields of homa_common_hdr except type, will be set from this. + * Caller must hold either the lock or a reference. + * + * Return: Either zero (for success), or a negative errno value if there + * was a problem. + */ +int homa_xmit_control(enum homa_packet_type type, void *contents, + size_t length, struct homa_rpc *rpc) +{ + struct homa_common_hdr *h = contents; + + h->type = type; + h->sport = htons(rpc->hsk->port); + h->dport = htons(rpc->dport); + h->sender_id = cpu_to_be64(rpc->id); + return __homa_xmit_control(contents, length, rpc->peer, rpc->hsk); +} + +/** + * __homa_xmit_control() - Lower-level version of homa_xmit_control: sends + * a control packet. + * @contents: Address of buffer containing the contents of the packet. + * The caller must have filled in all of the information, + * including the common header. + * @length: Length of @contents. + * @peer: Destination to which the packet will be sent. + * @hsk: Socket via which the packet will be sent. + * + * Return: Either zero (for success), or a negative errno value if there + * was a problem. + */ +int __homa_xmit_control(void *contents, size_t length, struct homa_peer *peer, + struct homa_sock *hsk) +{ + struct homa_common_hdr *h; + struct sk_buff *skb; + int extra_bytes; + int result; + + skb = homa_skb_alloc_tx(HOMA_MAX_HEADER); + if (unlikely(!skb)) + return -ENOBUFS; + skb_dst_set(skb, homa_get_dst(peer, hsk)); + + h = skb_put(skb, length); + memcpy(h, contents, length); + extra_bytes = HOMA_MIN_PKT_LENGTH - length; + if (extra_bytes > 0) + memset(skb_put(skb, extra_bytes), 0, extra_bytes); + skb->ooo_okay = 1; + skb_get(skb); + if (hsk->inet.sk.sk_family == AF_INET6) + result = ip6_xmit(&hsk->inet.sk, skb, &peer->flow.u.ip6, 0, + NULL, 0, 0); + else + result = ip_queue_xmit(&hsk->inet.sk, skb, &peer->flow); + if (unlikely(result != 0)) { + /* It appears that ip*_xmit frees skbuffs after + * errors; the following code is to raise an alert if + * this isn't actually the case. The extra skb_get above + * and kfree_skb call below are needed to do the check + * accurately (otherwise the buffer could be freed and + * its memory used for some other purpose, resulting in + * a bogus "reference count"). + */ + if (refcount_read(&skb->users) > 1) { + if (hsk->inet.sk.sk_family == AF_INET6) + pr_notice("ip6_xmit didn't free Homa control packet (type %d) after error %d\n", + h->type, result); + else + pr_notice("ip_queue_xmit didn't free Homa control packet (type %d) after error %d\n", + h->type, result); + } + } + kfree_skb(skb); + return result; +} + +/** + * homa_xmit_unknown() - Send an RPC_UNKNOWN packet to a peer. + * @skb: Buffer containing an incoming packet; identifies the peer to + * which the RPC_UNKNOWN packet should be sent. + * @hsk: Socket that should be used to send the RPC_UNKNOWN packet. + */ +void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk) +{ + struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data; + struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + struct homa_rpc_unknown_hdr unknown; + struct homa_peer *peer; + + unknown.common.sport = h->dport; + unknown.common.dport = h->sport; + unknown.common.type = RPC_UNKNOWN; + unknown.common.sender_id = cpu_to_be64(homa_local_id(h->sender_id)); + peer = homa_peer_get(hsk, &saddr); + if (!IS_ERR(peer)) + __homa_xmit_control(&unknown, sizeof(unknown), peer, hsk); + homa_peer_release(peer); +} + +/** + * homa_xmit_data() - If an RPC has outbound data packets that are permitted + * to be transmitted according to the scheduling mechanism, arrange for + * them to be sent (some may be sent immediately; others may be sent + * later by the pacer thread). + * @rpc: RPC to check for transmittable packets. Must be locked by + * caller. Note: this function will release the RPC lock while + * passing packets through the RPC stack, then reacquire it + * before returning. It is possible that the RPC gets freed + * when the lock isn't held, in which case the state will + * be RPC_DEAD on return. + * @force: True means send at least one packet, even if the NIC queue + * is too long. False means that zero packets may be sent, if + * the NIC queue is sufficiently long. + */ +void homa_xmit_data(struct homa_rpc *rpc, bool force) + __must_hold(rpc->bucket->lock) +{ + struct homa *homa = rpc->hsk->homa; + + homa_rpc_hold(rpc); + while (*rpc->msgout.next_xmit) { + struct sk_buff *skb = *rpc->msgout.next_xmit; + + if ((rpc->msgout.length - rpc->msgout.next_xmit_offset) + >= homa->pacer->throttle_min_bytes) { + if (!homa_pacer_check_nic_q(homa->pacer, skb, force)) { + homa_pacer_manage_rpc(rpc); + break; + } + } + + rpc->msgout.next_xmit = &(homa_get_skb_info(skb)->next_skb); + rpc->msgout.next_xmit_offset += + homa_get_skb_info(skb)->data_bytes; + + homa_rpc_hold(rpc); + homa_rpc_unlock(rpc); + skb_get(skb); + __homa_xmit_data(skb, rpc); + force = false; + homa_rpc_lock(rpc); + homa_rpc_put(rpc); + if (rpc->state == RPC_DEAD) + break; + } + homa_rpc_put(rpc); +} + +/** + * __homa_xmit_data() - Handles packet transmission stuff that is common + * to homa_xmit_data and homa_resend_data. + * @skb: Packet to be sent. The packet will be freed after transmission + * (and also if errors prevented transmission). + * @rpc: Information about the RPC that the packet belongs to. + */ +void __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc) +{ + skb_dst_set(skb, homa_get_dst(rpc->peer, rpc->hsk)); + + skb->ooo_okay = 1; + skb->ip_summed = CHECKSUM_PARTIAL; + skb->csum_start = skb_transport_header(skb) - skb->head; + skb->csum_offset = offsetof(struct homa_common_hdr, checksum); + if (rpc->hsk->inet.sk.sk_family == AF_INET6) + ip6_xmit(&rpc->hsk->inet.sk, skb, &rpc->peer->flow.u.ip6, + 0, NULL, 0, 0); + else + ip_queue_xmit(&rpc->hsk->inet.sk, skb, &rpc->peer->flow); +} + +/** + * homa_resend_data() - This function is invoked as part of handling RESEND + * requests. It retransmits the packet(s) containing a given range of bytes + * from a message. + * @rpc: RPC for which data should be resent. + * @start: Offset within @rpc->msgout of the first byte to retransmit. + * @end: Offset within @rpc->msgout of the byte just after the last one + * to retransmit. + */ +void homa_resend_data(struct homa_rpc *rpc, int start, int end) + __must_hold(rpc->bucket->lock) +{ + struct homa_skb_info *homa_info; + struct sk_buff *skb; + + if (end <= start) + return; + + /* Each iteration of this loop checks one packet in the message + * to see if it contains segments that need to be retransmitted. + */ + for (skb = rpc->msgout.packets; skb; skb = homa_info->next_skb) { + int seg_offset, offset, seg_length, data_left; + struct homa_data_hdr *h; + + homa_info = homa_get_skb_info(skb); + offset = homa_info->offset; + if (offset >= end) + break; + if (start >= (offset + homa_info->data_bytes)) + continue; + + offset = homa_info->offset; + seg_offset = sizeof(struct homa_data_hdr); + data_left = homa_info->data_bytes; + if (skb_shinfo(skb)->gso_segs <= 1) { + seg_length = data_left; + } else { + seg_length = homa_info->seg_length; + h = (struct homa_data_hdr *)skb_transport_header(skb); + } + for ( ; data_left > 0; data_left -= seg_length, + offset += seg_length, + seg_offset += skb_shinfo(skb)->gso_size) { + struct homa_skb_info *new_homa_info; + struct sk_buff *new_skb; + int err; + + if (seg_length > data_left) + seg_length = data_left; + + if (end <= offset) + goto resend_done; + if ((offset + seg_length) <= start) + continue; + + /* This segment must be retransmitted. */ + new_skb = homa_skb_alloc_tx(sizeof(struct homa_data_hdr) + + seg_length); + if (unlikely(!new_skb)) + goto resend_done; + h = __skb_put_data(new_skb, skb_transport_header(skb), + sizeof(struct homa_data_hdr)); + h->common.sequence = htonl(offset); + h->seg.offset = htonl(offset); + h->retransmit = 1; + err = homa_skb_append_from_skb(rpc->hsk->homa, new_skb, + skb, seg_offset, + seg_length); + if (err != 0) { + pr_err("%s got error %d from homa_skb_append_from_skb\n", + __func__, err); + kfree_skb(new_skb); + goto resend_done; + } + + new_homa_info = homa_get_skb_info(new_skb); + new_homa_info->wire_bytes = rpc->hsk->ip_header_length + + sizeof(struct homa_data_hdr) + + seg_length + HOMA_ETH_OVERHEAD; + new_homa_info->data_bytes = seg_length; + new_homa_info->seg_length = seg_length; + new_homa_info->offset = offset; + homa_pacer_check_nic_q(rpc->hsk->homa->pacer, new_skb, + true); + __homa_xmit_data(new_skb, rpc); + } + } + +resend_done: + return; +} -- 2.43.0