Clear the buffer sent to output pad

Hi there, I'm working on the integration of openai realtime api and I'm trying to implement vad. Now, when the user speaks, the audio is sent to openai and the response is collected through response.audio.delta, and sent to output through a buffer. When I receive the event input_audio_buffer.speech_started I would like to flush the buffer which hasn't been played yet and restart the process (send data to openai and receive the stream). There is probably something which I don't get regarding how Membrane works, is there someone who could help me?
15 Replies
tomashco
tomashcoOP2w ago
Actually this is the pipeline:
[
# Input path: Mic → OpenAI
child(:webrtc_source, %Membrane.WebRTC.Source{
signaling: opts[:source_channel]
})
|> via_out(:output, options: [kind: :audio])
|> child(:input_opus_parser, Membrane.Opus.Parser)
|> child(:opus_decoder, %Membrane.Opus.Decoder{sample_rate: 24_000})
# |> child(:vad, MembraneOpenAI.VAD)

# Output path: OpenAI → Browser
|> child(:open_ai, %MembraneOpenAI.OpenAIEndpoint{websocket_opts: openai_ws_opts})
# get_child(:open_ai)
# |> via_out(:output)
|> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
|> child(:realtimer, Membrane.Realtimer)
|> child(:opus_encoder, Membrane.Opus.Encoder)
|> via_in(:input, options: [kind: :audio])
|> child(:webrtc_sink, %Membrane.WebRTC.Sink{
tracks: [:audio],
signaling: opts[:sink_channel]
})
]
[
# Input path: Mic → OpenAI
child(:webrtc_source, %Membrane.WebRTC.Source{
signaling: opts[:source_channel]
})
|> via_out(:output, options: [kind: :audio])
|> child(:input_opus_parser, Membrane.Opus.Parser)
|> child(:opus_decoder, %Membrane.Opus.Decoder{sample_rate: 24_000})
# |> child(:vad, MembraneOpenAI.VAD)

# Output path: OpenAI → Browser
|> child(:open_ai, %MembraneOpenAI.OpenAIEndpoint{websocket_opts: openai_ws_opts})
# get_child(:open_ai)
# |> via_out(:output)
|> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
|> child(:realtimer, Membrane.Realtimer)
|> child(:opus_encoder, Membrane.Opus.Encoder)
|> via_in(:input, options: [kind: :audio])
|> child(:webrtc_sink, %Membrane.WebRTC.Sink{
tracks: [:audio],
signaling: opts[:sink_channel]
})
]
and this is the relevant handle info:
@impl true
def handle_info({:websocket_frame, {:text, frame}}, _ctx, state) do
case Jason.decode!(frame) do
%{"type" => "session.updated"} ->
Membrane.Logger.info("session updatedddd")
{[], state}

%{"type" => "function_call", "name" => name} ->
Membrane.Logger.info("FUNCTION CALL: #{name}")
{[], state}

%{"type" => "input_audio_buffer.speech_started"} ->
Membrane.Logger.info("User is Speaking, stop talking")

# Flush the accumulated audio buffer (clear it since audio is already streamed)

# Cancel the current response from OpenAI
frame = %{type: "response.cancel"} |> Jason.encode!()

:ok =
MembraneOpenAI.OpenAIWebSocket.send_frame(updated_state.ws, frame)

# Send reset event to restart the process

{[event: {:output, %Membrane.Realtimer.Events.Reset{}}], state}

%{"type" => "response.audio.delta", "delta" => delta} ->
audio_payload = Base.decode64!(delta)

{[buffer: {:output, %Membrane.Buffer{payload: audio_payload}}], state}

%{"type" => "response.audio.done"} ->
{[event: {:output, %Membrane.Realtimer.Events.Reset{}}], state}

%{"type" => "response.audio_transcript.done", "transcript" => transcript} ->
Membrane.Logger.info("AI transcription: #{transcript}")
{[], state}

%{} ->
{[], state}
end
end
@impl true
def handle_info({:websocket_frame, {:text, frame}}, _ctx, state) do
case Jason.decode!(frame) do
%{"type" => "session.updated"} ->
Membrane.Logger.info("session updatedddd")
{[], state}

%{"type" => "function_call", "name" => name} ->
Membrane.Logger.info("FUNCTION CALL: #{name}")
{[], state}

%{"type" => "input_audio_buffer.speech_started"} ->
Membrane.Logger.info("User is Speaking, stop talking")

# Flush the accumulated audio buffer (clear it since audio is already streamed)

# Cancel the current response from OpenAI
frame = %{type: "response.cancel"} |> Jason.encode!()

:ok =
MembraneOpenAI.OpenAIWebSocket.send_frame(updated_state.ws, frame)

# Send reset event to restart the process

{[event: {:output, %Membrane.Realtimer.Events.Reset{}}], state}

