Integrate Oban to a flow

So, currently, I have this flow:
defmodule MyFlow do
alias Marketplace.Markets.Property.Offer

use Ash.Flow

flow do
api Marketplace.Markets

argument :price, :decimal, allow_nil?: false
argument :payment_type, :atom, allow_nil?: false
argument :property_id, :uuid, allow_nil?: false

returns :make_offer

steps do
transaction :multi_make_offer, Offer do
read :make_own_offers_old, Offer, :make_own_offers_old do
not_found_error? false

input %{property_id: arg(:property_id)}
end

create :make_offer, Offer, :create do
input %{
price: arg(:price),
payment_type: arg(:payment_type),
property_id: arg(:property_id)
}

end
end
end
end
defmodule MyFlow do
alias Marketplace.Markets.Property.Offer

use Ash.Flow

flow do
api Marketplace.Markets

argument :price, :decimal, allow_nil?: false
argument :payment_type, :atom, allow_nil?: false
argument :property_id, :uuid, allow_nil?: false

returns :make_offer

steps do
transaction :multi_make_offer, Offer do
read :make_own_offers_old, Offer, :make_own_offers_old do
not_found_error? false

input %{property_id: arg(:property_id)}
end

create :make_offer, Offer, :create do
input %{
price: arg(:price),
payment_type: arg(:payment_type),
property_id: arg(:property_id)
}

end
end
end
end
As you can see, this flow will first update some rows in the database and then create a new offer, it is inside a transaction block, so nothing should be written to the database if something fails. Now I want to add a new step to send e-mails, I want to do that with Oban, meaning that I will add a new job to the Oban queue in the database, and then Oban will handle it. Of course, I want that the Oban database insertion is also part of the transaction so I can make sure that the job will only run if the rest of the Flow works fine. In my other project which doesn't use Ash I would do that with an Ecto.Multi:
{:ok, _} =
Multi.new()
...
|> Oban.insert(:for_user, Email.new(%{type: :user_deletion_email, user: user}))
|> Repo.transaction()
{:ok, _} =
Multi.new()
...
|> Oban.insert(:for_user, Email.new(%{type: :user_deletion_email, user: user}))
|> Repo.transaction()
Can I just call Oban.insert! directly inside a custom step in the flow and Ash will handle the rest (regarding making sure this call is inside a transaction), or should I do something else to make this work?
3 Replies
Blibs
BlibsOP3y ago
Btw, this is not just a question about if this will work or not, I could easily test myself after I finish adding Oban to my project. The question is more about if this is the best way (creating a custom step) to do it with Ash or if there is a better way like, for example, fetching the Ecto.Multi inside the step to use it.
ZachDaniel
ZachDaniel3y ago
I think inserting it inside the transaction step is the way to go 👍 assuming oban is using the same repo
Blibs
BlibsOP3y ago
I will try that and reopen this if there is some issue 👍

Did you find this page helpful?