TonyLikeSocks
TonyLikeSocks
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
This unblocked me. I also had a format issue I needed to resolve. My source was outputting RawAudio and the TrackReceiver expects RTP packets to depayload. But that was straight forward. Thanks @Feliks
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
Sorry. Life has been getting in the way. I need to try it.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
Ok. Dummy element 🙂 Thanks for the advice Feliks. I appreciate it.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
To be clear, everything works in prod. I'm just trying to add a test. It's a part of my code base that's a little gnarly, so some test coverage would be helpful
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
So just changing all my elements won't get me to a place where I can build a test pipeline that can exercise my endpoint. 🤔 I'm definitely a little stumped
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
Though the Membrane.Testing.Source, already has flow_control: manual which I think is the root of the issue.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
Yeah, that's the repo.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
I appreciate the advice.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
@Feliks So now my issue is that the first element in my bin is Membrane.RTC.Engine.Endpoint.WebRTC.TrackReceiver which has it's input pad flow_control: :push ; but the output pad of the Membrane.Testing.DynamicSource that I'm using has an output pad flow_control: :manual, which is incompatible. I perused the Engine code, and I assume it's doing something to mediate between the two flow controls, though looking at the code in membrane webrtc engine ( engine.ex tee.ex and filter_tee.ex) it wasn't obvious to me how that was being done. What do you advise? Should I build a dummy element in my testing pipeline that just mediates between the Testing.Source element and one that expects a push? Or I could make a copy of TrackReceiver that has the pad definitions set to :auto and just roll with it. Though I don't fully understand the implications here, so I'm a little worried about doing that and producing some subtle bug in the future.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
FWIW - i think the failure is silent because my test assertions were using the default timeout of 2_000, but that Engine.subscribe call has a timeout of 5_000. So I never tripped the timeout and saw the error message.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
I think I realized at least part of the problem. My element is silently dying. When I send send in the :new_tracks notification, I'm also trying to subscribe to the track published by the engine. I've got this line:
:ok = Engine.subscribe(state.rtc_engine, endpoint_id, track.id)
:ok = Engine.subscribe(state.rtc_engine, endpoint_id, track.id)
But the state.rtc_engine is the pid of my testing process, not an actual running RTC engine. So that call silently fails, and nothing else is executed, which is why my handle_pad_added calls weren't firing. Adding this to my test, unblocked me. I'm successfully setting up the pipeline and mocking the RTC Engine pieces.
receive do
{:subscribe, {caller_pid, ref}, ^endpoint_id, received_track_id, _opts} ->
# Verify the track ID if needed
assert received_track_id == track.id,
"Track ID in subscribe message doesn't match expected track ID"
send(caller_pid, {ref, :ok})
after
5000 -> flunk("Did not receive :subscribe message from Conversation bin within timeout")
end
receive do
{:subscribe, {caller_pid, ref}, ^endpoint_id, received_track_id, _opts} ->
# Verify the track ID if needed
assert received_track_id == track.id,
"Track ID in subscribe message doesn't match expected track ID"
send(caller_pid, {ref, :ok})
after
5000 -> flunk("Did not receive :subscribe message from Conversation bin within timeout")
end
Thanks @Feliks !
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
We setup the whole spec in the handle_pad_added for the output pad. I omitted it for brevity earlier. I
@impl true
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
IO.puts("Output pad added")

spec = [
bin_input(Pad.ref(:input, state.track.id))
|> child(:track_receiver, __MODULE__.TrackRecevier)
|> children...omitted for discord message limits
|> via_in(Pad.ref(:input, {state.out_track.id, :high}))
|> child(:track_sender, %TrackSender{
track: state.out_track,
variant_bitrates: %{high: 50_000}
})
|> via_out(Pad.ref(:output, {state.out_track.id, :high}))
|> bin_output(pad),
branch_spec(state),
branch_spec(state)
]

spec = {spec, log_metadata: [room_id: state.room_id]}

{[spec: spec], state}
end
@impl true
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
IO.puts("Output pad added")

spec = [
bin_input(Pad.ref(:input, state.track.id))
|> child(:track_receiver, __MODULE__.TrackRecevier)
|> children...omitted for discord message limits
|> via_in(Pad.ref(:input, {state.out_track.id, :high}))
|> child(:track_sender, %TrackSender{
track: state.out_track,
variant_bitrates: %{high: 50_000}
})
|> via_out(Pad.ref(:output, {state.out_track.id, :high}))
|> bin_output(pad),
branch_spec(state),
branch_spec(state)
]

spec = {spec, log_metadata: [room_id: state.room_id]}

{[spec: spec], state}
end
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
I checked and the handle_setup isn't being called. It's one of the child elements of the bin. My hunch is that I'm setting the spec up wrong in my test file. My conversation bin doesn't actually return the child spec until we get the handle_pad_added callbacks. Here's the relevant code from the conversation.ex bin that I'm trying to test:
@impl true
def handle_pad_added(Pad.ref(:input, track_id), _ctx, state) when state.track.id == track_id do
IO.puts("Pad added")

{
# Mark the response track as ready
[
notify_parent:
{:track_ready, state.out_track.id, hd(state.out_track.variants),
state.out_track.encoding}
],
state
}
end

@impl true
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
IO.puts("Output pad added")

spec = [
bin_input(Pad.ref(:input, state.track.id))
...omitted
]

spec = {spec, log_metadata: [room_id: state.room_id]}

{[spec: spec], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, track_id), _ctx, state) when state.track.id == track_id do
IO.puts("Pad added")