%{"type" => "response.audio.delta", "delta" => delta} ->
audio_payload = Base.decode64!(delta)

{[buffer: {:output, %Membrane.Buffer{payload: audio_payload}}], state}

%{"type" => "response.audio.done"} ->
{[event: {:output, %Membrane.Realtimer.Events.Reset{}}], state}

%{"type" => "response.audio_transcript.done", "transcript" => transcript} ->
Membrane.Logger.info("AI transcription: #{transcript}")
{[], state}

%{} ->
{[], state}
end
end
thank you!
varsill
varsill2w ago
Hello! I think the problem is that %Membrane.Realtimer.Events.Reset{} is not meant to discard all the buffers which are "on the way" between OpenAIEndpoint and the Realtimer (the buffers might either be in the message queues of the elements in-between or in their input queues). You would need to implement your own "buffers discarding" element and put it right before the RawAudioParser. Such an element would need to have manual flow control for both input and output pads and always demand for buffers on input, no matter if demand on output pad is positive. In its handle_buffer it would need to store the incoming buffers in element's internal state. It should only pop the buffers from the state and send them further in handle_demand. The element would also need to handle a custom asynchronious event (https://hexdocs.pm/membrane_core/1.2.4/Membrane.Event.html#async?/1) in its handle_event by discarding all the buffers stored in the internal state. You could then send such an asynchronious event in your OpenAIEndpoint along the %Membrane.Realtimer.Events.Reset{}
tomashco
tomashcoOP2w ago
Hi @varsill, thank you for pointing me to the direction, I've implemented the buffer_discarder module like follows, and I can see that the queue gets feeded with chunks, but the Realtimer element (or any other element later in the pipeline) never requests the chunks to play. I.e. the handle_demand never gets called This is the BufferDiscarder I implemented:
defmodule Membrane.BufferDiscarder do
use Membrane.Filter

alias Membrane.Buffer
alias Membrane.Event
alias Membrane.BufferDiscarder.Events.Flush

def_input_pad(:input,
accepted_format: _any,
flow_control: :manual,
demand_unit: :buffers
)

def_output_pad(:output,
accepted_format: _any,
flow_control: :manual,
demand_unit: :buffers
)

@impl true
def handle_init(_ctx, _opts) do
{[], %{queue: :queue.new()}}
end

@impl true
def handle_buffer(:input, %Buffer{} = buffer, _ctx, state) do
Membrane.Logger.info(
"Receiving buffer with PTS: #{buffer.pts}, buffer length: #{:queue.len(state.queue)}"
)

{[redemand: :output], %{state | queue: :queue.in(buffer, state.queue)}}
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{to_send, rest} = take_from_queue(state.queue, size)

actions =
if to_send == [] do
[]
else
Membrane.Logger.info("Sending buffers. First buffer's PTS: #{hd(to_send).pts}")
[buffer: {:output, to_send}]
end

{actions, %{state | queue: rest}}
end

@impl true
def handle_event(:input, %Flush{}, _ctx, _state) do
Membrane.Logger.info("Flushing buffers from internal state")
{[], %{queue: :queue.new()}}
end

@impl true
def handle_event(_pad, event, _ctx, state) do
{[forward: event], state}
end

defp take_from_queue(queue, count) do
do_take(queue, count, [])
end

defp do_take(queue, 0, acc), do: {Enum.reverse(acc), queue}

defp do_take(queue, n, acc) do
case :queue.out(queue) do
{{:value, buf}, rest} -> do_take(rest, n - 1, [buf | acc])
{:empty, _} -> {Enum.reverse(acc), queue}
end
end
end
defmodule Membrane.BufferDiscarder do
use Membrane.Filter

alias Membrane.Buffer
alias Membrane.Event
alias Membrane.BufferDiscarder.Events.Flush

def_input_pad(:input,
accepted_format: _any,
flow_control: :manual,
demand_unit: :buffers
)

def_output_pad(:output,
accepted_format: _any,
flow_control: :manual,
demand_unit: :buffers
)

@impl true
def handle_init(_ctx, _opts) do
{[], %{queue: :queue.new()}}
end

@impl true
def handle_buffer(:input, %Buffer{} = buffer, _ctx, state) do
Membrane.Logger.info(
"Receiving buffer with PTS: #{buffer.pts}, buffer length: #{:queue.len(state.queue)}"
)

{[redemand: :output], %{state | queue: :queue.in(buffer, state.queue)}}
end

@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{to_send, rest} = take_from_queue(state.queue, size)

actions =
if to_send == [] do
[]
else
Membrane.Logger.info("Sending buffers. First buffer's PTS: #{hd(to_send).pts}")
[buffer: {:output, to_send}]
end

{actions, %{state | queue: rest}}
end

@impl true
def handle_event(:input, %Flush{}, _ctx, _state) do
Membrane.Logger.info("Flushing buffers from internal state")
{[], %{queue: :queue.new()}}
end

