Add conditions to upsert

Is there any way to add conditionals when doing an upsert in a create action? For example, I have this resource:
defmodule Pacman.Markets.Blibs do
use Ash.Resource, data_layer: AshPostgres.DataLayer

code_interface do
define_for Pacman.Markets

define :create
end

attributes do
attribute :id, :uuid do
default &Ash.UUID.generate/0
primary_key? true
allow_nil? false
end

attribute :name, :string do
allow_nil? false

constraints max_length: 255
end

attribute :transaction_time, :integer, allow_nil?: false

attribute :deleted?, :boolean, allow_nil?: false, default: false

timestamps(private?: false)
end

postgres do
table "blibs"

repo Pacman.Repo

migration_types transaction_time: :bigint, name: {:varchar, 255}
end

actions do
defaults [:read, :destroy]

create :create do
primary? true

upsert? true
end
end
end
defmodule Pacman.Markets.Blibs do
use Ash.Resource, data_layer: AshPostgres.DataLayer

code_interface do
define_for Pacman.Markets

define :create
end

attributes do
attribute :id, :uuid do
default &Ash.UUID.generate/0
primary_key? true
allow_nil? false
end

attribute :name, :string do
allow_nil? false

constraints max_length: 255
end

attribute :transaction_time, :integer, allow_nil?: false

attribute :deleted?, :boolean, allow_nil?: false, default: false

timestamps(private?: false)
end

postgres do
table "blibs"

repo Pacman.Repo

migration_types transaction_time: :bigint, name: {:varchar, 255}
end

actions do
defaults [:read, :destroy]

create :create do
primary? true

upsert? true
end
end
end
9 Replies
Blibs
BlibsOP2y ago
If I create a new instance with the same id, it will do an upsert. with a query more or less like this:
INSERT INTO blibs (id, name, inserted_at, updated_at, transaction_time, "deleted?")
VALUES ('c48a8f31-9c55-4861-8465-f489db50e08f', 'a', '2023-07-31 20:07:27.660396Z', '2023-07-31 20:07:27.660396Z', 2, false)
ON CONFLICT (id) DO UPDATE SET id = 'c48a8f31-9c55-4861-8465-f489db50e08f', name = 'a', transaction_time = 2
RETURNING updated_at, inserted_at, "deleted?", transaction_time, name, id;
INSERT INTO blibs (id, name, inserted_at, updated_at, transaction_time, "deleted?")
VALUES ('c48a8f31-9c55-4861-8465-f489db50e08f', 'a', '2023-07-31 20:07:27.660396Z', '2023-07-31 20:07:27.660396Z', 2, false)
ON CONFLICT (id) DO UPDATE SET id = 'c48a8f31-9c55-4861-8465-f489db50e08f', name = 'a', transaction_time = 2
RETURNING updated_at, inserted_at, "deleted?", transaction_time, name, id;
The thing is, I want to add a condition to the upsert part, basically I want to only apply the upsert if the transaction_time that I'm updating is bigger than the one in the DB. Basically I want to change the query to this one:
INSERT INTO blibs (id, name, inserted_at, updated_at, transaction_time, "deleted?")
VALUES ('c48a8f31-9c55-4861-8465-f489db50e08f', 'a', '2023-07-31 20:07:27.660396Z', '2023-07-31 20:07:27.660396Z', 2, false)
ON CONFLICT (id) DO UPDATE SET id = 'c48a8f31-9c55-4861-8465-f489db50e08f', name = 'a', transaction_time = 2
WHERE blibs.transaction_time < 2
RETURNING updated_at, inserted_at, "deleted?", transaction_time, name, id;
INSERT INTO blibs (id, name, inserted_at, updated_at, transaction_time, "deleted?")
VALUES ('c48a8f31-9c55-4861-8465-f489db50e08f', 'a', '2023-07-31 20:07:27.660396Z', '2023-07-31 20:07:27.660396Z', 2, false)
ON CONFLICT (id) DO UPDATE SET id = 'c48a8f31-9c55-4861-8465-f489db50e08f', name = 'a', transaction_time = 2
WHERE blibs.transaction_time < 2
RETURNING updated_at, inserted_at, "deleted?", transaction_time, name, id;
Looking at the Ash documentation, I couldn't find any option to add conditionals to my create action, is there a way for me to achieve this?
Blibs
BlibsOP2y ago
Just for completeness, in this forum post it shows how it is done with Ecto https://elixirforum.com/t/upsert-conditional-update-e-g-update-only-when-existing-data-is-outdated/55503/2
Elixir Programming Language Forum
Upsert conditional update e.g update only when existing data is out...
Found the solution (ref. Handling Upsert stale error) turn on_conflict into Ecto.Query use stale_error_field to prevent upsert stale error For example: def upsert(%Post{} = post) do on_conflict = from Post, where: [public: false], update: [set: [title: "second"]] post |> Repo.insert( conflict_target...
ZachDaniel
ZachDaniel2y ago
We don't have that option currently you'll need to build a manual action can you open a request to ash?
Blibs
BlibsOP2y ago
Yep, I will create it tomorrow when I'm back to work Do you mind giving me just some tips on how to proceed with the manual action? I never understood how exactly I'm supposed to actually insert the changeset in the DB with a manual create action. My guess is that there is some way to manually apply all checks from the resource attributes to the changeset, if there is no error, retrieve the end attribute map from it, insert it using Ecto Repo.insert directly, and then somehow converting the result (especially in case of an error) back to an error that is equivalent to one that Ash.Changeset would give. I'm just not sure what is the best approach to do this and I can't recall an example of something like that in the documentation.
ZachDaniel
ZachDaniel2y ago
create :your_upsert do
upsert? true
manual fn changeset, _ ->
case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} -> upsert_struct_with_ecto(record, ...)
{:error, error} -> {:error, error}
end
end
end
create :your_upsert do
upsert? true
manual fn changeset, _ ->
case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} -> upsert_struct_with_ecto(record, ...)
{:error, error} -> {:error, error}
end
end
end
Blibs
BlibsOP2y ago
GitHub
Add support for creating upsert conditionals · Issue #667 · ash-pro...
Is your feature request related to a problem? Please describe. Yes, some times you want to also add conditionals when running upsert in create actions. For example, let's say that, during an up...
Blibs
BlibsOP2y ago
This is my current workaround:
create :create do
primary? true

