Nested flows steps possibly returns differently if using `apply` ?

Sorry if I bother you again on flow issues. This step:
def run(input, _opts, _context) do
Logger.info("Creating resource #{input.resource}")

created_resource =
input.resource
|> Changeset.for_create(
:create,
input.attributes |> Map.merge(additional_attributes(input))
)
|> MmsBiztalk.create()

case created_resource do
{:ok, resource_record} ->
resource_record |> input.resource.to_syncing!() |> input.resource.to_synced()

# NOTE this skips the invalid records allowing all the valid ones to
# be saved, one error in the flow will rollback everything
{:error, error} ->
Logger.warn(error)
{:ok, nil}
end
end
def run(input, _opts, _context) do
Logger.info("Creating resource #{input.resource}")

created_resource =
input.resource
|> Changeset.for_create(
:create,
input.attributes |> Map.merge(additional_attributes(input))
)
|> MmsBiztalk.create()

case created_resource do
{:ok, resource_record} ->
resource_record |> input.resource.to_syncing!() |> input.resource.to_synced()

# NOTE this skips the invalid records allowing all the valid ones to
# be saved, one error in the flow will rollback everything
{:error, error} ->
Logger.warn(error)
{:ok, nil}
end
end
Correctly returns {:ok, result} with updated status on resource_record While this:
def run(input, _opts, _context) do
Logger.info("Updating resource #{input.resource}")

resource_to_update =
input.resource
|> Ash.Query.for_read(String.to_atom("by_#{input.update_key}"), %{
input.update_key => input.attributes[input.update_key]
})
|> MmsBiztalk.read!()
|> List.first()

updated_resource =
resource_to_update
|> Changeset.for_update(
:update,
input.attributes
)
|> MmsBiztalk.update()

case updated_resource do
{:ok, resource_record} ->
transitioned_status_name = "to_#{input.data_type}_synced" |> String.to_atom()
apply(input.resource, transitioned_status_name, [resource_record])

# NOTE this skips the invalid records allowing all the valid ones to
# be saved, one error in the flow will rollback everything
{:error, error} ->
Logger.warn(error)
{:ok, nil}
end
end
def run(input, _opts, _context) do
Logger.info("Updating resource #{input.resource}")

resource_to_update =
input.resource
|> Ash.Query.for_read(String.to_atom("by_#{input.update_key}"), %{
input.update_key => input.attributes[input.update_key]
})
|> MmsBiztalk.read!()
|> List.first()

updated_resource =
resource_to_update
|> Changeset.for_update(
:update,
input.attributes
)
|> MmsBiztalk.update()

case updated_resource do
{:ok, resource_record} ->
transitioned_status_name = "to_#{input.data_type}_synced" |> String.to_atom()
apply(input.resource, transitioned_status_name, [resource_record])

# NOTE this skips the invalid records allowing all the valid ones to
# be saved, one error in the flow will rollback everything
{:error, error} ->
Logger.warn(error)
{:ok, nil}
end
end
returns {:ok, nil} even if there is no error and the resource_record is updated with the right status
17 Replies
ZachDaniel
ZachDaniel•3y ago
When you say it returns {:ok, nil} what do you mean? Do you mean the result of apply(input.resource, transitioned_status_name, [resource_record]) returns {:ok, nil} Or that that returns the right value, but somehow the custom step returns something else?
tommasop#2001
tommasop#2001OP•3y ago
it returns the right value but somehow the custom step returns nil as the ash flow result result
ZachDaniel
ZachDaniel•3y ago
🤔 can I see the flow where you call the step?
tommasop#2001
tommasop#2001OP•3y ago
yep
flow do
api MmsBiztalk

argument :resource, :term do
allow_nil? false
end

argument :data_type, :string do
allow_nil? false
end

argument :resources_attributes, :term do
allow_nil? false
end

argument :update_key, :atom do
default nil
end

returns :update_resources
end

steps do
transaction :create_data_in_and_map_resources, MmsBiztalk.DataIn do
create :create_data_in, MmsBiztalk.DataIn, :create do
input %{data_type: arg(:data_type)}
end

map :cycle_resources, arg(:resources_attributes) do
branch :creation_bulk, expr(is_nil(^arg(:update_key))) do
custom :create_resource, MmsBiztalk.Flows.Steps.CreateResource do
input %{
attributes: element(:cycle_resources),
resource: arg(:resource),
data_in_id: path(result(:create_data_in), [:id])
}
end
end

branch :update_bulk, expr(not is_nil(^arg(:update_key))) do
custom :update_resource, MmsBiztalk.Flows.Steps.UpdateResource do
input %{
attributes: element(:cycle_resources),
resource: arg(:resource),
data_type: arg(:data_type),
data_in_id: path(result(:create_data_in), [:id]),
update_key: arg(:update_key)
}
end
end
end
end
end
flow do
api MmsBiztalk

argument :resource, :term do
allow_nil? false
end

argument :data_type, :string do
allow_nil? false
end

argument :resources_attributes, :term do
allow_nil? false
end

argument :update_key, :atom do
default nil
end

returns :update_resources
end

steps do
transaction :create_data_in_and_map_resources, MmsBiztalk.DataIn do
create :create_data_in, MmsBiztalk.DataIn, :create do
input %{data_type: arg(:data_type)}
end

map :cycle_resources, arg(:resources_attributes) do
branch :creation_bulk, expr(is_nil(^arg(:update_key))) do
custom :create_resource, MmsBiztalk.Flows.Steps.CreateResource do
input %{
attributes: element(:cycle_resources),
resource: arg(:resource),
data_in_id: path(result(:create_data_in), [:id])
}
end
end