@impl true
def handle_event(_pad, event, _ctx, state) do
{[forward: event], state}
end

defp take_from_queue(queue, count) do
do_take(queue, count, [])
end

defp do_take(queue, 0, acc), do: {Enum.reverse(acc), queue}

defp do_take(queue, n, acc) do
case :queue.out(queue) do
{{:value, buf}, rest} -> do_take(rest, n - 1, [buf | acc])
{:empty, _} -> {Enum.reverse(acc), queue}
end
end
end
varsill
varsill2w ago
Hello! It's quite strange that handle_demand is never called, and handle_buffer is, as I don't see any "initial demand" action sent (and without any demand action we shouldn't ever see handle_buffer). Perhaps handle_demand is called exactly once, right before the first handle buffer? If so, you need to return redemand action on the :output pad from the handle_buffer. redemand action checks if there is positive demand on given pad and if so, it calls handle_demand for that pad - so it allows to handle a case in which the whole demand wasn't satisfied in previous handle_demand and we need to satisfy rest of it when new buffers come.
tomashco
tomashcoOP2w ago
Hi @varsill! I've updated the code above adding the redemand action into the handle_buffer and now the handle_demand is called, passing the buffer (and I hear the sound). But the logic to discard the buffer when a new event of speech_started arrives is not working, I think because the handle_demand is called sinchronously when a new buffer arrives in buffer_discarder. My idea is that the buffer is stored into buffer_discarder and then the Realtimer requests a new buffer just before playing it (so that I can flush the buffer in buffer_discarder when I want). Is it a configuration that Realtimer could handle?
varsill
varsill2w ago
You are right, but I think it should be sufficient to increase the "granularity" of the demands. . Could you try adding via_in(:input, target_queue_size: 1) before the Realtimer?
tomashco
tomashcoOP2w ago
I tried changing the pipeline like this:
# Output path: OpenAI → Browser
|> child(:open_ai, %MembraneOpenAI.OpenAIEndpoint{websocket_opts: openai_ws_opts})
|> child(:buffer_discarder, Membrane.BufferDiscarder)
|> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
|> child(:realtimer, Membrane.Realtimer)
|> via_in(:input, target_queue_size: 1)
|> child(:opus_encoder, Membrane.Opus.Encoder)
|> via_in(:input, options: [kind: :audio])
|> child(:webrtc_sink, %Membrane.WebRTC.Sink{
tracks: [:audio],
signaling: opts[:sink_channel]
})
# Output path: OpenAI → Browser
|> child(:open_ai, %MembraneOpenAI.OpenAIEndpoint{websocket_opts: openai_ws_opts})
|> child(:buffer_discarder, Membrane.BufferDiscarder)
|> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
|> child(:realtimer, Membrane.Realtimer)
|> via_in(:input, target_queue_size: 1)
|> child(:opus_encoder, Membrane.Opus.Encoder)
|> via_in(:input, options: [kind: :audio])
|> child(:webrtc_sink, %Membrane.WebRTC.Sink{
tracks: [:audio],
signaling: opts[:sink_channel]
})
but it continue to send all the buffers directly to Realtimer
Mateusz Front
Mateusz Front2w ago
IMO, as Membrane doesn't give precise control over how buffers are queued, and due to various corner cases, the best option is to generate the output in the real time within the element that receives responses from OpenAI. This requires reimplementing a part of the realtimer, but it's fairly easy.
Mateusz Front
Mateusz Front2w ago
GitHub
boombox/examples.livemd at master · membraneframework/boombox
Boombox is a simple streaming tool built on top of Membrane - membraneframework/boombox
Mateusz Front
Mateusz Front2w ago
Since Boombox operates on streams, we use Stream.interval to achieve the 'realtime' behaviour and periodically fetch data from a GenServer that buffers LLM responses This way, on speech started, you just drop your internal buffer and you're good Generally, in my experience, in complex use cases it's better to have the 'realtiming' baked in your custom element than trying to make use of realtimer
tomashco
tomashcoOP2w ago
so in this way I should implement the tick logic of realtimer into the buffer_discarder element
Mateusz Front
Mateusz Front2w ago
as far as I understand your snippet, it should be in the OpenAIEndpoint
tomashco
tomashcoOP2w ago
so basically what I should do in the openai element is: - receive the output from openai and store into a buffer - play the buffer following the implementation of realtimer (i.e. with handle_tick) - discard the buffer if the flush event is received thank you for the help! I'll work on it
Mateusz Front
Mateusz Front2w ago
exactly
tomashco
tomashcoOP2w ago
Lesson learnt from today: the start_timer interval shall be defined in nanoseconds, not milliseconds as I was believing 😄 I've been able to correctly fix the clearing of buffer, thank you!

Did you find this page helpful?