{
# Mark the response track as ready
[
notify_parent:
{:track_ready, state.out_track.id, hd(state.out_track.variants),
state.out_track.encoding}
],
state
}
end

@impl true
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
IO.puts("Output pad added")

spec = [
bin_input(Pad.ref(:input, state.track.id))
...omitted
]

spec = {spec, log_metadata: [room_id: state.room_id]}

{[spec: spec], state}
end
Since neither of those is tripped, I think I'm not setting my testing spec up correctly, or the Testing pipeline isn't correctly notifying the element that new pads have been added.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
Though the children for this bin aren't spawned until we handle this callback:
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
...stuff
end
def handle_pad_added(Pad.ref(:output, {track_id, :high}) = pad, _ctx, state)
when track_id == state.out_track.id do
...stuff
end
And the assertions that are failing right now are prior to that in the lifecycle.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
I'm testing more, and the event returned from my handle_init I can assert on. But an event fired from this callback:
@impl true
def handle_parent_notification(
{:new_tracks, [%Track{type: :audio} = track]},
ctx,
%{track: nil} = state
) do
{:endpoint, endpoint_id} = ctx.name
# subscribe to track
IO.puts("Subscribing to track #{inspect(track.id)}")

:ok = Engine.subscribe(state.rtc_engine, endpoint_id, track.id)
out_track = out_track_spec(track, ctx)

# Publish new track
actions = [notify_parent: {:publish, {:new_tracks, [out_track]}}]
state = %{state | out_track: out_track, track: track}

{actions, state}
end
@impl true
def handle_parent_notification(
{:new_tracks, [%Track{type: :audio} = track]},
ctx,
%{track: nil} = state
) do
{:endpoint, endpoint_id} = ctx.name
# subscribe to track
IO.puts("Subscribing to track #{inspect(track.id)}")

:ok = Engine.subscribe(state.rtc_engine, endpoint_id, track.id)
out_track = out_track_spec(track, ctx)

# Publish new track
actions = [notify_parent: {:publish, {:new_tracks, [out_track]}}]
state = %{state | out_track: out_track, track: track}

{actions, state}
end
I can't assert on. Though I can see the IO.puts message in my logs, so I know the callback is being fired.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
Yes.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
I assumed the notify_parent would trigger a notification to the test pipeline and I could assert on it.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 4/1/2025 in #membrane-help
Testing a Membrane Bin used in a WebRTC Engine Endpoint
This is the assertion that's failing
assert_pipeline_notified(
pipeline,
{:endpoint, endpoint_id},
{:track_ready, ^track_id, :high, :opus},
5_000
)
assert_pipeline_notified(
pipeline,
{:endpoint, endpoint_id},
{:track_ready, ^track_id, :high, :opus},
5_000
)
I'm expecting this callback to be called:
# A pad for the track that we subscribed to was connected
@impl true
def handle_pad_added(Pad.ref(:input, track_id), _ctx, state) when state.track.id == track_id do
IO.puts("Pad added")

{
# Mark the response track as ready
[
notify_parent:
{:track_ready, state.out_track.id, hd(state.out_track.variants),
state.out_track.encoding}
],
state
}
end
# A pad for the track that we subscribed to was connected
@impl true
def handle_pad_added(Pad.ref(:input, track_id), _ctx, state) when state.track.id == track_id do
IO.puts("Pad added")

{
# Mark the response track as ready
[
notify_parent:
{:track_ready, state.out_track.id, hd(state.out_track.variants),
state.out_track.encoding}
],
state
}
end
I've even got a handle_pad_added without the guard
@impl true
def handle_pad_added(Pad.ref(:input, track_id), ctx, state) do
IO.puts("Unhandled pad added. #{inspect(track_id)}")
IO.puts("State: #{inspect(state)}")
IO.puts("Ctx: #{inspect(ctx)}")
{[], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, track_id), ctx, state) do
IO.puts("Unhandled pad added. #{inspect(track_id)}")
IO.puts("State: #{inspect(state)}")
IO.puts("Ctx: #{inspect(ctx)}")
{[], state}
end
And it's never called either.
28 replies
SMSoftware Mansion
Created by TonyLikeSocks on 2/6/2025 in #membrane-help
Testing a filter with flow control :auto?
Ah ha! Thanks @Feliks That did the trick.
8 replies
SMSoftware Mansion
Created by TonyLikeSocks on 2/6/2025 in #membrane-help
Testing a filter with flow control :auto?
I assumed I had to use the generator to simulate the stream. Alternatively, I could so something like...
buffer_actions = Enum.map(chunks, fn chunk ->
{:buffer, {:output, %Buffer{payload: chunk}}}
end)

actions = [
{:stream_format, {:output, %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 1}}},
{:event, {:output, %VoiceActivityChanged{voice_activity: :speech}}}
] ++ buffer_actions ++ [
{:event, {:output, %VoiceActivityChanged{voice_activity: :silence}}}
]
buffer_actions = Enum.map(chunks, fn chunk ->
{:buffer, {:output, %Buffer{payload: chunk}}}
end)

actions = [
{:stream_format, {:output, %RawAudio{sample_format: :s16le, sample_rate: 16_000, channels: 1}}},
{:event, {:output, %VoiceActivityChanged{voice_activity: :speech}}}
] ++ buffer_actions ++ [
{:event, {:output, %VoiceActivityChanged{voice_activity: :silence}}}
]
Which does the same thing, though it seems like the buffers are sent through all at once. I can't tell if it mimics what is happening in a normal pipeline
8 replies