branch :update_bulk, expr(not is_nil(^arg(:update_key))) do
custom :update_resource, MmsBiztalk.Flows.Steps.UpdateResource do
input %{
attributes: element(:cycle_resources),
resource: arg(:resource),
data_type: arg(:data_type),
data_in_id: path(result(:create_data_in), [:id]),
update_key: arg(:update_key)
}
end
end
end
end
end
ZachDaniel
ZachDaniel•3y ago
🤔 what happens if you do returns :create_data_in_and_map_resources
tommasop#2001
tommasop#2001OP•3y ago
I have this test:
test "data in and Product are created if valid" do
before_data_in =
MmsBiztalk.DataIn
|> Ash.Query.for_read(:read)
|> MmsBiztalk.read!()
|> length()

%Ash.Flow.Result{valid?: true, result: generated_product} =
MmsBiztalk.Flows.EsolverToBiztalk.BulkPull.run(MmsBiztalk.Product, "products_quantities", [custom_params_for(:product, %{}, [:data_in_id, :category_id])])

after_data_in =
MmsBiztalk.DataIn
|> Ash.Query.for_read(:read)
|> MmsBiztalk.read!()

assert length(after_data_in) == before_data_in + 1
assert length(generated_product) == 1
assert hd(generated_product).status == :synced
end
test "data in and Product are created if valid" do
before_data_in =
MmsBiztalk.DataIn
|> Ash.Query.for_read(:read)
|> MmsBiztalk.read!()
|> length()

%Ash.Flow.Result{valid?: true, result: generated_product} =
MmsBiztalk.Flows.EsolverToBiztalk.BulkPull.run(MmsBiztalk.Product, "products_quantities", [custom_params_for(:product, %{}, [:data_in_id, :category_id])])

after_data_in =
MmsBiztalk.DataIn
|> Ash.Query.for_read(:read)
|> MmsBiztalk.read!()

assert length(after_data_in) == before_data_in + 1
assert length(generated_product) == 1
assert hd(generated_product).status == :synced
end
and generated_product is nil and this test:
test "data in and Product Quantity are created if valid" do
%{product: product} = setup_data_in_and_product("not_needed")

before_data_in =
MmsBiztalk.DataIn
|> Ash.Query.for_read(:read)
|> MmsBiztalk.read!()
|> length()

%Ash.Flow.Result{valid?: true, result: quantity} =
MmsBiztalk.Flows.EsolverToBiztalk.BulkPull.run(MmsBiztalk.Product, "products_quantities", [%{sku_code: product.sku_code, quantity: 14.0}], %{update_key: :sku_code})
require IEx; IEx.pry

after_data_in =
MmsBiztalk.DataIn
|> Ash.Query.for_read(:read)
|> MmsBiztalk.read!()

assert length(after_data_in) == before_data_in + 1
assert length(quantity) == 1
assert hd(quantity).status == :synced
end
test "data in and Product Quantity are created if valid" do
%{product: product} = setup_data_in_and_product("not_needed")

before_data_in =
MmsBiztalk.DataIn
|> Ash.Query.for_read(:read)
|> MmsBiztalk.read!()
|> length()

%Ash.Flow.Result{valid?: true, result: quantity} =
MmsBiztalk.Flows.EsolverToBiztalk.BulkPull.run(MmsBiztalk.Product, "products_quantities", [%{sku_code: product.sku_code, quantity: 14.0}], %{update_key: :sku_code})
require IEx; IEx.pry

after_data_in =
MmsBiztalk.DataIn
|> Ash.Query.for_read(:read)
|> MmsBiztalk.read!()

assert length(after_data_in) == before_data_in + 1
assert length(quantity) == 1
assert hd(quantity).status == :synced
end
and quantity is nil
tommasop#2001
tommasop#2001OP•3y ago
yes same result
ZachDaniel
ZachDaniel•3y ago
🤔 okay, could you construct a flow with the same structure, but with simple custom steps? Ideally something I could test against
tommasop#2001
tommasop#2001OP•3y ago
I will try to make a test directly in Ash and make a PR
ZachDaniel
ZachDaniel•3y ago
Looks awesome thanks! Okay, so I've fixed some issues that I found, but the actual primary issue is that you can't use return with a single step inside of a map You'd need to set the output step of the map (which defaults to the last step so not entirely necessary in your case) What I ended up doing after merging your tests:
custom :return_value,
fn value, _ ->
{:ok, value}
end,
input: expr(^result(:creation_bulk) || ^result(:update_bulk))
custom :return_value,
fn value, _ ->
{:ok, value}
end,
input: expr(^result(:creation_bulk) || ^result(:update_bulk))
That way it returns either the created or updated user So I need to write a validator for step returns Pushed, main has a verifier that will error if a return is invalid
tommasop#2001
tommasop#2001OP•3y ago
thanks Zach So if I have a map of 10 elements and inside I have only a create step what will it return by default: the last created element or all the 10 created elements?
ZachDaniel
ZachDaniel•3y ago
It will return the results of the last step of the map in a list
tommasop#2001
tommasop#2001OP•3y ago
It seems that if I add the custom :return_value the flow returns Ash.Flow.Result{result: {:ok, actual_result}} instead of Ash.Flow.Result{result: actual_result} I need to change your code to this:
custom :return_value,
fn value, _ ->
value
end,
input: expr(^result(:creation_bulk) || ^result(:update_bulk))
custom :return_value,
fn value, _ ->
value
end,
input: expr(^result(:creation_bulk) || ^result(:update_bulk))
To make the tests pass, is it correct?
ZachDaniel
ZachDaniel•3y ago
hm.... I don't think so. is your custom creation_bulk or update_bulk returning {:ok, {:ok, ...}} or something like that? I think that is what that means.
tommasop#2001
tommasop#2001OP•3y ago
ok will double check that @Zach Daniel you were obviously right 🙂

Did you find this page helpful?