How to upsert multiple rows atomically?

How do you update multiple rows in one go, making sure each update is atomic and either all succeed or all rollback? My use case: updating stock for an item across several warehouses.
stock {
item_id
warehouse_id
quantity
}
stock {
item_id
warehouse_id
quantity
}
Each update has a different increment/decrement value, and I need to make sure the final quantity never goes below 0. Bulk update seems to set the same value for all rows, but what I need is per-row updates that happen safely based on the current db value, all inside a transaction. Also, what’s the best way to handle cases where the row doesn’t exist yet? I’m guessing I’d need to use some form of upsert to insert the initial value and still keep everything atomic.
9 Replies
ZachDaniel
ZachDaniel4w ago
A bulk create with upsert is what you're looking for 👍
Joan Gavelán
Joan GavelánOP4w ago
Hi Zach. Thanks for the reply in such busy moments. I'm trying this:
create :manage_stock do
argument :item_id, :uuid, allow_nil?: false
argument :warehouse_id, :uuid, allow_nil?: false
argument :delta, :decimal, allow_nil?: false

upsert? true
upsert_fields [:quantity]

change set_attribute(:quantity, arg(:delta))
change atomic_update(:quantity, expr(quantity + ^arg(:delta)))

change manage_relationship(:item_id, :item, type: :append)
change manage_relationship(:warehouse_id, :warehouse, type: :append)
end
create :manage_stock do
argument :item_id, :uuid, allow_nil?: false
argument :warehouse_id, :uuid, allow_nil?: false
argument :delta, :decimal, allow_nil?: false

upsert? true
upsert_fields [:quantity]

change set_attribute(:quantity, arg(:delta))
change atomic_update(:quantity, expr(quantity + ^arg(:delta)))

change manage_relationship(:item_id, :item, type: :append)
change manage_relationship(:warehouse_id, :warehouse, type: :append)
end
Following the example from https://hexdocs.pm/ash/create-actions.html#atomic-updates. My goal is the typical upsert, insert if doesn't exist, update if it does (based on current db quantity, not in-memory data to avoid race conditions) The action above is working, but requires me to remove my greater_than: -1 constraint
attribute :quantity, :decimal do
allow_nil? false
public? true
constraints greater_than: -1
end
attribute :quantity, :decimal do
allow_nil? false
public? true
constraints greater_than: -1
end
I get the following when passing a negative delta (which is valid for updates)
iex(26)> Inventory.manage_stock(stock, scope: s)
{:error,
%Ash.Error.Invalid{
bread_crumbs: ["Error returned from: MyApp.Inventory.Stock.manage_stock"],
changeset: "#Changeset<>",
errors: [
%Ash.Error.Changes.InvalidAttribute{
field: :quantity,
message: "must be more than %{greater_than}",
private_vars: nil,
value: Decimal.new("-10"),
has_value?: true,
splode: Ash.Error,
bread_crumbs: ["Error returned from: MyApp.Inventory.Stock.manage_stock"],
vars: [greater_than: Decimal.new("-1")],
path: [],
stacktrace: #Splode.Stacktrace<>,
class: :invalid
}
]
}}
iex(26)> Inventory.manage_stock(stock, scope: s)
{:error,
%Ash.Error.Invalid{
bread_crumbs: ["Error returned from: MyApp.Inventory.Stock.manage_stock"],
changeset: "#Changeset<>",
errors: [
%Ash.Error.Changes.InvalidAttribute{
field: :quantity,
message: "must be more than %{greater_than}",
private_vars: nil,
value: Decimal.new("-10"),
has_value?: true,
splode: Ash.Error,
bread_crumbs: ["Error returned from: MyApp.Inventory.Stock.manage_stock"],
vars: [greater_than: Decimal.new("-1")],
path: [],
stacktrace: #Splode.Stacktrace<>,
class: :invalid
}
]
}}
Indeed I want to validate when creating a new stock that the value is not negative, but this approach is rejecting my delta right away, even if the pk I'm passing indicates the stock already exists, so no need to create one, what could I do in this scenario?
ZachDaniel
ZachDaniel3w ago
attribute constraints don't apply only on create they apply always
Joan Gavelán
Joan GavelánOP3w ago
Mm what can I do then? Need to validate the quantity does not go below 0, allow negative delta, and apply them only if the quantity already exists and q - delta is not less than 0. If the stock for such product doesn't even exist, I should reject such delta right away Tried this:
defmodule Lamashka.Inventory.Validations.StockQuantityNotNegative do
use Ash.Resource.Validation

@message "quantity cannot go below 0"

@impl true
def validate(changeset, _opts, _context) do
by_value = Ash.Changeset.get_argument(changeset, :by)
current_quantity = changeset.data.quantity || Decimal.new(0)
new_quantity = Decimal.add(current_quantity, by_value)

if Decimal.compare(new_quantity, 0) == :lt do
{:error, field: :quantity, message: @message}
else
:ok
end
end