manual fn changeset, _ ->
import Ecto.Query

case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} ->
all_fields = record |> Map.take(__MODULE__.__schema__(:fields)) |> Enum.to_list()

on_conflict =
from r in __MODULE__,
where: r.transaction_time < ^record.transaction_time,
update: [set: ^all_fields]

Pacman.Repo.insert(record,
conflict_target: [:id],
on_conflict: on_conflict,
stale_error_field: :transaction_time,
returning: true
)

{:error, error} ->
{:error, error}
end
end
end
create :create do
primary? true

manual fn changeset, _ ->
import Ecto.Query

case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} ->
all_fields = record |> Map.take(__MODULE__.__schema__(:fields)) |> Enum.to_list()

on_conflict =
from r in __MODULE__,
where: r.transaction_time < ^record.transaction_time,
update: [set: ^all_fields]

Pacman.Repo.insert(record,
conflict_target: [:id],
on_conflict: on_conflict,
stale_error_field: :transaction_time,
returning: true
)

{:error, error} ->
{:error, error}
end
end
end
It is super ugly, but seems to work fine. The only thing that I didn't figure out yet is how to transform the ecto changeset back to a ash changeset when there is errors. For example, if the upsert condition fails, it will return this changeset:
#Ecto.Changeset<action: :insert, changes: %{id: "9dd0fe5c-3fe4-4036-bb50-b189169b4028", name: "b", transaction_time: 6, deleted?: false, inserted_at: ~U[2023-08-01 14:23:46.238721Z], updated_at: ~U[2023-08-01 14:23:46.238721Z]}, errors: [transaction_time: {"is stale", [stale: true]}], data: #Pacman.Markets.Blibs<>, valid?: false>
#Ecto.Changeset<action: :insert, changes: %{id: "9dd0fe5c-3fe4-4036-bb50-b189169b4028", name: "b", transaction_time: 6, deleted?: false, inserted_at: ~U[2023-08-01 14:23:46.238721Z], updated_at: ~U[2023-08-01 14:23:46.238721Z]}, errors: [transaction_time: {"is stale", [stale: true]}], data: #Pacman.Markets.Blibs<>, valid?: false>
Do we have some helper function to convert an Ecto.Changeset error into an Ash.Changeset error? In other words, I want to get the errors from Ecto.Changeset ([transaction_time: {"is stale", [stale: true]}]) and convert into a meaningful error inside Ash.Changeset. I tried just passing the Ecto.Changeset errors field to Ash.Changeset using the add_error function, that kinda works but it gives me a more "generic" error:
%Ash.Error.Changes.InvalidChanges{
fields: [],
message: nil,
validation: nil,
changeset: nil,
query: nil,
error_context: [],
vars: [transaction_time: {"is stale", [stale: true]}],
path: [],
stacktrace: #Stacktrace<>,
class: :invalid
}
%Ash.Error.Changes.InvalidChanges{
fields: [],
message: nil,
validation: nil,
changeset: nil,
query: nil,
error_context: [],
vars: [transaction_time: {"is stale", [stale: true]}],
path: [],
stacktrace: #Stacktrace<>,
class: :invalid
}
I was wondering if there is some "parser" that would fill all the error fields correctly
ZachDaniel
ZachDaniel2y ago
There isn't one exposed, but what AshPostgres does looks something like this:
defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
{:error, Enum.map(errors, &to_ash_error/1)}
end

