Currently, newly allocated nodes in xas_split_alloc are initialized immediately upon allocation. This occurs before the node's specific role, shift, and offset within the splitting hierarchy are determined. Move the call to __xas_init_node_for_split() from xas_split_alloc() to xas_split(). This removes any dependence on "entry" during node allocation, in preparation for extending xa_split* functions to support multi-level splits, where intermediate nodes need not have their slots initialized. Signed-off-by: Ackerley Tng --- After moving the call to __xas_init_node_for_split() from xas_split_alloc() to xas_split(), void *entry is unused and no longer needs to be passed to xas_split_alloc(). I would like to get feedback on moving initialization into xas_split() before making a full refactoring to remove void *entry as a parameter to xas_split_alloc(). lib/xarray.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/xarray.c b/lib/xarray.c index edb877c2d51c..636edcf014f1 100644 --- a/lib/xarray.c +++ b/lib/xarray.c @@ -1060,7 +1060,6 @@ void xas_split_alloc(struct xa_state *xas, void *entry, unsigned int order, if (!node) goto nomem; - __xas_init_node_for_split(xas, node, entry); RCU_INIT_POINTER(node->parent, xas->xa_alloc); xas->xa_alloc = node; } while (sibs-- > 0); @@ -1103,6 +1102,7 @@ void xas_split(struct xa_state *xas, void *entry, unsigned int order) struct xa_node *child = xas->xa_alloc; xas->xa_alloc = rcu_dereference_raw(child->parent); + __xas_init_node_for_split(xas, child, entry); child->shift = node->shift - XA_CHUNK_SHIFT; child->offset = offset; child->count = XA_CHUNK_SIZE; -- 2.52.0.rc1.455.g30608eb744-goog The xas_split_alloc() function was previously limited in its ability to handle splits for large entries, specifically those requiring the XArray's height to increase by more than one level. It contained a WARN_ON for such cases and only allocated nodes for a single level of the tree. Introduce a new helper function, __xas_alloc_nodes(), to centralize the node allocation logic. Update xas_split_alloc() to determine the total number of nodes required across all new levels, then use __xas_alloc_nodes() to allocate them. This change removes the previous limitation and allows xas_split_alloc() to allocate enough nodes to support splitting for arbitrarily large entries. Signed-off-by: Ackerley Tng --- lib/xarray.c | 52 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/lib/xarray.c b/lib/xarray.c index 636edcf014f1..b7c44a75bb03 100644 --- a/lib/xarray.c +++ b/lib/xarray.c @@ -1028,6 +1028,27 @@ static void __xas_init_node_for_split(struct xa_state *xas, } } +static void __xas_alloc_nodes(struct xa_state *xas, unsigned int num_nodes, gfp_t gfp) +{ + struct xa_node *node; + unsigned int i; + + for (i = 0; i < num_nodes; ++i) { + node = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp); + if (!node) + goto nomem; + + RCU_INIT_POINTER(node->parent, xas->xa_alloc); + xas->xa_alloc = node; + } + + return; + +nomem: + xas_destroy(xas); + xas_set_err(xas, -ENOMEM); +} + /** * xas_split_alloc() - Allocate memory for splitting an entry. * @xas: XArray operation state. @@ -1046,28 +1067,27 @@ void xas_split_alloc(struct xa_state *xas, void *entry, unsigned int order, gfp_t gfp) { unsigned int sibs = (1 << (order % XA_CHUNK_SHIFT)) - 1; + unsigned int shift = order - (order % XA_CHUNK_SHIFT); + unsigned int level_nodes; + unsigned int nodes = 0; - /* XXX: no support for splitting really large entries yet */ - if (WARN_ON(xas->xa_shift + 2 * XA_CHUNK_SHIFT <= order)) - goto nomem; - if (xas->xa_shift + XA_CHUNK_SHIFT > order) + if (shift <= xas->xa_shift) return; - do { - struct xa_node *node; + shift -= XA_CHUNK_SHIFT; - node = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp); - if (!node) - goto nomem; + level_nodes = sibs + 1; + for (;;) { + nodes += level_nodes; - RCU_INIT_POINTER(node->parent, xas->xa_alloc); - xas->xa_alloc = node; - } while (sibs-- > 0); + if (shift == xas->xa_shift) + break; - return; -nomem: - xas_destroy(xas); - xas_set_err(xas, -ENOMEM); + nodes *= XA_CHUNK_SIZE; + shift -= XA_CHUNK_SHIFT; + } + + __xas_alloc_nodes(xas, nodes, gfp); } EXPORT_SYMBOL_GPL(xas_split_alloc); -- 2.52.0.rc1.455.g30608eb744-goog The existing xas_split() function primarily supports splitting an entry into multiple siblings within the current node, or creating child nodes for one level below. It does not handle scenarios where the requested split order requires wiring up multiple levels of new intermediate nodes to form a deeper subtree. This limits the xas_split() from splitting arbitrarily large entries. This commit extends xas_split() to build subtrees of XArray nodes and then set these subtrees as children of the node where the split is requested. Signed-off-by: Ackerley Tng --- I had a recursive implementation which I believe was easier to understand, but I went with a iterative implementation using a stack because I was concerned about stack depths in the kernel. Let me know if the recursive implementation is preferred! Feedback is always appreciated, and I'd specifically like feedback on: + RCU-related handling + Handling of xas_update() calls + Use of node_set_marks() - can/should that be refactored to be node-focused, rather than in some cases also updating the child? + Can/should xas_split_alloc() read entry using xas_load(), rather than rely on void *entry passed as an argument? lib/xarray.c | 158 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 121 insertions(+), 37 deletions(-) diff --git a/lib/xarray.c b/lib/xarray.c index b7c44a75bb03..6fdace2e73df 100644 --- a/lib/xarray.c +++ b/lib/xarray.c @@ -1105,52 +1105,136 @@ EXPORT_SYMBOL_GPL(xas_split_alloc); void xas_split(struct xa_state *xas, void *entry, unsigned int order) { unsigned int sibs = (1 << (order % XA_CHUNK_SHIFT)) - 1; - unsigned int offset, marks; + struct xa_node *node_to_split; + unsigned int offset_to_split; + struct xa_node *stack; struct xa_node *node; - void *curr = xas_load(xas); - int values = 0; + unsigned int offset; + unsigned int marks; + unsigned int i; + void *sibling; - node = xas->xa_node; - if (xas_top(node)) + xas_load(xas); + node_to_split = xas->xa_node; + if (xas_top(node_to_split)) return; - marks = node_get_marks(node, xas->xa_offset); + marks = node_get_marks(node_to_split, xas->xa_offset); - offset = xas->xa_offset + sibs; - do { - if (xas->xa_shift < node->shift) { - struct xa_node *child = xas->xa_alloc; - - xas->xa_alloc = rcu_dereference_raw(child->parent); - __xas_init_node_for_split(xas, child, entry); - child->shift = node->shift - XA_CHUNK_SHIFT; - child->offset = offset; - child->count = XA_CHUNK_SIZE; - child->nr_values = xa_is_value(entry) ? - XA_CHUNK_SIZE : 0; - RCU_INIT_POINTER(child->parent, node); - node_set_marks(node, offset, child, xas->xa_sibs, - marks); - rcu_assign_pointer(node->slots[offset], - xa_mk_node(child)); - if (xa_is_value(curr)) - values--; - xas_update(xas, child); + /* Horizontal split: just fill in values in existing node. */ + if (node_to_split->shift == xas->xa_shift) { + offset = xas->xa_offset; + + for (i = offset; i < offset + sibs + 1; i++) { + if ((i & xas->xa_sibs) == 0) { + node_set_marks(node_to_split, i, NULL, 0, marks); + rcu_assign_pointer(node_to_split->slots[i], entry); + + sibling = xa_mk_sibling(i); + } else { + rcu_assign_pointer(node_to_split->slots[i], sibling); + } + } + + xas_update(xas, node_to_split); + return; + } + + /* + * Vertical split: build tree bottom-up, so that on any RCU lookup (on + * the original tree), the tree is consistent. + */ + offset_to_split = get_offset(xas->xa_index, node_to_split); + stack = NULL; + offset = 0; + for (;;) { + unsigned int next_offset; + + if (stack && + stack->shift == node_to_split->shift - XA_CHUNK_SHIFT && + stack->offset == offset_to_split + sibs) + break; + + if (stack && stack->offset == XA_CHUNK_SIZE - 1) { + unsigned int child_shift; + + node = xas->xa_alloc; + xas->xa_alloc = rcu_dereference_raw(node->parent); + + child_shift = stack->shift; + for (i = 0; i < XA_CHUNK_SIZE; i++) { + struct xa_node *child = stack; + + stack = child->parent; + + node_set_marks(node, child->offset, NULL, 0, marks); + + RCU_INIT_POINTER(child->parent, node); + RCU_INIT_POINTER(node->slots[child->offset], xa_mk_node(child)); + } + + node->array = xas->xa; + node->count = XA_CHUNK_SIZE; + node->nr_values = 0; + node->shift = child_shift + XA_CHUNK_SHIFT; + node->offset = 0; + if (node->shift == node_to_split->shift - XA_CHUNK_SHIFT) + node->offset = offset_to_split; + if (stack && stack->shift == node->shift) + node->offset = stack->offset + 1; + + next_offset = 0; + + xas_update(xas, node); } else { - unsigned int canon = offset - xas->xa_sibs; + node = xas->xa_alloc; + xas->xa_alloc = rcu_dereference_raw(node->parent); - node_set_marks(node, canon, NULL, 0, marks); - rcu_assign_pointer(node->slots[canon], entry); - while (offset > canon) - rcu_assign_pointer(node->slots[offset--], - xa_mk_sibling(canon)); - values += (xa_is_value(entry) - xa_is_value(curr)) * - (xas->xa_sibs + 1); + for (i = 0; i < XA_CHUNK_SIZE; i++) { + if ((i & xas->xa_sibs) == 0) { + node_set_marks(node, i, NULL, 0, marks); + RCU_INIT_POINTER(node->slots[i], entry); + + sibling = xa_mk_sibling(i); + } else { + RCU_INIT_POINTER(node->slots[i], sibling); + } + } + + node->array = xas->xa; + node->count = XA_CHUNK_SIZE; + node->nr_values = xa_is_value(entry) ? XA_CHUNK_SIZE : 0; + node->shift = xas->xa_shift; + node->offset = offset; + if (node->shift == node_to_split->shift - XA_CHUNK_SHIFT) + node->offset = offset_to_split; + if (stack && stack->shift == node->shift) + node->offset = stack->offset + 1; + + next_offset = offset + 1; } - } while (offset-- > xas->xa_offset); - node->nr_values += values; - xas_update(xas, node); + node->parent = stack; + stack = node; + + offset = next_offset; + } + + /* Combine all the new nodes on the stack into node_to_split. */ + for (i = 0; i < sibs + 1; i++) { + node = stack; + stack = node->parent; + + node_set_marks(node_to_split, node->offset, NULL, 0, marks); + + rcu_assign_pointer(node->parent, node_to_split); + rcu_assign_pointer(node_to_split->slots[node->offset], xa_mk_node(node)); + } + + WARN_ON(stack); + + node_to_split->nr_values -= xa_is_value(entry) ? sibs + 1 : 0; + xas_update(xas, node_to_split); } EXPORT_SYMBOL_GPL(xas_split); -- 2.52.0.rc1.455.g30608eb744-goog Expand the range of order values for check_split_1() from 2 * XA_CHUNK_SHIFT to 4 * XA_CHUNK_SHIFT to test splitting beyond 2 levels. Separate the loops for check_split_1() and check_split_2() calls, since xas_try_split() does not yet support splitting beyond 2 levels. Signed-off-by: Ackerley Tng --- lib/test_xarray.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/test_xarray.c b/lib/test_xarray.c index 42626fb4dc0e..fbdf647e4ef8 100644 --- a/lib/test_xarray.c +++ b/lib/test_xarray.c @@ -1905,12 +1905,16 @@ static noinline void check_split(struct xarray *xa) XA_BUG_ON(xa, !xa_empty(xa)); - for (order = 1; order < 2 * XA_CHUNK_SHIFT; order++) { + for (order = 1; order < 4 * XA_CHUNK_SHIFT; order++) { for (new_order = 0; new_order < order; new_order++) { check_split_1(xa, 0, order, new_order); check_split_1(xa, 1UL << order, order, new_order); check_split_1(xa, 3UL << order, order, new_order); + } + } + for (order = 1; order < 2 * XA_CHUNK_SHIFT; order++) { + for (new_order = 0; new_order < order; new_order++) { check_split_2(xa, 0, order, new_order); check_split_2(xa, 1UL << order, order, new_order); check_split_2(xa, 3UL << order, order, new_order); -- 2.52.0.rc1.455.g30608eb744-goog