Recursive Flows

Hi I'm trying to right a flow that runs itself given a condition.
defmodule Demo.Inventory.Flows.Search do
@moduledoc """

"""

use Ash.Flow, otp_app: :demo

flow do
api(Demo.Support)

argument(:size, :integer)

argument(:token, :string)

argument(:query, :struct)

argument(:check_in, :date) do
allow_nil? false
end

argument(:check_out, :date) do
allow_nil? false
end

argument(:occupancies, {:array, Demo.Inventory.Resources.Types.Occupancy}) do
allow_nil? false
end

returns(:merge_results)
end

steps do
custom :get_hotels, Demo.Inventory.Flows.Steps.GetHotels do
input(%{
check_in: arg(:check_in),
check_out: arg(:check_out),
occupancies: arg(:occupancies),
page: [limit: arg(:size), after: arg(:token)]
})
end

custom :filter_unavailable_hotels, Demo.Inventory.Flows.Steps.FilterUnavailableHotels do
input %{
hotels: result(:get_hotels)
}
end

custom :need_more_hotels?, Demo.Inventory.Flows.Steps.NeedMoreHotels do
input %{
keyset: result(:get_hotels),
available_hotels: result(:filter_unavailable_hotels),
needed_size: arg(:size)
}
end

branch :maybe_get_more_hotels, result(:need_more_hotels?) do
output :get_next_token

custom :get_next_token, fn %{hotels: hotels}, _ ->
{:ok, get_in(Map.from_struct(List.last(hotels)), [:__metadata__, :keyset])}
end do
input %{
hotels: result(:filter_unavailable_hotels)
}
end

custom :get_number_of_missing_hotels, fn %{hotels: hotels, size: size}, _ ->
{:ok, size - length(hotels)}
end do
input %{
hotels: result(:filter_unavailable_hotels),
size: arg(:size)
}
end

run_flow :get_more_hotels, __MODULE__ do
input %{
check_in: arg(:check_in),
check_out: arg(:check_out),
occupancies: arg(:occupancies),
size: result(:get_number_of_missing_hotels),
token: result(:get_next_token)
}
end
end

custom :merge_results, Demo.Inventory.Flows.Steps.MergeResults do
input %{
hotels: result(:get_hotels),
more_hotels: result(:maybe_get_more_hotels)
}
end
end
end
defmodule Demo.Inventory.Flows.Search do
@moduledoc """

"""

use Ash.Flow, otp_app: :demo

flow do
api(Demo.Support)

argument(:size, :integer)

argument(:token, :string)

argument(:query, :struct)

argument(:check_in, :date) do
allow_nil? false
end

argument(:check_out, :date) do
allow_nil? false
end

argument(:occupancies, {:array, Demo.Inventory.Resources.Types.Occupancy}) do
allow_nil? false
end

returns(:merge_results)
end

steps do
custom :get_hotels, Demo.Inventory.Flows.Steps.GetHotels do
input(%{
check_in: arg(:check_in),
check_out: arg(:check_out),
occupancies: arg(:occupancies),
page: [limit: arg(:size), after: arg(:token)]
})
end

custom :filter_unavailable_hotels, Demo.Inventory.Flows.Steps.FilterUnavailableHotels do
input %{
hotels: result(:get_hotels)
}
end

custom :need_more_hotels?, Demo.Inventory.Flows.Steps.NeedMoreHotels do
input %{
keyset: result(:get_hotels),
available_hotels: result(:filter_unavailable_hotels),
needed_size: arg(:size)
}
end

branch :maybe_get_more_hotels, result(:need_more_hotels?) do
output :get_next_token

custom :get_next_token, fn %{hotels: hotels}, _ ->
{:ok, get_in(Map.from_struct(List.last(hotels)), [:__metadata__, :keyset])}
end do
input %{
hotels: result(:filter_unavailable_hotels)
}
end

custom :get_number_of_missing_hotels, fn %{hotels: hotels, size: size}, _ ->
{:ok, size - length(hotels)}
end do
input %{
hotels: result(:filter_unavailable_hotels),
size: arg(:size)
}
end

run_flow :get_more_hotels, __MODULE__ do
input %{
check_in: arg(:check_in),
check_out: arg(:check_out),
occupancies: arg(:occupancies),
size: result(:get_number_of_missing_hotels),
token: result(:get_next_token)
}
end
end

custom :merge_results, Demo.Inventory.Flows.Steps.MergeResults do
input %{
hotels: result(:get_hotels),
more_hotels: result(:maybe_get_more_hotels)
}
end
end
end
This leads to somekind of infinity recursion. I also tried copying the flow to a different file but that lead to an engine deadlock, it seems like the "sub" flow is run in the same context as the "parent" flow leading and as they both have the same step names it does not know which of the steps belong to which flow.
8 Replies
ZachDaniel
ZachDaniel3y ago
So the step names shouldn’t be an issue because we prefix nested step names internally But the recursive flow definitely is not something I thought of, and we’ll need to add something explicitly to support it I think. In the meantime a custom step that runs your flow would be the answer
barnabasj
barnabasjOP3y ago
Thanks, I'll go the custom route then.
ZachDaniel
ZachDaniel3y ago
I'm trying to think of how I'd even do the recursive flow thing...The way we do it now is we recursively expand all of the flows and lift up their steps into one big flow (hence the never ending recursion) But for recursive flows to work we'd need to do that lazily. I could probably do that
barnabasj
barnabasjOP3y ago
stop at the branch at first and expand it only if the condition applies?
ZachDaniel
ZachDaniel3y ago
Something like that, yeah. But its not really set up for lazy expansion right now so I'm unsure of what would need to happen for that to work. Its one of those things that might be like a couple lines of code or it might involve rewriting large portions of Ash.Flow 😆 Worse case scenario I can just detect a recursive flow and change it into a custom step that runs the flow
barnabasj
barnabasjOP3y ago
Yeah, but in typical Ash Fashion there is always an escape hatch (custom step) to make things work. This works for me now and can be easily adapted in the future. I love how there is always a way to make it work 🙂
ZachDaniel
ZachDaniel3y ago
Hey @barnabasj when you get a few, if you could create an issue about this in ash core that would be great 🙂
barnabasj
barnabasjOP3y ago
Sure

Did you find this page helpful?