Note: for man pages, see the Homa Wiki at: https://homa-transport.atlassian.net/wiki/spaces/HOMA/overview Signed-off-by: John Ousterhout --- Changes for v16: * Implement HOMAIOCINFO ioctl. Changes for v14: * Add "WITH Linux-syscall-note" SPDX license note Changes for v11: * Add explicit padding to struct homa_recvmsg_args to fix problems compiling on 32-bit machines. Changes for v9: * Eliminate use of _Static_assert * Remove declarations related to now-defunct homa_api.c Changes for v7: * Add HOMA_SENDMSG_NONBLOCKING flag for sendmsg * API changes for new mechanism for waiting for incoming messages * Add setsockopt SO_HOMA_SERVER (enable incoming requests) * Use u64 and __u64 properly --- MAINTAINERS | 6 + include/uapi/linux/homa.h | 300 ++++++++++++++++++++++++++++++++++++++ net/Kconfig | 1 + net/Makefile | 1 + 4 files changed, 308 insertions(+) create mode 100644 include/uapi/linux/homa.h diff --git a/MAINTAINERS b/MAINTAINERS index 97d958c945e4..9dd7506b502e 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -11185,6 +11185,12 @@ F: lib/test_hmm* F: mm/hmm* F: tools/testing/selftests/mm/*hmm* +HOMA TRANSPORT PROTOCOL +M: John Ousterhout +S: Maintained +F: net/homa/ +F: include/uapi/linux/homa.h + HONEYWELL HSC030PA PRESSURE SENSOR SERIES IIO DRIVER M: Petre Rodan L: linux-iio@vger.kernel.org diff --git a/include/uapi/linux/homa.h b/include/uapi/linux/homa.h new file mode 100644 index 000000000000..77e89f538258 --- /dev/null +++ b/include/uapi/linux/homa.h @@ -0,0 +1,300 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ WITH Linux-syscall-note */ + +/* This file defines the kernel call interface for the Homa + * transport protocol. + */ + +#ifndef _UAPI_LINUX_HOMA_H +#define _UAPI_LINUX_HOMA_H + +#include +#ifndef __KERNEL__ +#include +#include +#endif + +/* IANA-assigned Internet Protocol number for Homa. */ +#define IPPROTO_HOMA 146 + +/** + * define HOMA_MAX_MESSAGE_LENGTH - Maximum bytes of payload in a Homa + * request or response message. + */ +#define HOMA_MAX_MESSAGE_LENGTH 1000000 + +/** + * define HOMA_BPAGE_SIZE - Number of bytes in pages used for receive + * buffers. Must be power of two. + */ +#define HOMA_BPAGE_SIZE (1 << HOMA_BPAGE_SHIFT) +#define HOMA_BPAGE_SHIFT 16 + +/** + * define HOMA_MAX_BPAGES - The largest number of bpages that will be required + * to store an incoming message. + */ +#define HOMA_MAX_BPAGES ((HOMA_MAX_MESSAGE_LENGTH + HOMA_BPAGE_SIZE - 1) >> \ + HOMA_BPAGE_SHIFT) + +/** + * define HOMA_MIN_DEFAULT_PORT - The 16 bit port space is divided into + * two nonoverlapping regions. Ports 1-32767 are reserved exclusively + * for well-defined server ports. The remaining ports are used for client + * ports; these are allocated automatically by Homa. Port 0 is reserved. + */ +#define HOMA_MIN_DEFAULT_PORT 0x8000 + +/** + * struct homa_sendmsg_args - Provides information needed by Homa's + * sendmsg; passed to sendmsg using the msg_control field. + */ +struct homa_sendmsg_args { + /** + * @id: (in/out) An initial value of 0 means a new request is + * being sent; nonzero means the message is a reply to the given + * id. If the message is a request, then the value is modified to + * hold the id of the new RPC. + */ + __u64 id; + + /** + * @completion_cookie: (in) Used only for request messages; will be + * returned by recvmsg when the RPC completes. Typically used to + * locate app-specific info about the RPC. + */ + __u64 completion_cookie; + + /** + * @flags: (in) OR-ed combination of bits that control the operation. + * See below for values. + */ + __u32 flags; + + /** @reserved: Not currently used, must be 0. */ + __u32 reserved; +}; + +/* Flag bits for homa_sendmsg_args.flags (see man page for documentation): + */ +#define HOMA_SENDMSG_PRIVATE 0x01 +#define HOMA_SENDMSG_VALID_FLAGS 0x01 + +/** + * struct homa_recvmsg_args - Provides information needed by Homa's + * recvmsg; passed to recvmsg using the msg_control field. + */ +struct homa_recvmsg_args { + /** + * @id: (in/out) Initial value is 0 to wait for any shared RPC; + * nonzero means wait for that specific (private) RPC. Returns + * the id of the RPC received. + */ + __u64 id; + + /** + * @completion_cookie: (out) If the incoming message is a response, + * this will return the completion cookie specified when the + * request was sent. For requests this will always be zero. + */ + __u64 completion_cookie; + + /** + * @num_bpages: (in/out) Number of valid entries in @bpage_offsets. + * Passes in bpages from previous messages that can now be + * recycled; returns bpages from the new message. + */ + __u32 num_bpages; + + /** @reserved: Not currently used, must be 0. */ + __u32 reserved; + + /** + * @bpage_offsets: (in/out) Each entry is an offset into the buffer + * region for the socket pool. When returned from recvmsg, the + * offsets indicate where fragments of the new message are stored. All + * entries but the last refer to full buffer pages (HOMA_BPAGE_SIZE + * bytes) and are bpage-aligned. The last entry may refer to a bpage + * fragment and is not necessarily aligned. The application now owns + * these bpages and must eventually return them to Homa, using + * bpage_offsets in a future recvmsg invocation. + */ + __u32 bpage_offsets[HOMA_MAX_BPAGES]; +}; + +/** define SO_HOMA_RCVBUF: setsockopt option for specifying buffer region. */ +#define SO_HOMA_RCVBUF 10 + +/** + * define SO_HOMA_SERVER: setsockopt option for specifying whether a + * socket will act as server. + */ +#define SO_HOMA_SERVER 11 + +/** struct homa_rcvbuf_args - setsockopt argument for SO_HOMA_RCVBUF. */ +struct homa_rcvbuf_args { + /** @start: Address of first byte of buffer region in user space. */ + __u64 start; + + /** @length: Total number of bytes available at @start. */ + size_t length; +}; + +/* Meanings of the bits in Homa's flag word, which can be set using + * "sysctl /net/homa/flags". + */ + +/** + * define HOMA_FLAG_DONT_THROTTLE - disable the output throttling mechanism + * (always send all packets immediately). + */ +#define HOMA_FLAG_DONT_THROTTLE 2 + +/** + * struct homa_rpc_info - Used by HOMAIOCINFO to return information about + * a specific RPC. + */ +struct homa_rpc_info { + /** + * @id: Identifier for the RPC, unique among all RPCs sent by the + * client node. If the low-order bit is 1, this node is the server + * for the RPC; 0 means we are the client. + */ + __u64 id; + + /** @peer: Address of the peer socket for this RPC. */ + union { + struct sockaddr_storage storage; + struct sockaddr_in in4; + struct sockaddr_in6 in6; + } peer; + + /** + * @completion_cookie: For client-side RPCs this gives the completion + * cookie specified when the RPC was initiated. For server-side RPCs + * this is zero. + */ + __u64 completion_cookie; + + /** + * @tx_length: Length of the outgoing message in bytes, or -1 if + * the sendmsg hasn't yet been called. + */ + __s32 tx_length; + + /** + * @tx_sent: Number of bytes of the outgoing message that have been + * transmitted at least once. + */ + __u32 tx_sent; + + /** + * @tx_granted: Number of bytes of the outgoing message that the + * receiver has authorized us to transmit (includes unscheduled + * bytes). + */ + __u32 tx_granted; + + /** @reserved: Reserved for future use. */ + __u32 reserved; + + /** + * @rx_length: Length of the incoming message, in bytes. -1 means + * the length is not yet known (this is a client-side RPC and + * no packets have been received). + */ + __s32 rx_length; + + /** + * @rx_remaining: Number of bytes in the incoming message that have + * not yet been received. + */ + __u32 rx_remaining; + + /** + * @rx_gaps: The number of gaps in the incoming message. A gap is + * a range of bytes that have not been received yet, but bytes after + * the gap have been received. + */ + __u32 rx_gaps; + + /** + * @rx_gap_bytes: The total number of bytes in gaps in the incoming + * message. + */ + __u32 rx_gap_bytes; + + /** + * @rx_granted: The number of bytes in the message that the sender + * is authorized to transmit (includes unscheduled bytes). + */ + __u32 rx_granted; + + /** + * @flags: Various single-bit values associated with the RPC: + * HOMA_RPC_BUF_STALL: The incoming message is currently stalled + * because there is insufficient receiver buffer + * space. + * HOMA_RPC_PRIVATE: The RPC has been created as "private"; set + * only on the client side. + * HOMA_RPC_RX_READY: The incoming message is complete and has + * been queued waiting for a thread to call + * recvmsg. + * HOMA_RPC_RX_COPY: There are packets that have been received, + * whose data has not yet been copied from + * packet buffers to user space. + */ + __u16 flags; +#define HOMA_RPC_BUF_STALL 1 +#define HOMA_RPC_PRIVATE 2 +#define HOMA_RPC_RX_READY 4 +#define HOMA_RPC_RX_COPY 8 +}; + +/** + * struct homa_info - In/out argument passed to HOMAIOCINFO. Fields labeled + * as "in" must be set by the application; other fields are returned to the + * application from the kernel. + */ +struct homa_info { + /** + * @rpc_info: (in) Address of memory region in which to store + * information about individual RPCs. + */ + struct homa_rpc_info *rpc_info; + + /** + * @rpc_info_length: (in) Number of bytes of storage available at + * rpc_info. + */ + size_t rpc_info_length; + + /** + * @bpool_avail_bytes: Number of bytes in the buffer pool for incoming + * messages that is currently available for new messages. + */ + __u64 bpool_avail_bytes; + + /** @port: Port number handled by this socket. */ + __u32 port; + + /** + * @num_rpcs: Total number of active RPCs (both server and client) for + * this socket. The number stored at @rpc_info will be less than this + * if @rpc_info_length is too small. + */ + __u32 num_rpcs; + + /** + * @error_msg: Provides additional information about the last error + * returned by a Homa-related kernel call such as sendmsg, recvmsg, + * or ioctl. Not updated for some obvious return values such as EINTR + * or EWOULDBLOCK. + */ +#define HOMA_ERROR_MSG_SIZE 100 + char error_msg[HOMA_ERROR_MSG_SIZE]; +}; + +/* I/O control calls on Homa sockets.*/ +#define HOMAIOCINFO _IOWR('h', 1, struct homa_info) + +#endif /* _UAPI_LINUX_HOMA_H */ diff --git a/net/Kconfig b/net/Kconfig index d5865cf19799..92972ff2a78d 100644 --- a/net/Kconfig +++ b/net/Kconfig @@ -250,6 +250,7 @@ source "net/bridge/netfilter/Kconfig" endif # if NETFILTER source "net/sctp/Kconfig" +source "net/homa/Kconfig" source "net/rds/Kconfig" source "net/tipc/Kconfig" source "net/atm/Kconfig" diff --git a/net/Makefile b/net/Makefile index aac960c41db6..71f740e0dc34 100644 --- a/net/Makefile +++ b/net/Makefile @@ -43,6 +43,7 @@ ifneq ($(CONFIG_VLAN_8021Q),) obj-y += 8021q/ endif obj-$(CONFIG_IP_SCTP) += sctp/ +obj-$(CONFIG_HOMA) += homa/ obj-$(CONFIG_RDS) += rds/ obj-$(CONFIG_WIRELESS) += wireless/ obj-$(CONFIG_MAC80211) += mac80211/ -- 2.43.0 This file defines the on-the-wire packet formats for Homa. Signed-off-by: John Ousterhout --- Changes for v11: * Rework the mechanism for waking up RPCs that stalled waiting for buffer pool space Changes for v10: * Replace __u16 with u16, __u8 with u8, etc. * Refactor resend mechanism Changes for v9: * Eliminate use of _Static_assert * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory, Replace BOGUS in enum homa_packet_type with MAX_OP) * Remove HOMA_IPV6_HEADER_LENGTH and similar defs, use sizeof(ipv6hdr) instead Changes for v7: * Rename UNKNOWN packet type to RPC_UNKNOWN * Use u64 and __u64 properly --- net/homa/homa_wire.h | 348 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 348 insertions(+) create mode 100644 net/homa/homa_wire.h diff --git a/net/homa/homa_wire.h b/net/homa/homa_wire.h new file mode 100644 index 000000000000..8ce349ad3c1b --- /dev/null +++ b/net/homa/homa_wire.h @@ -0,0 +1,348 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file defines the on-the-wire format of Homa packets. */ + +#ifndef _HOMA_WIRE_H +#define _HOMA_WIRE_H + +#include +#include + +/* Defines the possible types of Homa packets. + * + * See the xxx_header structs below for more information about each type. + */ +enum homa_packet_type { + DATA = 0x10, + RESEND = 0x12, + RPC_UNKNOWN = 0x13, + BUSY = 0x14, + NEED_ACK = 0x17, + ACK = 0x18, + MAX_OP = 0x18, + /* If you add a new type here, you must also do the following: + * 1. Change MAX_OP so it is the highest valid opcode + * 2. Add support for the new opcode in homa_print_packet, + * homa_print_packet_short, homa_symbol_for_type, and mock_skb_alloc. + * 3. Add the header length to header_lengths in homa_plumbing.c. + */ +}; + +/** + * define HOMA_SKB_EXTRA - How many bytes of additional space to allow at the + * beginning of each sk_buff, before the Homa header. This includes room for + * either an IPV4 or IPV6 header, Ethernet header, VLAN header, etc. This is + * a bit of an overestimate, since it also includes space for a TCP header. + */ +#define HOMA_SKB_EXTRA MAX_TCP_HEADER + +/** + * define HOMA_ETH_FRAME_OVERHEAD - Additional overhead bytes for each + * Ethernet packet that are not included in the packet header (preamble, + * start frame delimiter, CRC, and inter-packet gap). + */ +#define HOMA_ETH_FRAME_OVERHEAD 24 + +/** + * define HOMA_ETH_OVERHEAD - Number of bytes per Ethernet packet for Ethernet + * header, CRC, preamble, and inter-packet gap. + */ +#define HOMA_ETH_OVERHEAD (18 + HOMA_ETH_FRAME_OVERHEAD) + +/** + * define HOMA_MIN_PKT_LENGTH - Every Homa packet must be padded to at least + * this length to meet Ethernet frame size limitations. This number includes + * Homa headers and data, but not IP or Ethernet headers. + */ +#define HOMA_MIN_PKT_LENGTH 26 + +/** + * define HOMA_MAX_HEADER - Number of bytes in the largest Homa header. + */ +#define HOMA_MAX_HEADER 90 + +/** + * struct homa_common_hdr - Wire format for the first bytes in every Homa + * packet. This must (mostly) match the format of a TCP header to enable + * Homa packets to actually be transmitted as TCP packets (and thereby + * take advantage of TSO and other features). + */ +struct homa_common_hdr { + /** + * @sport: Port on source machine from which packet was sent. + * Must be in the same position as in a TCP header. + */ + __be16 sport; + + /** + * @dport: Port on destination that is to receive packet. Must be + * in the same position as in a TCP header. + */ + __be16 dport; + + /** + * @sequence: corresponds to the sequence number field in TCP headers; + * used in DATA packets to hold the offset in the message of the first + * byte of data. However, when TSO is used without TCP hijacking, this + * value will only be correct in the first segment of a GSO packet. + */ + __be32 sequence; + + /** + * @ack: Corresponds to the high-order bits of the acknowledgment + * field in TCP headers; not used by Homa. + */ + char ack[3]; + + /** + * @type: Homa packet type (one of the values of the homa_packet_type + * enum). Corresponds to the low-order byte of the ack in TCP. + */ + u8 type; + + /** + * @doff: High order 4 bits correspond to the Data Offset field of a + * TCP header. In DATA packets they hold the number of 4-byte chunks + * in a homa_data_hdr; used by TSO to determine where the replicated + * header portion ends. For other packets the offset is always 5 + * (standard TCP header length); other values may cause some NICs + * (such as Intel E810-C) to drop outgoing packets when TCP hijacking + * is enabled. The low-order bits are always 0. + */ + u8 doff; + + /** @reserved1: Not used (corresponds to TCP flags). */ + u8 reserved1; + + /** + * @window: Corresponds to the window field in TCP headers. Not used + * by HOMA. + */ + __be16 window; + + /** + * @checksum: Not used by Homa, but must occupy the same bytes as + * the checksum in a TCP header (TSO may modify this?). + */ + __be16 checksum; + + /** @reserved2: Not used (corresponds to TCP urgent field). */ + __be16 reserved2; + + /** + * @sender_id: the identifier of this RPC as used on the sender (i.e., + * if the low-order bit is set, then the sender is the server for + * this RPC). + */ + __be64 sender_id; +} __packed; + +/** + * struct homa_ack - Identifies an RPC that can be safely deleted by its + * server. After sending the response for an RPC, the server must retain its + * state for the RPC until it knows that the client has successfully + * received the entire response. An ack indicates this. Clients will + * piggyback acks on future data packets, but if a client doesn't send + * any data to the server, the server will eventually request an ack + * explicitly with a NEED_ACK packet, in which case the client will + * return an explicit ACK. + */ +struct homa_ack { + /** + * @client_id: The client's identifier for the RPC. 0 means this ack + * is invalid. + */ + __be64 client_id; + + /** @server_port: The server-side port for the RPC. */ + __be16 server_port; +} __packed; + +/* struct homa_data_hdr - Contains data for part or all of a Homa message. + * An incoming packet consists of a homa_data_hdr followed by message data. + * An outgoing packet can have this simple format as well, or it can be + * structured as a GSO packet with the following format: + * + * |-----------------------| + * | | + * | data_header | + * | | + * |---------------------- | + * | | + * | | + * | segment data | + * | | + * | | + * |-----------------------| + * | seg_header | + * |-----------------------| + * | | + * | | + * | segment data | + * | | + * | | + * |-----------------------| + * | seg_header | + * |-----------------------| + * | | + * | | + * | segment data | + * | | + * | | + * |-----------------------| + * + * TSO will not adjust @homa_common_hdr.sequence in the segments, so Homa + * sprinkles correct offsets (in homa_seg_hdrs) throughout the segment data; + * TSO/GSO will include a different homa_seg_hdr in each generated packet. + */ + +struct homa_seg_hdr { + /** + * @offset: Offset within message of the first byte of data in + * this segment. + */ + __be32 offset; +} __packed; + +struct homa_data_hdr { + struct homa_common_hdr common; + + /** @message_length: Total #bytes in the message. */ + __be32 message_length; + + __be32 reserved1; + + /** @ack: If the @client_id field of this is nonzero, provides info + * about an RPC that the recipient can now safely free. Note: in + * TSO packets this will get duplicated in each of the segments; + * in order to avoid repeated attempts to ack the same RPC, + * homa_gro_receive will clear this field in all segments but the + * first. + */ + struct homa_ack ack; + + __be16 reserved2; + + /** + * @retransmit: 1 means this packet was sent in response to a RESEND + * (it has already been sent previously). + */ + u8 retransmit; + + char pad[3]; + + /** @seg: First of possibly many segments. */ + struct homa_seg_hdr seg; +} __packed; + +/** + * homa_data_len() - Returns the total number of bytes in a DATA packet + * after the homa_data_hdr. Note: if the packet is a GSO packet, the result + * may include metadata as well as packet data. + * @skb: Incoming data packet + * Return: see above + */ +static inline int homa_data_len(struct sk_buff *skb) +{ + return skb->len - skb_transport_offset(skb) - + sizeof(struct homa_data_hdr); +} + +/** + * struct homa_resend_hdr - Wire format for RESEND packets. + * + * A RESEND is sent by the receiver when it believes that message data may + * have been lost in transmission (or if it is concerned that the sender may + * have crashed). The receiver should resend the specified portion of the + * message, even if it already sent it previously. + */ +struct homa_resend_hdr { + /** @common: Fields common to all packet types. */ + struct homa_common_hdr common; + + /** + * @offset: Offset within the message of the first byte of data that + * should be retransmitted. + */ + __be32 offset; + + /** + * @length: Number of bytes of data to retransmit. -1 means no data + * has been received for the message, so everything sent previously + * should be retransmitted. + */ + __be32 length; + +} __packed; + +/** + * struct homa_rpc_unknown_hdr - Wire format for RPC_UNKNOWN packets. + * + * An RPC_UNKNOWN packet is sent by either server or client when it receives a + * packet for an RPC that is unknown to it. When a client receives an + * RPC_UNKNOWN packet it will typically restart the RPC from the beginning; + * when a server receives an RPC_UNKNOWN packet it will typically discard its + * state for the RPC. + */ +struct homa_rpc_unknown_hdr { + /** @common: Fields common to all packet types. */ + struct homa_common_hdr common; +} __packed; + +/** + * struct homa_busy_hdr - Wire format for BUSY packets. + * + * These packets tell the recipient that the sender is still alive (even if + * it isn't sending data expected by the recipient). + */ +struct homa_busy_hdr { + /** @common: Fields common to all packet types. */ + struct homa_common_hdr common; +} __packed; + +/** + * struct homa_need_ack_hdr - Wire format for NEED_ACK packets. + * + * These packets ask the recipient (a client) to return an ACK message if + * the packet's RPC is no longer active. + */ +struct homa_need_ack_hdr { + /** @common: Fields common to all packet types. */ + struct homa_common_hdr common; +} __packed; + +/** + * struct homa_ack_hdr - Wire format for ACK packets. + * + * These packets are sent from a client to a server to indicate that + * a set of RPCs is no longer active on the client, so the server can + * free any state it may have for them. + */ +struct homa_ack_hdr { + /** @common: Fields common to all packet types. */ + struct homa_common_hdr common; + + /** @num_acks: Number of (leading) elements in @acks that are valid. */ + __be16 num_acks; + +#define HOMA_MAX_ACKS_PER_PKT 5 + /** @acks: Info about RPCs that are no longer active. */ + struct homa_ack acks[HOMA_MAX_ACKS_PER_PKT]; +} __packed; + +/** + * homa_local_id(): given an RPC identifier from an input packet (which + * is network-encoded), return the decoded id we should use for that + * RPC on this machine. + * @sender_id: RPC id from an incoming packet, such as h->common.sender_id + * Return: see above + */ +static inline u64 homa_local_id(__be64 sender_id) +{ + /* If the client bit was set on the sender side, it needs to be + * removed here, and conversely. + */ + return be64_to_cpu(sender_id) ^ 1; +} + +#endif /* _HOMA_WIRE_H */ -- 2.43.0 homa_impl.h defines "struct homa", which contains overall information about the Homa transport, plus various odds and ends that are used throughout the Homa implementation. homa_stub.h is a temporary header file that provides stubs for facilities that have omitted for this first patch series. This file will go away once Home is fully upstreamed. Signed-off-by: John Ousterhout --- Changes for v16: * Remove various fields and functions: * net field in struct homa_net * bytes_left, next_sibling, and last_sibling fields in struct homa_skb_info * is_homa_pkt() * homa_from_skb() * homa_net_from_skb() * homa_usecs_to_cycles() * Rename homa_net_from_net to homa_net * Use consume_skb and kfree_skb_reason instead of kfree_skb Changes for v12: * Use tsc_khz instead of cpu_khz * Make is_homa_pkt work properly with IPv6 (it only worked for IPv4) Changes for v11: * Move link_mbps variable from struct homa_pacer back to struct homa. Changes for v10: * Eliminate __context__ definition * Replace __u16 with u16, __u8 with u8, etc. * Refactor resend mechanism Changes for v9: * Move information from sync.txt into comments in homa_impl.h * Add limits on number of active peer structs * Introduce homa_net objects; there is now a single global struct homa shared by all network namespaces, with one homa_net per network namespace with netns-specific information. * Introduce homa_clock as an abstraction layer for the fine-grain clock. * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) * Eliminate sizeof32 definition Changes for v8: * Pull out pacer-related fields into separate struct homa_pacer in homa_pacer.h Changes for v7: * Make Homa a per-net subsystem * Track tx buffer memory usage * Refactor waiting mechanism for incoming packets: simplify wait criteria and use standard Linux mechanisms for waiting * Remove "lock_slow" functions, which don't add functionality in this patch series * Rename homa_rpc_free to homa_rpc_end * Add homa_make_header_avl function * Use u64 and __u64 properly --- net/homa/homa_impl.h | 514 +++++++++++++++++++++++++++++++++++++++++++ net/homa/homa_stub.h | 91 ++++++++ 2 files changed, 605 insertions(+) create mode 100644 net/homa/homa_impl.h create mode 100644 net/homa/homa_stub.h diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h new file mode 100644 index 000000000000..512122ce5ae3 --- /dev/null +++ b/net/homa/homa_impl.h @@ -0,0 +1,514 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file contains definitions that are shared across the files + * that implement Homa for Linux. + */ + +#ifndef _HOMA_IMPL_H +#define _HOMA_IMPL_H + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "homa_wire.h" + +/* Forward declarations. */ +struct homa; +struct homa_peer; +struct homa_rpc; +struct homa_sock; + +/** + * union sockaddr_in_union - Holds either an IPv4 or IPv6 address (smaller + * and easier to use than sockaddr_storage). + */ +union sockaddr_in_union { + /** @sa: Used to access as a generic sockaddr. */ + struct sockaddr sa; + + /** @in4: Used to access as IPv4 socket. */ + struct sockaddr_in in4; + + /** @in6: Used to access as IPv6 socket. */ + struct sockaddr_in6 in6; +}; + +/** + * struct homa - Stores overall information about the Homa transport, which + * is shared across all Homa sockets and all network namespaces. + */ +struct homa { + /** + * @next_outgoing_id: Id to use for next outgoing RPC request. + * This is always even: it's used only to generate client-side ids. + * Accessed without locks. Note: RPC ids are unique within a + * single client machine. + */ + atomic64_t next_outgoing_id; + + /** + * @peertab: Info about all the other hosts we have communicated with; + * includes peers from all network namespaces. + */ + struct homa_peertab *peertab; + + /** + * @socktab: Information about all open sockets. Dynamically + * allocated; must be kfreed. + */ + struct homa_socktab *socktab; + + /** @max_numa: Highest NUMA node id in use by any core. */ + int max_numa; + + /** + * @link_mbps: The raw bandwidth of the network uplink, in + * units of 1e06 bits per second. Set externally via sysctl. + */ + int link_mbps; + + /** + * @resend_ticks: When an RPC's @silent_ticks reaches this value, + * start sending RESEND requests. + */ + int resend_ticks; + + /** + * @resend_interval: minimum number of homa timer ticks between + * RESENDs for the same RPC. + */ + int resend_interval; + + /** + * @timeout_ticks: abort an RPC if its silent_ticks reaches this value. + */ + int timeout_ticks; + + /** + * @timeout_resends: Assume that a server is dead if it has not + * responded after this many RESENDs have been sent to it. + */ + int timeout_resends; + + /** + * @request_ack_ticks: How many timer ticks we'll wait for the + * client to ack an RPC before explicitly requesting an ack. + * Set externally via sysctl. + */ + int request_ack_ticks; + + /** + * @reap_limit: Maximum number of packet buffers to free in a + * single call to home_rpc_reap. + */ + int reap_limit; + + /** + * @dead_buffs_limit: If the number of packet buffers in dead but + * not yet reaped RPCs is less than this number, then Homa reaps + * RPCs in a way that minimizes impact on performance but may permit + * dead RPCs to accumulate. If the number of dead packet buffers + * exceeds this value, then Homa switches to a more aggressive approach + * to reaping RPCs. Set externally via sysctl. + */ + int dead_buffs_limit; + + /** + * @max_dead_buffs: The largest aggregate number of packet buffers + * in dead (but not yet reaped) RPCs that has existed so far in a + * single socket. Readable via sysctl, and may be reset via sysctl + * to begin recalculating. + */ + int max_dead_buffs; + + /** + * @max_gso_size: Maximum number of bytes that will be included + * in a single output packet that Homa passes to Linux. Can be set + * externally via sysctl to lower the limit already enforced by Linux. + */ + int max_gso_size; + + /** + * @gso_force_software: A non-zero value will cause Homa to perform + * segmentation in software using GSO; zero means ask the NIC to + * perform TSO. Set externally via sysctl. + */ + int gso_force_software; + + /** + * @wmem_max: Limit on the value of sk_sndbuf for any socket. Set + * externally via sysctl. + */ + int wmem_max; + + /** + * @timer_ticks: number of times that homa_timer has been invoked + * (may wraparound, which is safe). + */ + u32 timer_ticks; + + /** + * @flags: a collection of bits that can be set using sysctl + * to trigger various behaviors. + */ + int flags; + + /** + * @bpage_lease_usecs: how long a core can own a bpage (microseconds) + * before its ownership can be revoked to reclaim the page. + */ + int bpage_lease_usecs; + + /** + * @bpage_lease_cycles: same as bpage_lease_usecs except in + * homa_clock() units. + */ + int bpage_lease_cycles; + + /** + * @next_id: Set via sysctl; causes next_outgoing_id to be set to + * this value; always reads as zero. Typically used while debugging to + * ensure that different nodes use different ranges of ids. + */ + int next_id; + + /** + * @destroyed: True means that this structure is being destroyed + * so everyone should clean up. + */ + bool destroyed; + +}; + +/** + * struct homa_net - Contains Homa information that is specific to a + * particular network namespace. + */ +struct homa_net { + /** @homa: Global Homa information. */ + struct homa *homa; + + /** + * @prev_default_port: The most recent port number assigned from + * the range of default ports. + */ + u16 prev_default_port; + + /** + * @num_peers: The total number of struct homa_peers that exist + * for this namespace. Managed by homa_peer.c under the peertab lock. + */ + int num_peers; +}; + +/** + * struct homa_skb_info - Additional information needed by Homa for each + * outbound DATA packet. Space is allocated for this at the very end of the + * linear part of the skb. + */ +struct homa_skb_info { + /** @next_skb: used to link together outgoing skb's for a message. */ + struct sk_buff *next_skb; + + /** + * @wire_bytes: total number of bytes of network bandwidth that + * will be consumed by this packet. This includes everything, + * including additional headers added by GSO, IP header, Ethernet + * header, CRC, preamble, and inter-packet gap. + */ + int wire_bytes; + + /** + * @data_bytes: total bytes of message data across all of the + * segments in this packet. + */ + int data_bytes; + + /** @seg_length: maximum number of data bytes in each GSO segment. */ + int seg_length; + + /** + * @offset: offset within the message of the first byte of data in + * this packet. + */ + int offset; + + /** @rpc: RPC that this packet belongs to. */ + void *rpc; +}; + +/** + * homa_get_skb_info() - Return the address of Homa's private information + * for an sk_buff. + * @skb: Socket buffer whose info is needed. + * Return: address of Homa's private information for @skb. + */ +static inline struct homa_skb_info *homa_get_skb_info(struct sk_buff *skb) +{ + return (struct homa_skb_info *)(skb_end_pointer(skb)) - 1; +} + +/** + * homa_set_doff() - Fills in the doff TCP header field for a Homa packet. + * @h: Packet header whose doff field is to be set. + * @size: Size of the "header", bytes (must be a multiple of 4). This + * information is used only for TSO; it's the number of bytes + * that should be replicated in each segment. The bytes after + * this will be distributed among segments. + */ +static inline void homa_set_doff(struct homa_data_hdr *h, int size) +{ + /* Drop the 2 low-order bits from size and set the 4 high-order + * bits of doff from what's left. + */ + h->common.doff = size << 2; +} + +/** skb_is_ipv6() - Return true if the packet is encapsulated with IPv6, + * false otherwise (presumably it's IPv4). + */ +static inline bool skb_is_ipv6(const struct sk_buff *skb) +{ + return ipv6_hdr(skb)->version == 6; +} + +/** + * ipv6_to_ipv4() - Given an IPv6 address produced by ipv4_to_ipv6, return + * the original IPv4 address (in network byte order). + * @ip6: IPv6 address; assumed to be a mapped IPv4 address. + * Return: IPv4 address stored in @ip6. + */ +static inline __be32 ipv6_to_ipv4(const struct in6_addr ip6) +{ + return ip6.in6_u.u6_addr32[3]; +} + +/** + * canonical_ipv6_addr() - Convert a socket address to the "standard" + * form used in Homa, which is always an IPv6 address; if the original address + * was IPv4, convert it to an IPv4-mapped IPv6 address. + * @addr: Address to canonicalize (if NULL, "any" is returned). + * Return: IPv6 address corresponding to @addr. + */ +static inline struct in6_addr canonical_ipv6_addr(const union sockaddr_in_union + *addr) +{ + struct in6_addr mapped; + + if (addr) { + if (addr->sa.sa_family == AF_INET6) + return addr->in6.sin6_addr; + ipv6_addr_set_v4mapped(addr->in4.sin_addr.s_addr, &mapped); + return mapped; + } + return in6addr_any; +} + +/** + * skb_canonical_ipv6_saddr() - Given a packet buffer, return its source + * address in the "standard" form used in Homa, which is always an IPv6 + * address; if the original address was IPv4, convert it to an IPv4-mapped + * IPv6 address. + * @skb: The source address will be extracted from this packet buffer. + * Return: IPv6 address for @skb's source machine. + */ +static inline struct in6_addr skb_canonical_ipv6_saddr(struct sk_buff *skb) +{ + struct in6_addr mapped; + + if (skb_is_ipv6(skb)) + return ipv6_hdr(skb)->saddr; + ipv6_addr_set_v4mapped(ip_hdr(skb)->saddr, &mapped); + return mapped; +} + +/** + * homa_make_header_avl() - Invokes pskb_may_pull to make sure that all the + * Homa header information for a packet is in the linear part of the skb + * where it can be addressed using skb_transport_header. + * @skb: Packet for which header is needed. + * Return: The result of pskb_may_pull (true for success) + */ +static inline bool homa_make_header_avl(struct sk_buff *skb) +{ + int pull_length; + + pull_length = skb_transport_header(skb) - skb->data + HOMA_MAX_HEADER; + if (pull_length > skb->len) + pull_length = skb->len; + return pskb_may_pull(skb, pull_length); +} + +extern unsigned int homa_net_id; + +int homa_ioc_info(struct socket *sock, unsigned long arg); + +/** + * homa_net() - Return the struct homa_net associated with a particular + * struct net. + * @net: Get the Homa data for this net namespace. + * Return: see above. + */ +static inline struct homa_net *homa_net(struct net *net) +{ + return (struct homa_net *)net_generic(net, homa_net_id); +} + +/** + * homa_clock() - Return a fine-grain clock value that is monotonic and + * consistent across cores. + * Return: see above. + */ +static inline u64 homa_clock(void) +{ + /* This function exists to make it easy to switch time sources + * if/when new or better sources become available. + */ + return ktime_get_ns(); +} + +/** + * homa_clock_khz() - Return the frequency of the values returned by + * homa_clock, in units of KHz. + * Return: see above. + */ +static inline u64 homa_clock_khz(void) +{ + return 1000000; +} + +/** + * homa_ns_to_cycles() - Convert from units of nanoseconds to units of + * homa_clock(). + * @ns: A time measurement in nanoseconds + * Return: The time in homa_clock() units corresponding to @ns. + */ +static inline u64 homa_ns_to_cycles(u64 ns) +{ + u64 tmp; + + tmp = ns * homa_clock_khz(); + do_div(tmp, 1000000); + return tmp; +} + +/* Homa Locking Strategy: + * + * (Note: this documentation is referenced in several other places in the + * Homa code) + * + * In the Linux TCP/IP stack the primary locking mechanism is a sleep-lock + * per socket. However, per-socket locks aren't adequate for Homa, because + * sockets are "larger" in Homa. In TCP, a socket corresponds to a single + * connection between two peers; an application can have hundreds or + * thousands of sockets open at once, so per-socket locks leave lots of + * opportunities for concurrency. With Homa, a single socket can be used for + * communicating with any number of peers, so there will typically be just + * one socket per thread. As a result, a single Homa socket must support many + * concurrent RPCs efficiently, and a per-socket lock would create a bottleneck + * (Homa tried this approach initially). + * + * Thus, the primary locks used in Homa spinlocks at RPC granularity. This + * allows operations on different RPCs for the same socket to proceed + * concurrently. Homa also has socket locks (which are spinlocks different + * from the official socket sleep-locks) but these are used much less + * frequently than RPC locks. + * + * Lock Ordering: + * + * There are several other locks in Homa besides RPC locks, all of which + * are spinlocks. When multiple locks are held, they must be acquired in a + * consistent order in order to prevent deadlock. Here are the rules for Homa: + * 1. Except for RPC and socket locks, all locks should be considered + * "leaf" locks: don't acquire other locks while holding them. + * 2. The lock order is: + * * RPC lock + * * Socket lock + * * Other lock + * + * It may seem surprising that RPC locks are acquired *before* socket locks, + * but this is essential for high performance. Homa has been designed so that + * many common operations (such as processing input packets) can be performed + * while holding only an RPC lock; this allows operations on different RPCs + * to proceed in parallel. Only a few operations, such as handing off an + * incoming message to a waiting thread, require the socket lock. If socket + * locks had to be acquired first, any operation that might eventually need + * the socket lock would have to acquire it before the RPC lock, which would + * severely restrict concurrency. + * + * Socket Shutdown: + * + * It is possible for socket shutdown to begin while operations are underway + * that hold RPC locks but not the socket lock. For example, a new RPC + * creation might be underway when a socket is shut down. The RPC creation + * will eventually acquire the socket lock and add the new RPC to those + * for the socket; it would be very bad if this were to happen after + * homa_sock_shutdown things is has deleted all RPCs for the socket. + * In general, any operation that acquires a socket lock must check + * hsk->shutdown after acquiring the lock and abort if hsk->shutdown is set. + * + * Spinlock Implications: + * + * Homa uses spinlocks exclusively; this is needed because locks typically + * need to be acquired at atomic level, such as in SoftIRQ code. + * + * Operations that can block, such as memory allocation and copying data + * to/from user space, are not permitted while holding spinlocks (spinlocks + * disable interrupts, so the holder must not block. This results in awkward + * code in several places to move restricted operations outside locked + * regions. Such code typically looks like this: + * - Acquire a reference on an object such as an RPC, in order to prevent + * the object from being deleted. + * - Release the object's lock. + * - Perform the restricted operation. + * - Re-acquire the lock. + * - Release the reference. + * It is possible that the object may have been modified by some other party + * while it was unlocked, so additional checks may be needed after reacquiring + * the lock. As one example, an RPC may have been terminated, in which case + * any operation in progress on that RPC should be aborted after reacquiring + * the lock. + * + * Lists of RPCs: + * + * There are a few places where Homa needs to process all of the RPCs + * associated with a socket, such as the timer. Such code must first lock + * the socket (to protect access to the link pointers) then lock + * individual RPCs on the list. However, this violates the rules for locking + * order. It isn't safe to unlock the socket before locking the individual RPCs, + * because RPCs could be deleted and their memory recycled between the unlock + * of the socket lock and the lock of the RPC; this could result in corruption. + * Homa uses two different approaches to handle this situation: + * 1. Use ``homa_protect_rpcs`` to prevent RPC reaping for a socket. RPCs can + * still be terminated, but their memory won't go away until + * homa_unprotect_rpcs is invoked. This allows the socket lock to be + * released before acquiring RPC locks; after acquiring each RPC lock, + * the RPC must be checked to see if it has been terminated; if so, skip it. + * 2. Use ``spin_trylock_bh`` to acquire the RPC lock while still holding the + * socket lock. If this fails, then release the socket lock and retry + * both the socket lock and the RPC lock. Of course, the state of both + * socket and RPC could change before the locks are finally acquired. + */ + +#endif /* _HOMA_IMPL_H */ diff --git a/net/homa/homa_stub.h b/net/homa/homa_stub.h new file mode 100644 index 000000000000..502cd93de89b --- /dev/null +++ b/net/homa/homa_stub.h @@ -0,0 +1,91 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file contains stripped-down replacements that have been + * temporarily removed from Homa during the Linux upstreaming + * process. By the time upstreaming is complete this file will + * have gone away. + */ + +#ifndef _HOMA_STUB_H +#define _HOMA_STUB_H + +#include "homa_impl.h" + +static inline int homa_skb_init(struct homa *homa) +{ + return 0; +} + +static inline void homa_skb_cleanup(struct homa *homa) +{} + +static inline void homa_skb_release_pages(struct homa *homa) +{} + +static inline int homa_skb_append_from_iter(struct homa *homa, + struct sk_buff *skb, + struct iov_iter *iter, int length) +{ + char *dst = skb_put(skb, length); + + if (copy_from_iter(dst, length, iter) != length) + return -EFAULT; + return 0; +} + +static inline int homa_skb_append_to_frag(struct homa *homa, + struct sk_buff *skb, void *buf, + int length) +{ + char *dst = skb_put(skb, length); + + memcpy(dst, buf, length); + return 0; +} + +static inline int homa_skb_append_from_skb(struct homa *homa, + struct sk_buff *dst_skb, + struct sk_buff *src_skb, + int offset, int length) +{ + return homa_skb_append_to_frag(homa, dst_skb, + skb_transport_header(src_skb) + offset, length); +} + +static inline void homa_skb_free_tx(struct homa *homa, struct sk_buff *skb) +{ + consume_skb(skb); +} + +static inline void homa_skb_free_many_tx(struct homa *homa, + struct sk_buff **skbs, int count) +{ + int i; + + for (i = 0; i < count; i++) + consume_skb(skbs[i]); +} + +static inline void homa_skb_get(struct sk_buff *skb, void *dest, int offset, + int length) +{ + memcpy(dest, skb_transport_header(skb) + offset, length); +} + +static inline struct sk_buff *homa_skb_alloc_tx(int length) +{ + struct sk_buff *skb; + + skb = alloc_skb(HOMA_SKB_EXTRA + sizeof(struct homa_skb_info) + length, + GFP_ATOMIC); + if (likely(skb)) { + skb_reserve(skb, HOMA_SKB_EXTRA); + skb_reset_transport_header(skb); + } + return skb; +} + +static inline void homa_skb_stash_pages(struct homa *homa, int length) +{} + +#endif /* _HOMA_STUB_H */ -- 2.43.0 These files implement Homa's mechanism for managing application-level buffer space for incoming messages This mechanism is needed to allow Homa to copy data out to user space in parallel with receiving packets; it was discussed in a talk at NetDev 0x17. Signed-off-by: John Ousterhout --- Changes for v16: * Add homa_pool_avail_bytes() for new HOMAIOCINFO ioctl Changes for v11: * Rework the mechanism for waking up RPCs that stalled waiting for buffer pool space Changes for v10: * Fix minor syntactic issues such as reverse xmas tree Changes for v9: * Eliminate use of _Static_assert * Use new homa_clock abstraction layer. * Allow memory to be allocated without GFP_ATOMIC * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) * Remove sync.txt, move its contents into comments (mostly in homa_impl.h) Changes for v8: * Refactor homa_pool APIs (move allocation/deallocation into homa_pool.c, move locking responsibility out) Changes for v7: * Use u64 and __u64 properly * Eliminate extraneous use of RCU * Refactor pool->cores to use percpu variable * Use smp_processor_id instead of raw_smp_processor_id --- net/homa/homa_pool.c | 507 +++++++++++++++++++++++++++++++++++++++++++ net/homa/homa_pool.h | 137 ++++++++++++ 2 files changed, 644 insertions(+) create mode 100644 net/homa/homa_pool.c create mode 100644 net/homa/homa_pool.h diff --git a/net/homa/homa_pool.c b/net/homa/homa_pool.c new file mode 100644 index 000000000000..556d898b22cf --- /dev/null +++ b/net/homa/homa_pool.c @@ -0,0 +1,507 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +#include "homa_impl.h" +#include "homa_pool.h" + +/* This file contains functions that manage user-space buffer pools. */ + +/* Pools must always have at least this many bpages (no particular + * reasoning behind this value). + */ +#define MIN_POOL_SIZE 2 + +/* Used when determining how many bpages to consider for allocation. */ +#define MIN_EXTRA 4 + +/** + * set_bpages_needed() - Set the bpages_needed field of @pool based + * on the length of the first RPC that's waiting for buffer space. + * The caller must own the lock for @pool->hsk. + * @pool: Pool to update. + */ +static void set_bpages_needed(struct homa_pool *pool) +{ + struct homa_rpc *rpc = list_first_entry(&pool->hsk->waiting_for_bufs, + struct homa_rpc, buf_links); + + pool->bpages_needed = (rpc->msgin.length + HOMA_BPAGE_SIZE - 1) >> + HOMA_BPAGE_SHIFT; +} + +/** + * homa_pool_alloc() - Allocate and initialize a new homa_pool (it will have + * no region associated with it until homa_pool_set_region is invoked). + * @hsk: Socket the pool will be associated with. + * Return: A pointer to the new pool or a negative errno. + */ +struct homa_pool *homa_pool_alloc(struct homa_sock *hsk) +{ + struct homa_pool *pool; + + pool = kzalloc(sizeof(*pool), GFP_KERNEL); + if (!pool) + return ERR_PTR(-ENOMEM); + pool->hsk = hsk; + return pool; +} + +/** + * homa_pool_set_region() - Associate a region of memory with a pool. + * @hsk: Socket whose pool the region will be associated with. + * Must not be locked, and the pool must not currently + * have a region associated with it. + * @region: First byte of the memory region for the pool, allocated + * by the application; must be page-aligned. + * @region_size: Total number of bytes available at @buf_region. + * Return: Either zero (for success) or a negative errno for failure. + */ +int homa_pool_set_region(struct homa_sock *hsk, void __user *region, + u64 region_size) +{ + struct homa_pool_core __percpu *cores; + struct homa_bpage *descriptors; + int i, result, num_bpages; + struct homa_pool *pool; + + if (((uintptr_t)region) & ~PAGE_MASK) + return -EINVAL; + + /* Allocate memory before locking the socket, so we can allocate + * without GFP_ATOMIC. + */ + num_bpages = region_size >> HOMA_BPAGE_SHIFT; + if (num_bpages < MIN_POOL_SIZE) + return -EINVAL; + descriptors = kmalloc_array(num_bpages, sizeof(struct homa_bpage), + GFP_KERNEL | __GFP_ZERO); + if (!descriptors) + return -ENOMEM; + cores = alloc_percpu_gfp(struct homa_pool_core, __GFP_ZERO); + if (!cores) { + result = -ENOMEM; + goto error; + } + + homa_sock_lock(hsk); + pool = hsk->buffer_pool; + if (pool->region) { + result = -EINVAL; + homa_sock_unlock(hsk); + goto error; + } + + pool->region = (char __user *)region; + pool->num_bpages = num_bpages; + pool->descriptors = descriptors; + atomic_set(&pool->free_bpages, pool->num_bpages); + pool->bpages_needed = INT_MAX; + pool->cores = cores; + pool->check_waiting_invoked = 0; + + for (i = 0; i < pool->num_bpages; i++) { + struct homa_bpage *bp = &pool->descriptors[i]; + + spin_lock_init(&bp->lock); + bp->owner = -1; + } + + homa_sock_unlock(hsk); + return 0; + +error: + kfree(descriptors); + free_percpu(cores); + return result; +} + +/** + * homa_pool_free() - Destructor for homa_pool. After this method + * returns, the object should not be used (it will be freed here). + * @pool: Pool to destroy. + */ +void homa_pool_free(struct homa_pool *pool) +{ + if (pool->region) { + kfree(pool->descriptors); + free_percpu(pool->cores); + pool->region = NULL; + } + kfree(pool); +} + +/** + * homa_pool_get_rcvbuf() - Return information needed to handle getsockopt + * for HOMA_SO_RCVBUF. + * @pool: Pool for which information is needed. + * @args: Store info here. + */ +void homa_pool_get_rcvbuf(struct homa_pool *pool, + struct homa_rcvbuf_args *args) +{ + args->start = (uintptr_t)pool->region; + args->length = pool->num_bpages << HOMA_BPAGE_SHIFT; +} + +/** + * homa_bpage_available() - Check whether a bpage is available for use. + * @bpage: Bpage to check + * @now: Current time (homa_clock() units) + * Return: True if the bpage is free or if it can be stolen, otherwise + * false. + */ +bool homa_bpage_available(struct homa_bpage *bpage, u64 now) +{ + int ref_count = atomic_read(&bpage->refs); + + return ref_count == 0 || (ref_count == 1 && bpage->owner >= 0 && + bpage->expiration <= now); +} + +/** + * homa_pool_get_pages() - Allocate one or more full pages from the pool. + * @pool: Pool from which to allocate pages + * @num_pages: Number of pages needed + * @pages: The indices of the allocated pages are stored here; caller + * must ensure this array is big enough. Reference counts have + * been set to 1 on all of these pages (or 2 if set_owner + * was specified). + * @set_owner: If nonzero, the current core is marked as owner of all + * of the allocated pages (and the expiration time is also + * set). Otherwise the pages are left unowned. + * Return: 0 for success, -1 if there wasn't enough free space in the pool. + */ +int homa_pool_get_pages(struct homa_pool *pool, int num_pages, u32 *pages, + int set_owner) +{ + int core_num = smp_processor_id(); + struct homa_pool_core *core; + u64 now = homa_clock(); + int alloced = 0; + int limit = 0; + + core = this_cpu_ptr(pool->cores); + if (atomic_sub_return(num_pages, &pool->free_bpages) < 0) { + atomic_add(num_pages, &pool->free_bpages); + return -1; + } + + /* Once we get to this point we know we will be able to find + * enough free pages; now we just have to find them. + */ + while (alloced != num_pages) { + struct homa_bpage *bpage; + int cur; + + /* If we don't need to use all of the bpages in the pool, + * then try to use only the ones with low indexes. This + * will reduce the cache footprint for the pool by reusing + * a few bpages over and over. Specifically this code will + * not consider any candidate page whose index is >= limit. + * Limit is chosen to make sure there are a reasonable + * number of free pages in the range, so we won't have to + * check a huge number of pages. + */ + if (limit == 0) { + int extra; + + limit = pool->num_bpages - + atomic_read(&pool->free_bpages); + extra = limit >> 2; + limit += (extra < MIN_EXTRA) ? MIN_EXTRA : extra; + if (limit > pool->num_bpages) + limit = pool->num_bpages; + } + + cur = core->next_candidate; + core->next_candidate++; + if (cur >= limit) { + core->next_candidate = 0; + + /* Must recompute the limit for each new loop through + * the bpage array: we may need to consider a larger + * range of pages because of concurrent allocations. + */ + limit = 0; + continue; + } + bpage = &pool->descriptors[cur]; + + /* Figure out whether this candidate is free (or can be + * stolen). Do a quick check without locking the page, and + * if the page looks promising, then lock it and check again + * (must check again in case someone else snuck in and + * grabbed the page). + */ + if (!homa_bpage_available(bpage, now)) + continue; + if (!spin_trylock_bh(&bpage->lock)) + /* Rather than wait for a locked page to become free, + * just go on to the next page. If the page is locked, + * it probably won't turn out to be available anyway. + */ + continue; + if (!homa_bpage_available(bpage, now)) { + spin_unlock_bh(&bpage->lock); + continue; + } + if (bpage->owner >= 0) + atomic_inc(&pool->free_bpages); + if (set_owner) { + atomic_set(&bpage->refs, 2); + bpage->owner = core_num; + bpage->expiration = now + + pool->hsk->homa->bpage_lease_cycles; + } else { + atomic_set(&bpage->refs, 1); + bpage->owner = -1; + } + spin_unlock_bh(&bpage->lock); + pages[alloced] = cur; + alloced++; + } + return 0; +} + +/** + * homa_pool_alloc_msg() - Allocate buffer space for an incoming message. + * @rpc: RPC that needs space allocated for its incoming message (space must + * not already have been allocated). The fields @msgin->num_buffers + * and @msgin->buffers are filled in. Must be locked by caller. + * Return: The return value is normally 0, which means either buffer space + * was allocated or the @rpc was queued on @hsk->waiting. If a fatal error + * occurred, such as no buffer pool present, then a negative errno is + * returned. + */ +int homa_pool_alloc_msg(struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + struct homa_pool *pool = rpc->hsk->buffer_pool; + int full_pages, partial, i, core_id; + struct homa_pool_core *core; + u32 pages[HOMA_MAX_BPAGES]; + struct homa_bpage *bpage; + struct homa_rpc *other; + + if (!pool->region) + return -ENOMEM; + + /* First allocate any full bpages that are needed. */ + full_pages = rpc->msgin.length >> HOMA_BPAGE_SHIFT; + if (unlikely(full_pages)) { + if (homa_pool_get_pages(pool, full_pages, pages, 0) != 0) + goto out_of_space; + for (i = 0; i < full_pages; i++) + rpc->msgin.bpage_offsets[i] = pages[i] << + HOMA_BPAGE_SHIFT; + } + rpc->msgin.num_bpages = full_pages; + + /* The last chunk may be less than a full bpage; for this we use + * the bpage that we own (and reuse it for multiple messages). + */ + partial = rpc->msgin.length & (HOMA_BPAGE_SIZE - 1); + if (unlikely(partial == 0)) + goto success; + core_id = smp_processor_id(); + core = this_cpu_ptr(pool->cores); + bpage = &pool->descriptors[core->page_hint]; + spin_lock_bh(&bpage->lock); + if (bpage->owner != core_id) { + spin_unlock_bh(&bpage->lock); + goto new_page; + } + if ((core->allocated + partial) > HOMA_BPAGE_SIZE) { + if (atomic_read(&bpage->refs) == 1) { + /* Bpage is totally free, so we can reuse it. */ + core->allocated = 0; + } else { + bpage->owner = -1; + + /* We know the reference count can't reach zero here + * because of check above, so we won't have to decrement + * pool->free_bpages. + */ + atomic_dec_return(&bpage->refs); + spin_unlock_bh(&bpage->lock); + goto new_page; + } + } + bpage->expiration = homa_clock() + + pool->hsk->homa->bpage_lease_cycles; + atomic_inc(&bpage->refs); + spin_unlock_bh(&bpage->lock); + goto allocate_partial; + + /* Can't use the current page; get another one. */ +new_page: + if (homa_pool_get_pages(pool, 1, pages, 1) != 0) { + homa_pool_release_buffers(pool, rpc->msgin.num_bpages, + rpc->msgin.bpage_offsets); + rpc->msgin.num_bpages = 0; + goto out_of_space; + } + core->page_hint = pages[0]; + core->allocated = 0; + +allocate_partial: + rpc->msgin.bpage_offsets[rpc->msgin.num_bpages] = core->allocated + + (core->page_hint << HOMA_BPAGE_SHIFT); + rpc->msgin.num_bpages++; + core->allocated += partial; + +success: + return 0; + + /* We get here if there wasn't enough buffer space for this + * message; add the RPC to hsk->waiting_for_bufs. The list is sorted + * by RPC length in order to implement SRPT. + */ +out_of_space: + homa_sock_lock(pool->hsk); + list_for_each_entry(other, &pool->hsk->waiting_for_bufs, buf_links) { + if (other->msgin.length > rpc->msgin.length) { + list_add_tail(&rpc->buf_links, &other->buf_links); + goto queued; + } + } + list_add_tail(&rpc->buf_links, &pool->hsk->waiting_for_bufs); + +queued: + set_bpages_needed(pool); + homa_sock_unlock(pool->hsk); + return 0; +} + +/** + * homa_pool_get_buffer() - Given an RPC, figure out where to store incoming + * message data. + * @rpc: RPC for which incoming message data is being processed; its + * msgin must be properly initialized and buffer space must have + * been allocated for the message. + * @offset: Offset within @rpc's incoming message. + * @available: Will be filled in with the number of bytes of space available + * at the returned address (could be zero if offset is + * (erroneously) past the end of the message). + * Return: The application's virtual address for buffer space corresponding + * to @offset in the incoming message for @rpc. + */ +void __user *homa_pool_get_buffer(struct homa_rpc *rpc, int offset, + int *available) +{ + int bpage_index, bpage_offset; + + bpage_index = offset >> HOMA_BPAGE_SHIFT; + if (offset >= rpc->msgin.length) { + WARN_ONCE(true, "%s got offset %d >= message length %d\n", + __func__, offset, rpc->msgin.length); + *available = 0; + return NULL; + } + bpage_offset = offset & (HOMA_BPAGE_SIZE - 1); + *available = (bpage_index < (rpc->msgin.num_bpages - 1)) + ? HOMA_BPAGE_SIZE - bpage_offset + : rpc->msgin.length - offset; + return rpc->hsk->buffer_pool->region + + rpc->msgin.bpage_offsets[bpage_index] + bpage_offset; +} + +/** + * homa_pool_release_buffers() - Release buffer space so that it can be + * reused. + * @pool: Pool that the buffer space belongs to. Doesn't need to + * be locked. + * @num_buffers: How many buffers to release. + * @buffers: Points to @num_buffers values, each of which is an offset + * from the start of the pool to the buffer to be released. + * Return: 0 for success, otherwise a negative errno. + */ +int homa_pool_release_buffers(struct homa_pool *pool, int num_buffers, + u32 *buffers) +{ + int result = 0; + int i; + + if (!pool->region) + return result; + for (i = 0; i < num_buffers; i++) { + u32 bpage_index = buffers[i] >> HOMA_BPAGE_SHIFT; + struct homa_bpage *bpage = &pool->descriptors[bpage_index]; + + if (bpage_index < pool->num_bpages) { + if (atomic_dec_return(&bpage->refs) == 0) + atomic_inc(&pool->free_bpages); + } else { + result = -EINVAL; + } + } + return result; +} + +/** + * homa_pool_check_waiting() - Checks to see if there are enough free + * bpages to wake up any RPCs that were blocked. Whenever + * homa_pool_release_buffers is invoked, this function must be invoked later, + * at a point when the caller holds no locks (homa_pool_release_buffers may + * be invoked with locks held, so it can't safely invoke this function). + * This is regrettably tricky, but I can't think of a better solution. + * @pool: Information about the buffer pool. + */ +void homa_pool_check_waiting(struct homa_pool *pool) +{ + if (!pool->region) + return; + while (atomic_read(&pool->free_bpages) >= pool->bpages_needed) { + struct homa_rpc *rpc; + + homa_sock_lock(pool->hsk); + if (list_empty(&pool->hsk->waiting_for_bufs)) { + pool->bpages_needed = INT_MAX; + homa_sock_unlock(pool->hsk); + break; + } + rpc = list_first_entry(&pool->hsk->waiting_for_bufs, + struct homa_rpc, buf_links); + if (!homa_rpc_try_lock(rpc)) { + /* Can't just spin on the RPC lock because we're + * holding the socket lock and the lock order is + * rpc-then-socket (see "Homa Locking Strategy" in + * homa_impl.h). Instead, release the socket lock + * and try the entire operation again. + */ + homa_sock_unlock(pool->hsk); + continue; + } + list_del_init(&rpc->buf_links); + if (list_empty(&pool->hsk->waiting_for_bufs)) + pool->bpages_needed = INT_MAX; + else + set_bpages_needed(pool); + homa_sock_unlock(pool->hsk); + homa_pool_alloc_msg(rpc); + homa_rpc_unlock(rpc); + } +} + +/** + * homa_pool_avail_bytes() - Return a count of the number of bytes currently + * unused and available for allocation in a pool. + * @pool: Pool of interest. + * Return: See above. + */ +u64 homa_pool_avail_bytes(struct homa_pool *pool) +{ + struct homa_pool_core *core; + u64 avail; + int cpu; + + if (!pool->region) + return 0; + avail = atomic_read(&pool->free_bpages); + avail *= HOMA_BPAGE_SIZE; + for (cpu = 0; cpu < nr_cpu_ids; cpu++) { + core = per_cpu_ptr(pool->cores, cpu); + if (pool->descriptors[core->page_hint].owner == cpu) + avail += HOMA_BPAGE_SIZE - core->allocated; + } + return avail; +} diff --git a/net/homa/homa_pool.h b/net/homa/homa_pool.h new file mode 100644 index 000000000000..1f5455665c5d --- /dev/null +++ b/net/homa/homa_pool.h @@ -0,0 +1,137 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file contains definitions used to manage user-space buffer pools. + */ + +#ifndef _HOMA_POOL_H +#define _HOMA_POOL_H + +#include + +#include "homa_rpc.h" + +/** + * struct homa_bpage - Contains information about a single page in + * a buffer pool. + */ +struct homa_bpage { + /** @lock: to synchronize shared access. */ + spinlock_t lock; + + /** + * @refs: Counts number of distinct uses of this + * bpage (1 tick for each message that is using + * this page, plus an additional tick if the @owner + * field is set). + */ + atomic_t refs; + + /** + * @owner: kernel core that currently owns this page + * (< 0 if none). + */ + int owner; + + /** + * @expiration: homa_clock() time after which it's OK to steal this + * page from its current owner (if @refs is 1). + */ + u64 expiration; +} ____cacheline_aligned_in_smp; + +/** + * struct homa_pool_core - Holds core-specific data for a homa_pool (a bpage + * out of which that core is allocating small chunks). + */ +struct homa_pool_core { + /** + * @page_hint: Index of bpage in pool->descriptors, + * which may be owned by this core. If so, we'll use it + * for allocating partial pages. + */ + int page_hint; + + /** + * @allocated: if the page given by @page_hint is + * owned by this core, this variable gives the number of + * (initial) bytes that have already been allocated + * from the page. + */ + int allocated; + + /** + * @next_candidate: when searching for free bpages, + * check this index next. + */ + int next_candidate; +}; + +/** + * struct homa_pool - Describes a pool of buffer space for incoming + * messages for a particular socket; managed by homa_pool.c. The pool is + * divided up into "bpages", which are a multiple of the hardware page size. + * A bpage may be owned by a particular core so that it can more efficiently + * allocate space for small messages. + */ +struct homa_pool { + /** + * @hsk: the socket that this pool belongs to. + */ + struct homa_sock *hsk; + + /** + * @region: beginning of the pool's region (in the app's virtual + * memory). Divided into bpages. 0 means the pool hasn't yet been + * initialized. + */ + char __user *region; + + /** @num_bpages: total number of bpages in the pool. */ + int num_bpages; + + /** @descriptors: kmalloced area containing one entry for each bpage. */ + struct homa_bpage *descriptors; + + /** + * @free_bpages: the number of pages still available for allocation + * by homa_pool_get pages. This equals the number of pages with zero + * reference counts, minus the number of pages that have been claimed + * by homa_get_pool_pages but not yet allocated. + */ + atomic_t free_bpages; + + /** + * @bpages_needed: the number of free bpages required to satisfy the + * needs of the first RPC on @hsk->waiting_for_bufs, or INT_MAX if + * that queue is empty. + */ + int bpages_needed; + + /** @cores: core-specific info; dynamically allocated. */ + struct homa_pool_core __percpu *cores; + + /** + * @check_waiting_invoked: incremented during unit tests when + * homa_pool_check_waiting is invoked. + */ + int check_waiting_invoked; +}; + +bool homa_bpage_available(struct homa_bpage *bpage, u64 now); +struct homa_pool *homa_pool_alloc(struct homa_sock *hsk); +int homa_pool_alloc_msg(struct homa_rpc *rpc); +u64 homa_pool_avail_bytes(struct homa_pool *pool); +void homa_pool_check_waiting(struct homa_pool *pool); +void homa_pool_free(struct homa_pool *pool); +void __user *homa_pool_get_buffer(struct homa_rpc *rpc, int offset, + int *available); +int homa_pool_get_pages(struct homa_pool *pool, int num_pages, + u32 *pages, int leave_locked); +void homa_pool_get_rcvbuf(struct homa_pool *pool, + struct homa_rcvbuf_args *args); +int homa_pool_release_buffers(struct homa_pool *pool, + int num_buffers, u32 *buffers); +int homa_pool_set_region(struct homa_sock *hsk, void __user *region, + u64 region_size); + +#endif /* _HOMA_POOL_H */ -- 2.43.0 Homa needs to keep a small amount of information for each peer that it has communicated with. These files define that state and provide functions for storing and accessing it. Signed-off-by: John Ousterhout --- Changes for v16: * Clean up and simplify reference counting mechanism (use refcount_t instead of atomic_t, eliminate dead_peers mechanism) * Fix synchronization bugs in homa_dst_refresh (use RCU properly) * Remove addr field of struct homa_peer * Create separate header file for murmurhash hash function Changes for v11: * Clean up sparse annotations Changes for v10: * Use kzalloc instead of __GFP_ZERO * Remove log messages after alloc errors * Fix issues found by sparse, xmastree.py, etc. * Add missing initialization for peertab->lock Changes for v9: * Add support for homa_net objects * Implement limits on the number of active homa_peer objects. This includes adding reference counts in homa_peers and adding code to release peers where there are too many. * Switch to using rhashtable to store homa_peers; the table is shared across all network namespaces, though individual peers are namespace- specific * Invoke dst->ops->check in addition to checking the obsolete flag * Various name improvements * Remove the homa_peertab_gc_dsts mechanism, which is unnecessary Changes for v7: * Remove homa_peertab_get_peers * Remove "lock_slow" functions, which don't add functionality in this patch * Remove unused fields from homa_peer structs * Use u64 and __u64 properly * Add lock annotations * Refactor homa_peertab_get_peers * Use __GFP_ZERO in kmalloc calls --- net/homa/homa_peer.c | 571 +++++++++++++++++++++++++++++++++++++++++ net/homa/homa_peer.h | 312 ++++++++++++++++++++++ net/homa/murmurhash3.h | 44 ++++ 3 files changed, 927 insertions(+) create mode 100644 net/homa/homa_peer.c create mode 100644 net/homa/homa_peer.h create mode 100644 net/homa/murmurhash3.h diff --git a/net/homa/homa_peer.c b/net/homa/homa_peer.c new file mode 100644 index 000000000000..5b47284ef6e4 --- /dev/null +++ b/net/homa/homa_peer.c @@ -0,0 +1,571 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file provides functions related to homa_peer and homa_peertab + * objects. + */ + +#include "homa_impl.h" +#include "homa_peer.h" +#include "homa_rpc.h" +#include "murmurhash3.h" + +static const struct rhashtable_params ht_params = { + .key_len = sizeof(struct homa_peer_key), + .key_offset = offsetof(struct homa_peer, ht_key), + .head_offset = offsetof(struct homa_peer, ht_linkage), + .nelem_hint = 10000, + .hashfn = murmurhash3, + .obj_cmpfn = homa_peer_compare +}; + +/** + * homa_peer_alloc_peertab() - Allocate and initialize a homa_peertab. + * + * Return: A pointer to the new homa_peertab, or ERR_PTR(-errno) if there + * was a problem. + */ +struct homa_peertab *homa_peer_alloc_peertab(void) +{ + struct homa_peertab *peertab; + int err; + + peertab = kzalloc(sizeof(*peertab), GFP_KERNEL); + if (!peertab) + return ERR_PTR(-ENOMEM); + + spin_lock_init(&peertab->lock); + err = rhashtable_init(&peertab->ht, &ht_params); + if (err) { + kfree(peertab); + return ERR_PTR(err); + } + peertab->ht_valid = true; + rhashtable_walk_enter(&peertab->ht, &peertab->ht_iter); + peertab->gc_threshold = 5000; + peertab->net_max = 10000; + peertab->idle_secs_min = 10; + peertab->idle_secs_max = 120; + + homa_peer_update_sysctl_deps(peertab); + return peertab; +} + +/** + * homa_peer_free_net() - Garbage collect all of the peer information + * associated with a particular network namespace. + * @hnet: Network namespace whose peers should be freed. There must not + * be any active sockets or RPCs for this namespace. + */ +void homa_peer_free_net(struct homa_net *hnet) +{ + struct homa_peertab *peertab = hnet->homa->peertab; + struct rhashtable_iter iter; + struct homa_peer *peer; + + spin_lock_bh(&peertab->lock); + peertab->gc_stop_count++; + spin_unlock_bh(&peertab->lock); + + rhashtable_walk_enter(&peertab->ht, &iter); + rhashtable_walk_start(&iter); + while (1) { + peer = rhashtable_walk_next(&iter); + if (!peer) + break; + if (IS_ERR(peer)) + continue; + if (peer->ht_key.hnet != hnet) + continue; + if (rhashtable_remove_fast(&peertab->ht, &peer->ht_linkage, + ht_params) == 0) { + homa_peer_release(peer); + hnet->num_peers--; + peertab->num_peers--; + } + } + rhashtable_walk_stop(&iter); + rhashtable_walk_exit(&iter); + WARN(hnet->num_peers != 0, "%s ended up with hnet->num_peers %d", + __func__, hnet->num_peers); + + spin_lock_bh(&peertab->lock); + peertab->gc_stop_count--; + spin_unlock_bh(&peertab->lock); +} + +/** + * homa_peer_release_fn() - This function is invoked for each entry in + * the peer hash table by the rhashtable code when the table is being + * deleted. It frees its argument. + * @object: homa_peer to free. + * @dummy: Not used. + */ +void homa_peer_release_fn(void *object, void *dummy) +{ + struct homa_peer *peer = object; + + homa_peer_release(peer); +} + +/** + * homa_peer_free_peertab() - Destructor for homa_peertabs. + * @peertab: The table to destroy. + */ +void homa_peer_free_peertab(struct homa_peertab *peertab) +{ + spin_lock_bh(&peertab->lock); + peertab->gc_stop_count++; + spin_unlock_bh(&peertab->lock); + + if (peertab->ht_valid) { + rhashtable_walk_exit(&peertab->ht_iter); + rhashtable_free_and_destroy(&peertab->ht, homa_peer_release_fn, + NULL); + } + kfree(peertab); +} + +/** + * homa_peer_prefer_evict() - Given two peers, determine which one is + * a better candidate for eviction. + * @peertab: Overall information used to manage peers. + * @peer1: First peer. + * @peer2: Second peer. + * Return: True if @peer1 is a better candidate for eviction than @peer2. + */ +int homa_peer_prefer_evict(struct homa_peertab *peertab, + struct homa_peer *peer1, + struct homa_peer *peer2) +{ + /* Prefer a peer whose homa-net is over its limit; if both are either + * over or under, then prefer the peer with the shortest idle time. + */ + if (peer1->ht_key.hnet->num_peers > peertab->net_max) { + if (peer2->ht_key.hnet->num_peers <= peertab->net_max) + return true; + else + return peer1->access_jiffies < peer2->access_jiffies; + } + if (peer2->ht_key.hnet->num_peers > peertab->net_max) + return false; + else + return peer1->access_jiffies < peer2->access_jiffies; +} + +/** + * homa_peer_pick_victims() - Select a few peers that can be freed. + * @peertab: Choose peers that are stored here. + * @victims: Return addresses of victims here. + * @max_victims: Limit on how many victims to choose (and size of @victims + * array). + * Return: The number of peers stored in @victims; may be zero. + */ +int homa_peer_pick_victims(struct homa_peertab *peertab, + struct homa_peer *victims[], int max_victims) +{ + struct homa_peer *peer; + int num_victims = 0; + int to_scan; + int i, idle; + + /* Scan 2 peers for every potential victim and keep the "best" + * peers for removal. + */ + rhashtable_walk_start(&peertab->ht_iter); + for (to_scan = 2 * max_victims; to_scan > 0; to_scan--) { + peer = rhashtable_walk_next(&peertab->ht_iter); + if (!peer) { + /* Reached the end of the table; restart at + * the beginning. + */ + rhashtable_walk_stop(&peertab->ht_iter); + rhashtable_walk_exit(&peertab->ht_iter); + rhashtable_walk_enter(&peertab->ht, &peertab->ht_iter); + rhashtable_walk_start(&peertab->ht_iter); + peer = rhashtable_walk_next(&peertab->ht_iter); + if (!peer) + break; + } + if (IS_ERR(peer)) { + /* rhashtable decided to restart the search at the + * beginning. + */ + peer = rhashtable_walk_next(&peertab->ht_iter); + if (!peer || IS_ERR(peer)) + break; + } + + /* Has this peer been idle long enough to be candidate for + * eviction? + */ + idle = jiffies - peer->access_jiffies; + if (idle < peertab->idle_jiffies_min) + continue; + if (idle < peertab->idle_jiffies_max && + peer->ht_key.hnet->num_peers <= peertab->net_max) + continue; + + /* Sort the candidate into the existing list of victims. */ + for (i = 0; i < num_victims; i++) { + if (peer == victims[i]) { + /* This can happen if there aren't very many + * peers and we wrapped around in the hash + * table. + */ + peer = NULL; + break; + } + if (homa_peer_prefer_evict(peertab, peer, victims[i])) { + struct homa_peer *tmp; + + tmp = victims[i]; + victims[i] = peer; + peer = tmp; + } + } + + if (num_victims < max_victims && peer) { + victims[num_victims] = peer; + num_victims++; + } + } + rhashtable_walk_stop(&peertab->ht_iter); + return num_victims; +} + +/** + * homa_peer_gc() - This function is invoked by Homa at regular intervals; + * its job is to ensure that the number of peers stays within limits. + * If the number grows too large, it selectively deletes peers to get + * back under the limit. + * @peertab: Structure whose peers should be considered for garbage + * collection. + */ +void homa_peer_gc(struct homa_peertab *peertab) +{ +#define EVICT_BATCH_SIZE 5 + struct homa_peer *victims[EVICT_BATCH_SIZE]; + int num_victims; + int i; + + spin_lock_bh(&peertab->lock); + if (peertab->gc_stop_count != 0) + goto done; + if (peertab->num_peers < peertab->gc_threshold) + goto done; + num_victims = homa_peer_pick_victims(peertab, victims, + EVICT_BATCH_SIZE); + if (num_victims == 0) + goto done; + + for (i = 0; i < num_victims; i++) { + struct homa_peer *peer = victims[i]; + + if (rhashtable_remove_fast(&peertab->ht, &peer->ht_linkage, + ht_params) == 0) { + homa_peer_release(peer); + peertab->num_peers--; + peer->ht_key.hnet->num_peers--; + } + } +done: + spin_unlock_bh(&peertab->lock); +} + +/** + * homa_peer_alloc() - Allocate and initialize a new homa_peer object. + * @hsk: Socket for which the peer will be used. + * @addr: Address of the desired host: IPv4 addresses are represented + * as IPv4-mapped IPv6 addresses. + * Return: The peer associated with @addr, or a negative errno if an + * error occurred. On a successful return the reference count + * will be incremented for the returned peer. Sets hsk->error_msg + * on errors. + */ +struct homa_peer *homa_peer_alloc(struct homa_sock *hsk, + const struct in6_addr *addr) +{ + struct homa_peer *peer; + int status; + + peer = kzalloc(sizeof(*peer), GFP_ATOMIC); + if (!peer) { + hsk->error_msg = "couldn't allocate memory for homa_peer"; + return (struct homa_peer *)ERR_PTR(-ENOMEM); + } + peer->ht_key.addr = *addr; + peer->ht_key.hnet = hsk->hnet; + refcount_set(&peer->refs, 1); + peer->access_jiffies = jiffies; + spin_lock_init(&peer->lock); + peer->current_ticks = -1; + + status = homa_peer_reset_dst(peer, hsk); + if (status != 0) { + hsk->error_msg = "couldn't find route for peer"; + kfree(peer); + return ERR_PTR(status); + } + return peer; +} + +/** + * homa_peer_free() - Release any resources in a peer and free the homa_peer + * struct. Invoked by the RCU mechanism via homa_peer_release. + * @head: Pointer to the rcu_head field of the peer to free. + */ +void homa_peer_free(struct rcu_head *head) +{ + struct homa_peer *peer; + + peer = container_of(head, struct homa_peer, rcu_head); + dst_release(rcu_dereference(peer->dst)); + kfree(peer); +} + +/** + * homa_peer_get() - Returns the peer associated with a given host; creates + * a new homa_peer if one doesn't already exist. + * @hsk: Socket where the peer will be used. + * @addr: Address of the desired host: IPv4 addresses are represented + * as IPv4-mapped IPv6 addresses. + * + * Return: The peer associated with @addr, or a negative errno if an + * error occurred. On a successful return the reference count + * will be incremented for the returned peer. The caller must + * eventually call homa_peer_release to release the reference. + */ +struct homa_peer *homa_peer_get(struct homa_sock *hsk, + const struct in6_addr *addr) +{ + struct homa_peertab *peertab = hsk->homa->peertab; + struct homa_peer *peer, *other; + struct homa_peer_key key; + + key.addr = *addr; + key.hnet = hsk->hnet; + rcu_read_lock(); + peer = rhashtable_lookup(&peertab->ht, &key, ht_params); + if (peer) { + homa_peer_hold(peer); + peer->access_jiffies = jiffies; + rcu_read_unlock(); + return peer; + } + + /* No existing entry, so we have to create a new one. */ + peer = homa_peer_alloc(hsk, addr); + if (IS_ERR(peer)) { + rcu_read_unlock(); + return peer; + } + spin_lock_bh(&peertab->lock); + other = rhashtable_lookup_get_insert_fast(&peertab->ht, + &peer->ht_linkage, ht_params); + if (IS_ERR(other)) { + /* Couldn't insert; return the error info. */ + homa_peer_release(peer); + peer = other; + } else if (other) { + /* Someone else already created the desired peer; use that + * one instead of ours. + */ + homa_peer_release(peer); + homa_peer_hold(other); + peer = other; + peer->access_jiffies = jiffies; + } else { + homa_peer_hold(peer); + peertab->num_peers++; + key.hnet->num_peers++; + } + spin_unlock_bh(&peertab->lock); + rcu_read_unlock(); + return peer; +} + +/** + * homa_get_dst() - Returns destination information associated with a peer, + * updating it if the cached information is stale. + * @peer: Peer whose destination information is desired. + * @hsk: Homa socket with which the dst will be used; needed by lower-level + * code to recreate the dst. + * Return: Up-to-date destination for peer; a reference has been taken + * on this dst_entry, which the caller must eventually release. + */ +struct dst_entry *homa_get_dst(struct homa_peer *peer, struct homa_sock *hsk) +{ + struct dst_entry *dst; + int pass; + + rcu_read_lock(); + for (pass = 0; ; pass++) { + do { + /* This loop repeats only if we happen to fetch + * the dst right when it is being reset. + */ + dst = rcu_dereference(peer->dst); + } while (!dst_hold_safe(dst)); + + /* After the first pass it's OK to return an obsolete dst + * (we're basically giving up; continuing could result in + * an infinite loop if homa_dst_refresh can't create a new dst). + */ + if (dst_check(dst, peer->dst_cookie) || pass > 0) + break; + dst_release(dst); + homa_peer_reset_dst(peer, hsk); + } + rcu_read_unlock(); + return dst; +} + +/** + * homa_peer_reset_dst() - Find an appropriate dst_entry for a peer and + * store it in the peer's dst field. If the field is already set, the + * current value is assumed to be stale and will be discarded if a new + * dst_entry can be created. + * @peer: The peer whose dst field should be reset. + * @hsk: Socket that will be used for sending packets. + * Return: Zero for success, or a negative errno if there was an error + * (in which case the existing value for the dst field is left + * in place). + */ +int homa_peer_reset_dst(struct homa_peer *peer, struct homa_sock *hsk) +{ + struct dst_entry *dst; + int result = 0; + + homa_peer_lock(peer); + memset(&peer->flow, 0, sizeof(peer->flow)); + if (hsk->sock.sk_family == AF_INET) { + struct rtable *rt; + + flowi4_init_output(&peer->flow.u.ip4, hsk->sock.sk_bound_dev_if, + hsk->sock.sk_mark, hsk->inet.tos, + RT_SCOPE_UNIVERSE, hsk->sock.sk_protocol, 0, + ipv6_to_ipv4(peer->addr), + hsk->inet.inet_saddr, 0, 0, + hsk->sock.sk_uid); + security_sk_classify_flow(&hsk->sock, + &peer->flow.u.__fl_common); + rt = ip_route_output_flow(sock_net(&hsk->sock), + &peer->flow.u.ip4, &hsk->sock); + if (IS_ERR(rt)) { + result = PTR_ERR(rt); + goto done; + } + dst = &rt->dst; + peer->dst_cookie = 0; + } else { + peer->flow.u.ip6.flowi6_oif = hsk->sock.sk_bound_dev_if; + peer->flow.u.ip6.flowi6_iif = LOOPBACK_IFINDEX; + peer->flow.u.ip6.flowi6_mark = hsk->sock.sk_mark; + peer->flow.u.ip6.flowi6_scope = RT_SCOPE_UNIVERSE; + peer->flow.u.ip6.flowi6_proto = hsk->sock.sk_protocol; + peer->flow.u.ip6.flowi6_flags = 0; + peer->flow.u.ip6.flowi6_secid = 0; + peer->flow.u.ip6.flowi6_tun_key.tun_id = 0; + peer->flow.u.ip6.flowi6_uid = hsk->sock.sk_uid; + peer->flow.u.ip6.daddr = peer->addr; + peer->flow.u.ip6.saddr = hsk->inet.pinet6->saddr; + peer->flow.u.ip6.fl6_dport = 0; + peer->flow.u.ip6.fl6_sport = 0; + peer->flow.u.ip6.mp_hash = 0; + peer->flow.u.ip6.__fl_common.flowic_tos = hsk->inet.tos; + peer->flow.u.ip6.flowlabel = ip6_make_flowinfo(hsk->inet.tos, + 0); + security_sk_classify_flow(&hsk->sock, + &peer->flow.u.__fl_common); + dst = ip6_dst_lookup_flow(sock_net(&hsk->sock), &hsk->sock, + &peer->flow.u.ip6, NULL); + if (IS_ERR(dst)) { + result = PTR_ERR(dst); + goto done; + } + peer->dst_cookie = rt6_get_cookie(dst_rt6_info(dst)); + } + + /* From the standpoint of homa_get_dst, peer->dst is not updated + * atomically with peer->dst_cookie, which means homa_get_dst could + * use a new cookie with an old dest. Fortunately, this is benign; at + * worst, it might cause an obsolete dst to be reused (resulting in + * a lost packet) or a valid dst to be replaced (resulting in + * unnecessary work). + */ + dst_release(rcu_replace_pointer(peer->dst, dst, true)); + +done: + homa_peer_unlock(peer); + return result; +} + +/** + * homa_peer_add_ack() - Add a given RPC to the list of unacked + * RPCs for its server. Once this method has been invoked, it's safe + * to delete the RPC, since it will eventually be acked to the server. + * @rpc: Client RPC that has now completed. + */ +void homa_peer_add_ack(struct homa_rpc *rpc) +{ + struct homa_peer *peer = rpc->peer; + struct homa_ack_hdr ack; + + homa_peer_lock(peer); + if (peer->num_acks < HOMA_MAX_ACKS_PER_PKT) { + peer->acks[peer->num_acks].client_id = cpu_to_be64(rpc->id); + peer->acks[peer->num_acks].server_port = htons(rpc->dport); + peer->num_acks++; + homa_peer_unlock(peer); + return; + } + + /* The peer has filled up; send an ACK message to empty it. The + * RPC in the message header will also be considered ACKed. + */ + memcpy(ack.acks, peer->acks, sizeof(peer->acks)); + ack.num_acks = htons(peer->num_acks); + peer->num_acks = 0; + homa_peer_unlock(peer); + homa_xmit_control(ACK, &ack, sizeof(ack), rpc); +} + +/** + * homa_peer_get_acks() - Copy acks out of a peer, and remove them from the + * peer. + * @peer: Peer to check for possible unacked RPCs. + * @count: Maximum number of acks to return. + * @dst: The acks are copied to this location. + * + * Return: The number of acks extracted from the peer (<= count). + */ +int homa_peer_get_acks(struct homa_peer *peer, int count, struct homa_ack *dst) +{ + /* Don't waste time acquiring the lock if there are no ids available. */ + if (peer->num_acks == 0) + return 0; + + homa_peer_lock(peer); + + if (count > peer->num_acks) + count = peer->num_acks; + memcpy(dst, &peer->acks[peer->num_acks - count], + count * sizeof(peer->acks[0])); + peer->num_acks -= count; + + homa_peer_unlock(peer); + return count; +} + +/** + * homa_peer_update_sysctl_deps() - Update any peertab fields that depend + * on values set by sysctl. This function is invoked anytime a peer sysctl + * value is updated. + * @peertab: Struct to update. + */ +void homa_peer_update_sysctl_deps(struct homa_peertab *peertab) +{ + peertab->idle_jiffies_min = peertab->idle_secs_min * HZ; + peertab->idle_jiffies_max = peertab->idle_secs_max * HZ; +} + diff --git a/net/homa/homa_peer.h b/net/homa/homa_peer.h new file mode 100644 index 000000000000..5e145be2ab1a --- /dev/null +++ b/net/homa/homa_peer.h @@ -0,0 +1,312 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file contains definitions related to managing peers (homa_peer + * and homa_peertab). + */ + +#ifndef _HOMA_PEER_H +#define _HOMA_PEER_H + +#include "homa_wire.h" +#include "homa_sock.h" + +#include + +struct homa_rpc; + +/** + * struct homa_peertab - Stores homa_peer objects, indexed by IPv6 + * address. + */ +struct homa_peertab { + /** + * @lock: Used to synchronize updates to @ht as well as other + * operations on this object. + */ + spinlock_t lock; + + /** @ht: Hash table that stores all struct peers. */ + struct rhashtable ht; + + /** @ht_iter: Used to scan ht to find peers to garbage collect. */ + struct rhashtable_iter ht_iter; + + /** @num_peers: Total number of peers currently in @ht. */ + int num_peers; + + /** + * @ht_valid: True means ht and ht_iter have been initialized and must + * eventually be destroyed. + */ + bool ht_valid; + + /** @rcu_head: Holds state of a pending call_rcu invocation. */ + struct rcu_head rcu_head; + + /** + * @gc_stop_count: Nonzero means that peer garbage collection + * should not be performed (conflicting state changes are underway). + */ + int gc_stop_count; + + /** + * @gc_threshold: If @num_peers is less than this, don't bother + * doing any peer garbage collection. Set externally via sysctl. + */ + int gc_threshold; + + /** + * @net_max: If the number of peers for a homa_net exceeds this number, + * work aggressively to reclaim peers for that homa_net. Set + * externally via sysctl. + */ + int net_max; + + /** + * @idle_secs_min: A peer will not be considered for garbage collection + * under any circumstances if it has been idle less than this many + * seconds. Set externally via sysctl. + */ + int idle_secs_min; + + /** + * @idle_jiffies_min: Same as idle_secs_min except in units + * of jiffies. + */ + unsigned long idle_jiffies_min; + + /** + * @idle_secs_max: A peer that has been idle for less than + * this many seconds will not be considered for garbage collection + * unless its homa_net has more than @net_threshold peers. Set + * externally via sysctl. + */ + int idle_secs_max; + + /** + * @idle_jiffies_max: Same as idle_secs_max except in units + * of jiffies. + */ + unsigned long idle_jiffies_max; + +}; + +/** + * struct homa_peer_key - Used to look up homa_peer structs in an rhashtable. + */ +struct homa_peer_key { + /** + * @addr: Address of the desired host. IPv4 addresses are represented + * with IPv4-mapped IPv6 addresses. Must be the first variable in + * the struct, because of union in homa_peer. + */ + struct in6_addr addr; + + /** @hnet: The network namespace in which this peer is valid. */ + struct homa_net *hnet; +}; + +/** + * struct homa_peer - One of these objects exists for each machine that we + * have communicated with (either as client or server). + */ +struct homa_peer { + union { + /** + * @addr: IPv6 address for the machine (IPv4 addresses are + * stored as IPv4-mapped IPv6 addresses). + */ + struct in6_addr addr; + + /** @ht_key: The hash table key for this peer in peertab->ht. */ + struct homa_peer_key ht_key; + }; + + /** + * @refs: Number of outstanding references to this peer. Includes + * one reference for the entry in peertab->ht, plus one for each + * unmatched call to homa_peer_hold; the peer gets freed when + * this value becomes zero. + */ + refcount_t refs; + + /** + * @access_jiffies: Time in jiffies of most recent access to this + * peer. + */ + unsigned long access_jiffies; + + /** + * @ht_linkage: Used by rashtable implement to link this peer into + * peertab->ht. + */ + struct rhash_head ht_linkage; + + /** + * @lock: used to synchronize access to fields in this struct, such + * as @num_acks, @acks, @dst, and @dst_cookie. + */ + spinlock_t lock ____cacheline_aligned_in_smp; + + /** + * @num_acks: the number of (initial) entries in @acks that + * currently hold valid information. + */ + int num_acks; + + /** + * @acks: info about client RPCs whose results have been completely + * received. + */ + struct homa_ack acks[HOMA_MAX_ACKS_PER_PKT]; + + /** + * @dst: Used to route packets to this peer; this object owns a + * reference that must eventually be released. + */ + struct dst_entry __rcu *dst; + + /** + * @dst_cookie: Used to check whether dst is still valid. This is + * accessed without synchronization, which is racy, but the worst + * that can happen is using an obsolete dst. + */ + u32 dst_cookie; + + /** + * @flow: Addressing info used to create @dst and also required + * when transmitting packets. + */ + struct flowi flow; + + /** + * @outstanding_resends: the number of resend requests we have + * sent to this server (spaced @homa.resend_interval apart) since + * we received a packet from this peer. + */ + int outstanding_resends; + + /** + * @most_recent_resend: @homa->timer_ticks when the most recent + * resend was sent to this peer. + */ + int most_recent_resend; + + /** + * @least_recent_rpc: of all the RPCs for this peer scanned at + * @current_ticks, this is the RPC whose @resend_timer_ticks + * is farthest in the past. + */ + struct homa_rpc *least_recent_rpc; + + /** + * @least_recent_ticks: the @resend_timer_ticks value for + * @least_recent_rpc. + */ + u32 least_recent_ticks; + + /** + * @current_ticks: the value of @homa->timer_ticks the last time + * that @least_recent_rpc and @least_recent_ticks were computed. + * Used to detect the start of a new homa_timer pass. + */ + u32 current_ticks; + + /** + * @resend_rpc: the value of @least_recent_rpc computed in the + * previous homa_timer pass. This RPC will be issued a RESEND + * in the current pass, if it still needs one. + */ + struct homa_rpc *resend_rpc; + + /** @rcu_head: Holds state of a pending call_rcu invocation. */ + struct rcu_head rcu_head; +}; + +void homa_dst_refresh(struct homa_peertab *peertab, + struct homa_peer *peer, struct homa_sock *hsk); +struct dst_entry + *homa_get_dst(struct homa_peer *peer, struct homa_sock *hsk); +void homa_peer_add_ack(struct homa_rpc *rpc); +struct homa_peer + *homa_peer_alloc(struct homa_sock *hsk, const struct in6_addr *addr); +struct homa_peertab + *homa_peer_alloc_peertab(void); +int homa_peer_dointvec(const struct ctl_table *table, int write, + void *buffer, size_t *lenp, loff_t *ppos); +void homa_peer_free(struct rcu_head *head); +void homa_peer_free_net(struct homa_net *hnet); +void homa_peer_free_peertab(struct homa_peertab *peertab); +void homa_peer_gc(struct homa_peertab *peertab); +struct homa_peer + *homa_peer_get(struct homa_sock *hsk, const struct in6_addr *addr); +int homa_peer_get_acks(struct homa_peer *peer, int count, + struct homa_ack *dst); +int homa_peer_pick_victims(struct homa_peertab *peertab, + struct homa_peer *victims[], int max_victims); +int homa_peer_prefer_evict(struct homa_peertab *peertab, + struct homa_peer *peer1, + struct homa_peer *peer2); +void homa_peer_release_fn(void *object, void *dummy); +int homa_peer_reset_dst(struct homa_peer *peer, struct homa_sock *hsk); +void homa_peer_update_sysctl_deps(struct homa_peertab *peertab); + +/** + * homa_peer_lock() - Acquire the lock for a peer. + * @peer: Peer to lock. + */ +static inline void homa_peer_lock(struct homa_peer *peer) + __acquires(peer->lock) +{ + spin_lock_bh(&peer->lock); +} + +/** + * homa_peer_unlock() - Release the lock for a peer. + * @peer: Peer to lock. + */ +static inline void homa_peer_unlock(struct homa_peer *peer) + __releases(peer->lock) +{ + spin_unlock_bh(&peer->lock); +} + +/** + * homa_peer_hold() - Increment the reference count on an RPC, which will + * prevent it from being freed until homa_peer_release() is called. + * @peer: Object on which to take a reference. + */ +static inline void homa_peer_hold(struct homa_peer *peer) +{ + refcount_inc(&peer->refs); +} + +/** + * homa_peer_release() - Release a reference on a peer (cancels the effect of + * a previous call to homa_peer_hold). If the reference count becomes zero + * then the peer may be deleted at any time. + * @peer: Object to release. + */ +static inline void homa_peer_release(struct homa_peer *peer) +{ + if (refcount_dec_and_test(&peer->refs)) + call_rcu(&peer->rcu_head, homa_peer_free); +} + +/** + * homa_peer_compare() - Comparison function for entries in @peertab->ht. + * @arg: Contains one of the keys to compare. + * @obj: homa_peer object containing the other key to compare. + * Return: 0 means the keys match, 1 means mismatch. + */ +static inline int homa_peer_compare(struct rhashtable_compare_arg *arg, + const void *obj) +{ + const struct homa_peer_key *key = arg->key; + const struct homa_peer *peer = obj; + + return !(ipv6_addr_equal(&key->addr, &peer->ht_key.addr) && + peer->ht_key.hnet == key->hnet); +} + +#endif /* _HOMA_PEER_H */ diff --git a/net/homa/murmurhash3.h b/net/homa/murmurhash3.h new file mode 100644 index 000000000000..1ed1f0b67a93 --- /dev/null +++ b/net/homa/murmurhash3.h @@ -0,0 +1,44 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file contains a limited implementation of MurmurHash3; it is + * used for rhashtables instead of the default jhash because it is + * faster (25 ns. vs. 40 ns as of May 2025) + */ + +/** + * murmurhash3() - Hash function. + * @data: Pointer to key for which a hash is desired. + * @len: Length of the key; must be a multiple of 4. + * @seed: Seed for the hash. + * Return: A 32-bit hash value for the given key. + */ +static inline u32 murmurhash3(const void *data, u32 len, u32 seed) +{ + const u32 c1 = 0xcc9e2d51; + const u32 c2 = 0x1b873593; + const u32 *key = data; + u32 h = seed; + + len = len >> 2; + for (size_t i = 0; i < len; i++) { + u32 k = key[i]; + + k *= c1; + k = (k << 15) | (k >> (32 - 15)); + k *= c2; + + h ^= k; + h = (h << 13) | (h >> (32 - 13)); + h = h * 5 + 0xe6546b64; + } + + /* Total number of input bytes */ + h ^= len * 4; + + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + return h; +} -- 2.43.0 These files provide functions for managing the state that Homa keeps for each open Homa socket. Signed-off-by: John Ousterhout --- Changes for v16: * Add error_msg field to struct homa_sock (for HOMAIOCINFO) * Acquire RCU read lock in homa_sock_wakeup_wmem for safety * Refactor homa_sock_init to reduce time in atomic context Changes for v11: * Clean up sparse annotations Changes for v10: * Revise sparse annotations to eliminate __context__ definition * Replace __u16 with u16, __u8 with u8, etc. * Use the destroy function from struct proto properly (fixes races in socket cleanup) Changes for v9: * Add support for homa_net objects; there is now a single socket table shared across all network namespaces * Set SOCK_RCU_FREE in homa_sock_init, not homa_sock_shutdown * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) Changes for v8: * Update for new homa_pool APIs Changes for v7: * Refactor homa_sock_start_scan etc. (take a reference on the socket, so homa_socktab::active_scans and struct homa_socktab_links are no longer needed; encapsulate RCU usage entirely in homa_sock.c). * Add functions for tx memory accounting * Refactor waiting mechanism for incoming messages * Add hsk->is_server, setsockopt SO_HOMA_SERVER * Remove "lock_slow" functions, which don't add functionality in this patch series * Remove locker argument from locking functions * Use u64 and __u64 properly * Take a reference to the socket in homa_sock_find --- net/homa/homa_sock.c | 448 +++++++++++++++++++++++++++++++++++++++++++ net/homa/homa_sock.h | 424 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 872 insertions(+) create mode 100644 net/homa/homa_sock.c create mode 100644 net/homa/homa_sock.h diff --git a/net/homa/homa_sock.c b/net/homa/homa_sock.c new file mode 100644 index 000000000000..758906e975a4 --- /dev/null +++ b/net/homa/homa_sock.c @@ -0,0 +1,448 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file manages homa_sock and homa_socktab objects. */ + +#include "homa_impl.h" +#include "homa_interest.h" +#include "homa_peer.h" +#include "homa_pool.h" + +/** + * homa_socktab_init() - Constructor for homa_socktabs. + * @socktab: The object to initialize; previous contents are discarded. + */ +void homa_socktab_init(struct homa_socktab *socktab) +{ + int i; + + spin_lock_init(&socktab->write_lock); + for (i = 0; i < HOMA_SOCKTAB_BUCKETS; i++) + INIT_HLIST_HEAD(&socktab->buckets[i]); +} + +/** + * homa_socktab_destroy() - Destructor for homa_socktabs: deletes all + * existing sockets. + * @socktab: The object to destroy. + * @hnet: If non-NULL, only sockets for this namespace are deleted. + */ +void homa_socktab_destroy(struct homa_socktab *socktab, struct homa_net *hnet) +{ + struct homa_socktab_scan scan; + struct homa_sock *hsk; + + for (hsk = homa_socktab_start_scan(socktab, &scan); hsk; + hsk = homa_socktab_next(&scan)) { + if (hnet && hnet != hsk->hnet) + continue; + + /* In actual use there should be no sockets left when this + * function is invoked, so the code below will never be + * invoked. However, it is useful during unit tests. + */ + homa_sock_shutdown(hsk); + homa_sock_destroy(&hsk->sock); + } + homa_socktab_end_scan(&scan); +} + +/** + * homa_socktab_start_scan() - Begin an iteration over all of the sockets + * in a socktab. + * @socktab: Socktab to scan. + * @scan: Will hold the current state of the scan; any existing + * contents are discarded. The caller must eventually pass this + * to homa_socktab_end_scan. + * + * Return: The first socket in the table, or NULL if the table is + * empty. If non-NULL, a reference is held on the socket to + * prevent its deletion. + * + * Each call to homa_socktab_next will return the next socket in the table. + * All sockets that are present in the table at the time this function is + * invoked will eventually be returned, as long as they are not removed + * from the table. It is safe to remove sockets from the table while the + * scan is in progress. If a socket is removed from the table during the scan, + * it may or may not be returned by homa_socktab_next. New entries added + * during the scan may or may not be returned. + */ +struct homa_sock *homa_socktab_start_scan(struct homa_socktab *socktab, + struct homa_socktab_scan *scan) +{ + scan->socktab = socktab; + scan->hsk = NULL; + scan->current_bucket = -1; + + return homa_socktab_next(scan); +} + +/** + * homa_socktab_next() - Return the next socket in an iteration over a socktab. + * @scan: State of the scan. + * + * Return: The next socket in the table, or NULL if the iteration has + * returned all of the sockets in the table. If non-NULL, a + * reference is held on the socket to prevent its deletion. + * Sockets are not returned in any particular order. It's + * possible that the returned socket has been destroyed. + */ +struct homa_sock *homa_socktab_next(struct homa_socktab_scan *scan) +{ + struct hlist_head *bucket; + struct hlist_node *next; + + rcu_read_lock(); + if (scan->hsk) { + sock_put(&scan->hsk->sock); + next = rcu_dereference(hlist_next_rcu(&scan->hsk->socktab_links)); + if (next) + goto success; + } + for (scan->current_bucket++; + scan->current_bucket < HOMA_SOCKTAB_BUCKETS; + scan->current_bucket++) { + bucket = &scan->socktab->buckets[scan->current_bucket]; + next = rcu_dereference(hlist_first_rcu(bucket)); + if (next) + goto success; + } + scan->hsk = NULL; + rcu_read_unlock(); + return NULL; + +success: + scan->hsk = hlist_entry(next, struct homa_sock, socktab_links); + sock_hold(&scan->hsk->sock); + rcu_read_unlock(); + return scan->hsk; +} + +/** + * homa_socktab_end_scan() - Must be invoked on completion of each scan + * to clean up state associated with the scan. + * @scan: State of the scan. + */ +void homa_socktab_end_scan(struct homa_socktab_scan *scan) +{ + if (scan->hsk) { + sock_put(&scan->hsk->sock); + scan->hsk = NULL; + } +} + +/** + * homa_sock_init() - Constructor for homa_sock objects. This function + * initializes only the parts of the socket that are owned by Homa. + * @hsk: Object to initialize. The Homa-specific parts must have been + * initialized to zeroes by the caller. + * + * Return: 0 for success, otherwise a negative errno. + */ +int homa_sock_init(struct homa_sock *hsk) +{ + struct homa_pool *buffer_pool; + struct homa_socktab *socktab; + struct homa_sock *other; + struct homa_net *hnet; + struct homa *homa; + int starting_port; + int result = 0; + int i; + + hnet = (struct homa_net *)net_generic(sock_net(&hsk->sock), + homa_net_id); + homa = hnet->homa; + socktab = homa->socktab; + + /* Initialize fields outside the Homa part. */ + hsk->sock.sk_sndbuf = homa->wmem_max; + sock_set_flag(&hsk->inet.sk, SOCK_RCU_FREE); + + /* Do things requiring memory allocation before locking the socket, + * so that GFP_ATOMIC is not needed. + */ + buffer_pool = homa_pool_alloc(hsk); + if (IS_ERR(buffer_pool)) + return PTR_ERR(buffer_pool); + + /* Initialize Homa-specific fields. We can initialize everything + * except the port and hash table links without acquiring the + * socket lock. + */ + hsk->homa = homa; + hsk->hnet = hnet; + hsk->buffer_pool = buffer_pool; + hsk->inet.inet_num = hsk->port; + hsk->inet.inet_sport = htons(hsk->port); + + hsk->is_server = false; + hsk->shutdown = false; + hsk->ip_header_length = (hsk->inet.sk.sk_family == AF_INET) ? + sizeof(struct iphdr) : sizeof(struct ipv6hdr); + spin_lock_init(&hsk->lock); + atomic_set(&hsk->protect_count, 0); + INIT_LIST_HEAD(&hsk->active_rpcs); + INIT_LIST_HEAD(&hsk->dead_rpcs); + hsk->dead_skbs = 0; + INIT_LIST_HEAD(&hsk->waiting_for_bufs); + INIT_LIST_HEAD(&hsk->ready_rpcs); + INIT_LIST_HEAD(&hsk->interests); + for (i = 0; i < HOMA_CLIENT_RPC_BUCKETS; i++) { + struct homa_rpc_bucket *bucket = &hsk->client_rpc_buckets[i]; + + spin_lock_init(&bucket->lock); + bucket->id = i; + INIT_HLIST_HEAD(&bucket->rpcs); + } + for (i = 0; i < HOMA_SERVER_RPC_BUCKETS; i++) { + struct homa_rpc_bucket *bucket = &hsk->server_rpc_buckets[i]; + + spin_lock_init(&bucket->lock); + bucket->id = i + 1000000; + INIT_HLIST_HEAD(&bucket->rpcs); + } + + /* Pick a default port. Must keep the socktab locked from now + * until the new socket is added to the socktab, to ensure that + * no other socket chooses the same port. + */ + spin_lock_bh(&socktab->write_lock); + starting_port = hnet->prev_default_port; + while (1) { + hnet->prev_default_port++; + if (hnet->prev_default_port < HOMA_MIN_DEFAULT_PORT) + hnet->prev_default_port = HOMA_MIN_DEFAULT_PORT; + other = homa_sock_find(hnet, hnet->prev_default_port); + if (!other) + break; + sock_put(&other->sock); + if (hnet->prev_default_port == starting_port) { + spin_unlock_bh(&socktab->write_lock); + hsk->shutdown = true; + hsk->homa = NULL; + result = -EADDRNOTAVAIL; + goto error; + } + spin_unlock_bh(&socktab->write_lock); + cond_resched(); + spin_lock_bh(&socktab->write_lock); + } + hsk->port = hnet->prev_default_port; + hlist_add_head_rcu(&hsk->socktab_links, + &socktab->buckets[homa_socktab_bucket(hnet, + hsk->port)]); + spin_unlock_bh(&socktab->write_lock); + return result; + +error: + homa_pool_free(buffer_pool); + return result; +} + +/* + * homa_sock_unlink() - Unlinks a socket from its socktab and does + * related cleanups. Once this method returns, the socket will not be + * discoverable through the socktab. + * @hsk: Socket to unlink. + */ +void homa_sock_unlink(struct homa_sock *hsk) +{ + struct homa_socktab *socktab = hsk->homa->socktab; + + spin_lock_bh(&socktab->write_lock); + hlist_del_rcu(&hsk->socktab_links); + spin_unlock_bh(&socktab->write_lock); +} + +/** + * homa_sock_shutdown() - Disable a socket so that it can no longer + * be used for either sending or receiving messages. Any system calls + * currently waiting to send or receive messages will be aborted. This + * function will terminate any existing use of the socket, but it does + * not free up socket resources: that happens in homa_sock_destroy. + * @hsk: Socket to shut down. + */ +void homa_sock_shutdown(struct homa_sock *hsk) +{ + struct homa_interest *interest; + struct homa_rpc *rpc; + + homa_sock_lock(hsk); + if (hsk->shutdown || !hsk->homa) { + homa_sock_unlock(hsk); + return; + } + + /* The order of cleanup is very important, because there could be + * active operations that hold RPC locks but not the socket lock. + * 1. Set @shutdown; this ensures that no new RPCs will be created for + * this socket (though some creations might already be in progress). + * 2. Remove the socket from its socktab: this ensures that + * incoming packets for the socket will be dropped. + * 3. Go through all of the RPCs and delete them; this will + * synchronize with any operations in progress. + * 4. Perform other socket cleanup: at this point we know that + * there will be no concurrent activities on individual RPCs. + * 5. Don't delete the buffer pool until after all of the RPCs + * have been reaped. + * See "Homa Locking Strategy" in homa_impl.h for additional information + * about locking. + */ + hsk->shutdown = true; + homa_sock_unlink(hsk); + homa_sock_unlock(hsk); + + rcu_read_lock(); + list_for_each_entry_rcu(rpc, &hsk->active_rpcs, active_links) { + homa_rpc_lock(rpc); + homa_rpc_end(rpc); + homa_rpc_unlock(rpc); + } + rcu_read_unlock(); + + homa_sock_lock(hsk); + while (!list_empty(&hsk->interests)) { + interest = list_first_entry(&hsk->interests, + struct homa_interest, links); + list_del_init(&interest->links); + atomic_set_release(&interest->ready, 1); + wake_up(&interest->wait_queue); + } + homa_sock_unlock(hsk); +} + +/** + * homa_sock_destroy() - Release all of the internal resources associated + * with a socket; is invoked at time when that is safe (i.e., all references + * on the socket have been dropped). + * @sk: Socket to destroy. + */ +void homa_sock_destroy(struct sock *sk) +{ + struct homa_sock *hsk = homa_sk(sk); + + if (!hsk->homa) + return; + + while (!list_empty(&hsk->dead_rpcs)) + homa_rpc_reap(hsk, true); + + WARN_ON_ONCE(refcount_read(&hsk->sock.sk_wmem_alloc) != 1); + + if (hsk->buffer_pool) { + homa_pool_free(hsk->buffer_pool); + hsk->buffer_pool = NULL; + } +} + +/** + * homa_sock_bind() - Associates a server port with a socket; if there + * was a previous server port assignment for @hsk, it is abandoned. + * @hnet: Network namespace with which port is associated. + * @hsk: Homa socket. + * @port: Desired server port for @hsk. If 0, then this call + * becomes a no-op: the socket will continue to use + * its randomly assigned client port. + * + * Return: 0 for success, otherwise a negative errno. If an error is + * returned, hsk->error_msg is set. + */ +int homa_sock_bind(struct homa_net *hnet, struct homa_sock *hsk, + u16 port) +{ + struct homa_socktab *socktab = hnet->homa->socktab; + struct homa_sock *owner; + int result = 0; + + if (port == 0) + return result; + if (port >= HOMA_MIN_DEFAULT_PORT) { + hsk->error_msg = "port number invalid: in the automatically assigned range"; + return -EINVAL; + } + homa_sock_lock(hsk); + spin_lock_bh(&socktab->write_lock); + if (hsk->shutdown) { + hsk->error_msg = "socket has been shut down"; + result = -ESHUTDOWN; + goto done; + } + + owner = homa_sock_find(hnet, port); + if (owner) { + sock_put(&owner->sock); + if (owner != hsk) { + hsk->error_msg = "requested port number is already in use"; + result = -EADDRINUSE; + } + goto done; + } + hlist_del_rcu(&hsk->socktab_links); + hsk->port = port; + hsk->inet.inet_num = port; + hsk->inet.inet_sport = htons(hsk->port); + hlist_add_head_rcu(&hsk->socktab_links, + &socktab->buckets[homa_socktab_bucket(hnet, port)]); + hsk->is_server = true; +done: + spin_unlock_bh(&socktab->write_lock); + homa_sock_unlock(hsk); + return result; +} + +/** + * homa_sock_find() - Returns the socket associated with a given port. + * @hnet: Network namespace where the socket will be used. + * @port: The port of interest. + * Return: The socket that owns @port, or NULL if none. If non-NULL + * then this method has taken a reference on the socket and + * the caller must call sock_put to release it. + */ +struct homa_sock *homa_sock_find(struct homa_net *hnet, u16 port) +{ + int bucket = homa_socktab_bucket(hnet, port); + struct homa_sock *result = NULL; + struct homa_sock *hsk; + + rcu_read_lock(); + hlist_for_each_entry_rcu(hsk, &hnet->homa->socktab->buckets[bucket], + socktab_links) { + if (hsk->port == port && hsk->hnet == hnet) { + result = hsk; + sock_hold(&hsk->sock); + break; + } + } + rcu_read_unlock(); + return result; +} + +/** + * homa_sock_wait_wmem() - Block the thread until @hsk's usage of tx + * packet memory drops below the socket's limit. + * @hsk: Socket of interest. + * @nonblocking: If there's not enough memory, return -EWOLDBLOCK instead + * of blocking. + * Return: 0 for success, otherwise a negative errno. + */ +int homa_sock_wait_wmem(struct homa_sock *hsk, int nonblocking) +{ + long timeo = hsk->sock.sk_sndtimeo; + int result; + + /* Note: we can't use sock_wait_for_wmem because that function + * is not available to modules (as of August 2025 it's static). + */ + + if (nonblocking) + timeo = 0; + set_bit(SOCK_NOSPACE, &hsk->sock.sk_socket->flags); + result = wait_event_interruptible_timeout(*sk_sleep(&hsk->sock), + homa_sock_wmem_avl(hsk) || hsk->shutdown, + timeo); + if (signal_pending(current)) + return -EINTR; + if (result == 0) + return -EWOULDBLOCK; + return 0; +} diff --git a/net/homa/homa_sock.h b/net/homa/homa_sock.h new file mode 100644 index 000000000000..143cb92c4bdf --- /dev/null +++ b/net/homa/homa_sock.h @@ -0,0 +1,424 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file defines structs and other things related to Homa sockets. */ + +#ifndef _HOMA_SOCK_H +#define _HOMA_SOCK_H + +/* Forward declarations. */ +struct homa; +struct homa_pool; + +/* Number of hash buckets in a homa_socktab. Must be a power of 2. */ +#define HOMA_SOCKTAB_BUCKET_BITS 10 +#define HOMA_SOCKTAB_BUCKETS BIT(HOMA_SOCKTAB_BUCKET_BITS) + +/** + * struct homa_socktab - A hash table that maps from port numbers (either + * client or server) to homa_sock objects. + * + * This table is managed exclusively by homa_socktab.c, using RCU to + * minimize synchronization during lookups. + */ +struct homa_socktab { + /** + * @write_lock: Controls all modifications to this object; not needed + * for socket lookups (RCU is used instead). Also used to + * synchronize port allocation. + */ + spinlock_t write_lock; + + /** + * @buckets: Heads of chains for hash table buckets. Chains + * consist of homa_sock objects. + */ + struct hlist_head buckets[HOMA_SOCKTAB_BUCKETS]; +}; + +/** + * struct homa_socktab_scan - Records the state of an iteration over all + * the entries in a homa_socktab, in a way that is safe against concurrent + * reclamation of sockets. + */ +struct homa_socktab_scan { + /** @socktab: The table that is being scanned. */ + struct homa_socktab *socktab; + + /** + * @hsk: Points to the current socket in the iteration, or NULL if + * we're at the beginning or end of the iteration. If non-NULL then + * we are holding a reference to this socket. + */ + struct homa_sock *hsk; + + /** + * @current_bucket: The index of the bucket in socktab->buckets + * currently being scanned (-1 if @hsk == NULL). + */ + int current_bucket; +}; + +/** + * struct homa_rpc_bucket - One bucket in a hash table of RPCs. + */ + +struct homa_rpc_bucket { + /** + * @lock: serves as a lock both for this bucket (e.g., when + * adding and removing RPCs) and also for all of the RPCs in + * the bucket. Must be held whenever looking up an RPC in + * this bucket or manipulating an RPC in the bucket. This approach + * has the following properties: + * 1. An RPC can be looked up and locked (a common operation) with + * a single lock acquisition. + * 2. Looking up and locking are atomic: there is no window of + * vulnerability where someone else could delete an RPC after + * it has been looked up and before it has been locked. + * 3. The lookup mechanism does not use RCU. This is important because + * RPCs are created rapidly and typically live only a few tens of + * microseconds. As of May 2025 RCU introduces a lag of about + * 25 ms before objects can be deleted; for RPCs this would result + * in hundreds or thousands of RPCs accumulating before RCU allows + * them to be deleted. + * This approach has the disadvantage that RPCs within a bucket share + * locks and thus may not be able to work concurrently, but there are + * enough buckets in the table to make such colllisions rare. + * + * See "Homa Locking Strategy" in homa_impl.h for more info about + * locking. + */ + spinlock_t lock; + + /** + * @id: identifier for this bucket, used in error messages etc. + * It's the index of the bucket within its hash table bucket + * array, with an additional offset to separate server and + * client RPCs. + */ + int id; + + /** @rpcs: list of RPCs that hash to this bucket. */ + struct hlist_head rpcs; +}; + +/** + * define HOMA_CLIENT_RPC_BUCKETS - Number of buckets in hash tables for + * client RPCs. Must be a power of 2. + */ +#define HOMA_CLIENT_RPC_BUCKETS 1024 + +/** + * define HOMA_SERVER_RPC_BUCKETS - Number of buckets in hash tables for + * server RPCs. Must be a power of 2. + */ +#define HOMA_SERVER_RPC_BUCKETS 1024 + +/** + * struct homa_sock - Information about an open socket. + */ +struct homa_sock { + /* Info for other network layers. Note: IPv6 info (struct ipv6_pinfo + * comes at the very end of the struct, *after* Homa's data, if this + * socket uses IPv6). + */ + union { + /** @sock: generic socket data; must be the first field. */ + struct sock sock; + + /** + * @inet: generic Internet socket data; must also be the + first field (contains sock as its first member). + */ + struct inet_sock inet; + }; + + /** + * @homa: Overall state about the Homa implementation. NULL + * means this socket was never initialized or has been deleted. + */ + struct homa *homa; + + /** + * @hnet: Overall state specific to the network namespace for + * this socket. + */ + struct homa_net *hnet; + + /** + * @buffer_pool: used to allocate buffer space for incoming messages. + * Storage is dynamically allocated. + */ + struct homa_pool *buffer_pool; + + /** + * @port: Port number: identifies this socket uniquely among all + * those on this node. + */ + u16 port; + + /** + * @is_server: True means that this socket can act as both client + * and server; false means the socket is client-only. + */ + bool is_server; + + /** + * @shutdown: True means the socket is no longer usable (either + * shutdown has already been invoked, or the socket was never + * properly initialized). Note: can't use the SOCK_DEAD flag for + * this because that flag doesn't get set until much later in the + * process of closing a socket. + */ + bool shutdown; + + /** + * @ip_header_length: Length of IP headers for this socket (depends + * on IPv4 vs. IPv6). + */ + int ip_header_length; + + /** @socktab_links: Links this socket into a homa_socktab bucket. */ + struct hlist_node socktab_links; + + /** + * @error_msg: Static string giving human-readable information about + * the reason for the last error returned by a Homa kernel call. + * Applications can fetch this with the HOMAIOCINFO ioctl to figure + * out why a call failed. + */ + char *error_msg; + + /* Information above is (almost) never modified; start a new + * cache line below for info that is modified frequently. + */ + + /** + * @lock: Must be held when modifying fields such as interests + * and lists of RPCs. This lock is used in place of sk->sk_lock + * because it's used differently (it's always used as a simple + * spin lock). See "Homa Locking Strategy" in homa_impl.h + * for more on Homa's synchronization strategy. + */ + spinlock_t lock ____cacheline_aligned_in_smp; + + /** + * @protect_count: counts the number of calls to homa_protect_rpcs + * for which there have not yet been calls to homa_unprotect_rpcs. + */ + atomic_t protect_count; + + /** + * @active_rpcs: List of all existing RPCs related to this socket, + * including both client and server RPCs. This list isn't strictly + * needed, since RPCs are already in one of the hash tables below, + * but it's more efficient for homa_timer to have this list + * (so it doesn't have to scan large numbers of hash buckets). + * The list is sorted, with the oldest RPC first. Manipulate with + * RCU so timer can access without locking. + */ + struct list_head active_rpcs; + + /** + * @dead_rpcs: Contains RPCs for which homa_rpc_end has been + * called, but which have not yet been reaped by homa_rpc_reap. + */ + struct list_head dead_rpcs; + + /** @dead_skbs: Total number of socket buffers in RPCs on dead_rpcs. */ + int dead_skbs; + + /** + * @waiting_for_bufs: Contains RPCs that are blocked because there + * wasn't enough space in the buffer pool region for their incoming + * messages. Sorted in increasing order of message length. + */ + struct list_head waiting_for_bufs; + + /** + * @ready_rpcs: List of all RPCs that are ready for attention from + * an application thread. + */ + struct list_head ready_rpcs; + + /** + * @interests: List of threads that are currently waiting for + * incoming messages via homa_wait_shared. + */ + struct list_head interests; + + /** + * @client_rpc_buckets: Hash table for fast lookup of client RPCs. + * Modifications are synchronized with bucket locks, not + * the socket lock. + */ + struct homa_rpc_bucket client_rpc_buckets[HOMA_CLIENT_RPC_BUCKETS]; + + /** + * @server_rpc_buckets: Hash table for fast lookup of server RPCs. + * Modifications are synchronized with bucket locks, not + * the socket lock. + */ + struct homa_rpc_bucket server_rpc_buckets[HOMA_SERVER_RPC_BUCKETS]; +}; + +/** + * struct homa_v6_sock - For IPv6, additional IPv6-specific information + * is present in the socket struct after Homa-specific information. + */ +struct homa_v6_sock { + /** @homa: All socket info except for IPv6-specific stuff. */ + struct homa_sock homa; + + /** @inet6: Socket info specific to IPv6. */ + struct ipv6_pinfo inet6; +}; + +int homa_sock_bind(struct homa_net *hnet, struct homa_sock *hsk, + u16 port); +void homa_sock_destroy(struct sock *sk); +struct homa_sock *homa_sock_find(struct homa_net *hnet, u16 port); +int homa_sock_init(struct homa_sock *hsk); +void homa_sock_shutdown(struct homa_sock *hsk); +void homa_sock_unlink(struct homa_sock *hsk); +int homa_sock_wait_wmem(struct homa_sock *hsk, int nonblocking); +void homa_socktab_destroy(struct homa_socktab *socktab, + struct homa_net *hnet); +void homa_socktab_end_scan(struct homa_socktab_scan *scan); +void homa_socktab_init(struct homa_socktab *socktab); +struct homa_sock *homa_socktab_next(struct homa_socktab_scan *scan); +struct homa_sock *homa_socktab_start_scan(struct homa_socktab *socktab, + struct homa_socktab_scan *scan); + +/** + * homa_sock_lock() - Acquire the lock for a socket. + * @hsk: Socket to lock. + */ +static inline void homa_sock_lock(struct homa_sock *hsk) + __acquires(hsk->lock) +{ + spin_lock_bh(&hsk->lock); +} + +/** + * homa_sock_unlock() - Release the lock for a socket. + * @hsk: Socket to lock. + */ +static inline void homa_sock_unlock(struct homa_sock *hsk) + __releases(hsk->lock) +{ + spin_unlock_bh(&hsk->lock); +} + +/** + * homa_socktab_bucket() - Compute the bucket number in a homa_socktab + * that will contain a particular socket. + * @hnet: Network namespace of the desired socket. + * @port: Port number of the socket. + * + * Return: The index of the bucket in which a socket matching @hnet and + * @port will be found (if it exists). + */ +static inline int homa_socktab_bucket(struct homa_net *hnet, u16 port) +{ + return hash_32((uintptr_t)hnet ^ port, HOMA_SOCKTAB_BUCKET_BITS); +} + +/** + * homa_client_rpc_bucket() - Find the bucket containing a given + * client RPC. + * @hsk: Socket associated with the RPC. + * @id: Id of the desired RPC. + * + * Return: The bucket in which this RPC will appear, if the RPC exists. + */ +static inline struct homa_rpc_bucket + *homa_client_rpc_bucket(struct homa_sock *hsk, u64 id) +{ + /* We can use a really simple hash function here because RPC ids + * are allocated sequentially. + */ + return &hsk->client_rpc_buckets[(id >> 1) & + (HOMA_CLIENT_RPC_BUCKETS - 1)]; +} + +/** + * homa_server_rpc_bucket() - Find the bucket containing a given + * server RPC. + * @hsk: Socket associated with the RPC. + * @id: Id of the desired RPC. + * + * Return: The bucket in which this RPC will appear, if the RPC exists. + */ +static inline struct homa_rpc_bucket + *homa_server_rpc_bucket(struct homa_sock *hsk, u64 id) +{ + /* Each client allocates RPC ids sequentially, so they will + * naturally distribute themselves across the hash space. + * Thus we can use the id directly as hash. + */ + return &hsk->server_rpc_buckets[(id >> 1) + & (HOMA_SERVER_RPC_BUCKETS - 1)]; +} + +/** + * homa_bucket_lock() - Acquire the lock for an RPC hash table bucket. + * @bucket: Bucket to lock. + * @id: Id of the RPC on whose behalf the bucket is being locked. + * Used only for metrics. + */ +static inline void homa_bucket_lock(struct homa_rpc_bucket *bucket, u64 id) + __acquires(bucket->lock) +{ + spin_lock_bh(&bucket->lock); +} + +/** + * homa_bucket_unlock() - Release the lock for an RPC hash table bucket. + * @bucket: Bucket to unlock. + * @id: ID of the RPC that was using the lock. + */ +static inline void homa_bucket_unlock(struct homa_rpc_bucket *bucket, u64 id) + __releases(bucket->lock) +{ + spin_unlock_bh(&bucket->lock); +} + +static inline struct homa_sock *homa_sk(const struct sock *sk) +{ + return (struct homa_sock *)sk; +} + +/** + * homa_sock_wmem_avl() - Returns true if the socket is within its limit + * for output memory usage. False means that no new messages should be sent + * until memory is freed. + * @hsk: Socket of interest. + * Return: See above. + */ +static inline bool homa_sock_wmem_avl(struct homa_sock *hsk) +{ + return refcount_read(&hsk->sock.sk_wmem_alloc) < hsk->sock.sk_sndbuf; +} + +/** + * homa_sock_wakeup_wmem() - Invoked when tx packet memory has been freed; + * if memory usage is below the limit and there are tasks waiting for memory, + * wake them up. + * @hsk: Socket of interest. + */ +static inline void homa_sock_wakeup_wmem(struct homa_sock *hsk) +{ + /* Note: can't use sk_stream_write_space for this functionality + * because it uses a different test to determine whether enough + * memory is available. + */ + if (test_bit(SOCK_NOSPACE, &hsk->sock.sk_socket->flags) && + homa_sock_wmem_avl(hsk)) { + clear_bit(SOCK_NOSPACE, &hsk->sock.sk_socket->flags); + rcu_read_lock(); + wake_up_interruptible_poll(sk_sleep(&hsk->sock), EPOLLOUT); + rcu_read_unlock(); + } +} + +#endif /* _HOMA_SOCK_H */ -- 2.43.0 These files implement the homa_interest struct, which is used to wait for incoming messages. Signed-off-by: John Ousterhout --- Changes for v14: * Fix race in homa_wait_shared (an RPC could get lost if it became ready at the same time that homa_interest_wait returned with an error) * Remove nonblocking parameter from homa_interest_wait (handle this elsewhere) Changes for v11: * Clean up sparse annotations Changes for v10: none Changes for v9: * Remove unused field homa_interest->core --- net/homa/homa_interest.c | 114 +++++++++++++++++++++++++++++++++++++++ net/homa/homa_interest.h | 93 ++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 net/homa/homa_interest.c create mode 100644 net/homa/homa_interest.h diff --git a/net/homa/homa_interest.c b/net/homa/homa_interest.c new file mode 100644 index 000000000000..6daeedd21309 --- /dev/null +++ b/net/homa/homa_interest.c @@ -0,0 +1,114 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file contains functions for managing homa_interest structs. */ + +#include "homa_impl.h" +#include "homa_interest.h" +#include "homa_rpc.h" +#include "homa_sock.h" + +/** + * homa_interest_init_shared() - Initialize an interest and queue it up on + * a socket. + * @interest: Interest to initialize + * @hsk: Socket on which the interests should be queued. Must be locked + * by caller. + */ +void homa_interest_init_shared(struct homa_interest *interest, + struct homa_sock *hsk) + __must_hold(hsk->lock) +{ + interest->rpc = NULL; + atomic_set(&interest->ready, 0); + interest->blocked = 0; + init_waitqueue_head(&interest->wait_queue); + interest->hsk = hsk; + list_add(&interest->links, &hsk->interests); +} + +/** + * homa_interest_init_private() - Initialize an interest that will wait + * on a particular (private) RPC, and link it to that RPC. + * @interest: Interest to initialize. + * @rpc: RPC to associate with the interest. Must be private, and + * caller must have locked it. + * + * Return: 0 for success, otherwise a negative errno. + */ +int homa_interest_init_private(struct homa_interest *interest, + struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + if (rpc->private_interest) + return -EINVAL; + + interest->rpc = rpc; + atomic_set(&interest->ready, 0); + interest->blocked = 0; + init_waitqueue_head(&interest->wait_queue); + interest->hsk = rpc->hsk; + rpc->private_interest = interest; + return 0; +} + +/** + * homa_interest_wait() - Wait for an interest to have an actionable RPC, + * or for an error to occur. + * @interest: Interest to wait for; must previously have been initialized + * and linked to a socket or RPC. On return, the interest + * will have been unlinked if its ready flag is set; otherwise + * it may still be linked. + * + * Return: 0 for success (the ready flag is set in the interest), or -EINTR + * if the thread received an interrupt. + */ +int homa_interest_wait(struct homa_interest *interest) +{ + struct homa_sock *hsk = interest->hsk; + int result = 0; + int iteration; + int wait_err; + + interest->blocked = 0; + + /* This loop iterates in order to poll and/or reap dead RPCS. */ + for (iteration = 0; ; iteration++) { + if (iteration != 0) + /* Give NAPI/SoftIRQ tasks a chance to run. */ + schedule(); + + if (atomic_read_acquire(&interest->ready) != 0) + goto done; + + /* See if we can cleanup dead RPCs while waiting. */ + if (homa_rpc_reap(hsk, false) != 0) + continue; + + break; + } + + interest->blocked = 1; + wait_err = wait_event_interruptible_exclusive(interest->wait_queue, + atomic_read_acquire(&interest->ready) != 0); + if (wait_err == -ERESTARTSYS) + result = -EINTR; + +done: + return result; +} + +/** + * homa_interest_notify_private() - If a thread is waiting on the private + * interest for an RPC, wake it up. + * @rpc: RPC that may (potentially) have a private interest. Must be + * locked by the caller. + */ +void homa_interest_notify_private(struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + if (rpc->private_interest) { + atomic_set_release(&rpc->private_interest->ready, 1); + wake_up(&rpc->private_interest->wait_queue); + } +} + diff --git a/net/homa/homa_interest.h b/net/homa/homa_interest.h new file mode 100644 index 000000000000..d9f932960fd8 --- /dev/null +++ b/net/homa/homa_interest.h @@ -0,0 +1,93 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file defines struct homa_interest and related functions. */ + +#ifndef _HOMA_INTEREST_H +#define _HOMA_INTEREST_H + +#include "homa_rpc.h" +#include "homa_sock.h" + +/** + * struct homa_interest - Holds info that allows applications to wait for + * incoming RPC messages. An interest can be either private, in which case + * the application is waiting for a single specific RPC response and the + * interest is referenced by an rpc->private_interest, or shared, in which + * case the application is waiting for any incoming message that isn't + * private and the interest is present on hsk->interests. + */ +struct homa_interest { + /** + * @rpc: If ready is set, then this holds an RPC that needs + * attention, or NULL if this is a shared interest and hsk has + * been shutdown. If ready is not set, this will be NULL if the + * interest is shared; if it's private, it holds the RPC the + * interest is associated with. If non-NULL, a reference has been + * taken on the RPC. + */ + struct homa_rpc *rpc; + + /** + * @ready: Nonzero means the interest is ready for attention: either + * there is an RPC that needs attention or @hsk has been shutdown. + */ + atomic_t ready; + + /** + * @blocked: Zero means a handoff was received without the thread + * needing to block; nonzero means the thread blocked. + */ + int blocked; + + /** + * @wait_queue: Used to block the thread while waiting (will never + * have more than one queued thread). + */ + struct wait_queue_head wait_queue; + + /** @hsk: Socket that the interest is associated with. */ + struct homa_sock *hsk; + + /** + * @links: If the interest is shared, used to link this object into + * @hsk->interests. + */ + struct list_head links; +}; + +/** + * homa_interest_unlink_shared() - Remove an interest from the list for a + * socket. Note: this can race with homa_rpc_handoff, so on return it's + * possible that the interest is ready. + * @interest: Interest to remove. Must have been initialized with + * homa_interest_init_shared. + */ +static inline void homa_interest_unlink_shared(struct homa_interest *interest) + __must_hold(hsk->lock) +{ + list_del_init(&interest->links); +} + +/** + * homa_interest_unlink_private() - Detach a private interest from its + * RPC. Note: this can race with homa_rpc_handoff, so on return it's + * possible that the interest is ready. + * @interest: Interest to remove. Must have been initialized with + * homa_interest_init_private. Its RPC must be locked by + * the caller. + */ +static inline void homa_interest_unlink_private(struct homa_interest *interest) + __must_hold(interest->rpc->bucket->lock) +{ + if (interest == interest->rpc->private_interest) + interest->rpc->private_interest = NULL; +} + +void homa_interest_init_shared(struct homa_interest *interest, + struct homa_sock *hsk); +int homa_interest_init_private(struct homa_interest *interest, + struct homa_rpc *rpc); +void homa_interest_notify_private(struct homa_rpc *rpc); +int homa_interest_wait(struct homa_interest *interest); + +#endif /* _HOMA_INTEREST_H */ -- 2.43.0 These files provide basic functions for managing remote procedure calls, which are the fundamental entities managed by Homa. Each RPC consists of a request message from a client to a server, followed by a response message returned from the server to the client. Signed-off-by: John Ousterhout --- Changes for v16: * Retain retransmitted packets until homa_rpc_reap (to ensure that RPCs don't get reaped with retransmitted packets still in the tx pipeline) * Fix deadlock over hsk->protect_count in homa_rpc_reap * Fix bugs in wmem management * Use set_bit and clear_bit for flag bits * Use refcount_t instead of atomic_t for reference counts * Replace inline code with homa_rpc_lock_preempt function * Reduce stack usage in homa_rpc_reap * Use consume_skb and kfree_skb_reason instead of kfree_skb * Add homa_rpc_get_info() for use in HOMAIOCINFO * Set hsk->error_msg Changes for v14: * Add msgout.first_not_tx field needed by homa_rpc_tx_end function (better abstraction) Changes for v11: * Cleanup and simplify use of RPC reference counts. * Rework the mechanism for waking up RPCs that stalled waiting for buffer pool space. Changes for v10: * Replace __u16 with u16, __u8 with u8, etc. * Improve documentation * Revise sparse annotations to eliminate __context__ definition * Use kzalloc instead of __GFP_ZERO * Fix issues from xmastree, sparse, etc. Changes for v9: * Eliminate reap.txt; move its contents into code as a comment in homa_rpc_reap * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) * Add support for homa_net objects * Use new homa_clock abstraction layer Changes for v8: * Updates to reflect pacer refactoring Changes for v7: * Implement accounting for bytes in tx skbs * Fix potential races related to homa->active_rpcs * Refactor waiting mechanism for incoming packets: simplify wait criteria and use standard Linux mechanisms for waiting * Add reference counting for RPCs (homa_rpc_hold, homa_rpc_put) * Remove locker argument from locking functions * Rename homa_rpc_free to homa_rpc_end * Use u64 and __u64 properly * Use __skb_queue_purge instead of skb_queue_purge * Use __GFP_ZERO in kmalloc calls * Eliminate spurious RCU usage --- net/homa/homa_impl.h | 6 + net/homa/homa_rpc.c | 712 +++++++++++++++++++++++++++++++++++++++++++ net/homa/homa_rpc.h | 532 ++++++++++++++++++++++++++++++++ 3 files changed, 1250 insertions(+) create mode 100644 net/homa/homa_rpc.c create mode 100644 net/homa/homa_rpc.h diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h index 512122ce5ae3..0be394534ad5 100644 --- a/net/homa/homa_impl.h +++ b/net/homa/homa_impl.h @@ -363,6 +363,12 @@ static inline bool homa_make_header_avl(struct sk_buff *skb) extern unsigned int homa_net_id; int homa_ioc_info(struct socket *sock, unsigned long arg); +void homa_rpc_handoff(struct homa_rpc *rpc); +int homa_xmit_control(enum homa_packet_type type, void *contents, + size_t length, struct homa_rpc *rpc); + +int homa_message_in_init(struct homa_rpc *rpc, int unsched); +void homa_xmit_data(struct homa_rpc *rpc); /** * homa_net() - Return the struct homa_net associated with a particular diff --git a/net/homa/homa_rpc.c b/net/homa/homa_rpc.c new file mode 100644 index 000000000000..2a9a995588ae --- /dev/null +++ b/net/homa/homa_rpc.c @@ -0,0 +1,712 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file contains functions for managing homa_rpc structs. */ + +#include "homa_impl.h" +#include "homa_interest.h" +#include "homa_peer.h" +#include "homa_pool.h" + +#include "homa_stub.h" + +/** + * homa_rpc_alloc_client() - Allocate and initialize a client RPC (one that + * is used to issue an outgoing request). Doesn't send any packets. Invoked + * with no locks held. + * @hsk: Socket to which the RPC belongs. + * @dest: Address of host (ip and port) to which the RPC will be sent. + * + * Return: A printer to the newly allocated object, or a negative + * errno if an error occurred. The RPC will be locked; the + * caller must eventually unlock it. Sets hsk->error_msg on errors. + */ +struct homa_rpc *homa_rpc_alloc_client(struct homa_sock *hsk, + const union sockaddr_in_union *dest) + __cond_acquires(crpc->bucket->lock) +{ + struct in6_addr dest_addr_as_ipv6 = canonical_ipv6_addr(dest); + struct homa_rpc_bucket *bucket; + struct homa_rpc *crpc; + int err; + + crpc = kzalloc(sizeof(*crpc), GFP_KERNEL); + if (unlikely(!crpc)) { + hsk->error_msg = "couldn't allocate memory for client RPC"; + return ERR_PTR(-ENOMEM); + } + + /* Initialize fields that don't require the socket lock. */ + crpc->hsk = hsk; + crpc->id = atomic64_fetch_add(2, &hsk->homa->next_outgoing_id); + bucket = homa_client_rpc_bucket(hsk, crpc->id); + crpc->bucket = bucket; + crpc->state = RPC_OUTGOING; + refcount_set(&crpc->refs, 1); + crpc->peer = homa_peer_get(hsk, &dest_addr_as_ipv6); + if (IS_ERR(crpc->peer)) { + err = PTR_ERR(crpc->peer); + crpc->peer = NULL; + goto error; + } + crpc->dport = ntohs(dest->in6.sin6_port); + crpc->msgin.length = -1; + crpc->msgout.length = -1; + INIT_LIST_HEAD(&crpc->ready_links); + INIT_LIST_HEAD(&crpc->buf_links); + INIT_LIST_HEAD(&crpc->dead_links); + INIT_LIST_HEAD(&crpc->throttled_links); + crpc->resend_timer_ticks = hsk->homa->timer_ticks; + crpc->magic = HOMA_RPC_MAGIC; + crpc->start_time = homa_clock(); + + /* Initialize fields that require locking. This allows the most + * expensive work, such as copying in the message from user space, + * to be performed without holding locks. Also, can't hold spin + * locks while doing things that could block, such as memory allocation. + */ + homa_bucket_lock(bucket, crpc->id); + homa_sock_lock(hsk); + if (hsk->shutdown) { + homa_sock_unlock(hsk); + homa_rpc_unlock(crpc); + hsk->error_msg = "socket has been shut down"; + err = -ESHUTDOWN; + goto error; + } + hlist_add_head(&crpc->hash_links, &bucket->rpcs); + rcu_read_lock(); + list_add_tail_rcu(&crpc->active_links, &hsk->active_rpcs); + rcu_read_unlock(); + homa_sock_unlock(hsk); + + return crpc; + +error: + if (crpc->peer) + homa_peer_release(crpc->peer); + kfree(crpc); + return ERR_PTR(err); +} + +/** + * homa_rpc_alloc_server() - Allocate and initialize a server RPC (one that is + * used to manage an incoming request). If appropriate, the RPC will also + * be handed off (we do it here, while we have the socket locked, to avoid + * acquiring the socket lock a second time later for the handoff). + * @hsk: Socket that owns this RPC. + * @source: IP address (network byte order) of the RPC's client. + * @h: Header for the first data packet received for this RPC; used + * to initialize the RPC. + * @created: Will be set to 1 if a new RPC was created and 0 if an + * existing RPC was found. + * + * Return: A pointer to a new RPC, which is locked, or a negative errno + * if an error occurred. If there is already an RPC corresponding + * to h, then it is returned instead of creating a new RPC. + */ +struct homa_rpc *homa_rpc_alloc_server(struct homa_sock *hsk, + const struct in6_addr *source, + struct homa_data_hdr *h, int *created) + __cond_acquires(srpc->bucket->lock) +{ + u64 id = homa_local_id(h->common.sender_id); + struct homa_rpc_bucket *bucket; + struct homa_rpc *srpc = NULL; + int err; + + if (!hsk->buffer_pool) + return ERR_PTR(-ENOMEM); + + /* Lock the bucket, and make sure no-one else has already created + * the desired RPC. + */ + bucket = homa_server_rpc_bucket(hsk, id); + homa_bucket_lock(bucket, id); + hlist_for_each_entry(srpc, &bucket->rpcs, hash_links) { + if (srpc->id == id && + srpc->dport == ntohs(h->common.sport) && + ipv6_addr_equal(&srpc->peer->addr, source)) { + /* RPC already exists; just return it instead + * of creating a new RPC. + */ + *created = 0; + return srpc; + } + } + + /* Initialize fields that don't require the socket lock. */ + srpc = kzalloc(sizeof(*srpc), GFP_ATOMIC); + if (!srpc) { + err = -ENOMEM; + goto error; + } + srpc->hsk = hsk; + srpc->bucket = bucket; + srpc->state = RPC_INCOMING; + refcount_set(&srpc->refs, 1); + srpc->peer = homa_peer_get(hsk, source); + if (IS_ERR(srpc->peer)) { + err = PTR_ERR(srpc->peer); + srpc->peer = NULL; + goto error; + } + srpc->dport = ntohs(h->common.sport); + srpc->id = id; + srpc->msgin.length = -1; + srpc->msgout.length = -1; + INIT_LIST_HEAD(&srpc->ready_links); + INIT_LIST_HEAD(&srpc->buf_links); + INIT_LIST_HEAD(&srpc->dead_links); + INIT_LIST_HEAD(&srpc->throttled_links); + srpc->resend_timer_ticks = hsk->homa->timer_ticks; + srpc->magic = HOMA_RPC_MAGIC; + srpc->start_time = homa_clock(); + err = homa_message_in_init(srpc, ntohl(h->message_length)); + if (err != 0) + goto error; + + /* Initialize fields that require socket to be locked. */ + homa_sock_lock(hsk); + if (hsk->shutdown) { + homa_sock_unlock(hsk); + err = -ESHUTDOWN; + goto error; + } + hlist_add_head(&srpc->hash_links, &bucket->rpcs); + list_add_tail_rcu(&srpc->active_links, &hsk->active_rpcs); + homa_sock_unlock(hsk); + if (ntohl(h->seg.offset) == 0 && srpc->msgin.num_bpages > 0) { + set_bit(RPC_PKTS_READY, &srpc->flags); + homa_rpc_handoff(srpc); + } + *created = 1; + return srpc; + +error: + homa_bucket_unlock(bucket, id); + if (srpc && srpc->peer) + homa_peer_release(srpc->peer); + kfree(srpc); + return ERR_PTR(err); +} + +/** + * homa_rpc_acked() - This function is invoked when an ack is received + * for an RPC; if the RPC still exists, is freed. + * @hsk: Socket on which the ack was received. May or may not correspond + * to the RPC, but can sometimes be used to avoid a socket lookup. + * @saddr: Source address from which the act was received (the client + * node for the RPC) + * @ack: Information about an RPC from @saddr that may now be deleted + * safely. + */ +void homa_rpc_acked(struct homa_sock *hsk, const struct in6_addr *saddr, + struct homa_ack *ack) +{ + u16 server_port = ntohs(ack->server_port); + u64 id = homa_local_id(ack->client_id); + struct homa_sock *hsk2 = hsk; + struct homa_rpc *rpc; + + if (hsk->port != server_port) { + /* Without RCU, sockets other than hsk can be deleted + * out from under us. + */ + hsk2 = homa_sock_find(hsk->hnet, server_port); + if (!hsk2) + return; + } + rpc = homa_rpc_find_server(hsk2, saddr, id); + if (rpc) { + homa_rpc_end(rpc); + homa_rpc_unlock(rpc); /* Locked by homa_rpc_find_server. */ + } + if (hsk->port != server_port) + sock_put(&hsk2->sock); +} + +/** + * homa_rpc_end() - Stop all activity on an RPC and begin the process of + * releasing its resources; this process will continue in the background + * until homa_rpc_reap eventually completes it. + * @rpc: Structure to clean up, or NULL. Must be locked. Its socket must + * not be locked. Once this function returns the caller should not + * use the RPC except to unlock it. + */ +void homa_rpc_end(struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + /* The goal for this function is to make the RPC inaccessible, + * so that no other code will ever access it again. However, don't + * actually release resources or tear down the internal structure + * of the RPC; leave that to homa_rpc_reap, which runs later. There + * are two reasons for this. First, releasing resources may be + * expensive, so we don't want to keep the caller waiting; homa_rpc_reap + * will run in situations where there is time to spare. Second, there + * may be other code that currently has pointers to this RPC but + * temporarily released the lock (e.g. to copy data to/from user space). + * It isn't safe to clean up until that code has finished its work and + * released any pointers to the RPC (homa_rpc_reap will ensure that + * this has happened). So, this function should only make changes + * needed to make the RPC inaccessible. + */ + if (!rpc || rpc->state == RPC_DEAD) + return; + rpc->state = RPC_DEAD; + rpc->error = -EINVAL; + + /* Unlink from all lists, so no-one will ever find this RPC again. */ + homa_sock_lock(rpc->hsk); + __hlist_del(&rpc->hash_links); + list_del_rcu(&rpc->active_links); + list_add_tail(&rpc->dead_links, &rpc->hsk->dead_rpcs); + __list_del_entry(&rpc->ready_links); + __list_del_entry(&rpc->buf_links); + homa_interest_notify_private(rpc); + + if (rpc->msgin.length >= 0) { + rpc->hsk->dead_skbs += skb_queue_len(&rpc->msgin.packets); + while (1) { + struct homa_gap *gap; + + gap = list_first_entry_or_null(&rpc->msgin.gaps, + struct homa_gap, links); + if (!gap) + break; + list_del(&gap->links); + kfree(gap); + } + } + rpc->hsk->dead_skbs += rpc->msgout.num_skbs; + if (rpc->hsk->dead_skbs > rpc->hsk->homa->max_dead_buffs) + /* This update isn't thread-safe; it's just a + * statistic so it's OK if updates occasionally get + * missed. + */ + rpc->hsk->homa->max_dead_buffs = rpc->hsk->dead_skbs; + + homa_sock_unlock(rpc->hsk); +} + +/** + * homa_rpc_abort() - Terminate an RPC. + * @rpc: RPC to be terminated. Must be locked by caller. + * @error: A negative errno value indicating the error that caused the abort. + * If this is a client RPC, the error will be returned to the + * application; if it's a server RPC, the error is ignored and + * we just free the RPC. + */ +void homa_rpc_abort(struct homa_rpc *rpc, int error) + __must_hold(rpc->bucket->lock) +{ + if (!homa_is_client(rpc->id)) { + homa_rpc_end(rpc); + return; + } + rpc->error = error; + homa_rpc_handoff(rpc); +} + +/** + * homa_abort_rpcs() - Abort all RPCs to/from a particular peer. + * @homa: Overall data about the Homa protocol implementation. + * @addr: Address (network order) of the destination whose RPCs are + * to be aborted. + * @port: If nonzero, then RPCs will only be aborted if they were + * targeted at this server port. + * @error: Negative errno value indicating the reason for the abort. + */ +void homa_abort_rpcs(struct homa *homa, const struct in6_addr *addr, + int port, int error) +{ + struct homa_socktab_scan scan; + struct homa_sock *hsk; + struct homa_rpc *rpc; + + for (hsk = homa_socktab_start_scan(homa->socktab, &scan); hsk; + hsk = homa_socktab_next(&scan)) { + /* Skip the (expensive) lock acquisition if there's no + * work to do. + */ + if (list_empty(&hsk->active_rpcs)) + continue; + if (!homa_protect_rpcs(hsk)) + continue; + rcu_read_lock(); + list_for_each_entry_rcu(rpc, &hsk->active_rpcs, active_links) { + if (!ipv6_addr_equal(&rpc->peer->addr, addr)) + continue; + if (port && rpc->dport != port) + continue; + homa_rpc_lock(rpc); + homa_rpc_abort(rpc, error); + homa_rpc_unlock(rpc); + } + rcu_read_unlock(); + homa_unprotect_rpcs(hsk); + } + homa_socktab_end_scan(&scan); +} + +/** + * homa_rpc_reap() - Invoked to release resources associated with dead + * RPCs for a given socket. + * @hsk: Homa socket that may contain dead RPCs. Must not be locked by the + * caller; this function will lock and release. + * @reap_all: False means do a small chunk of work; there may still be + * unreaped RPCs on return. True means reap all dead RPCs for + * hsk. Will busy-wait if reaping has been disabled for some RPCs. + * + * Return: A return value of 0 means that we ran out of work to do; calling + * again will do no work (there could be unreaped RPCs, but if so, + * they cannot currently be reaped). A value greater than zero means + * there is still more reaping work to be done. + */ +int homa_rpc_reap(struct homa_sock *hsk, bool reap_all) +{ + /* RPC Reaping Strategy: + * + * (Note: there are references to this comment elsewhere in the + * Homa code) + * + * Most of the cost of reaping comes from freeing sk_buffs; this can be + * quite expensive for RPCs with long messages. + * + * The natural time to reap is when homa_rpc_end is invoked to + * terminate an RPC, but this doesn't work for two reasons. First, + * there may be outstanding references to the RPC; it cannot be reaped + * until all of those references have been released. Second, reaping + * is potentially expensive and RPC termination could occur in + * homa_softirq when there are short messages waiting to be processed. + * Taking time to reap a long RPC could result in significant delays + * for subsequent short RPCs. + * + * Thus Homa doesn't reap immediately in homa_rpc_end. Instead, dead + * RPCs are queued up and reaping occurs in this function, which is + * invoked later when it is less likely to impact latency. The + * challenge is to do this so that (a) we don't allow large numbers of + * dead RPCs to accumulate and (b) we minimize the impact of reaping + * on latency. + * + * The primary place where homa_rpc_reap is invoked is when threads + * are waiting for incoming messages. The thread has nothing else to + * do (it may even be polling for input), so reaping can be performed + * with no latency impact on the application. However, if a machine + * is overloaded then it may never wait, so this mechanism isn't always + * sufficient. + * + * Homa now reaps in two other places, if reaping while waiting for + * messages isn't adequate: + * 1. If too may dead skbs accumulate, then homa_timer will call + * homa_rpc_reap. + * 2. If this timer thread cannot keep up with all the reaping to be + * done then as a last resort homa_dispatch_pkts will reap in small + * increments (a few sk_buffs or RPCs) for every incoming batch + * of packets . This is undesirable because it will impact Homa's + * performance. + * + * During the introduction of homa_pools for managing input + * buffers, freeing of packets for incoming messages was moved to + * homa_copy_to_user under the assumption that this code wouldn't be + * on the critical path. However, there is evidence that with + * fast networks (e.g. 100 Gbps) copying to user space is the + * bottleneck for incoming messages, and packet freeing takes about + * 20-25% of the total time in homa_copy_to_user. So, it may eventually + * be desirable to remove packet freeing out of homa_copy_to_user. + */ +#define BATCH_MAX 10 + struct homa_rpc *rpcs[BATCH_MAX]; + struct sk_buff *skbs[BATCH_MAX]; + int num_skbs, num_rpcs; + bool checked_all_rpcs; + struct homa_rpc *rpc; + struct homa_rpc *tmp; + int i, batch_size; + int skbs_to_reap; + int rx_frees; + + /* Each iteration through the following loop will reap + * BATCH_MAX skbs. + */ + skbs_to_reap = hsk->homa->reap_limit; + checked_all_rpcs = list_empty(&hsk->dead_rpcs); + while (!checked_all_rpcs) { + batch_size = BATCH_MAX; + if (!reap_all) { + if (skbs_to_reap <= 0) + break; + if (batch_size > skbs_to_reap) + batch_size = skbs_to_reap; + skbs_to_reap -= batch_size; + } + num_skbs = 0; + num_rpcs = 0; + rx_frees = 0; + + homa_sock_lock(hsk); + if (atomic_read(&hsk->protect_count)) { + homa_sock_unlock(hsk); + return 0; + } + + /* Collect buffers and freeable RPCs. */ + list_for_each_entry_safe(rpc, tmp, &hsk->dead_rpcs, + dead_links) { + int refs; + + /* Make sure that all outstanding uses of the RPC have + * completed. We can read the reference count safely + * only when we're holding the lock. Note: it isn't + * safe to block while locking the RPC here, since we + * hold the socket lock. + */ + if (homa_rpc_try_lock(rpc)) { + refs = refcount_read(&rpc->refs); + homa_rpc_unlock(rpc); + } else { + refs = 2; + } + if (refs > 1) + continue; + + /* For Tx sk_buffs, collect them here but defer + * freeing until after releasing the socket lock. + */ + if (rpc->msgout.length >= 0) { + while (1) { + struct sk_buff *skb; + + skb = rpc->msgout.to_free; + if (!skb) { + skb = rpc->msgout.packets; + if (!skb) + break; + rpc->msgout.to_free = skb; + rpc->msgout.packets = NULL; + } + + /* Don't reap RPC if anyone besides + * us has a reference to the skb. + */ + if (refcount_read(&skb->users) > 1) + goto next_rpc; + skbs[num_skbs] = skb; + rpc->msgout.to_free = + homa_get_skb_info(skb)->next_skb; + num_skbs++; + rpc->msgout.num_skbs--; + if (num_skbs >= batch_size) + goto release; + } + } + + /* In the normal case rx sk_buffs will already have been + * freed before we got here. Thus it's OK to free + * immediately in rare situations where there are + * buffers left. + */ + if (rpc->msgin.length >= 0 && + !skb_queue_empty_lockless(&rpc->msgin.packets)) { + rx_frees += skb_queue_len(&rpc->msgin.packets); + __skb_queue_purge_reason(&rpc->msgin.packets, + SKB_CONSUMED); + } + + /* If we get here, it means all packets have been + * removed from the RPC. + */ + rpcs[num_rpcs] = rpc; + num_rpcs++; + list_del(&rpc->dead_links); + WARN_ON(refcount_sub_and_test(rpc->msgout.skb_memory, + &hsk->sock.sk_wmem_alloc)); + if (num_rpcs >= batch_size) + goto release; + +next_rpc: + continue; + } + checked_all_rpcs = true; + + /* Free all of the collected resources; release the socket + * lock while doing this. + */ +release: + hsk->dead_skbs -= num_skbs + rx_frees; + homa_sock_unlock(hsk); + homa_skb_free_many_tx(hsk->homa, skbs, num_skbs); + for (i = 0; i < num_rpcs; i++) { + rpc = rpcs[i]; + + if (unlikely(rpc->msgin.num_bpages)) + homa_pool_release_buffers(rpc->hsk->buffer_pool, + rpc->msgin.num_bpages, + rpc->msgin.bpage_offsets); + if (rpc->msgin.length >= 0) { + while (1) { + struct homa_gap *gap; + + gap = list_first_entry_or_null( + &rpc->msgin.gaps, + struct homa_gap, + links); + if (!gap) + break; + list_del(&gap->links); + kfree(gap); + } + } + if (rpc->peer) { + homa_peer_release(rpc->peer); + rpc->peer = NULL; + } + rpc->state = 0; + rpc->magic = 0; + kfree(rpc); + } + homa_sock_wakeup_wmem(hsk); + } + homa_pool_check_waiting(hsk->buffer_pool); + return !checked_all_rpcs; +} + +/** + * homa_abort_sock_rpcs() - Abort all outgoing (client-side) RPCs on a given + * socket. + * @hsk: Socket whose RPCs should be aborted. + * @error: Zero means that the aborted RPCs should be freed immediately. + * A nonzero value means that the RPCs should be marked + * complete, so that they can be returned to the application; + * this value (a negative errno) will be returned from + * recvmsg. + */ +void homa_abort_sock_rpcs(struct homa_sock *hsk, int error) +{ + struct homa_rpc *rpc; + + if (list_empty(&hsk->active_rpcs)) + return; + if (!homa_protect_rpcs(hsk)) + return; + rcu_read_lock(); + list_for_each_entry_rcu(rpc, &hsk->active_rpcs, active_links) { + if (!homa_is_client(rpc->id)) + continue; + homa_rpc_lock(rpc); + if (rpc->state == RPC_DEAD) { + homa_rpc_unlock(rpc); + continue; + } + if (error) + homa_rpc_abort(rpc, error); + else + homa_rpc_end(rpc); + homa_rpc_unlock(rpc); + } + rcu_read_unlock(); + homa_unprotect_rpcs(hsk); +} + +/** + * homa_rpc_find_client() - Locate client-side information about the RPC that + * a packet belongs to, if there is any. Thread-safe without socket lock. + * @hsk: Socket via which packet was received. + * @id: Unique identifier for the RPC. + * + * Return: A pointer to the homa_rpc for this id, or NULL if none. + * The RPC will be locked; the caller must eventually unlock it + * by invoking homa_rpc_unlock. + */ +struct homa_rpc *homa_rpc_find_client(struct homa_sock *hsk, u64 id) + __cond_acquires(crpc->bucket->lock) +{ + struct homa_rpc_bucket *bucket = homa_client_rpc_bucket(hsk, id); + struct homa_rpc *crpc; + + homa_bucket_lock(bucket, id); + hlist_for_each_entry(crpc, &bucket->rpcs, hash_links) { + if (crpc->id == id) + return crpc; + } + homa_bucket_unlock(bucket, id); + return NULL; +} + +/** + * homa_rpc_find_server() - Locate server-side information about the RPC that + * a packet belongs to, if there is any. Thread-safe without socket lock. + * @hsk: Socket via which packet was received. + * @saddr: Address from which the packet was sent. + * @id: Unique identifier for the RPC (must have server bit set). + * + * Return: A pointer to the homa_rpc matching the arguments, or NULL + * if none. The RPC will be locked; the caller must eventually + * unlock it by invoking homa_rpc_unlock. + */ +struct homa_rpc *homa_rpc_find_server(struct homa_sock *hsk, + const struct in6_addr *saddr, u64 id) + __cond_acquires(srpc->bucket->lock) +{ + struct homa_rpc_bucket *bucket = homa_server_rpc_bucket(hsk, id); + struct homa_rpc *srpc; + + homa_bucket_lock(bucket, id); + hlist_for_each_entry(srpc, &bucket->rpcs, hash_links) { + if (srpc->id == id && ipv6_addr_equal(&srpc->peer->addr, saddr)) + return srpc; + } + homa_bucket_unlock(bucket, id); + return NULL; +} + +/** + * homa_rpc_get_info() - Extract information from an RPC for returning to + * an application via the HOMAIOCINFO ioctl. + * @rpc: RPC for which information is desired. + * @info: Structure in which to store the information. + */ +void homa_rpc_get_info(struct homa_rpc *rpc, struct homa_rpc_info *info) +{ + struct homa_gap *gap; + + memset(info, 0, sizeof(*info)); + info->id = rpc->id; + if (rpc->hsk->inet.sk.sk_family == AF_INET6) { + info->peer.in6.sin6_family = AF_INET6; + info->peer.in6.sin6_addr = rpc->peer->addr; + info->peer.in6.sin6_port = htons(rpc->dport); + } else { + info->peer.in6.sin6_family = AF_INET; + info->peer.in4.sin_addr.s_addr = ipv6_to_ipv4(rpc->peer->addr); + info->peer.in4.sin_port = htons(rpc->dport); + } + info->completion_cookie = rpc->completion_cookie; + if (rpc->msgout.length >= 0) { + info->tx_length = rpc->msgout.length; + info->tx_sent = rpc->msgout.next_xmit_offset; + info->tx_granted = rpc->msgout.length; + } else { + info->tx_length = -1; + } + if (rpc->msgin.length >= 0) { + info->rx_length = rpc->msgin.length; + info->rx_remaining = rpc->msgin.bytes_remaining; + list_for_each_entry(gap, &rpc->msgin.gaps, links) { + info->rx_gaps++; + info->rx_gap_bytes += gap->end - gap->start; + } + info->rx_granted = rpc->msgin.length; + if (skb_queue_len(&rpc->msgin.packets) > 0) + info->flags |= HOMA_RPC_RX_COPY; + } else { + info->rx_length = -1; + } + if (!list_empty(&rpc->buf_links)) + info->flags |= HOMA_RPC_BUF_STALL; + if (!list_empty(&rpc->ready_links) && + rpc->msgin.bytes_remaining == 0 && + skb_queue_len(&rpc->msgin.packets) == 0) + info->flags |= HOMA_RPC_RX_READY; + if (rpc->flags & RPC_PRIVATE) + info->flags |= HOMA_RPC_PRIVATE; +} diff --git a/net/homa/homa_rpc.h b/net/homa/homa_rpc.h new file mode 100644 index 000000000000..3591bd32c5c2 --- /dev/null +++ b/net/homa/homa_rpc.h @@ -0,0 +1,532 @@ +/* SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ */ + +/* This file defines homa_rpc and related structs. */ + +#ifndef _HOMA_RPC_H +#define _HOMA_RPC_H + +#include +#include +#include + +#include "homa_sock.h" +#include "homa_wire.h" + +/* Forward references. */ +struct homa_ack; + +/** + * struct homa_message_out - Describes a message (either request or response) + * for which this machine is the sender. + */ +struct homa_message_out { + /** + * @length: Total bytes in message (excluding headers). A value + * less than 0 means this structure is uninitialized and therefore + * not in use (all other fields will be zero in this case). + */ + int length; + + /** @num_skbs: Total number of buffers currently in @to_free. */ + int num_skbs; + + /** + * @skb_memory: Total number of bytes of memory occupied by + * the sk_buffs for this message. + */ + int skb_memory; + + /** + * @copied_from_user: Number of bytes of the message that have + * been copied from user space into skbs in @packets. + */ + int copied_from_user; + + /** + * @packets: Singly-linked list of all packets in message, linked + * using homa_skb_info->next_skb. The list is in order of offset in + * the message (offset 0 first); each sk_buff can potentially contain + * multiple data_segments, which will be split into separate packets + * by GSO. This list grows gradually as data is copied in from user + * space, so it may not be complete. + */ + struct sk_buff *packets; + + /** + * @next_xmit: Pointer to pointer to next packet to transmit (will + * either refer to @packets or homa_skb_info->next_skb for some skb + * in @packets). + */ + struct sk_buff **next_xmit; + + /** + * @next_xmit_offset: All bytes in the message, up to but not + * including this one, have been passed to ip_queue_xmit or + * ip6_xmit. + */ + int next_xmit_offset; + + /** + * @first_not_tx: All packets in @packets preceding this one have + * been confirmed to have been transmitted by the NIC (the driver + * has released its reference). NULL means all packets are known to + * have been transmitted. Used by homa_rpc_tx_end. + */ + struct sk_buff *first_not_tx; + + /** + * @to_free: Singly-linked list of packets that must be freed by + * homa_rpc_reap. Initially holds retransmitted packets, but + * eventually includes the packets in @packets. homa_rpc_reap uses + * this list to ensure that all tx packets have been freed by the + * IP stack before it frees the homa_rpc (otherwise homa_qdisc might + * try to access the RPC via a packet's homa_skb_info). Note: I + * considered using skb->destructor to release a reference on the RPC, + * but this does not appear to be reliable because (a) skb->destructor + * may be overwritten and (b) it may be called before the skb has + * cleared the tx pipeline (via skb_orphan?). Also, need to retain + * @packets in case they are needed for retransmission. + */ + struct sk_buff *to_free; + + /** + * @init_time: homa_clock() time when this structure was initialized. + * Used to find the oldest outgoing message. + */ + u64 init_time; +}; + +/** + * struct homa_gap - Represents a range of bytes within a message that have + * not yet been received. + */ +struct homa_gap { + /** @start: offset of first byte in this gap. */ + int start; + + /** @end: offset of byte just after last one in this gap. */ + int end; + + /** + * @time: homa_clock() time when the gap was first detected. + * As of 7/2024 this isn't used for anything. + */ + u64 time; + + /** @links: for linking into list in homa_message_in. */ + struct list_head links; +}; + +/** + * struct homa_message_in - Holds the state of a message received by + * this machine; used for both requests and responses. + */ +struct homa_message_in { + /** + * @length: Payload size in bytes. -1 means this structure is + * uninitialized and therefore not in use. + */ + int length; + + /** + * @packets: DATA packets for this message that have been received but + * not yet copied to user space (ordered by increasing offset). The + * lock in this structure is not used (the RPC lock is used instead). + */ + struct sk_buff_head packets; + + /** + * @recv_end: Offset of the byte just after the highest one that + * has been received so far. + */ + int recv_end; + + /** + * @gaps: List of homa_gaps describing all of the bytes with + * offsets less than @recv_end that have not yet been received. + */ + struct list_head gaps; + + /** + * @bytes_remaining: Amount of data for this message that has + * not yet been received; will determine the message's priority. + */ + int bytes_remaining; + + /** + * @num_bpages: The number of entries in @bpage_offsets used for this + * message (0 means buffers not allocated yet). + */ + u32 num_bpages; + + /** + * @bpage_offsets: Describes buffer space allocated for this message. + * Each entry is an offset from the start of the buffer region. + * All but the last pointer refer to areas of size HOMA_BPAGE_SIZE. + */ + u32 bpage_offsets[HOMA_MAX_BPAGES]; + +}; + +/** + * struct homa_rpc - One of these structures exists for each active + * RPC. The same structure is used to manage both outgoing RPCs on + * clients and incoming RPCs on servers. + */ +struct homa_rpc { + /** @hsk: Socket that owns the RPC. */ + struct homa_sock *hsk; + + /** + * @bucket: Pointer to the bucket in hsk->client_rpc_buckets or + * hsk->server_rpc_buckets where this RPC is linked. Used primarily + * for locking the RPC (which is done by locking its bucket). + */ + struct homa_rpc_bucket *bucket; + + /** + * @state: The current state of this RPC: + * + * @RPC_OUTGOING: The RPC is waiting for @msgout to be transmitted + * to the peer. + * @RPC_INCOMING: The RPC is waiting for data @msgin to be received + * from the peer; at least one packet has already + * been received. + * @RPC_IN_SERVICE: Used only for server RPCs: the request message + * has been read from the socket, but the response + * message has not yet been presented to the kernel. + * @RPC_DEAD: RPC has been deleted and is waiting to be + * reaped. In some cases, information in the RPC + * structure may be accessed in this state. + * + * Client RPCs pass through states in the following order: + * RPC_OUTGOING, RPC_INCOMING, RPC_DEAD. + * + * Server RPCs pass through states in the following order: + * RPC_INCOMING, RPC_IN_SERVICE, RPC_OUTGOING, RPC_DEAD. + */ + enum { + RPC_OUTGOING = 5, + RPC_INCOMING = 6, + RPC_IN_SERVICE = 8, + RPC_DEAD = 9 + } state; + + /** + * @flags: Additional state information: an OR'ed combination of + * various single-bit flags. See below for definitions. Must be + * manipulated with atomic operations because some of the manipulations + * occur without holding the RPC lock. + */ + unsigned long flags; + + /* Valid bit numbers for @flags: + * RPC_PKTS_READY - The RPC has input packets ready to be + * copied to user space. + * APP_NEEDS_LOCK - Means that code in the application thread + * needs the RPC lock (e.g. so it can start + * copying data to user space) so others + * (e.g. SoftIRQ processing) should relinquish + * the lock ASAP. Without this, SoftIRQ can + * lock out the application for a long time, + * preventing data copies to user space from + * starting (and they limit throughput at + * high network speeds). + * RPC_PRIVATE - This RPC will be waited on in "private" mode, + * where the app explicitly requests the + * response from this particular RPC. + */ +#define RPC_PKTS_READY 0 +#define APP_NEEDS_LOCK 1 +#define RPC_PRIVATE 2 + + /** + * @refs: Number of references to this RPC, including one for each + * unmatched call to homa_rpc_hold plus one for the socket's reference + * in either active_rpcs or dead_rpcs. + */ + refcount_t refs; + + /** + * @peer: Information about the other machine (the server, if + * this is a client RPC, or the client, if this is a server RPC). + * If non-NULL then we own a reference on the object. + */ + struct homa_peer *peer; + + /** @dport: Port number on @peer that will handle packets. */ + u16 dport; + + /** + * @id: Unique identifier for the RPC among all those issued + * from its port. The low-order bit indicates whether we are + * server (1) or client (0) for this RPC. + */ + u64 id; + + /** + * @completion_cookie: Only used on clients. Contains identifying + * information about the RPC provided by the application; returned to + * the application with the RPC's result. + */ + u64 completion_cookie; + + /** + * @error: Only used on clients. If nonzero, then the RPC has + * failed and the value is a negative errno that describes the + * problem. + */ + int error; + + /** + * @msgin: Information about the message we receive for this RPC + * (for server RPCs this is the request, for client RPCs this is the + * response). + */ + struct homa_message_in msgin; + + /** + * @msgout: Information about the message we send for this RPC + * (for client RPCs this is the request, for server RPCs this is the + * response). + */ + struct homa_message_out msgout; + + /** + * @hash_links: Used to link this object into a hash bucket for + * either @hsk->client_rpc_buckets (for a client RPC), or + * @hsk->server_rpc_buckets (for a server RPC). + */ + struct hlist_node hash_links; + + /** + * @ready_links: Used to link this object into @hsk->ready_rpcs. + */ + struct list_head ready_links; + + /** + * @buf_links: Used to link this RPC into @hsk->waiting_for_bufs. + * If the RPC isn't on @hsk->waiting_for_bufs, this is an empty + * list pointing to itself. + */ + struct list_head buf_links; + + /** + * @active_links: For linking this object into @hsk->active_rpcs. + * The next field will be LIST_POISON1 if this RPC hasn't yet been + * linked into @hsk->active_rpcs. Access with RCU. + */ + struct list_head active_links; + + /** @dead_links: For linking this object into @hsk->dead_rpcs. */ + struct list_head dead_links; + + /** + * @private_interest: If there is a thread waiting for this RPC in + * homa_wait_private, then this points to that thread's interest. + */ + struct homa_interest *private_interest; + + /** + * @throttled_links: Used to link this RPC into + * homa->pacer.throttled_rpcs. If this RPC isn't in + * homa->pacer.throttled_rpcs, this is an empty + * list pointing to itself. + */ + struct list_head throttled_links; + + /** + * @silent_ticks: Number of times homa_timer has been invoked + * since the last time a packet indicating progress was received + * for this RPC, so we don't need to send a resend for a while. + */ + int silent_ticks; + + /** + * @resend_timer_ticks: Value of homa->timer_ticks the last time + * we sent a RESEND for this RPC. + */ + u32 resend_timer_ticks; + + /** + * @done_timer_ticks: The value of homa->timer_ticks the first + * time we noticed that this (server) RPC is done (all response + * packets have been transmitted), so we're ready for an ack. + * Zero means we haven't reached that point yet. + */ + u32 done_timer_ticks; + + /** + * @magic: when the RPC is alive, this holds a distinct value that + * is unlikely to occur naturally. The value is cleared when the + * RPC is reaped, so we can detect accidental use of an RPC after + * it has been reaped. + */ +#define HOMA_RPC_MAGIC 0xdeadbeef + int magic; + + /** + * @start_time: homa_clock() time when this RPC was created. Used + * occasionally for testing. + */ + u64 start_time; +}; + +void homa_abort_rpcs(struct homa *homa, const struct in6_addr *addr, + int port, int error); +void homa_abort_sock_rpcs(struct homa_sock *hsk, int error); +void homa_rpc_abort(struct homa_rpc *crpc, int error); +void homa_rpc_acked(struct homa_sock *hsk, const struct in6_addr *saddr, + struct homa_ack *ack); +struct homa_rpc + *homa_rpc_alloc_client(struct homa_sock *hsk, + const union sockaddr_in_union *dest); +struct homa_rpc + *homa_rpc_alloc_server(struct homa_sock *hsk, + const struct in6_addr *source, + struct homa_data_hdr *h, int *created); +void homa_rpc_end(struct homa_rpc *rpc); +struct homa_rpc + *homa_rpc_find_client(struct homa_sock *hsk, u64 id); +struct homa_rpc + *homa_rpc_find_server(struct homa_sock *hsk, + const struct in6_addr *saddr, u64 id); +void homa_rpc_get_info(struct homa_rpc *rpc, struct homa_rpc_info *info); +int homa_rpc_reap(struct homa_sock *hsk, bool reap_all); + +/** + * homa_rpc_lock() - Acquire the lock for an RPC. + * @rpc: RPC to lock. + */ +static inline void homa_rpc_lock(struct homa_rpc *rpc) + __acquires(rpc->bucket->lock) +{ + homa_bucket_lock(rpc->bucket, rpc->id); +} + +/** + * homa_rpc_try_lock() - Acquire the lock for an RPC if it is available. + * @rpc: RPC to lock. + * Return: Nonzero if lock was successfully acquired, zero if it is + * currently owned by someone else. + */ +static inline int homa_rpc_try_lock(struct homa_rpc *rpc) + __cond_acquires(rpc->bucket->lock) +{ + if (!spin_trylock_bh(&rpc->bucket->lock)) + return 0; + return 1; +} + +/** + * homa_rpc_lock_preempt() - Same as homa_rpc_lock, except sets the + * APP_NEEDS_LOCK flags while waiting to encourage the existing lock + * owner to relinquish the lock. + * @rpc: RPC to lock. + */ +static inline void homa_rpc_lock_preempt(struct homa_rpc *rpc) + __acquires(rpc->bucket->lock) +{ + set_bit(APP_NEEDS_LOCK, &rpc->flags); + homa_bucket_lock(rpc->bucket, rpc->id); + clear_bit(APP_NEEDS_LOCK, &rpc->flags); +} + +/** + * homa_rpc_unlock() - Release the lock for an RPC. + * @rpc: RPC to unlock. + */ +static inline void homa_rpc_unlock(struct homa_rpc *rpc) + __releases(rpc->bucket->lock) +{ + homa_bucket_unlock(rpc->bucket, rpc->id); +} + +/** + * homa_protect_rpcs() - Ensures that no RPCs will be reaped for a given + * socket until homa_sock_unprotect is called. Typically used by functions + * that want to scan the active RPCs for a socket without holding the socket + * lock. Multiple calls to this function may be in effect at once. See + * "Homa Locking Strategy" in homa_impl.h for more info on why this function + * is needed. + * @hsk: Socket whose RPCs should be protected. Must not be locked + * by the caller; will be locked here. + * + * Return: 1 for success, 0 if the socket has been shutdown, in which + * case its RPCs cannot be protected. + */ +static inline int homa_protect_rpcs(struct homa_sock *hsk) +{ + int result; + + homa_sock_lock(hsk); + result = !hsk->shutdown; + if (result) + atomic_inc(&hsk->protect_count); + homa_sock_unlock(hsk); + return result; +} + +/** + * homa_unprotect_rpcs() - Cancel the effect of a previous call to + * homa_sock_protect(), so that RPCs can once again be reaped. + * @hsk: Socket whose RPCs should be unprotected. + */ +static inline void homa_unprotect_rpcs(struct homa_sock *hsk) +{ + atomic_dec(&hsk->protect_count); +} + +/** + * homa_rpc_hold() - Increment the reference count on an RPC, which will + * prevent it from being freed until homa_rpc_put() is called. References + * are taken in two situations: + * 1. An RPC is going to be manipulated by a collection of functions. In + * this case the top-most function that identifies the RPC takes the + * reference; any function that receives an RPC as an argument can + * assume that a reference has been taken on the RPC by some higher + * function on the call stack. + * 2. A pointer to an RPC is stored in an object for use later, such as + * an interest. A reference must be held as long as the pointer remains + * accessible in the object. + * @rpc: RPC on which to take a reference. + */ +static inline void homa_rpc_hold(struct homa_rpc *rpc) +{ + refcount_inc(&rpc->refs); +} + +/** + * homa_rpc_put() - Release a reference on an RPC (cancels the effect of + * a previous call to homa_rpc_put). + * @rpc: RPC to release. + */ +static inline void homa_rpc_put(struct homa_rpc *rpc) +{ + refcount_dec(&rpc->refs); +} + +/** + * homa_is_client(): returns true if we are the client for a particular RPC, + * false if we are the server. + * @id: Id of the RPC in question. + * Return: true if we are the client for RPC id, false otherwise + */ +static inline bool homa_is_client(u64 id) +{ + return (id & 1) == 0; +} + +/** + * homa_rpc_needs_attention() - Returns true if @rpc has failed or if + * its incoming message is ready for attention by an application thread + * (e.g., packets are ready to copy to user space). + * @rpc: RPC to check. + * Return: See above + */ +static inline bool homa_rpc_needs_attention(struct homa_rpc *rpc) +{ + return (rpc->error != 0 || test_bit(RPC_PKTS_READY, &rpc->flags)); +} + +#endif /* _HOMA_RPC_H */ -- 2.43.0 This file does most of the work of transmitting outgoing messages. It is also responsible for copying data from user space into skbs. Signed-off-by: John Ousterhout --- Changes for v16: * Set hsk->error_msg (for HOMAIOCINFO) * Refactor pipelining mechanism in homa_message_out_fill * Retain retransmitted packets until homa_rpc_reap (to ensure that RPCs don't get reaped with retransmitted packets still in the tx pipeline) Changes for v14: * Implement homa_rpc_tx_end function Changes for v13: * Fix bug in homa_resend_data: wasn't fully initializing new skb. * Fix bug in homa_tx_data_pkt_alloc: wasn't allocating enough space in the new skb. Changes for v12: * Move RPC_DEAD check in homa_xmit_data to eliminate window for more complete coverage. Changes for v11: * Cleanup and simplify use of RPC reference counts. Changes for v10: * Revise sparse annotations to eliminate __context__ definition * Remove log messages after alloc errors Changes for v9: * Use new homa_clock abstraction layer * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) * Eliminate sizeof32 define: use sizeof instead Changes for v7: * Implement accounting for bytes in tx skbs * Rename UNKNOWN packet type to RPC_UNKNOWN * Use new RPC reference counts; eliminates need for RCU * Remove locker argument from locking functions * Use u64 and __u64 properly * Fix incorrect skb check in homa_message_out_fill --- net/homa/homa_impl.h | 14 + net/homa/homa_outgoing.c | 566 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 580 insertions(+) create mode 100644 net/homa/homa_outgoing.c diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h index 0be394534ad5..c7570d1f85b2 100644 --- a/net/homa/homa_impl.h +++ b/net/homa/homa_impl.h @@ -362,13 +362,27 @@ static inline bool homa_make_header_avl(struct sk_buff *skb) extern unsigned int homa_net_id; +int homa_fill_data_interleaved(struct homa_rpc *rpc, + struct sk_buff *skb, struct iov_iter *iter); int homa_ioc_info(struct socket *sock, unsigned long arg); +int homa_message_out_fill(struct homa_rpc *rpc, + struct iov_iter *iter, int xmit); +void homa_message_out_init(struct homa_rpc *rpc, int length); void homa_rpc_handoff(struct homa_rpc *rpc); +int homa_rpc_tx_end(struct homa_rpc *rpc); +struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, + struct iov_iter *iter, int offset, + int length, int max_seg_data); int homa_xmit_control(enum homa_packet_type type, void *contents, size_t length, struct homa_rpc *rpc); +int __homa_xmit_control(void *contents, size_t length, + struct homa_peer *peer, struct homa_sock *hsk); +void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk); int homa_message_in_init(struct homa_rpc *rpc, int unsched); +void homa_resend_data(struct homa_rpc *rpc, int start, int end); void homa_xmit_data(struct homa_rpc *rpc); +void __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc); /** * homa_net() - Return the struct homa_net associated with a particular diff --git a/net/homa/homa_outgoing.c b/net/homa/homa_outgoing.c new file mode 100644 index 000000000000..d349739eccb1 --- /dev/null +++ b/net/homa/homa_outgoing.c @@ -0,0 +1,566 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file contains functions related to the sender side of message + * transmission. It also contains utility functions for sending packets. + */ + +#include "homa_impl.h" +#include "homa_peer.h" +#include "homa_rpc.h" +#include "homa_wire.h" + +#include "homa_stub.h" + +/** + * homa_message_out_init() - Initialize rpc->msgout. + * @rpc: RPC whose output message should be initialized. Must be + * locked by caller. + * @length: Number of bytes that will eventually be in rpc->msgout. + */ +void homa_message_out_init(struct homa_rpc *rpc, int length) + __must_hold(rpc->bucket->lock) +{ + memset(&rpc->msgout, 0, sizeof(rpc->msgout)); + rpc->msgout.length = length; + rpc->msgout.next_xmit = &rpc->msgout.packets; + rpc->msgout.init_time = homa_clock(); +} + +/** + * homa_fill_data_interleaved() - This function is invoked to fill in the + * part of a data packet after the initial header, when GSO is being used. + * homa_seg_hdrs must be interleaved with the data to provide the correct + * offset for each segment. + * @rpc: RPC whose output message is being created. Must be + * locked by caller. + * @skb: The packet being filled. The initial homa_data_hdr was + * created and initialized by the caller and the + * homa_skb_info has been filled in with the packet geometry. + * @iter: Describes location(s) of (remaining) message data in user + * space. + * Return: Either a negative errno or 0 (for success). + */ +int homa_fill_data_interleaved(struct homa_rpc *rpc, struct sk_buff *skb, + struct iov_iter *iter) + __must_hold(rpc->bucket->lock) +{ + struct homa_skb_info *homa_info = homa_get_skb_info(skb); + int seg_length = homa_info->seg_length; + int bytes_left = homa_info->data_bytes; + int offset = homa_info->offset; + int err; + + /* Each iteration of the following loop adds info for one packet, + * which includes a homa_seg_hdr followed by the data for that + * segment. The first homa_seg_hdr was already added by the caller. + */ + while (1) { + struct homa_seg_hdr seg; + + if (bytes_left < seg_length) + seg_length = bytes_left; + err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter, + seg_length); + if (err != 0) + return err; + bytes_left -= seg_length; + offset += seg_length; + + if (bytes_left == 0) + break; + + seg.offset = htonl(offset); + err = homa_skb_append_to_frag(rpc->hsk->homa, skb, &seg, + sizeof(seg)); + if (err != 0) + return err; + } + return 0; +} + +/** + * homa_tx_data_pkt_alloc() - Allocate a new sk_buff and fill it with an + * outgoing Homa data packet. The resulting packet will be a GSO packet + * that will eventually be segmented by the NIC. + * @rpc: RPC that packet will belong to (msgout must have been + * initialized). Must be locked by caller. + * @iter: Describes location(s) of (remaining) message data in user + * space. + * @offset: Offset in the message of the first byte of data in this + * packet. + * @length: How many bytes of data to include in the skb. Caller must + * ensure that this amount of data isn't too much for a + * well-formed GSO packet, and that iter has at least this + * much data. + * @max_seg_data: Maximum number of bytes of message data that can go in + * a single segment of the GSO packet. + * Return: A pointer to the new packet, or a negative errno. Sets + * rpc->hsk->error_msg on errors. + */ +struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, + struct iov_iter *iter, int offset, + int length, int max_seg_data) + __must_hold(rpc->bucket->lock) +{ + struct homa_skb_info *homa_info; + struct homa_data_hdr *h; + struct sk_buff *skb; + int err, gso_size; + u64 segs; + + segs = length + max_seg_data - 1; + do_div(segs, max_seg_data); + + /* Initialize the overall skb. */ + skb = homa_skb_alloc_tx(sizeof(struct homa_data_hdr) + length + + (segs - 1) * sizeof(struct homa_seg_hdr)); + if (!skb) { + rpc->hsk->error_msg = "couldn't allocate sk_buff for outgoing message"; + return ERR_PTR(-ENOMEM); + } + + /* Fill in the Homa header (which will be replicated in every + * network packet by GSO). + */ + h = (struct homa_data_hdr *)skb_put(skb, sizeof(struct homa_data_hdr)); + h->common.sport = htons(rpc->hsk->port); + h->common.dport = htons(rpc->dport); + h->common.sequence = htonl(offset); + h->common.type = DATA; + homa_set_doff(h, sizeof(struct homa_data_hdr)); + h->common.checksum = 0; + h->common.sender_id = cpu_to_be64(rpc->id); + h->message_length = htonl(rpc->msgout.length); + h->ack.client_id = 0; + homa_peer_get_acks(rpc->peer, 1, &h->ack); + h->retransmit = 0; + h->seg.offset = htonl(offset); + + homa_info = homa_get_skb_info(skb); + homa_info->next_skb = NULL; + homa_info->wire_bytes = length + segs * (sizeof(struct homa_data_hdr) + + rpc->hsk->ip_header_length + HOMA_ETH_OVERHEAD); + homa_info->data_bytes = length; + homa_info->seg_length = max_seg_data; + homa_info->offset = offset; + homa_info->rpc = rpc; + + if (segs > 1) { + homa_set_doff(h, sizeof(struct homa_data_hdr) - + sizeof(struct homa_seg_hdr)); + gso_size = max_seg_data + sizeof(struct homa_seg_hdr); + err = homa_fill_data_interleaved(rpc, skb, iter); + } else { + gso_size = max_seg_data; + err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter, + length); + } + if (err) { + rpc->hsk->error_msg = "couldn't copy message body into packet buffers"; + goto error; + } + + if (segs > 1) { + skb_shinfo(skb)->gso_segs = segs; + skb_shinfo(skb)->gso_size = gso_size; + + /* It's unclear what gso_type should be used to force software + * GSO; the value below seems to work... + */ + skb_shinfo(skb)->gso_type = + rpc->hsk->homa->gso_force_software ? 0xd : SKB_GSO_TCPV6; + } + return skb; + +error: + homa_skb_free_tx(rpc->hsk->homa, skb); + return ERR_PTR(err); +} + +/** + * homa_message_out_fill() - Initializes information for sending a message + * for an RPC (either request or response); copies the message data from + * user space and (possibly) begins transmitting the message. + * @rpc: RPC for which to send message; this function must not + * previously have been called for the RPC. Must be locked. The RPC + * will be unlocked while copying data, but will be locked again + * before returning. + * @iter: Describes location(s) of message data in user space. + * @xmit: Nonzero means this method should start transmitting packets; + * transmission will be overlapped with copying from user space. + * Zero means the caller will initiate transmission after this + * function returns. + * + * Return: 0 for success, or a negative errno for failure. It is possible + * for the RPC to be freed while this function is active. If that + * happens, copying will cease, -EINVAL will be returned, and + * rpc->state will be RPC_DEAD. Sets rpc->hsk->error_msg on errors. + */ +int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit) + __must_hold(rpc->bucket->lock) +{ + /* Geometry information for packets: + * mtu: largest size for an on-the-wire packet (including + * all headers through IP header, but not Ethernet + * header). + * max_seg_data: largest amount of Homa message data that fits + * in an on-the-wire packet (after segmentation). + * max_gso_data: largest amount of Homa message data that fits + * in a GSO packet (before segmentation). + */ + int mtu, max_seg_data, max_gso_data; + struct sk_buff **last_link; + struct dst_entry *dst; + u64 segs_per_gso; + /* Bytes of the message that haven't yet been copied into skbs. */ + int bytes_left; + int gso_size; + int err; + + if (unlikely(iter->count > HOMA_MAX_MESSAGE_LENGTH || + iter->count == 0)) { + rpc->hsk->error_msg = "message length exceeded HOMA_MAX_MESSAGE_LENGTH"; + err = -EINVAL; + goto error; + } + homa_message_out_init(rpc, iter->count); + + /* Compute the geometry of packets. */ + dst = homa_get_dst(rpc->peer, rpc->hsk); + mtu = dst_mtu(dst); + max_seg_data = mtu - rpc->hsk->ip_header_length + - sizeof(struct homa_data_hdr); + gso_size = dst->dev->gso_max_size; + if (gso_size > rpc->hsk->homa->max_gso_size) + gso_size = rpc->hsk->homa->max_gso_size; + dst_release(dst); + + /* Round gso_size down to an even # of mtus. */ + segs_per_gso = gso_size - rpc->hsk->ip_header_length - + sizeof(struct homa_data_hdr) + + sizeof(struct homa_seg_hdr); + do_div(segs_per_gso, max_seg_data + + sizeof(struct homa_seg_hdr)); + if (segs_per_gso == 0) + segs_per_gso = 1; + max_gso_data = segs_per_gso * max_seg_data; + + homa_skb_stash_pages(rpc->hsk->homa, rpc->msgout.length); + + /* Each iteration of the loop below creates one GSO packet. */ + last_link = &rpc->msgout.packets; + for (bytes_left = rpc->msgout.length; bytes_left > 0; ) { + int skb_data_bytes, offset; + struct sk_buff *skb; + + homa_rpc_unlock(rpc); + skb_data_bytes = max_gso_data; + offset = rpc->msgout.length - bytes_left; + if (skb_data_bytes > bytes_left) + skb_data_bytes = bytes_left; + skb = homa_tx_data_pkt_alloc(rpc, iter, offset, skb_data_bytes, + max_seg_data); + if (IS_ERR(skb)) { + err = PTR_ERR(skb); + homa_rpc_lock(rpc); + goto error; + } + bytes_left -= skb_data_bytes; + + homa_rpc_lock(rpc); + if (rpc->state == RPC_DEAD) { + /* RPC was freed while we were copying. */ + rpc->hsk->error_msg = "rpc deleted while creating outgoing message"; + err = -EINVAL; + homa_skb_free_tx(rpc->hsk->homa, skb); + goto error; + } + *last_link = skb; + last_link = &(homa_get_skb_info(skb)->next_skb); + *last_link = NULL; + rpc->msgout.num_skbs++; + rpc->msgout.skb_memory += skb->truesize; + rpc->msgout.copied_from_user = rpc->msgout.length - bytes_left; + rpc->msgout.first_not_tx = rpc->msgout.packets; + } + refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc); + if (xmit) + homa_xmit_data(rpc); + return 0; + +error: + refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc); + return err; +} + +/** + * homa_xmit_control() - Send a control packet to the other end of an RPC. + * @type: Packet type, such as DATA. + * @contents: Address of buffer containing the contents of the packet. + * Only information after the common header must be valid; + * the common header will be filled in by this function. + * @length: Length of @contents (including the common header). + * @rpc: The packet will go to the socket that handles the other end + * of this RPC. Addressing info for the packet, including all of + * the fields of homa_common_hdr except type, will be set from this. + * Caller must hold either the lock or a reference. + * + * Return: Either zero (for success), or a negative errno value if there + * was a problem. + */ +int homa_xmit_control(enum homa_packet_type type, void *contents, + size_t length, struct homa_rpc *rpc) +{ + struct homa_common_hdr *h = contents; + + h->type = type; + h->sport = htons(rpc->hsk->port); + h->dport = htons(rpc->dport); + h->sender_id = cpu_to_be64(rpc->id); + return __homa_xmit_control(contents, length, rpc->peer, rpc->hsk); +} + +/** + * __homa_xmit_control() - Lower-level version of homa_xmit_control: sends + * a control packet. + * @contents: Address of buffer containing the contents of the packet. + * The caller must have filled in all of the information, + * including the common header. + * @length: Length of @contents. + * @peer: Destination to which the packet will be sent. + * @hsk: Socket via which the packet will be sent. + * + * Return: Either zero (for success), or a negative errno value if there + * was a problem. + */ +int __homa_xmit_control(void *contents, size_t length, struct homa_peer *peer, + struct homa_sock *hsk) +{ + struct homa_common_hdr *h; + struct sk_buff *skb; + int extra_bytes; + int result; + + skb = homa_skb_alloc_tx(HOMA_MAX_HEADER); + if (unlikely(!skb)) + return -ENOBUFS; + skb_dst_set(skb, homa_get_dst(peer, hsk)); + + h = skb_put(skb, length); + memcpy(h, contents, length); + extra_bytes = HOMA_MIN_PKT_LENGTH - length; + if (extra_bytes > 0) + memset(skb_put(skb, extra_bytes), 0, extra_bytes); + skb->ooo_okay = 1; + if (hsk->inet.sk.sk_family == AF_INET6) + result = ip6_xmit(&hsk->inet.sk, skb, &peer->flow.u.ip6, 0, + NULL, 0, 0); + else + result = ip_queue_xmit(&hsk->inet.sk, skb, &peer->flow); + return result; +} + +/** + * homa_xmit_unknown() - Send an RPC_UNKNOWN packet to a peer. + * @skb: Buffer containing an incoming packet; identifies the peer to + * which the RPC_UNKNOWN packet should be sent. + * @hsk: Socket that should be used to send the RPC_UNKNOWN packet. + */ +void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk) +{ + struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data; + struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + struct homa_rpc_unknown_hdr unknown; + struct homa_peer *peer; + + unknown.common.sport = h->dport; + unknown.common.dport = h->sport; + unknown.common.type = RPC_UNKNOWN; + unknown.common.sender_id = cpu_to_be64(homa_local_id(h->sender_id)); + peer = homa_peer_get(hsk, &saddr); + if (!IS_ERR(peer)) + __homa_xmit_control(&unknown, sizeof(unknown), peer, hsk); + homa_peer_release(peer); +} + +/** + * homa_xmit_data() - If an RPC has outbound data packets that are permitted + * to be transmitted according to the scheduling mechanism, arrange for + * them to be sent. + * @rpc: RPC to check for transmittable packets. Must be locked by + * caller. Note: this function will release the RPC lock while + * passing packets through the RPC stack, then reacquire it + * before returning. It is possible that the RPC gets terminated + * when the lock isn't held, in which case the state will + * be RPC_DEAD on return. + */ +void homa_xmit_data(struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + int length; + + while (*rpc->msgout.next_xmit && rpc->state != RPC_DEAD) { + struct sk_buff *skb = *rpc->msgout.next_xmit; + + rpc->msgout.next_xmit = &(homa_get_skb_info(skb)->next_skb); + length = homa_get_skb_info(skb)->data_bytes; + rpc->msgout.next_xmit_offset += length; + + homa_rpc_unlock(rpc); + skb_get(skb); + __homa_xmit_data(skb, rpc); + homa_rpc_lock(rpc); + } +} + +/** + * __homa_xmit_data() - Handles packet transmission stuff that is common + * to homa_xmit_data and homa_resend_data. + * @skb: Packet to be sent. The packet will be freed after transmission + * (and also if errors prevented transmission). + * @rpc: Information about the RPC that the packet belongs to. + */ +void __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc) +{ + skb_dst_set(skb, homa_get_dst(rpc->peer, rpc->hsk)); + + skb->ooo_okay = 1; + skb->ip_summed = CHECKSUM_PARTIAL; + skb->csum_start = skb_transport_header(skb) - skb->head; + skb->csum_offset = offsetof(struct homa_common_hdr, checksum); + if (rpc->hsk->inet.sk.sk_family == AF_INET6) + ip6_xmit(&rpc->hsk->inet.sk, skb, &rpc->peer->flow.u.ip6, + 0, NULL, 0, 0); + else + ip_queue_xmit(&rpc->hsk->inet.sk, skb, &rpc->peer->flow); +} + +/** + * homa_resend_data() - This function is invoked as part of handling RESEND + * requests. It retransmits the packet(s) containing a given range of bytes + * from a message. + * @rpc: RPC for which data should be resent. + * @start: Offset within @rpc->msgout of the first byte to retransmit. + * @end: Offset within @rpc->msgout of the byte just after the last one + * to retransmit. + */ +void homa_resend_data(struct homa_rpc *rpc, int start, int end) + __must_hold(rpc->bucket->lock) +{ + struct homa_skb_info *homa_info; + struct sk_buff *skb; + + if (end <= start) + return; + + /* Each iteration of this loop checks one packet in the message + * to see if it contains segments that need to be retransmitted. + */ + for (skb = rpc->msgout.packets; skb; skb = homa_info->next_skb) { + int seg_offset, offset, seg_length, data_left; + struct homa_data_hdr *h; + + homa_info = homa_get_skb_info(skb); + offset = homa_info->offset; + if (offset >= end) + break; + if (start >= (offset + homa_info->data_bytes)) + continue; + + offset = homa_info->offset; + seg_offset = sizeof(struct homa_data_hdr); + data_left = homa_info->data_bytes; + if (skb_shinfo(skb)->gso_segs <= 1) { + seg_length = data_left; + } else { + seg_length = homa_info->seg_length; + h = (struct homa_data_hdr *)skb_transport_header(skb); + } + for ( ; data_left > 0; data_left -= seg_length, + offset += seg_length, + seg_offset += skb_shinfo(skb)->gso_size) { + struct homa_skb_info *new_homa_info; + struct sk_buff *new_skb; + int err; + + if (seg_length > data_left) + seg_length = data_left; + + if (end <= offset) + goto resend_done; + if ((offset + seg_length) <= start) + continue; + + /* This segment must be retransmitted. */ + new_skb = homa_skb_alloc_tx(sizeof(struct homa_data_hdr) + + seg_length); + if (unlikely(!new_skb)) + goto resend_done; + h = __skb_put_data(new_skb, skb_transport_header(skb), + sizeof(struct homa_data_hdr)); + h->common.sequence = htonl(offset); + h->seg.offset = htonl(offset); + h->retransmit = 1; + err = homa_skb_append_from_skb(rpc->hsk->homa, new_skb, + skb, seg_offset, + seg_length); + if (err != 0) { + pr_err("%s got error %d from homa_skb_append_from_skb\n", + __func__, err); + kfree_skb(new_skb); + goto resend_done; + } + + new_homa_info = homa_get_skb_info(new_skb); + new_homa_info->next_skb = rpc->msgout.to_free; + new_homa_info->wire_bytes = rpc->hsk->ip_header_length + + sizeof(struct homa_data_hdr) + + seg_length + HOMA_ETH_OVERHEAD; + new_homa_info->data_bytes = seg_length; + new_homa_info->seg_length = seg_length; + new_homa_info->offset = offset; + new_homa_info->rpc = rpc; + + rpc->msgout.to_free = new_skb; + rpc->msgout.num_skbs++; + skb_get(new_skb); + __homa_xmit_data(new_skb, rpc); + } + } + +resend_done: + return; +} + +/** + * homa_rpc_tx_end() - Return the offset of the first byte in an + * RPC's outgoing message that has not yet been fully transmitted. + * "Fully transmitted" means the message has been transmitted by the + * NIC and the skb has been released by the driver. This is different from + * rpc->msgout.next_xmit_offset, which computes the first offset that + * hasn't yet been passed to the IP stack. + * @rpc: RPC to check + * Return: See above. If the message has been fully transmitted then + * rpc->msgout.length is returned. + */ +int homa_rpc_tx_end(struct homa_rpc *rpc) +{ + struct sk_buff *skb = rpc->msgout.first_not_tx; + + while (skb) { + struct homa_skb_info *homa_info = homa_get_skb_info(skb); + + /* next_xmit_offset tells us whether the packet has been + * passed to the IP stack. Checking the reference count tells + * us whether the packet has been released by the driver + * (which only happens after notification from the NIC that + * transmission is complete). + */ + if (homa_info->offset >= rpc->msgout.next_xmit_offset || + refcount_read(&skb->users) > 1) + return homa_info->offset; + skb = homa_info->next_skb; + rpc->msgout.first_not_tx = skb; + } + return rpc->msgout.length; +} -- 2.43.0 This file contains functions for constructing and destructing homa structs. Signed-off-by: John Ousterhout --- Changes for v16: * Use cpu_relax when spinning Changes for v11: * Move link_mbps variable from struct homa_pacer back to struct homa Changes for v10: * Remove log messages after alloc errors Changes for v9: * Add support for homa_net objects * Use new homa_clock abstraction layer * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) Changes for v8: * Accommodate homa_pacer refactoring Changes for v7: * Make Homa a pernet subsystem * Add support for tx memory accounting * Remove "lock_slow" functions, which don't add functionality in this patch series * Use u64 and __u64 properly --- net/homa/homa_impl.h | 6 +++ net/homa/homa_utils.c | 110 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 net/homa/homa_utils.c diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h index c7570d1f85b2..db8af88865a2 100644 --- a/net/homa/homa_impl.h +++ b/net/homa/homa_impl.h @@ -362,14 +362,20 @@ static inline bool homa_make_header_avl(struct sk_buff *skb) extern unsigned int homa_net_id; +void homa_destroy(struct homa *homa); int homa_fill_data_interleaved(struct homa_rpc *rpc, struct sk_buff *skb, struct iov_iter *iter); +int homa_init(struct homa *homa); int homa_ioc_info(struct socket *sock, unsigned long arg); int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit); void homa_message_out_init(struct homa_rpc *rpc, int length); +void homa_net_destroy(struct homa_net *hnet); +int homa_net_init(struct homa_net *hnet, struct net *net, + struct homa *homa); void homa_rpc_handoff(struct homa_rpc *rpc); int homa_rpc_tx_end(struct homa_rpc *rpc); +void homa_spin(int ns); struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, struct iov_iter *iter, int offset, int length, int max_seg_data); diff --git a/net/homa/homa_utils.c b/net/homa/homa_utils.c new file mode 100644 index 000000000000..df3845fb9417 --- /dev/null +++ b/net/homa/homa_utils.c @@ -0,0 +1,110 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file contains miscellaneous utility functions for Homa, such + * as initializing and destroying homa structs. + */ + +#include "homa_impl.h" +#include "homa_peer.h" +#include "homa_rpc.h" + +#include "homa_stub.h" + +/** + * homa_init() - Constructor for homa objects. + * @homa: Object to initialize. + * + * Return: 0 on success, or a negative errno if there was an error. Even + * if an error occurs, it is safe (and necessary) to call + * homa_destroy at some point. + */ +int homa_init(struct homa *homa) +{ + int err; + + memset(homa, 0, sizeof(*homa)); + + atomic64_set(&homa->next_outgoing_id, 2); + homa->link_mbps = 25000; + homa->peertab = homa_peer_alloc_peertab(); + if (IS_ERR(homa->peertab)) { + err = PTR_ERR(homa->peertab); + homa->peertab = NULL; + return err; + } + homa->socktab = kmalloc(sizeof(*homa->socktab), GFP_KERNEL); + if (!homa->socktab) + return -ENOMEM; + homa_socktab_init(homa->socktab); + + /* Wild guesses to initialize configuration values... */ + homa->resend_ticks = 5; + homa->resend_interval = 5; + homa->timeout_ticks = 100; + homa->timeout_resends = 5; + homa->request_ack_ticks = 2; + homa->reap_limit = 10; + homa->dead_buffs_limit = 5000; + homa->max_gso_size = 10000; + homa->wmem_max = 100000000; + homa->bpage_lease_usecs = 10000; + return 0; +} + +/** + * homa_destroy() - Destructor for homa objects. + * @homa: Object to destroy. It is safe if this object has already + * been previously destroyed. + */ +void homa_destroy(struct homa *homa) +{ + /* The order of the following cleanups matters! */ + if (homa->socktab) { + homa_socktab_destroy(homa->socktab, NULL); + kfree(homa->socktab); + homa->socktab = NULL; + } + if (homa->peertab) { + homa_peer_free_peertab(homa->peertab); + homa->peertab = NULL; + } +} + +/** + * homa_net_init() - Initialize a new struct homa_net as a per-net subsystem. + * @hnet: Struct to initialzie. + * @net: The network namespace the struct will be associated with. + * @homa: The main Homa data structure to use for the net. + * Return: 0 on success, otherwise a negative errno. + */ +int homa_net_init(struct homa_net *hnet, struct net *net, struct homa *homa) +{ + memset(hnet, 0, sizeof(*hnet)); + hnet->homa = homa; + hnet->prev_default_port = HOMA_MIN_DEFAULT_PORT - 1; + return 0; +} + +/** + * homa_net_destroy() - Release any resources associated with a homa_net. + * @hnet: Object to destroy; must not be used again after this function + * returns. + */ +void homa_net_destroy(struct homa_net *hnet) +{ + homa_socktab_destroy(hnet->homa->socktab, hnet); + homa_peer_free_net(hnet); +} + +/** + * homa_spin() - Delay (without sleeping) for a given time interval. + * @ns: How long to delay (in nanoseconds) + */ +void homa_spin(int ns) +{ + u64 end; + + end = homa_clock() + homa_ns_to_cycles(ns); + while (homa_clock() < end) + cpu_relax(); +} -- 2.43.0 This file contains most of the code for handling incoming packets, including top-level dispatching code plus specific handlers for each pack type. It also contains code for dispatching fully-received messages to waiting application threads. Signed-off-by: John Ousterhout --- Changes for v16: * Use kfree_skb_reason and consume_skb instead of kfree_skb * Use set_bit/clear_bit/test_bit * Simplify handling of acks; eliminate akcs and num_acks variables in homa_dispatch_pkts * Check for wmem in homa_dispatch_pkts (fixes deadlock over wmem) * Use new homa_lock_preempt() function Changes for v14: * Use new homa_rpc_tx_end function * Fix race in homa_wait_shared (an RPC could get lost if it became ready at the same time that homa_interest_wait returned with an error) * Handle nonblocking behavior here, rather than in homa_interest.c * Change API for homa_wait_private to distinguish errors in an RPC from errors that prevented the wait operation from completing. Changes for v11: * Cleanup and simplify use of RPC reference counts. * Cleanup sparse annotations. * Rework the mechanism for waking up RPCs that stalled waiting for buffer pool space. Changes for v10: * Revise sparse annotations to eliminate __context__ definition * Refactor resend mechanism (new function homa_request_retrans replaces homa_gap_retry) * Remove log messages after alloc errors * Fix socket cleanup race Changes for v9: * Add support for homa_net objects * Use new homa_clock abstraction layer * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) Changes for v7: * API change for homa_rpc_handoff * Refactor waiting mechanism for incoming packets: simplify wait criteria and use standard Linux mechanisms for waiting, use new homa_interest struct * Reject unauthorized incoming request messages * Improve documentation for code that spins (and reduce spin length) * Use RPC reference counts, eliminate RPC_HANDING_OFF flag * Replace erroneous use of "safe" list iteration with "rcu" version * Remove locker argument from locking functions * Check incoming messages against HOMA_MAX_MESSAGE_LENGTH * Use u64 and __u64 properly --- net/homa/homa_impl.h | 15 + net/homa/homa_incoming.c | 887 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 902 insertions(+) create mode 100644 net/homa/homa_incoming.c diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h index db8af88865a2..cba755d3ea2c 100644 --- a/net/homa/homa_impl.h +++ b/net/homa/homa_impl.h @@ -362,23 +362,38 @@ static inline bool homa_make_header_avl(struct sk_buff *skb) extern unsigned int homa_net_id; +void homa_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, + struct homa_rpc *rpc); +void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb); +int homa_copy_to_user(struct homa_rpc *rpc); +void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc); void homa_destroy(struct homa *homa); +void homa_dispatch_pkts(struct sk_buff *skb); int homa_fill_data_interleaved(struct homa_rpc *rpc, struct sk_buff *skb, struct iov_iter *iter); +struct homa_gap *homa_gap_alloc(struct list_head *next, int start, int end); int homa_init(struct homa *homa); int homa_ioc_info(struct socket *sock, unsigned long arg); int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit); void homa_message_out_init(struct homa_rpc *rpc, int length); +void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, + struct homa_rpc *rpc); void homa_net_destroy(struct homa_net *hnet); int homa_net_init(struct homa_net *hnet, struct net *net, struct homa *homa); +void homa_request_retrans(struct homa_rpc *rpc); +void homa_resend_pkt(struct sk_buff *skb, struct homa_rpc *rpc, + struct homa_sock *hsk); void homa_rpc_handoff(struct homa_rpc *rpc); int homa_rpc_tx_end(struct homa_rpc *rpc); void homa_spin(int ns); struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, struct iov_iter *iter, int offset, int length, int max_seg_data); +void homa_rpc_unknown_pkt(struct sk_buff *skb, struct homa_rpc *rpc); +int homa_wait_private(struct homa_rpc *rpc, int nonblocking); +struct homa_rpc *homa_wait_shared(struct homa_sock *hsk, int nonblocking); int homa_xmit_control(enum homa_packet_type type, void *contents, size_t length, struct homa_rpc *rpc); int __homa_xmit_control(void *contents, size_t length, diff --git a/net/homa/homa_incoming.c b/net/homa/homa_incoming.c new file mode 100644 index 000000000000..2aa625dfe973 --- /dev/null +++ b/net/homa/homa_incoming.c @@ -0,0 +1,887 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file contains functions that handle incoming Homa messages. */ + +#include "homa_impl.h" +#include "homa_interest.h" +#include "homa_peer.h" +#include "homa_pool.h" + +/** + * homa_message_in_init() - Constructor for homa_message_in. + * @rpc: RPC whose msgin structure should be initialized. The + * msgin struct is assumed to be zeroes. + * @length: Total number of bytes in message. + * Return: Zero for successful initialization, or a negative errno + * if rpc->msgin could not be initialized. + */ +int homa_message_in_init(struct homa_rpc *rpc, int length) + __must_hold(rpc->bucket->lock) +{ + int err; + + if (length > HOMA_MAX_MESSAGE_LENGTH) + return -EINVAL; + + rpc->msgin.length = length; + __skb_queue_head_init(&rpc->msgin.packets); + INIT_LIST_HEAD(&rpc->msgin.gaps); + rpc->msgin.bytes_remaining = length; + err = homa_pool_alloc_msg(rpc); + if (err != 0) { + rpc->msgin.length = -1; + return err; + } + return 0; +} + +/** + * homa_gap_alloc() - Allocate a new gap and add it to a gap list. + * @next: Add the new gap just before this list element. + * @start: Offset of first byte covered by the gap. + * @end: Offset of byte just after the last one covered by the gap. + * Return: Pointer to the new gap, or NULL if memory couldn't be allocated + * for the gap object. + */ +struct homa_gap *homa_gap_alloc(struct list_head *next, int start, int end) +{ + struct homa_gap *gap; + + gap = kmalloc(sizeof(*gap), GFP_ATOMIC); + if (!gap) + return NULL; + gap->start = start; + gap->end = end; + gap->time = homa_clock(); + list_add_tail(&gap->links, next); + return gap; +} + +/** + * homa_request_retrans() - The function is invoked when it appears that + * data packets for a message have been lost. It issues RESEND requests + * as appropriate and may modify the state of the RPC. + * @rpc: RPC for which incoming data is delinquent; must be locked by + * caller. + */ +void homa_request_retrans(struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + struct homa_resend_hdr resend; + struct homa_gap *gap; + int offset, length; + + if (rpc->msgin.length >= 0) { + /* Issue RESENDS for any gaps in incoming data. */ + list_for_each_entry(gap, &rpc->msgin.gaps, links) { + resend.offset = htonl(gap->start); + resend.length = htonl(gap->end - gap->start); + homa_xmit_control(RESEND, &resend, sizeof(resend), rpc); + } + + /* Issue a RESEND for any granted data after the last gap. */ + offset = rpc->msgin.recv_end; + length = rpc->msgin.length - rpc->msgin.recv_end; + if (length <= 0) + return; + } else { + /* No data has been received for the RPC. Ask the sender to + * resend everything it has sent so far. + */ + offset = 0; + length = -1; + } + + resend.offset = htonl(offset); + resend.length = htonl(length); + homa_xmit_control(RESEND, &resend, sizeof(resend), rpc); +} + +/** + * homa_add_packet() - Add an incoming packet to the contents of a + * partially received message. + * @rpc: Add the packet to the msgin for this RPC. + * @skb: The new packet. This function takes ownership of the packet + * (the packet will either be freed or added to rpc->msgin.packets). + */ +void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb) + __must_hold(rpc->bucket->lock) +{ + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data; + struct homa_gap *gap, *dummy, *gap2; + int start = ntohl(h->seg.offset); + int length = homa_data_len(skb); + enum skb_drop_reason reason; + int end = start + length; + + if ((start + length) > rpc->msgin.length) { + reason = SKB_DROP_REASON_PKT_TOO_BIG; + goto discard; + } + + if (start == rpc->msgin.recv_end) { + /* Common case: packet is sequential. */ + rpc->msgin.recv_end += length; + goto keep; + } + + if (start > rpc->msgin.recv_end) { + /* Packet creates a new gap. */ + if (!homa_gap_alloc(&rpc->msgin.gaps, + rpc->msgin.recv_end, start)) { + reason = SKB_DROP_REASON_NOMEM; + goto discard; + } + rpc->msgin.recv_end = end; + goto keep; + } + + /* Must now check to see if the packet fills in part or all of + * an existing gap. + */ + list_for_each_entry_safe(gap, dummy, &rpc->msgin.gaps, links) { + /* Is packet at the start of this gap? */ + if (start <= gap->start) { + if (end <= gap->start) + continue; + if (start < gap->start) { + reason = SKB_DROP_REASON_DUP_FRAG; + goto discard; + } + if (end > gap->end) { + reason = SKB_DROP_REASON_DUP_FRAG; + goto discard; + } + gap->start = end; + if (gap->start >= gap->end) { + list_del(&gap->links); + kfree(gap); + } + goto keep; + } + + /* Is packet at the end of this gap? BTW, at this point we know + * the packet can't cover the entire gap. + */ + if (end >= gap->end) { + if (start >= gap->end) + continue; + if (end > gap->end) { + reason = SKB_DROP_REASON_DUP_FRAG; + goto discard; + } + gap->end = start; + goto keep; + } + + /* Packet is in the middle of the gap; must split the gap. */ + gap2 = homa_gap_alloc(&gap->links, gap->start, start); + if (!gap2) { + reason = SKB_DROP_REASON_NOMEM; + goto discard; + } + gap2->time = gap->time; + gap->start = end; + goto keep; + } + +discard: + kfree_skb_reason(skb, reason); + return; + +keep: + __skb_queue_tail(&rpc->msgin.packets, skb); + rpc->msgin.bytes_remaining -= length; +} + +/** + * homa_copy_to_user() - Copy as much data as possible from incoming + * packet buffers to buffers in user space. + * @rpc: RPC for which data should be copied. Must be locked by caller. + * Return: Zero for success or a negative errno if there is an error. + * It is possible for the RPC to be freed while this function + * executes (it releases and reacquires the RPC lock). If that + * happens, -EINVAL will be returned and the state of @rpc + * will be RPC_DEAD. Clears the RPC_PKTS_READY bit in @rpc->flags + * if all available packets have been copied out. + */ +int homa_copy_to_user(struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ +#define MAX_SKBS 20 + struct sk_buff *skbs[MAX_SKBS]; + int error = 0; + int n = 0; /* Number of filled entries in skbs. */ + int i; + + /* Tricky note: we can't hold the RPC lock while we're actually + * copying to user space, because (a) it's illegal to hold a spinlock + * while copying to user space and (b) we'd like for homa_softirq + * to add more packets to the RPC while we're copying these out. + * So, collect a bunch of packets to copy, then release the lock, + * copy them, and reacquire the lock. + */ + while (true) { + struct sk_buff *skb; + + if (rpc->state == RPC_DEAD) { + error = -EINVAL; + break; + } + + skb = __skb_dequeue(&rpc->msgin.packets); + if (skb) { + skbs[n] = skb; + n++; + if (n < MAX_SKBS) + continue; + } + if (n == 0) { + clear_bit(RPC_PKTS_READY, &rpc->flags); + break; + } + + /* At this point we've collected a batch of packets (or + * run out of packets); copy any available packets out to + * user space. + */ + homa_rpc_unlock(rpc); + + /* Each iteration of this loop copies out one skb. */ + for (i = 0; i < n; i++) { + struct homa_data_hdr *h = (struct homa_data_hdr *) + skbs[i]->data; + int pkt_length = homa_data_len(skbs[i]); + int offset = ntohl(h->seg.offset); + int buf_bytes, chunk_size; + struct iov_iter iter; + int copied = 0; + char __user *dst; + + /* Each iteration of this loop copies to one + * user buffer. + */ + while (copied < pkt_length) { + chunk_size = pkt_length - copied; + dst = homa_pool_get_buffer(rpc, offset + copied, + &buf_bytes); + if (buf_bytes < chunk_size) { + if (buf_bytes == 0) { + /* skb has data beyond message + * end? + */ + break; + } + chunk_size = buf_bytes; + } + error = import_ubuf(READ, dst, chunk_size, + &iter); + if (error) + goto free_skbs; + error = skb_copy_datagram_iter(skbs[i], + sizeof(*h) + + copied, &iter, + chunk_size); + if (error) + goto free_skbs; + copied += chunk_size; + } + } + +free_skbs: + for (i = 0; i < n; i++) + consume_skb(skbs[i]); + n = 0; + homa_rpc_lock_preempt(rpc); + if (error) + break; + } + return error; +} + +/** + * homa_dispatch_pkts() - Top-level function that processes a batch of packets, + * all related to the same RPC. + * @skb: First packet in the batch, linked through skb->next. + */ +void homa_dispatch_pkts(struct sk_buff *skb) +{ +#define MAX_ACKS 10 + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data; + u64 id = homa_local_id(h->common.sender_id); + int dport = ntohs(h->common.dport); + struct homa_rpc *rpc = NULL; + struct homa_sock *hsk; + struct homa_net *hnet; + struct sk_buff *next; + + /* Find the appropriate socket.*/ + hnet = homa_net(dev_net(skb->dev)); + hsk = homa_sock_find(hnet, dport); + if (!hsk || (!homa_is_client(id) && !hsk->is_server)) { + if (skb_is_ipv6(skb)) + icmp6_send(skb, ICMPV6_DEST_UNREACH, + ICMPV6_PORT_UNREACH, 0, NULL, IP6CB(skb)); + else + icmp_send(skb, ICMP_DEST_UNREACH, + ICMP_PORT_UNREACH, 0); + while (skb) { + next = skb->next; + kfree_skb(skb); + skb = next; + } + if (hsk) + sock_put(&hsk->sock); + return; + } + + /* Each iteration through the following loop processes one packet. */ + for (; skb; skb = next) { + h = (struct homa_data_hdr *)skb->data; + next = skb->next; + + /* Relinquish the RPC lock temporarily if it's needed + * elsewhere. + */ + if (rpc) { + if (test_bit(APP_NEEDS_LOCK, &rpc->flags)) { + homa_rpc_unlock(rpc); + + /* This short spin is needed to ensure that the + * other thread gets the lock before this thread + * grabs it again below (the need for this + * was confirmed experimentally in 2/2025; + * without it, the handoff fails 20-25% of the + * time). Furthermore, the call to homa_spin + * seems to allow the other thread to acquire + * the lock more quickly. + */ + homa_spin(100); + homa_rpc_lock(rpc); + } + } + + /* If we don't already have an RPC, find it, lock it, + * and create a reference on it. + */ + if (!rpc) { + if (!homa_is_client(id)) { + /* We are the server for this RPC. */ + if (h->common.type == DATA) { + int created; + + /* Create a new RPC if one doesn't + * already exist. + */ + rpc = homa_rpc_alloc_server(hsk, &saddr, + h, + &created); + if (IS_ERR(rpc)) { + rpc = NULL; + goto discard; + } + } else { + rpc = homa_rpc_find_server(hsk, &saddr, + id); + } + } else { + rpc = homa_rpc_find_client(hsk, id); + } + if (rpc) + homa_rpc_hold(rpc); + } + if (unlikely(!rpc)) { + if (h->common.type != NEED_ACK && + h->common.type != ACK && + h->common.type != RESEND) + goto discard; + } else { + if (h->common.type == DATA || + h->common.type == BUSY) + rpc->silent_ticks = 0; + rpc->peer->outstanding_resends = 0; + } + + switch (h->common.type) { + case DATA: + homa_data_pkt(skb, rpc); + break; + case RESEND: + homa_resend_pkt(skb, rpc, hsk); + break; + case RPC_UNKNOWN: + homa_rpc_unknown_pkt(skb, rpc); + break; + case BUSY: + /* Nothing to do for these packets except reset + * silent_ticks, which happened above. + */ + goto discard; + case NEED_ACK: + homa_need_ack_pkt(skb, hsk, rpc); + break; + case ACK: + homa_ack_pkt(skb, hsk, rpc); + break; + goto discard; + } + continue; + +discard: + kfree_skb(skb); + } + if (rpc) { + homa_rpc_put(rpc); + homa_rpc_unlock(rpc); + } + + /* We need to reap dead RPCs here under two conditions: + * 1. The socket has hit its limit on tx buffer space and threads are + * blocked waiting for skbs to be released. + * 2. A large number of dead RPCs have accumulated, and it seems + * that the reaper isn't keeping up when invoked only at + * "convenient" times (see "RPC Reaping Strategy" in homa_rpc_reap + * code for details). + */ + if (hsk->dead_skbs > 0) { + int waiting_for_wmem = test_bit(SOCK_NOSPACE, + &hsk->sock.sk_socket->flags); + if (waiting_for_wmem || + hsk->dead_skbs >= 2 * hsk->homa->dead_buffs_limit) + homa_rpc_reap(hsk, waiting_for_wmem); + } + sock_put(&hsk->sock); +} + +/** + * homa_data_pkt() - Handler for incoming DATA packets + * @skb: Incoming packet; size known to be large enough for the header. + * This function now owns the packet. + * @rpc: Information about the RPC corresponding to this packet. + * Must be locked by the caller. + */ +void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data; + + if (h->ack.client_id) { + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + + homa_rpc_unlock(rpc); + homa_rpc_acked(rpc->hsk, &saddr, &h->ack); + homa_rpc_lock(rpc); + if (rpc->state == RPC_DEAD) + goto discard; + } + + if (rpc->state != RPC_INCOMING && homa_is_client(rpc->id)) { + if (unlikely(rpc->state != RPC_OUTGOING)) + goto discard; + rpc->state = RPC_INCOMING; + if (homa_message_in_init(rpc, ntohl(h->message_length)) != 0) + goto discard; + } else if (rpc->state != RPC_INCOMING) { + /* Must be server; note that homa_rpc_alloc_server already + * initialized msgin and allocated buffers. + */ + if (unlikely(rpc->msgin.length >= 0)) + goto discard; + } + + if (rpc->msgin.num_bpages == 0) + /* Drop packets that arrive when we can't allocate buffer + * space. If we keep them around, packet buffer usage can + * exceed available cache space, resulting in poor + * performance. + */ + goto discard; + + homa_add_packet(rpc, skb); + + if (skb_queue_len(&rpc->msgin.packets) != 0 && + !test_bit(RPC_PKTS_READY, &rpc->flags)) { + set_bit(RPC_PKTS_READY, &rpc->flags); + homa_rpc_handoff(rpc); + } + + return; + +discard: + kfree_skb(skb); +} + +/** + * homa_resend_pkt() - Handler for incoming RESEND packets + * @skb: Incoming packet; size already verified large enough for header. + * This function now owns the packet. + * @rpc: Information about the RPC corresponding to this packet; must + * be locked by caller, but may be NULL if there is no RPC matching + * this packet + * @hsk: Socket on which the packet was received. + */ +void homa_resend_pkt(struct sk_buff *skb, struct homa_rpc *rpc, + struct homa_sock *hsk) + __must_hold(rpc->bucket->lock) +{ + struct homa_resend_hdr *h = (struct homa_resend_hdr *)skb->data; + int offset = ntohl(h->offset); + int length = ntohl(h->length); + int end = offset + length; + struct homa_busy_hdr busy; + int tx_end; + + if (!rpc) { + homa_xmit_unknown(skb, hsk); + goto done; + } + + tx_end = homa_rpc_tx_end(rpc); + if (!homa_is_client(rpc->id) && rpc->state != RPC_OUTGOING) { + /* We are the server for this RPC and don't yet have a + * response message, so send BUSY to keep the client + * waiting. + */ + homa_xmit_control(BUSY, &busy, sizeof(busy), rpc); + goto done; + } + + if (length == -1) + end = tx_end; + + homa_resend_data(rpc, offset, (end > tx_end) ? tx_end : end); + + if (offset >= tx_end) { + /* We have chosen not to transmit any of the requested data; + * send BUSY so the receiver knows we are alive. + */ + homa_xmit_control(BUSY, &busy, sizeof(busy), rpc); + goto done; + } + +done: + consume_skb(skb); +} + +/** + * homa_rpc_unknown_pkt() - Handler for incoming RPC_UNKNOWN packets. + * @skb: Incoming packet; size known to be large enough for the header. + * This function now owns the packet. + * @rpc: Information about the RPC corresponding to this packet. Must + * be locked by caller. + */ +void homa_rpc_unknown_pkt(struct sk_buff *skb, struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + if (homa_is_client(rpc->id)) { + if (rpc->state == RPC_OUTGOING) { + int tx_end = homa_rpc_tx_end(rpc); + + /* It appears that everything we've already transmitted + * has been lost; retransmit it. + */ + homa_resend_data(rpc, 0, tx_end); + goto done; + } + } else { + homa_rpc_end(rpc); + } +done: + consume_skb(skb); +} + +/** + * homa_need_ack_pkt() - Handler for incoming NEED_ACK packets + * @skb: Incoming packet; size already verified large enough for header. + * This function now owns the packet. + * @hsk: Socket on which the packet was received. + * @rpc: The RPC named in the packet header, or NULL if no such + * RPC exists. The RPC has been locked by the caller. + */ +void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, + struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data; + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + u64 id = homa_local_id(h->sender_id); + struct homa_ack_hdr ack; + struct homa_peer *peer; + + /* Don't ack if it's not safe for the peer to purge its state + * for this RPC (the RPC still exists and we haven't received + * the entire response), or if we can't find peer info. + */ + if (rpc && (rpc->state != RPC_INCOMING || + rpc->msgin.bytes_remaining)) { + homa_request_retrans(rpc); + goto done; + } else { + peer = homa_peer_get(hsk, &saddr); + if (IS_ERR(peer)) + goto done; + } + + /* Send an ACK for this RPC. At the same time, include all of the + * other acks available for the peer. Note: can't use rpc below, + * since it may be NULL. + */ + ack.common.type = ACK; + ack.common.sport = h->dport; + ack.common.dport = h->sport; + ack.common.sender_id = cpu_to_be64(id); + ack.num_acks = htons(homa_peer_get_acks(peer, + HOMA_MAX_ACKS_PER_PKT, + ack.acks)); + __homa_xmit_control(&ack, sizeof(ack), peer, hsk); + homa_peer_release(peer); + +done: + consume_skb(skb); +} + +/** + * homa_ack_pkt() - Handler for incoming ACK packets + * @skb: Incoming packet; size already verified large enough for header. + * This function now owns the packet. + * @hsk: Socket on which the packet was received. + * @rpc: The RPC named in the packet header, or NULL if no such + * RPC exists. The RPC lock will be dead on return. + */ +void homa_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, + struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb); + struct homa_ack_hdr *h = (struct homa_ack_hdr *)skb->data; + int i, count; + + if (rpc) + homa_rpc_end(rpc); + + count = ntohs(h->num_acks); + if (count > 0) { + if (rpc) { + /* Must temporarily release rpc's lock because + * homa_rpc_acked needs to acquire RPC locks. + */ + homa_rpc_unlock(rpc); + for (i = 0; i < count; i++) + homa_rpc_acked(hsk, &saddr, &h->acks[i]); + homa_rpc_lock(rpc); + } else { + for (i = 0; i < count; i++) + homa_rpc_acked(hsk, &saddr, &h->acks[i]); + } + } + consume_skb(skb); +} + +/** + * homa_wait_private() - Waits until the response has been received for + * a specific RPC or the RPC has failed with an error. + * @rpc: RPC to wait for; an error will be returned if the RPC is + * not a client RPC or not private. Must be locked by caller. + * @nonblocking: Nonzero means return immediately if @rpc not ready. + * Return: 0 means that @rpc is ready for attention: either its response + * has been received or it has an unrecoverable error such as + * ETIMEDOUT (in rpc->error). Nonzero means some other error + * (such as EINTR or EINVAL) occurred before @rpc became ready + * for attention; in this case the return value is a negative + * errno. + */ +int homa_wait_private(struct homa_rpc *rpc, int nonblocking) + __must_hold(rpc->bucket->lock) +{ + struct homa_interest interest; + int result; + + if (!test_bit(RPC_PRIVATE, &rpc->flags)) + return -EINVAL; + + /* Each iteration through this loop waits until rpc needs attention + * in some way (e.g. packets have arrived), then deals with that need + * (e.g. copy to user space). It may take many iterations until the + * RPC is ready for the application. + */ + while (1) { + result = 0; + if (!rpc->error) + rpc->error = homa_copy_to_user(rpc); + if (rpc->error) + break; + if (rpc->msgin.length >= 0 && + rpc->msgin.bytes_remaining == 0 && + skb_queue_len(&rpc->msgin.packets) == 0) + break; + + if (nonblocking) { + result = -EAGAIN; + break; + } + + result = homa_interest_init_private(&interest, rpc); + if (result != 0) + break; + + homa_rpc_unlock(rpc); + result = homa_interest_wait(&interest); + + homa_rpc_lock_preempt(rpc); + homa_interest_unlink_private(&interest); + + /* Abort on error, but if the interest actually got ready + * in the meantime the ignore the error (loop back around + * to process the RPC). + */ + if (result != 0 && atomic_read(&interest.ready) == 0) + break; + } + + return result; +} + +/** + * homa_wait_shared() - Wait for the completion of any non-private + * incoming message on a socket. + * @hsk: Socket on which to wait. Must not be locked. + * @nonblocking: Nonzero means return immediately if no RPC is ready. + * + * Return: Pointer to an RPC with a complete incoming message or nonzero + * error field, or a negative errno (usually -EINTR). If an RPC + * is returned it will be locked and referenced; the caller + * must release the lock and the reference. + */ +struct homa_rpc *homa_wait_shared(struct homa_sock *hsk, int nonblocking) + __cond_acquires(rpc->bucket->lock) +{ + struct homa_interest interest; + struct homa_rpc *rpc; + int result; + + INIT_LIST_HEAD(&interest.links); + init_waitqueue_head(&interest.wait_queue); + /* Each iteration through this loop waits until an RPC needs attention + * in some way (e.g. packets have arrived), then deals with that need + * (e.g. copy to user space). It may take many iterations until an + * RPC is ready for the application. + */ + while (1) { + homa_sock_lock(hsk); + if (hsk->shutdown) { + rpc = ERR_PTR(-ESHUTDOWN); + homa_sock_unlock(hsk); + goto done; + } + if (!list_empty(&hsk->ready_rpcs)) { + rpc = list_first_entry(&hsk->ready_rpcs, + struct homa_rpc, + ready_links); + homa_rpc_hold(rpc); + list_del_init(&rpc->ready_links); + if (!list_empty(&hsk->ready_rpcs)) { + /* There are still more RPCs available, so + * let Linux know. + */ + hsk->sock.sk_data_ready(&hsk->sock); + } + homa_sock_unlock(hsk); + } else if (nonblocking) { + rpc = ERR_PTR(-EAGAIN); + homa_sock_unlock(hsk); + + /* This is a good time to cleanup dead RPCS. */ + homa_rpc_reap(hsk, false); + goto done; + } else { + homa_interest_init_shared(&interest, hsk); + homa_sock_unlock(hsk); + result = homa_interest_wait(&interest); + + if (result != 0) { + int ready; + + /* homa_interest_wait returned an error, so we + * have to do two things. First, unlink the + * interest from the socket. Second, check to + * see if in the meantime the interest received + * a handoff. If so, ignore the error. Very + * important to hold the socket lock while + * checking, in order to eliminate races with + * homa_rpc_handoff. + */ + homa_sock_lock(hsk); + homa_interest_unlink_shared(&interest); + ready = atomic_read(&interest.ready); + homa_sock_unlock(hsk); + if (ready == 0) { + rpc = ERR_PTR(result); + goto done; + } + } + + rpc = interest.rpc; + if (!rpc) { + rpc = ERR_PTR(-ESHUTDOWN); + goto done; + } + } + + homa_rpc_lock_preempt(rpc); + if (!rpc->error) + rpc->error = homa_copy_to_user(rpc); + if (rpc->error) { + if (rpc->state != RPC_DEAD) + break; + } else if (rpc->msgin.bytes_remaining == 0 && + skb_queue_len(&rpc->msgin.packets) == 0) + break; + homa_rpc_put(rpc); + homa_rpc_unlock(rpc); + } + +done: + return rpc; +} + +/** + * homa_rpc_handoff() - This function is called when the input message for + * an RPC is ready for attention from a user thread. It notifies a waiting + * reader and/or queues the RPC, as appropriate. + * @rpc: RPC to handoff; must be locked. + */ +void homa_rpc_handoff(struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + struct homa_sock *hsk = rpc->hsk; + struct homa_interest *interest; + + if (test_bit(RPC_PRIVATE, &rpc->flags)) { + homa_interest_notify_private(rpc); + return; + } + + /* Shared RPC; if there is a waiting thread, hand off the RPC; + * otherwise enqueue it. + */ + homa_sock_lock(hsk); + if (hsk->shutdown) { + homa_sock_unlock(hsk); + return; + } + if (!list_empty(&hsk->interests)) { + interest = list_first_entry(&hsk->interests, + struct homa_interest, links); + list_del_init(&interest->links); + interest->rpc = rpc; + homa_rpc_hold(rpc); + atomic_set_release(&interest->ready, 1); + wake_up(&interest->wait_queue); + } else if (list_empty(&rpc->ready_links)) { + list_add_tail(&rpc->ready_links, &hsk->ready_rpcs); + hsk->sock.sk_data_ready(&hsk->sock); + } + homa_sock_unlock(hsk); +} + -- 2.43.0 This file contains code that wakes up periodically to check for missing data, initiate retransmissions, and declare peer nodes "dead". Signed-off-by: John Ousterhout --- Changes for v14: * Use new homa_rpc_tx_end function Changes for v11: * Cleanup sparse annotations. Changes for v10: * Refactor resend mechanism Changes for v9: * Reflect changes in socket and peer management * Minor name changes for clarity Changes for v7: * Interface changes to homa_sock_start_scan etc. * Remove locker argument from locking functions * Use u64 and __u64 properly --- net/homa/homa_impl.h | 3 + net/homa/homa_timer.c | 136 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 net/homa/homa_timer.c diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h index cba755d3ea2c..c1d1519c9e38 100644 --- a/net/homa/homa_impl.h +++ b/net/homa/homa_impl.h @@ -388,6 +388,9 @@ void homa_resend_pkt(struct sk_buff *skb, struct homa_rpc *rpc, void homa_rpc_handoff(struct homa_rpc *rpc); int homa_rpc_tx_end(struct homa_rpc *rpc); void homa_spin(int ns); +void homa_timer(struct homa *homa); +void homa_timer_check_rpc(struct homa_rpc *rpc); +int homa_timer_main(void *transport); struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, struct iov_iter *iter, int offset, int length, int max_seg_data); diff --git a/net/homa/homa_timer.c b/net/homa/homa_timer.c new file mode 100644 index 000000000000..dcfdcc06c8ab --- /dev/null +++ b/net/homa/homa_timer.c @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file handles timing-related functions for Homa, such as retries + * and timeouts. + */ + +#include "homa_impl.h" +#include "homa_peer.h" +#include "homa_rpc.h" +#include "homa_stub.h" + +/** + * homa_timer_check_rpc() - Invoked for each RPC during each timer pass; does + * most of the work of checking for time-related actions such as sending + * resends, aborting RPCs for which there is no response, and sending + * requests for acks. It is separate from homa_timer because homa_timer + * got too long and deeply indented. + * @rpc: RPC to check; must be locked by the caller. + */ +void homa_timer_check_rpc(struct homa_rpc *rpc) + __must_hold(rpc->bucket->lock) +{ + struct homa *homa = rpc->hsk->homa; + int tx_end = homa_rpc_tx_end(rpc); + + /* See if we need to request an ack for this RPC. */ + if (!homa_is_client(rpc->id) && rpc->state == RPC_OUTGOING && + tx_end == rpc->msgout.length) { + if (rpc->done_timer_ticks == 0) { + rpc->done_timer_ticks = homa->timer_ticks; + } else { + /* >= comparison that handles tick wrap-around. */ + if ((rpc->done_timer_ticks + homa->request_ack_ticks + - 1 - homa->timer_ticks) & 1 << 31) { + struct homa_need_ack_hdr h; + + homa_xmit_control(NEED_ACK, &h, sizeof(h), rpc); + } + } + } + + if (rpc->state == RPC_INCOMING) { + if (rpc->msgin.num_bpages == 0) { + /* Waiting for buffer space, so no problem. */ + rpc->silent_ticks = 0; + return; + } + } else if (!homa_is_client(rpc->id)) { + /* We're the server and we've received the input message; + * no need to worry about retries. + */ + rpc->silent_ticks = 0; + return; + } + + if (rpc->state == RPC_OUTGOING) { + if (tx_end < rpc->msgout.length) { + /* There are granted bytes that we haven't transmitted, + * so no need to be concerned; the ball is in our court. + */ + rpc->silent_ticks = 0; + return; + } + } + + if (rpc->silent_ticks < homa->resend_ticks) + return; + if (rpc->silent_ticks >= homa->timeout_ticks) { + homa_rpc_abort(rpc, -ETIMEDOUT); + return; + } + if (((rpc->silent_ticks - homa->resend_ticks) % homa->resend_interval) + == 0) + homa_request_retrans(rpc); +} + +/** + * homa_timer() - This function is invoked at regular intervals ("ticks") + * to implement retries and aborts for Homa. + * @homa: Overall data about the Homa protocol implementation. + */ +void homa_timer(struct homa *homa) +{ + struct homa_socktab_scan scan; + struct homa_sock *hsk; + struct homa_rpc *rpc; + int rpc_count = 0; + + homa->timer_ticks++; + + /* Scan all existing RPCs in all sockets. */ + for (hsk = homa_socktab_start_scan(homa->socktab, &scan); + hsk; hsk = homa_socktab_next(&scan)) { + while (hsk->dead_skbs >= homa->dead_buffs_limit) + /* If we get here, it means that Homa isn't keeping + * up with RPC reaping, so we'll help out. See + * "RPC Reaping Strategy" in homa_rpc_reap code for + * details. + */ + if (homa_rpc_reap(hsk, false) == 0) + break; + + if (list_empty(&hsk->active_rpcs) || hsk->shutdown) + continue; + + if (!homa_protect_rpcs(hsk)) + continue; + rcu_read_lock(); + list_for_each_entry_rcu(rpc, &hsk->active_rpcs, active_links) { + homa_rpc_lock(rpc); + if (rpc->state == RPC_IN_SERVICE) { + rpc->silent_ticks = 0; + homa_rpc_unlock(rpc); + continue; + } + rpc->silent_ticks++; + homa_timer_check_rpc(rpc); + homa_rpc_unlock(rpc); + rpc_count++; + if (rpc_count >= 10) { + /* Give other kernel threads a chance to run + * on this core. + */ + rcu_read_unlock(); + schedule(); + rcu_read_lock(); + rpc_count = 0; + } + } + rcu_read_unlock(); + homa_unprotect_rpcs(hsk); + } + homa_socktab_end_scan(&scan); + homa_skb_release_pages(homa); + homa_peer_gc(homa->peertab); +} -- 2.43.0 homa_plumbing.c contains functions that connect Homa to the rest of the Linux kernel, such as dispatch tables used by Linux and the top-level functions that Linux invokes from those dispatch tables. Signed-off-by: John Ousterhout --- Changes for v16: * Implement HOMAIOCINFO ioctl * Set hsk->error_msg (for HOMAIOCINFO) * Use consume_skb and kfree_skb_reason instead of kfree_skb * Use set_bit, clear_bit, etc. for flag bits * Remove global_homa variable Changes for v13: * Fix bug in is_homa_pkt: didn't properly handle packets where the network header hadn't yet been set. Changes for v12: * Fix deadlock in homa_recvmsg (homa_rpc_reap was invoked while holding an RPC lock). Changes for v11: * Move link_mbps variable from struct homa_pacer back to struct homa. * Clean up error handing in homa_load. * Cleanup and simplify use of RPC reference counts. * Add explicit padding to struct homa_recvmsg_args to fix problems compiling on 32-bit machines. Changes for v10: * Use the destroy function from struct proto properly (fixes bugs in socket cleanup) * Fix issues from sparse, xmastree, etc. * Replace __u16 with u16, __u8 with u8, etc. Changes for v9: * Add support for homa_net objects * Various name improvements (e.g. use "alloc" instead of "new" for functions that allocate memory) * Add BUILD_BUG_ON statements to replace _Static_asserts removed from header files * Remove unnecessary/unused functions such as homa_get_port, homa_disconnect, and homa_backlog_rcv. Changes for v8: * Accommodate homa_pacer and homa_pool refactorings Changes for v7: * Remove extraneous code * Make Homa a pernet subsystem * Block Homa senders if insufficient tx buffer memory * Check for missing buffer pool in homa_recvmsg * Refactor waiting mechanism for incoming packets: simplify wait criteria and use standard Linux mechanisms for waiting * Implement SO_HOMA_SERVER option for setsockopt * Rename UNKNOWN packet type to RPC_UNKNOWN * Remove locker argument from locking functions * Use u64 and __u64 properly * Use new homa_make_header_avl function --- net/homa/homa_impl.h | 27 + net/homa/homa_plumbing.c | 1254 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 1281 insertions(+) create mode 100644 net/homa/homa_plumbing.c diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h index c1d1519c9e38..fee0cc800d58 100644 --- a/net/homa/homa_impl.h +++ b/net/homa/homa_impl.h @@ -365,28 +365,53 @@ extern unsigned int homa_net_id; void homa_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, struct homa_rpc *rpc); void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb); +int homa_bind(struct socket *sk, struct sockaddr *addr, + int addr_len); +void homa_close(struct sock *sock, long timeout); int homa_copy_to_user(struct homa_rpc *rpc); void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc); void homa_destroy(struct homa *homa); void homa_dispatch_pkts(struct sk_buff *skb); +int homa_err_handler_v4(struct sk_buff *skb, u32 info); +int homa_err_handler_v6(struct sk_buff *skb, + struct inet6_skb_parm *opt, u8 type, u8 code, + int offset, __be32 info); int homa_fill_data_interleaved(struct homa_rpc *rpc, struct sk_buff *skb, struct iov_iter *iter); struct homa_gap *homa_gap_alloc(struct list_head *next, int start, int end); +int homa_getsockopt(struct sock *sk, int level, int optname, + char __user *optval, int __user *optlen); +int homa_hash(struct sock *sk); +enum hrtimer_restart homa_hrtimer(struct hrtimer *timer); int homa_init(struct homa *homa); int homa_ioc_info(struct socket *sock, unsigned long arg); +int homa_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg); +int homa_load(void); int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit); void homa_message_out_init(struct homa_rpc *rpc, int length); void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk, struct homa_rpc *rpc); void homa_net_destroy(struct homa_net *hnet); +void homa_net_exit(struct net *net); int homa_net_init(struct homa_net *hnet, struct net *net, struct homa *homa); +int homa_net_start(struct net *net); +__poll_t homa_poll(struct file *file, struct socket *sock, + struct poll_table_struct *wait); +int homa_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, + int flags, int *addr_len); void homa_request_retrans(struct homa_rpc *rpc); void homa_resend_pkt(struct sk_buff *skb, struct homa_rpc *rpc, struct homa_sock *hsk); void homa_rpc_handoff(struct homa_rpc *rpc); int homa_rpc_tx_end(struct homa_rpc *rpc); +int homa_sendmsg(struct sock *sk, struct msghdr *msg, size_t len); +int homa_setsockopt(struct sock *sk, int level, int optname, + sockptr_t optval, unsigned int optlen); +int homa_shutdown(struct socket *sock, int how); +int homa_socket(struct sock *sk); +int homa_softirq(struct sk_buff *skb); void homa_spin(int ns); void homa_timer(struct homa *homa); void homa_timer_check_rpc(struct homa_rpc *rpc); @@ -394,7 +419,9 @@ int homa_timer_main(void *transport); struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc, struct iov_iter *iter, int offset, int length, int max_seg_data); +void homa_unhash(struct sock *sk); void homa_rpc_unknown_pkt(struct sk_buff *skb, struct homa_rpc *rpc); +void homa_unload(void); int homa_wait_private(struct homa_rpc *rpc, int nonblocking); struct homa_rpc *homa_wait_shared(struct homa_sock *hsk, int nonblocking); int homa_xmit_control(enum homa_packet_type type, void *contents, diff --git a/net/homa/homa_plumbing.c b/net/homa/homa_plumbing.c new file mode 100644 index 000000000000..32bfa3d7e130 --- /dev/null +++ b/net/homa/homa_plumbing.c @@ -0,0 +1,1254 @@ +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ + +/* This file consists mostly of "glue" that hooks Homa into the rest of + * the Linux kernel. The guts of the protocol are in other files. + */ + +#include "homa_impl.h" +#include "homa_peer.h" +#include "homa_pool.h" + +/* Identifier for retrieving Homa-specific data for a struct net. */ +unsigned int homa_net_id; + +/* This structure defines functions that allow Homa to be used as a + * pernet subsystem. + */ +static struct pernet_operations homa_net_ops = { + .init = homa_net_start, + .exit = homa_net_exit, + .id = &homa_net_id, + .size = sizeof(struct homa_net) +}; + +/* Global data for Homa. Avoid referencing directly except when there is + * no alternative (instead, use a homa pointer stored in a struct or + * passed via a parameter). This allows overriding during unit tests. + */ +static struct homa homa_data; + +/* This structure defines functions that handle various operations on + * Homa sockets. These functions are relatively generic: they are called + * to implement top-level system calls. Many of these operations can + * be implemented by PF_INET6 functions that are independent of the + * Homa protocol. + */ +static const struct proto_ops homa_proto_ops = { + .family = PF_INET, + .owner = THIS_MODULE, + .release = inet_release, + .bind = homa_bind, + .connect = inet_dgram_connect, + .socketpair = sock_no_socketpair, + .accept = sock_no_accept, + .getname = inet_getname, + .poll = homa_poll, + .ioctl = homa_ioctl, + .listen = sock_no_listen, + .shutdown = homa_shutdown, + .setsockopt = sock_common_setsockopt, + .getsockopt = sock_common_getsockopt, + .sendmsg = inet_sendmsg, + .recvmsg = inet_recvmsg, + .mmap = sock_no_mmap, + .set_peek_off = sk_set_peek_off, +}; + +static const struct proto_ops homav6_proto_ops = { + .family = PF_INET6, + .owner = THIS_MODULE, + .release = inet6_release, + .bind = homa_bind, + .connect = inet_dgram_connect, + .socketpair = sock_no_socketpair, + .accept = sock_no_accept, + .getname = inet6_getname, + .poll = homa_poll, + .ioctl = homa_ioctl, + .listen = sock_no_listen, + .shutdown = homa_shutdown, + .setsockopt = sock_common_setsockopt, + .getsockopt = sock_common_getsockopt, + .sendmsg = inet_sendmsg, + .recvmsg = inet_recvmsg, + .mmap = sock_no_mmap, + .set_peek_off = sk_set_peek_off, +}; + +/* This structure also defines functions that handle various operations + * on Homa sockets. However, these functions are lower-level than those + * in homa_proto_ops: they are specific to the PF_INET or PF_INET6 + * protocol family, and in many cases they are invoked by functions in + * homa_proto_ops. Most of these functions have Homa-specific implementations. + */ +static struct proto homa_prot = { + .name = "HOMA", + .owner = THIS_MODULE, + .close = homa_close, + .connect = ip4_datagram_connect, + .init = homa_socket, + .destroy = homa_sock_destroy, + .setsockopt = homa_setsockopt, + .getsockopt = homa_getsockopt, + .sendmsg = homa_sendmsg, + .recvmsg = homa_recvmsg, + .hash = homa_hash, + .unhash = homa_unhash, + .obj_size = sizeof(struct homa_sock), + .no_autobind = 1, +}; + +static struct proto homav6_prot = { + .name = "HOMAv6", + .owner = THIS_MODULE, + .close = homa_close, + .connect = ip6_datagram_connect, + .init = homa_socket, + .destroy = homa_sock_destroy, + .setsockopt = homa_setsockopt, + .getsockopt = homa_getsockopt, + .sendmsg = homa_sendmsg, + .recvmsg = homa_recvmsg, + .hash = homa_hash, + .unhash = homa_unhash, + .obj_size = sizeof(struct homa_v6_sock), + .ipv6_pinfo_offset = offsetof(struct homa_v6_sock, inet6), + .no_autobind = 1, +}; + +/* Top-level structure describing the Homa protocol. */ +static struct inet_protosw homa_protosw = { + .type = SOCK_DGRAM, + .protocol = IPPROTO_HOMA, + .prot = &homa_prot, + .ops = &homa_proto_ops, + .flags = INET_PROTOSW_REUSE, +}; + +static struct inet_protosw homav6_protosw = { + .type = SOCK_DGRAM, + .protocol = IPPROTO_HOMA, + .prot = &homav6_prot, + .ops = &homav6_proto_ops, + .flags = INET_PROTOSW_REUSE, +}; + +/* This structure is used by IP to deliver incoming Homa packets to us. */ +static struct net_protocol homa_protocol = { + .handler = homa_softirq, + .err_handler = homa_err_handler_v4, + .no_policy = 1, +}; + +static struct inet6_protocol homav6_protocol = { + .handler = homa_softirq, + .err_handler = homa_err_handler_v6, + .flags = INET6_PROTO_NOPOLICY | INET6_PROTO_FINAL, +}; + +/* Sizes of the headers for each Homa packet type, in bytes. */ +static u16 header_lengths[] = { + sizeof(struct homa_data_hdr), + 0, + sizeof(struct homa_resend_hdr), + sizeof(struct homa_rpc_unknown_hdr), + sizeof(struct homa_busy_hdr), + 0, + 0, + sizeof(struct homa_need_ack_hdr), + sizeof(struct homa_ack_hdr) +}; + +/* Thread that runs timer code to detect lost packets and crashed peers. */ +static struct task_struct *timer_kthread; +static DECLARE_COMPLETION(timer_thread_done); + +/* Used to wakeup timer_kthread at regular intervals. */ +static struct hrtimer hrtimer; + +/* Nonzero is an indication to the timer thread that it should exit. */ +static int timer_thread_exit; + +/** + * homa_load() - invoked when this module is loaded into the Linux kernel + * Return: 0 on success, otherwise a negative errno. + */ +int __init homa_load(void) +{ + struct homa *homa = &homa_data; + bool init_protocol6 = false; + bool init_protosw6 = false; + bool init_protocol = false; + bool init_protosw = false; + bool init_net_ops = false; + bool init_proto6 = false; + bool init_proto = false; + bool init_homa = false; + int status; + + /* Compile-time validations that no packet header is longer + * than HOMA_MAX_HEADER. + */ + BUILD_BUG_ON(sizeof(struct homa_data_hdr) > HOMA_MAX_HEADER); + BUILD_BUG_ON(sizeof(struct homa_resend_hdr) > HOMA_MAX_HEADER); + BUILD_BUG_ON(sizeof(struct homa_rpc_unknown_hdr) > HOMA_MAX_HEADER); + BUILD_BUG_ON(sizeof(struct homa_busy_hdr) > HOMA_MAX_HEADER); + BUILD_BUG_ON(sizeof(struct homa_need_ack_hdr) > HOMA_MAX_HEADER); + BUILD_BUG_ON(sizeof(struct homa_ack_hdr) > HOMA_MAX_HEADER); + + /* Extra constraints on data packets: + * - Ensure minimum header length so Homa doesn't have to worry about + * padding data packets. + * - Make sure data packet headers are a multiple of 4 bytes (needed + * for TCP/TSO compatibility). + */ + BUILD_BUG_ON(sizeof(struct homa_data_hdr) < HOMA_MIN_PKT_LENGTH); + BUILD_BUG_ON((sizeof(struct homa_data_hdr) - + sizeof(struct homa_seg_hdr)) & 0x3); + + /* Detect size changes in uAPI structs. */ + BUILD_BUG_ON(sizeof(struct homa_sendmsg_args) != 24); + BUILD_BUG_ON(sizeof(struct homa_recvmsg_args) != 88); + + status = homa_init(homa); + if (status) + goto error; + init_homa = true; + + status = proto_register(&homa_prot, 1); + if (status != 0) { + pr_err("proto_register failed for homa_prot: %d\n", status); + goto error; + } + init_proto = true; + + status = proto_register(&homav6_prot, 1); + if (status != 0) { + pr_err("proto_register failed for homav6_prot: %d\n", status); + goto error; + } + init_proto6 = true; + + inet_register_protosw(&homa_protosw); + init_protosw = true; + + status = inet6_register_protosw(&homav6_protosw); + if (status != 0) { + pr_err("inet6_register_protosw failed in %s: %d\n", __func__, + status); + goto error; + } + init_protosw6 = true; + + status = inet_add_protocol(&homa_protocol, IPPROTO_HOMA); + if (status != 0) { + pr_err("inet_add_protocol failed in %s: %d\n", __func__, + status); + goto error; + } + init_protocol = true; + + status = inet6_add_protocol(&homav6_protocol, IPPROTO_HOMA); + if (status != 0) { + pr_err("inet6_add_protocol failed in %s: %d\n", __func__, + status); + goto error; + } + init_protocol6 = true; + + status = register_pernet_subsys(&homa_net_ops); + if (status != 0) { + pr_err("Homa got error from register_pernet_subsys: %d\n", + status); + goto error; + } + init_net_ops = true; + + timer_kthread = kthread_run(homa_timer_main, homa, "homa_timer"); + if (IS_ERR(timer_kthread)) { + status = PTR_ERR(timer_kthread); + pr_err("couldn't create Homa timer thread: error %d\n", + status); + timer_kthread = NULL; + goto error; + } + + return 0; + +error: + if (timer_kthread) { + timer_thread_exit = 1; + wake_up_process(timer_kthread); + wait_for_completion(&timer_thread_done); + } + if (init_net_ops) + unregister_pernet_subsys(&homa_net_ops); + if (init_homa) + homa_destroy(homa); + if (init_protocol) + inet_del_protocol(&homa_protocol, IPPROTO_HOMA); + if (init_protocol6) + inet6_del_protocol(&homav6_protocol, IPPROTO_HOMA); + if (init_protosw) + inet_unregister_protosw(&homa_protosw); + if (init_protosw6) + inet6_unregister_protosw(&homav6_protosw); + if (init_proto) + proto_unregister(&homa_prot); + if (init_proto6) + proto_unregister(&homav6_prot); + return status; +} + +/** + * homa_unload() - invoked when this module is unloaded from the Linux kernel. + */ +void __exit homa_unload(void) +{ + struct homa *homa = &homa_data; + + pr_notice("Homa module unloading\n"); + + unregister_pernet_subsys(&homa_net_ops); + inet_del_protocol(&homa_protocol, IPPROTO_HOMA); + inet_unregister_protosw(&homa_protosw); + inet6_del_protocol(&homav6_protocol, IPPROTO_HOMA); + inet6_unregister_protosw(&homav6_protosw); + proto_unregister(&homa_prot); + proto_unregister(&homav6_prot); + homa_destroy(homa); +} + +module_init(homa_load); +module_exit(homa_unload); + +/** + * homa_net_start() - Initialize Homa for a new network namespace. + * @net: The net that Homa will be associated with. + * Return: 0 on success, otherwise a negative errno. + */ +int homa_net_start(struct net *net) +{ + pr_notice("Homa attaching to net namespace\n"); + return homa_net_init(homa_net(net), net, &homa_data); +} + +/** + * homa_net_exit() - Perform Homa cleanup needed when a network namespace + * is destroyed. + * @net: The net from which Homa should be removed. + */ +void homa_net_exit(struct net *net) +{ + pr_notice("Homa detaching from net namespace\n"); + homa_net_destroy(homa_net(net)); +} + +/** + * homa_bind() - Implements the bind system call for Homa sockets: associates + * a well-known service port with a socket. Unlike other AF_INET6 protocols, + * there is no need to invoke this system call for sockets that are only + * used as clients. + * @sock: Socket on which the system call was invoked. + * @addr: Contains the desired port number. + * @addr_len: Number of bytes in uaddr. + * Return: 0 on success, otherwise a negative errno. Sets hsk->error_msg + * on errors. + */ +int homa_bind(struct socket *sock, struct sockaddr *addr, int addr_len) +{ + union sockaddr_in_union *addr_in = (union sockaddr_in_union *)addr; + struct homa_sock *hsk = homa_sk(sock->sk); + int port = 0; + + if (unlikely(addr->sa_family != sock->sk->sk_family)) { + hsk->error_msg = "address family in bind address didn't match socket"; + return -EAFNOSUPPORT; + } + if (addr_in->in6.sin6_family == AF_INET6) { + if (addr_len < sizeof(struct sockaddr_in6)) { + hsk->error_msg = "ipv6 address too short"; + return -EINVAL; + } + port = ntohs(addr_in->in4.sin_port); + } else if (addr_in->in4.sin_family == AF_INET) { + if (addr_len < sizeof(struct sockaddr_in)) { + hsk->error_msg = "ipv4 address too short"; + return -EINVAL; + } + port = ntohs(addr_in->in6.sin6_port); + } + return homa_sock_bind(hsk->hnet, hsk, port); +} + +/** + * homa_close() - Invoked when close system call is invoked on a Homa socket. + * @sk: Socket being closed + * @timeout: ?? + */ +void homa_close(struct sock *sk, long timeout) +{ + struct homa_sock *hsk = homa_sk(sk); + + homa_sock_shutdown(hsk); + sk_common_release(sk); +} + +/** + * homa_shutdown() - Implements the shutdown system call for Homa sockets. + * @sock: Socket to shut down. + * @how: Ignored: for other sockets, can independently shut down + * sending and receiving, but for Homa any shutdown will + * shut down everything. + * + * Return: 0 on success, otherwise a negative errno. + */ +int homa_shutdown(struct socket *sock, int how) +{ + homa_sock_shutdown(homa_sk(sock->sk)); + return 0; +} + +/** + * homa_ioc_info() - The top-level function that implements the + * HOMAIOCINFO ioctl for Homa sockets. + * @sock: Socket for this request + * @arg: The address in user space of the argument to ioctl, which + * is a homa_info struct. + * + * Return: 0 on success, otherwise a negative errno. Sets hsk->error_msg + * on errors. + */ +int homa_ioc_info(struct socket *sock, unsigned long arg) +{ + struct homa_sock *hsk = homa_sk(sock->sk); + struct homa_rpc_info rinfo; + struct homa_info hinfo; + struct homa_rpc *rpc; + int bytes_avl; + char *dst; + + if (unlikely(copy_from_user(&hinfo, (void __user *)arg, + sizeof(hinfo)))) { + hsk->error_msg = "invalid address for homa_info"; + return -EFAULT; + } + + if (!homa_protect_rpcs(hsk)) { + hsk->error_msg = "socket has been shut down"; + return -ESHUTDOWN; + } + hinfo.bpool_avail_bytes = homa_pool_avail_bytes(hsk->buffer_pool); + hinfo.port = hsk->port; + dst = (char *)hinfo.rpc_info; + bytes_avl = hinfo.rpc_info_length; + hinfo.num_rpcs = 0; + list_for_each_entry_rcu(rpc, &hsk->active_rpcs, active_links) { + homa_rpc_lock(rpc); + if (rpc->state == RPC_DEAD) { + homa_rpc_unlock(rpc); + continue; + } + homa_rpc_get_info(rpc, &rinfo); + homa_rpc_unlock(rpc); + if (dst && bytes_avl >= sizeof(rinfo)) { + if (copy_to_user((void __user *)dst, &rinfo, + sizeof(rinfo))) { + homa_unprotect_rpcs(hsk); + hsk->error_msg = "couldn't copy homa_rpc_info to user space: invalid or read-only address?"; + return -EFAULT; + } + dst += sizeof(rinfo); + bytes_avl -= sizeof(rinfo); + } + hinfo.num_rpcs++; + } + homa_unprotect_rpcs(hsk); + + if (hsk->error_msg) + snprintf(hinfo.error_msg, HOMA_ERROR_MSG_SIZE, "%s", + hsk->error_msg); + else + hinfo.error_msg[0] = 0; + + if (copy_to_user((void __user *)arg, &hinfo, sizeof(hinfo))) { + hsk->error_msg = "couldn't copy homa_info to user space: read-only address?"; + return -EFAULT; + } + return 0; +} + +/** + * homa_ioctl() - Implements the ioctl system call for Homa sockets. + * @sock: Socket on which the system call was invoked. + * @cmd: Identifier for a particular ioctl operation. + * @arg: Operation-specific argument; typically the address of a block + * of data in user address space. + * + * Return: 0 on success, otherwise a negative errno. Sets hsk->error_msg + * on errors. + */ +int homa_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg) +{ + if (cmd == HOMAIOCINFO) + return homa_ioc_info(sock, arg); + homa_sk(sock->sk)->error_msg = "ioctl opcode isn't supported by Homa"; + return -EINVAL; +} + +/** + * homa_socket() - Implements the socket(2) system call for sockets. + * @sk: Socket on which the system call was invoked. The non-Homa + * parts have already been initialized. + * + * Return: always 0 (success). + */ +int homa_socket(struct sock *sk) +{ + struct homa_sock *hsk = homa_sk(sk); + int result; + + result = homa_sock_init(hsk); + if (result != 0) { + homa_sock_shutdown(hsk); + homa_sock_destroy(&hsk->sock); + } + return result; +} + +/** + * homa_setsockopt() - Implements the getsockopt system call for Homa sockets. + * @sk: Socket on which the system call was invoked. + * @level: Level at which the operation should be handled; will always + * be IPPROTO_HOMA. + * @optname: Identifies a particular setsockopt operation. + * @optval: Address in user space of information about the option. + * @optlen: Number of bytes of data at @optval. + * Return: 0 on success, otherwise a negative errno. Sets hsk->error_msg + * on errors. + */ +int homa_setsockopt(struct sock *sk, int level, int optname, + sockptr_t optval, unsigned int optlen) +{ + struct homa_sock *hsk = homa_sk(sk); + int ret; + + if (level != IPPROTO_HOMA) { + hsk->error_msg = "homa_setsockopt invoked with level not IPPROTO_HOMA"; + return -ENOPROTOOPT; + } + + if (optname == SO_HOMA_RCVBUF) { + struct homa_rcvbuf_args args; + + if (optlen != sizeof(struct homa_rcvbuf_args)) { + hsk->error_msg = "invalid optlen argument: must be sizeof(struct homa_rcvbuf_args)"; + return -EINVAL; + } + + if (copy_from_sockptr(&args, optval, optlen)) { + hsk->error_msg = "invalid address for homa_rcvbuf_args"; + return -EFAULT; + } + + /* Do a trivial test to make sure we can at least write the + * first page of the region. + */ + if (copy_to_user(u64_to_user_ptr(args.start), &args, + sizeof(args))) { + hsk->error_msg = "receive buffer region is not writable"; + return -EFAULT; + } + + ret = homa_pool_set_region(hsk, u64_to_user_ptr(args.start), + args.length); + } else if (optname == SO_HOMA_SERVER) { + int arg; + + if (optlen != sizeof(arg)) { + hsk->error_msg = "invalid optlen argument: must be sizeof(int)"; + return -EINVAL; + } + + if (copy_from_sockptr(&arg, optval, optlen)) { + hsk->error_msg = "invalid address for SO_HOMA_SERVER value"; + return -EFAULT; + } + + if (arg) + hsk->is_server = true; + else + hsk->is_server = false; + ret = 0; + } else { + hsk->error_msg = "setsockopt option not supported by Homa"; + ret = -ENOPROTOOPT; + } + return ret; +} + +/** + * homa_getsockopt() - Implements the getsockopt system call for Homa sockets. + * @sk: Socket on which the system call was invoked. + * @level: Selects level in the network stack to handle the request; + * must be IPPROTO_HOMA. + * @optname: Identifies a particular setsockopt operation. + * @optval: Address in user space where the option's value should be stored. + * @optlen: Number of bytes available at optval; will be overwritten with + * actual number of bytes stored. + * Return: 0 on success, otherwise a negative errno. Sets hsk->error_msg + * on errors. + */ +int homa_getsockopt(struct sock *sk, int level, int optname, + char __user *optval, int __user *optlen) +{ + struct homa_sock *hsk = homa_sk(sk); + struct homa_rcvbuf_args rcvbuf_args; + int is_server; + void *result; + int len; + + if (copy_from_sockptr(&len, USER_SOCKPTR(optlen), sizeof(int))) { + hsk->error_msg = "invalid address for optlen argument to getsockopt"; + return -EFAULT; + } + + if (level != IPPROTO_HOMA) { + hsk->error_msg = "homa_setsockopt invoked with level not IPPROTO_HOMA"; + return -ENOPROTOOPT; + } + if (optname == SO_HOMA_RCVBUF) { + if (len < sizeof(rcvbuf_args)) { + hsk->error_msg = "invalid optlen argument: must be sizeof(struct homa_rcvbuf_args)"; + return -EINVAL; + } + + homa_sock_lock(hsk); + homa_pool_get_rcvbuf(hsk->buffer_pool, &rcvbuf_args); + homa_sock_unlock(hsk); + len = sizeof(rcvbuf_args); + result = &rcvbuf_args; + } else if (optname == SO_HOMA_SERVER) { + if (len < sizeof(is_server)) { + hsk->error_msg = "invalid optlen argument: must be sizeof(int)"; + return -EINVAL; + } + + is_server = hsk->is_server; + len = sizeof(is_server); + result = &is_server; + } else { + hsk->error_msg = "getsockopt option not supported by Homa"; + return -ENOPROTOOPT; + } + + if (copy_to_sockptr(USER_SOCKPTR(optlen), &len, sizeof(int))) { + hsk->error_msg = "couldn't update optlen argument to getsockopt: read-only?"; + return -EFAULT; + } + + if (copy_to_sockptr(USER_SOCKPTR(optval), result, len)) { + hsk->error_msg = "couldn't update optval argument to getsockopt: read-only?"; + return -EFAULT; + } + + return 0; +} + +/** + * homa_sendmsg() - Send a request or response message on a Homa socket. + * @sk: Socket on which the system call was invoked. + * @msg: Structure describing the message to send; the msg_control + * field points to additional information. + * @length: Number of bytes of the message. + * Return: 0 on success, otherwise a negative errno. Sets hsk->error_msg + * on errors. + */ +int homa_sendmsg(struct sock *sk, struct msghdr *msg, size_t length) +{ + struct homa_sock *hsk = homa_sk(sk); + struct homa_sendmsg_args args; + union sockaddr_in_union *addr; + struct homa_rpc *rpc = NULL; + int result = 0; + + addr = (union sockaddr_in_union *)msg->msg_name; + if (!addr) { + hsk->error_msg = "no msg_name passed to sendmsg"; + result = -EINVAL; + goto error; + } + + if (unlikely(!msg->msg_control_is_user)) { + hsk->error_msg = "msg_control argument for sendmsg isn't in user space"; + result = -EINVAL; + goto error; + } + if (unlikely(copy_from_user(&args, (void __user *)msg->msg_control, + sizeof(args)))) { + hsk->error_msg = "invalid address for msg_control argument to sendmsg"; + result = -EFAULT; + goto error; + } + if (args.flags & ~HOMA_SENDMSG_VALID_FLAGS || + args.reserved != 0) { + hsk->error_msg = "reserved fields in homa_sendmsg_args must be zero"; + result = -EINVAL; + goto error; + } + + if (!homa_sock_wmem_avl(hsk)) { + result = homa_sock_wait_wmem(hsk, + msg->msg_flags & MSG_DONTWAIT); + if (result != 0) + goto error; + } + + if (addr->sa.sa_family != sk->sk_family) { + hsk->error_msg = "address family in sendmsg address must match the socket"; + result = -EAFNOSUPPORT; + goto error; + } + if (msg->msg_namelen < sizeof(struct sockaddr_in) || + (msg->msg_namelen < sizeof(struct sockaddr_in6) && + addr->in6.sin6_family == AF_INET6)) { + hsk->error_msg = "msg_namelen too short"; + result = -EINVAL; + goto error; + } + + if (!args.id) { + /* This is a request message. */ + rpc = homa_rpc_alloc_client(hsk, addr); + if (IS_ERR(rpc)) { + result = PTR_ERR(rpc); + rpc = NULL; + goto error; + } + homa_rpc_hold(rpc); + if (args.flags & HOMA_SENDMSG_PRIVATE) + set_bit(RPC_PRIVATE, &rpc->flags); + rpc->completion_cookie = args.completion_cookie; + result = homa_message_out_fill(rpc, &msg->msg_iter, 1); + if (result) + goto error; + args.id = rpc->id; + homa_rpc_unlock(rpc); /* Locked by homa_rpc_alloc_client. */ + + if (unlikely(copy_to_user((void __user *)msg->msg_control, + &args, sizeof(args)))) { + homa_rpc_lock(rpc); + hsk->error_msg = "couldn't update homa_sendmsg_args argument to sendmsg: read-only?"; + result = -EFAULT; + goto error; + } + homa_rpc_put(rpc); + } else { + /* This is a response message. */ + struct in6_addr canonical_dest; + + if (args.completion_cookie != 0) { + hsk->error_msg = "completion_cookie must be zero when sending responses"; + result = -EINVAL; + goto error; + } + canonical_dest = canonical_ipv6_addr(addr); + + rpc = homa_rpc_find_server(hsk, &canonical_dest, args.id); + if (!rpc) { + /* Return without an error if the RPC doesn't exist; + * this could be totally valid (e.g. client is + * no longer interested in it). + */ + return 0; + } + homa_rpc_hold(rpc); + if (rpc->error) { + hsk->error_msg = "RPC has failed, so can't send response"; + result = rpc->error; + goto error; + } + if (rpc->state != RPC_IN_SERVICE) { + hsk->error_msg = "RPC is not in a state where a response can be sent"; + result = -EINVAL; + goto error_dont_end_rpc; + } + rpc->state = RPC_OUTGOING; + + result = homa_message_out_fill(rpc, &msg->msg_iter, 1); + if (result && rpc->state != RPC_DEAD) + goto error; + homa_rpc_put(rpc); + homa_rpc_unlock(rpc); /* Locked by homa_rpc_find_server. */ + } + return 0; + +error: + if (rpc) + homa_rpc_end(rpc); + +error_dont_end_rpc: + if (rpc) { + homa_rpc_put(rpc); + + /* Locked by homa_rpc_find_server or homa_rpc_alloc_client. */ + homa_rpc_unlock(rpc); + } + return result; +} + +/** + * homa_recvmsg() - Receive a message from a Homa socket. + * @sk: Socket on which the system call was invoked. + * @msg: Controlling information for the receive. + * @len: Total bytes of space available in msg->msg_iov; not used. + * @flags: Flags from system call; only MSG_DONTWAIT is used. + * @addr_len: Store the length of the sender address here + * Return: The length of the message on success, otherwise a negative + * errno. Sets hsk->error_msg on errors. + */ +int homa_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int flags, + int *addr_len) +{ + struct homa_sock *hsk = homa_sk(sk); + struct homa_recvmsg_args control; + struct homa_rpc *rpc = NULL; + int nonblocking; + int result; + + if (unlikely(!msg->msg_control)) { + /* This test isn't strictly necessary, but it provides a + * hook for testing kernel call times. + */ + hsk->error_msg = "no msg_control passed to recvmsg"; + return -EINVAL; + } + if (msg->msg_controllen != sizeof(control)) { + hsk->error_msg = "invalid msg_controllen in recvmsg"; + return -EINVAL; + } + if (unlikely(copy_from_user(&control, (void __user *)msg->msg_control, + sizeof(control)))) { + hsk->error_msg = "invalid address for msg_control argument to recvmsg"; + return -EFAULT; + } + control.completion_cookie = 0; + + if (control.num_bpages > HOMA_MAX_BPAGES) { + hsk->error_msg = "num_pages exceeds HOMA_MAX_BPAGES"; + result = -EINVAL; + goto done; + } + if (control.reserved != 0) { + hsk->error_msg = "reserved fields in homa_recvmsg_args must be zero"; + result = -EINVAL; + goto done; + } + if (!hsk->buffer_pool) { + hsk->error_msg = "SO_HOMA_RECVBUF socket option has not been set"; + result = -EINVAL; + goto done; + } + result = homa_pool_release_buffers(hsk->buffer_pool, control.num_bpages, + control.bpage_offsets); + control.num_bpages = 0; + if (result != 0) { + hsk->error_msg = "error while releasing buffer pages"; + goto done; + } + + nonblocking = flags & MSG_DONTWAIT; + if (control.id != 0) { + rpc = homa_rpc_find_client(hsk, control.id); /* Locks RPC. */ + if (!rpc) { + hsk->error_msg = "invalid RPC id passed to recvmsg"; + result = -EINVAL; + goto done; + } + homa_rpc_hold(rpc); + result = homa_wait_private(rpc, nonblocking); + if (result != 0) { + hsk->error_msg = "error while waiting for private RPC to complete"; + control.id = 0; + goto done; + } + } else { + rpc = homa_wait_shared(hsk, nonblocking); + if (IS_ERR(rpc)) { + /* If we get here, it means there was an error that + * prevented us from finding an RPC to return. Errors + * in the RPC itself are handled below. + */ + hsk->error_msg = "error while waiting for shared RPC to complete"; + result = PTR_ERR(rpc); + rpc = NULL; + goto done; + } + } + if (rpc->error) { + hsk->error_msg = "RPC failed"; + result = rpc->error; + } else { + result = rpc->msgin.length; + } + result = rpc->error ? rpc->error : rpc->msgin.length; + + /* Collect result information. */ + control.id = rpc->id; + control.completion_cookie = rpc->completion_cookie; + if (likely(rpc->msgin.length >= 0)) { + control.num_bpages = rpc->msgin.num_bpages; + memcpy(control.bpage_offsets, rpc->msgin.bpage_offsets, + sizeof(rpc->msgin.bpage_offsets)); + } + if (sk->sk_family == AF_INET6) { + struct sockaddr_in6 *in6 = msg->msg_name; + + in6->sin6_family = AF_INET6; + in6->sin6_port = htons(rpc->dport); + in6->sin6_addr = rpc->peer->addr; + *addr_len = sizeof(*in6); + } else { + struct sockaddr_in *in4 = msg->msg_name; + + in4->sin_family = AF_INET; + in4->sin_port = htons(rpc->dport); + in4->sin_addr.s_addr = ipv6_to_ipv4(rpc->peer->addr); + *addr_len = sizeof(*in4); + } + + /* This indicates that the application now owns the buffers, so + * we won't free them in homa_rpc_end. + */ + rpc->msgin.num_bpages = 0; + + if (homa_is_client(rpc->id)) { + homa_peer_add_ack(rpc); + homa_rpc_end(rpc); + } else { + if (result < 0) + homa_rpc_end(rpc); + else + rpc->state = RPC_IN_SERVICE; + } + +done: + /* Note: must release the RPC lock before calling homa_rpc_reap + * or copying results to user space. + */ + if (rpc) { + homa_rpc_put(rpc); + + /* Locked by homa_rpc_find_client or homa_wait_shared. */ + homa_rpc_unlock(rpc); + } + + if (test_bit(SOCK_NOSPACE, &hsk->sock.sk_socket->flags)) { + /* There are tasks waiting for tx memory, so reap + * immediately. + */ + homa_rpc_reap(hsk, true); + } + + if (unlikely(copy_to_user((__force void __user *)msg->msg_control, + &control, sizeof(control)))) { + hsk->error_msg = "couldn't update homa_recvmsg_args argument to recvmsg: read-only?"; + result = -EFAULT; + } + + return result; +} + +/** + * homa_hash() - Not needed for Homa. + * @sk: Socket for the operation + * Return: ?? + */ +int homa_hash(struct sock *sk) +{ + return 0; +} + +/** + * homa_unhash() - Not needed for Homa. + * @sk: Socket for the operation + */ +void homa_unhash(struct sock *sk) +{ +} + +/** + * homa_softirq() - This function is invoked at SoftIRQ level to handle + * incoming packets. + * @skb: The incoming packet. + * Return: Always 0 + */ +int homa_softirq(struct sk_buff *skb) +{ + struct sk_buff *packets, *other_pkts, *next; + struct sk_buff **prev_link, **other_link; + enum skb_drop_reason reason; + struct homa_common_hdr *h; + int header_offset; + + /* skb may actually contain many distinct packets, linked through + * skb_shinfo(skb)->frag_list by the Homa GRO mechanism. Make a + * pass through the list to process all of the short packets, + * leaving the longer packets in the list. Also, perform various + * prep/cleanup/error checking functions. + */ + skb->next = skb_shinfo(skb)->frag_list; + skb_shinfo(skb)->frag_list = NULL; + packets = skb; + prev_link = &packets; + for (skb = packets; skb; skb = next) { + next = skb->next; + + /* Make the header available at skb->data, even if the packet + * is fragmented. One complication: it's possible that the IP + * header hasn't yet been removed (this happens for GRO packets + * on the frag_list, since they aren't handled explicitly by IP. + */ + if (!homa_make_header_avl(skb)) { + reason = SKB_DROP_REASON_HDR_TRUNC; + goto discard; + } + header_offset = skb_transport_header(skb) - skb->data; + if (header_offset) + __skb_pull(skb, header_offset); + + /* Reject packets that are too short or have bogus types. */ + h = (struct homa_common_hdr *)skb->data; + if (unlikely(skb->len < sizeof(struct homa_common_hdr) || + h->type < DATA || h->type > MAX_OP || + skb->len < header_lengths[h->type - DATA])) { + reason = SKB_DROP_REASON_PKT_TOO_SMALL; + goto discard; + } + + /* Process the packet now if it is a control packet or + * if it contains an entire short message. + */ + if (h->type != DATA || ntohl(((struct homa_data_hdr *)h) + ->message_length) < 1400) { + *prev_link = skb->next; + skb->next = NULL; + homa_dispatch_pkts(skb); + } else { + prev_link = &skb->next; + } + continue; + +discard: + *prev_link = skb->next; + kfree_skb_reason(skb, reason); + } + + /* Now process the longer packets. Each iteration of this loop + * collects all of the packets for a particular RPC and dispatches + * them (batching the packets for an RPC allows more efficient + * generation of grants). + */ + while (packets) { + struct in6_addr saddr, saddr2; + struct homa_common_hdr *h2; + struct sk_buff *skb2; + + skb = packets; + prev_link = &skb->next; + saddr = skb_canonical_ipv6_saddr(skb); + other_pkts = NULL; + other_link = &other_pkts; + h = (struct homa_common_hdr *)skb->data; + for (skb2 = skb->next; skb2; skb2 = next) { + next = skb2->next; + h2 = (struct homa_common_hdr *)skb2->data; + if (h2->sender_id == h->sender_id) { + saddr2 = skb_canonical_ipv6_saddr(skb2); + if (ipv6_addr_equal(&saddr, &saddr2)) { + *prev_link = skb2; + prev_link = &skb2->next; + continue; + } + } + *other_link = skb2; + other_link = &skb2->next; + } + *prev_link = NULL; + *other_link = NULL; + homa_dispatch_pkts(packets); + packets = other_pkts; + } + + return 0; +} + +/** + * homa_err_handler_v4() - Invoked by IP to handle an incoming error + * packet, such as ICMP UNREACHABLE. + * @skb: The incoming packet; skb->data points to the byte just after + * the ICMP header (the first byte of the embedded packet IP header). + * @skb: The incoming packet. + * @info: Information about the error that occurred? + * + * Return: zero, or a negative errno if the error couldn't be handled here. + */ +int homa_err_handler_v4(struct sk_buff *skb, u32 info) +{ + struct homa *homa = homa_net(dev_net(skb->dev))->homa; + const struct icmphdr *icmp = icmp_hdr(skb); + struct in6_addr daddr; + int type = icmp->type; + int code = icmp->code; + struct iphdr *iph; + int error = 0; + int port = 0; + + iph = (struct iphdr *)(skb->data); + ipv6_addr_set_v4mapped(iph->daddr, &daddr); + if (type == ICMP_DEST_UNREACH && code == ICMP_PORT_UNREACH) { + struct homa_common_hdr *h = (struct homa_common_hdr *)(skb->data + + iph->ihl * 4); + + port = ntohs(h->dport); + error = -ENOTCONN; + } else if (type == ICMP_DEST_UNREACH) { + if (code == ICMP_PROT_UNREACH) + error = -EPROTONOSUPPORT; + else + error = -EHOSTUNREACH; + } else { + pr_notice("%s invoked with info %x, ICMP type %d, ICMP code %d\n", + __func__, info, type, code); + } + if (error != 0) + homa_abort_rpcs(homa, &daddr, port, error); + return 0; +} + +/** + * homa_err_handler_v6() - Invoked by IP to handle an incoming error + * packet, such as ICMP UNREACHABLE. + * @skb: The incoming packet; skb->data points to the byte just after + * the ICMP header (the first byte of the embedded packet IP header). + * @opt: Not used. + * @type: Type of ICMP packet. + * @code: Additional information about the error. + * @offset: Not used. + * @info: Information about the error that occurred? + * + * Return: zero, or a negative errno if the error couldn't be handled here. + */ +int homa_err_handler_v6(struct sk_buff *skb, struct inet6_skb_parm *opt, + u8 type, u8 code, int offset, __be32 info) +{ + const struct ipv6hdr *iph = (const struct ipv6hdr *)skb->data; + struct homa *homa = homa_net(dev_net(skb->dev))->homa; + int error = 0; + int port = 0; + + if (type == ICMPV6_DEST_UNREACH && code == ICMPV6_PORT_UNREACH) { + const struct homa_common_hdr *h; + + h = (struct homa_common_hdr *)(skb->data + sizeof(*iph)); + port = ntohs(h->dport); + error = -ENOTCONN; + } else if (type == ICMPV6_DEST_UNREACH && code == ICMPV6_ADDR_UNREACH) { + error = -EHOSTUNREACH; + } else if (type == ICMPV6_PARAMPROB && code == ICMPV6_UNK_NEXTHDR) { + error = -EPROTONOSUPPORT; + } + if (error != 0) + homa_abort_rpcs(homa, &iph->daddr, port, error); + return 0; +} + +/** + * homa_poll() - Invoked by Linux as part of implementing select, poll, + * epoll, etc. + * @file: Open file that is participating in a poll, select, etc. + * @sock: A Homa socket, associated with @file. + * @wait: This table will be registered with the socket, so that it + * is notified when the socket's ready state changes. + * + * Return: A mask of bits such as EPOLLIN, which indicate the current + * state of the socket. + */ +__poll_t homa_poll(struct file *file, struct socket *sock, + struct poll_table_struct *wait) +{ + struct homa_sock *hsk = homa_sk(sock->sk); + __poll_t mask; + + mask = 0; + sock_poll_wait(file, sock, wait); + if (homa_sock_wmem_avl(hsk)) + mask |= EPOLLOUT | EPOLLWRNORM; + else + set_bit(SOCK_NOSPACE, &hsk->sock.sk_socket->flags); + + if (hsk->shutdown) + mask |= EPOLLIN; + + if (!list_empty(&hsk->ready_rpcs)) + mask |= EPOLLIN | EPOLLRDNORM; + return mask; +} + +/** + * homa_hrtimer() - This function is invoked by the hrtimer mechanism to + * wake up the timer thread. Runs at IRQ level. + * @timer: The timer that triggered; not used. + * + * Return: Always HRTIMER_RESTART. + */ +enum hrtimer_restart homa_hrtimer(struct hrtimer *timer) +{ + wake_up_process(timer_kthread); + return HRTIMER_NORESTART; +} + +/** + * homa_timer_main() - Top-level function for the timer thread. + * @transport: Pointer to struct homa. + * + * Return: Always 0. + */ +int homa_timer_main(void *transport) +{ + struct homa *homa = (struct homa *)transport; + ktime_t tick_interval; + u64 nsec; + + hrtimer_setup(&hrtimer, homa_hrtimer, CLOCK_MONOTONIC, + HRTIMER_MODE_REL); + nsec = 1000000; /* 1 ms */ + tick_interval = ns_to_ktime(nsec); + while (1) { + set_current_state(TASK_UNINTERRUPTIBLE); + if (!timer_thread_exit) { + hrtimer_start(&hrtimer, tick_interval, + HRTIMER_MODE_REL); + schedule(); + } + __set_current_state(TASK_RUNNING); + if (timer_thread_exit) + break; + homa_timer(homa); + } + hrtimer_cancel(&hrtimer); + kthread_complete_and_exit(&timer_thread_done, 0); + return 0; +} + +MODULE_LICENSE("Dual BSD/GPL"); +MODULE_AUTHOR("John Ousterhout "); +MODULE_DESCRIPTION("Homa transport protocol"); +MODULE_VERSION("1.0"); + +/* Arrange for this module to be loaded automatically when a Homa socket is + * opened. Apparently symbols don't work in the macros below, so must use + * numeric values for IPPROTO_HOMA (146) and SOCK_DGRAM(2). + */ +MODULE_ALIAS_NET_PF_PROTO_TYPE(PF_INET, 146, 2); +MODULE_ALIAS_NET_PF_PROTO_TYPE(PF_INET6, 146, 2); -- 2.43.0 Before this commit the Homa code is "inert": it won't be compiled in kernel builds. This commit adds Homa's Makefile and Kconfig, and also links Homa into net/Makefile and net/Kconfig, so that Homa will be built during kernel builds if enabled (it is disabled by default). Signed-off-by: John Ousterhout --- net/homa/Kconfig | 21 +++++++++++++++++++++ net/homa/Makefile | 11 +++++++++++ 2 files changed, 32 insertions(+) create mode 100644 net/homa/Kconfig create mode 100644 net/homa/Makefile diff --git a/net/homa/Kconfig b/net/homa/Kconfig new file mode 100644 index 000000000000..16fec3fd52ba --- /dev/null +++ b/net/homa/Kconfig @@ -0,0 +1,21 @@ +# SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+ +# +# Homa transport protocol +# + +menuconfig HOMA + tristate "The Homa transport protocol" + depends on INET + depends on IPV6 + + help + Homa is a network transport protocol for communication within + a datacenter. It provides significantly lower latency than TCP, + particularly for workloads containing a mixture of large and small + messages operating at high network utilization. At present, Homa + has been only partially upstreamed; this version provides bare-bones + functionality but is not performant. For more information see the + homa(7) man page or checkout the Homa Wiki at + https://homa-transport.atlassian.net/wiki/spaces/HOMA/overview. + + If unsure, say N. diff --git a/net/homa/Makefile b/net/homa/Makefile new file mode 100644 index 000000000000..57f051d44c6b --- /dev/null +++ b/net/homa/Makefile @@ -0,0 +1,11 @@ +obj-$(CONFIG_HOMA) := homa.o +homa-y:= homa_incoming.o \ + homa_interest.o \ + homa_outgoing.o \ + homa_peer.o \ + homa_plumbing.o \ + homa_pool.o \ + homa_rpc.o \ + homa_sock.o \ + homa_timer.o \ + homa_utils.o -- 2.43.0