defp handle_errors({:ok, val}), do: {:ok, val}

defp to_ash_error({field, {message, vars}}) do
Ash.Error.Changes.InvalidAttribute.exception(
field: field,
message: message,
private_vars: vars
)
end
defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
{:error, Enum.map(errors, &to_ash_error/1)}
end

defp handle_errors({:ok, val}), do: {:ok, val}

defp to_ash_error({field, {message, vars}}) do
Ash.Error.Changes.InvalidAttribute.exception(
field: field,
message: message,
private_vars: vars
)
end
Blibs
BlibsOP2y ago
Ah, thanks! Based on that, my final workaround is the following:
create :create do
primary? true

manual fn changeset, _ ->
import Ecto.Query

case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} ->
all_fields = record |> Map.take(__MODULE__.__schema__(:fields)) |> Enum.to_list()

on_conflict =
from r in __MODULE__,
where: r.transaction_time < ^record.transaction_time and not r.deleted?,
update: [set: ^all_fields]

result = Pacman.Repo.insert(record,
conflict_target: [:id],
on_conflict: on_conflict,
stale_error_field: :upsert,
stale_error_message: "upsert condition not met",
returning: true
)

with {:error, %{errors: [upsert: {error, stale: true}]}} <- result do
error = Ash.Error.Unknown.UnknownError.exception(field: :upsert, error: error, changeset: changeset)

{:error, Ash.Changeset.add_error(changeset, error)}
end

{:error, error} ->
{:error, error}
end
end
end
create :create do
primary? true

manual fn changeset, _ ->
import Ecto.Query

case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} ->
all_fields = record |> Map.take(__MODULE__.__schema__(:fields)) |> Enum.to_list()

on_conflict =
from r in __MODULE__,
where: r.transaction_time < ^record.transaction_time and not r.deleted?,
update: [set: ^all_fields]

result = Pacman.Repo.insert(record,
conflict_target: [:id],
on_conflict: on_conflict,
stale_error_field: :upsert,
stale_error_message: "upsert condition not met",
returning: true
)

with {:error, %{errors: [upsert: {error, stale: true}]}} <- result do
error = Ash.Error.Unknown.UnknownError.exception(field: :upsert, error: error, changeset: changeset)

{:error, Ash.Changeset.add_error(changeset, error)}
end

{:error, error} ->
{:error, error}
end
end
end

Did you find this page helpful?