@impl true
def atomic(_changeset, _opts, _context) do
{:atomic, [:quantity], expr(quantity + arg(:by) < 0),
expr(
error(^InvalidAttribute, %{
field: :quantity,
value: atomic_ref(:quantity),
message: @message
})
)}
end
end
defmodule Lamashka.Inventory.Validations.StockQuantityNotNegative do
use Ash.Resource.Validation

@message "quantity cannot go below 0"

@impl true
def validate(changeset, _opts, _context) do
by_value = Ash.Changeset.get_argument(changeset, :by)
current_quantity = changeset.data.quantity || Decimal.new(0)
new_quantity = Decimal.add(current_quantity, by_value)

if Decimal.compare(new_quantity, 0) == :lt do
{:error, field: :quantity, message: @message}
else
:ok
end
end

@impl true
def atomic(_changeset, _opts, _context) do
{:atomic, [:quantity], expr(quantity + arg(:by) < 0),
expr(
error(^InvalidAttribute, %{
field: :quantity,
value: atomic_ref(:quantity),
message: @message
})
)}
end
end
But it rejects negative deltas immediately, because of this line I guess:
change set_attribute(:quantity, arg(:delta))
change set_attribute(:quantity, arg(:delta))
ZachDaniel
ZachDaniel3w ago
Yeah I think the difficulty here is that an upsert treats these things as possibly being able to create a record and it sounds like you're doing something that could theoretically create something with a negative value I'm actually not sure if we can reasonably support it currently. Basically what you need is something like atomic_update but that is only evaluated on create To allow for doing something like atomic_set(:field, expr(if(^arg(:delta) <= -1, error(...), ^arg(:delta)))) but we don't support it currently. So you will probably need to write a generic action and call Ecto yourself for this
Joan Gavelán
Joan GavelánOP3w ago
Thank you for the clarification. And yeah, that is the approach I'm currently taking and I've managed to make it work:
defmodule MyApp.Inventory.Actions.ManageStock do
use Ash.Resource.Actions.Implementation

alias MyApp.Inventory
alias MyApp.Inventory.Stock

require Ash.Query

@impl true
def run(input, _opts, context) do
stock_values = input.arguments.stock

Enum.each(stock_values, &manage(&1, context))

:ok
end

defp manage(stock, context) do
{quantity, composite_pk} = Map.pop!(stock, :quantity)
%{item_id: item_id, warehouse_id: warehouse_id} = composite_pk

if stock_exists?(item_id, warehouse_id, actor: context.actor) do
Inventory.update_stock!(composite_pk, %{delta: quantity}, actor: context.actor)
else
Inventory.create_stock!(stock, actor: context.actor, tenant: context.tenant)
end
end

defp stock_exists?(item_id, warehouse_id, opts \\ []) do
Stock
|> Ash.Query.filter(item_id == ^item_id and warehouse_id == ^warehouse_id)
|> Ash.exists?(opts)
end
end
defmodule MyApp.Inventory.Actions.ManageStock do
use Ash.Resource.Actions.Implementation

alias MyApp.Inventory
alias MyApp.Inventory.Stock

require Ash.Query

@impl true
def run(input, _opts, context) do
stock_values = input.arguments.stock

Enum.each(stock_values, &manage(&1, context))

:ok
end

defp manage(stock, context) do
{quantity, composite_pk} = Map.pop!(stock, :quantity)
%{item_id: item_id, warehouse_id: warehouse_id} = composite_pk

if stock_exists?(item_id, warehouse_id, actor: context.actor) do
Inventory.update_stock!(composite_pk, %{delta: quantity}, actor: context.actor)
else
Inventory.create_stock!(stock, actor: context.actor, tenant: context.tenant)
end
end

defp stock_exists?(item_id, warehouse_id, opts \\ []) do
Stock
|> Ash.Query.filter(item_id == ^item_id and warehouse_id == ^warehouse_id)
|> Ash.exists?(opts)
end
end
My doubt is if the actions called within this generic action will run in a single transaction?
ZachDaniel
ZachDaniel3w ago
You can set transaction? true on the action and then they will And consider taking some kind of lock to avoid race conditions
Joan Gavelán
Joan GavelánOP3w ago
Thank you for the warning, implemented a change using the builtin helper get_and_lock_for_update/1 for the update action. Please let me know if there's anything else I should consider.
update :update_stock do
argument :delta, :decimal, allow_nil?: false
change get_and_lock_for_update()
change atomic_update(:quantity, expr(quantity + ^arg(:delta)))
end
update :update_stock do
argument :delta, :decimal, allow_nil?: false
change get_and_lock_for_update()
change atomic_update(:quantity, expr(quantity + ^arg(:delta)))
end
Another question I have: how should I return errors in this scenario? As you can see, I used bang functions mainly to make sure everything was working. Now that it does, I’d like to improve the implementation by returning meaningful errors that AshPhoenix can properly map to Inertia errors using the assign_errors/1 helper. What would you recommend?
ZachDaniel
ZachDaniel3w ago
You'd use Ash.Changeset.add_error Using bang functions is fine if you just want to abort and have the transaction rolled back

Did you find this page helpful?