Clear the buffer sent to output pad
Hi there, I'm working on the integration of openai realtime api and I'm trying to implement vad.
Now, when the user speaks, the audio is sent to openai and the response is collected through
response.audio.delta
, and sent to output through a buffer.
When I receive the event input_audio_buffer.speech_started
I would like to flush the buffer which hasn't been played yet and restart the process (send data to openai and receive the stream).
There is probably something which I don't get regarding how Membrane works, is there someone who could help me?15 Replies
Actually this is the pipeline:
and this is the relevant handle info:
thank you!
Hello! I think the problem is that
%Membrane.Realtimer.Events.Reset{}
is not meant to discard all the buffers which are "on the way" between OpenAIEndpoint
and the Realtimer
(the buffers might either be in the message queues of the elements in-between or in their input queues).
You would need to implement your own "buffers discarding" element and put it right before the RawAudioParser.
Such an element would need to have manual flow control for both input and output pads and always demand for buffers on input, no matter if demand on output pad is positive. In its handle_buffer
it would need to store the incoming buffers in element's internal state. It should only pop the buffers from the state and send them further in handle_demand
.
The element would also need to handle a custom asynchronious event (https://hexdocs.pm/membrane_core/1.2.4/Membrane.Event.html#async?/1) in its handle_event
by discarding all the buffers stored in the internal state.
You could then send such an asynchronious event in your OpenAIEndpoint
along the %Membrane.Realtimer.Events.Reset{}
Hi @varsill, thank you for pointing me to the direction, I've implemented the buffer_discarder module like follows, and I can see that the queue gets feeded with chunks, but the Realtimer element (or any other element later in the pipeline) never requests the chunks to play. I.e. the handle_demand never gets called
This is the BufferDiscarder I implemented:
Hello! It's quite strange that
handle_demand
is never called, and handle_buffer
is, as I don't see any "initial demand
" action sent (and without any demand
action we shouldn't ever see handle_buffer
). Perhaps handle_demand
is called exactly once, right before the first handle buffer
?
If so, you need to return redemand
action on the :output
pad from the handle_buffer
.
redemand
action checks if there is positive demand on given pad and if so, it calls handle_demand
for that pad - so it allows to handle a case in which the whole demand wasn't satisfied in previous handle_demand
and we need to satisfy rest of it when new buffers come.Hi @varsill! I've updated the code above adding the redemand action into the handle_buffer and now the handle_demand is called, passing the buffer (and I hear the sound). But the logic to discard the buffer when a new event of speech_started arrives is not working, I think because the handle_demand is called sinchronously when a new buffer arrives in buffer_discarder.
My idea is that the buffer is stored into buffer_discarder and then the Realtimer requests a new buffer just before playing it (so that I can flush the buffer in buffer_discarder when I want).
Is it a configuration that Realtimer could handle?
You are right, but I think it should be sufficient to increase the "granularity" of the demands. . Could you try adding
via_in(:input, target_queue_size: 1)
before the Realtimer?I tried changing the pipeline like this:
but it continue to send all the buffers directly to Realtimer
IMO, as Membrane doesn't give precise control over how buffers are queued, and due to various corner cases, the best option is to generate the output in the real time within the element that receives responses from OpenAI. This requires reimplementing a part of the realtimer, but it's fairly easy.
BTW, here's a boombox example of talk to LLM: https://github.com/membraneframework/boombox/blob/master/examples.livemd#talk-to-chat-gpt
GitHub
boombox/examples.livemd at master · membraneframework/boombox
Boombox is a simple streaming tool built on top of Membrane - membraneframework/boombox
Since Boombox operates on streams, we use Stream.interval to achieve the 'realtime' behaviour and periodically fetch data from a GenServer that buffers LLM responses
This way, on speech started, you just drop your internal buffer and you're good
Generally, in my experience, in complex use cases it's better to have the 'realtiming' baked in your custom element than trying to make use of realtimer
so in this way I should implement the tick logic of realtimer into the buffer_discarder element
as far as I understand your snippet, it should be in the OpenAIEndpoint
so basically what I should do in the openai element is:
- receive the output from openai and store into a buffer
- play the buffer following the implementation of realtimer (i.e. with handle_tick)
- discard the buffer if the flush event is received
thank you for the help! I'll work on it
exactly
Lesson learnt from today: the start_timer interval shall be defined in nanoseconds, not milliseconds as I was believing 😄
I've been able to correctly fix the clearing of buffer, thank you!