Writing a `Bin` queuing content from multiple remote files

@skillet wrote in https://discord.com/channels/464786597288738816/1007192081107791902/1224491418626560121
Hello all. New to the framework (and elixir) and still a little fuzzy on how to implement my idea. Basically I want to stitch together a bunch of wav and/or mp3 files and stream them indefinitely. Like a queue where I can keep adding files and the pipeline should grab them as needed FIFO style. The files will be downloaded via HTTP. So what I'm currently envisioning is a Bin that uses a Hackney source element to grab the file and push it on down. Then, when it's done it will get replaced with a new Hackney source pointing to the next file. Does that sound like the right idea, and are there any good examples you can point me to that do something similar?
F
Feliks40d ago
Your problem can be solved in the following way: Write a Bin, that holds in its own state queue of urls to the files. Bin should have one output pad with availability: :always. Bin should spawn following children:
- child(:funnel, %Funnel{end_of_stream: :never}) |> bin_output() - this should be spawned in handle_init (https://hexdocs.pm/membrane_funnel_plugin) - child({:hackeny, url}, %Hackney.Source{location: url, hackney_opts: [follow_redicrect: true]} |> get_child(:funnel) - the first hackeny source should be spawned, when the first url will be available. handle_element_end_of_stream from {:hackney, url} in Bin should terminate Hackney.Source that has sent end of stream and spawn new one and link it to the funnel. Keep in mind, that if you want to put a decoder after a Bin, it might have problems with receiving many mp3 files one after another from a single pad.
S
skillet40d ago
@Feliks thank you, this is perfect! And we spawn the new children by returning a new spec from the callbacks?
F
Feliks40d ago
Yes. Unfortunatelly, example in membrane_hackney_plugin is little outdated, so don't rely on it. You should also use :remove_children action These links might be helpful: - https://hexdocs.pm/membrane_core/Membrane.Bin.Action.html#t:remove_children/0 - https://membrane.stream/learn/get_started_with_membrane - https://membrane.stream/learn
Membrane framework
Reliable & scalable multimedia streaming framework for Elixir
Learn - Membrane framework
Reliable & scalable multimedia streaming framework for Elixir
S
skillet39d ago
@Feliks follow up question: what do you think would be the best protocol on the sink end to stream the result to the browser in a Phoenix LiveView app? Preferably to be played in a standard HTML audio tag
F
Feliks39d ago
If audio, that you want to play will be static and pre-defined, you can just put url to a static file in an audio tag. If it won't be static, you can use HLS to send it to the browser, but as far as I know, it cannot be done with just an audio tag on the browser side, but there are some JS libraries that can handle receiving HLS from a server. (https://github.com/video-dev/hls.js) The third option is WebRTC, but it is designed to handle live use-cases and it might make server deployment harder
GitHub
GitHub - video-dev/hls.js: HLS.js is a JavaScript library that play...
HLS.js is a JavaScript library that plays HLS in browsers with support for MSE. - video-dev/hls.js
S
skillet38d ago
Cool. I was trying to avoid bringing in a JS dependency but it looks like HLS might indeed be the best option Hey @Feliks , do you see anything wrong with this spec? The pipeline is shutting down before hitting the "After spec" line:
@impl true
def handle_init(_ctx, options) do
IO.inspect(options, label: "Options: ")
sink_bin = %HTTPAdaptiveStream.SinkBin{
manifest_module: HTTPAdaptiveStream.HLS,
storage: %FileStorage{directory: "priv/static/audio"},
mode: :live,
persist?: true
}

IO.inspect("Before spec")

spec =
child(:source_bin, %ReplicateSourceBin{input_schema: options.input_schema})
|> child(:sink_bin, sink_bin)

IO.inspect("After spec")

{[spec: spec], %{}}
end
@impl true
def handle_init(_ctx, options) do
IO.inspect(options, label: "Options: ")
sink_bin = %HTTPAdaptiveStream.SinkBin{
manifest_module: HTTPAdaptiveStream.HLS,
storage: %FileStorage{directory: "priv/static/audio"},
mode: :live,
persist?: true
}

IO.inspect("Before spec")

spec =
child(:source_bin, %ReplicateSourceBin{input_schema: options.input_schema})
|> child(:sink_bin, sink_bin)

IO.inspect("After spec")

{[spec: spec], %{}}
end
Getting these logs:
"Before spec"
[debug] ReplicatePipeline/ subprocess supervisor got exit request from parent, reason: :shutdown, shutting down children
"Before spec"
[debug] ReplicatePipeline/ subprocess supervisor got exit request from parent, reason: :shutdown, shutting down children
And handle_init in the source bin is not executing I'm starting the pipeline under a supervision tree inside a handle_event in a live view if that's relevant Turns out there was a problem with options.input_schema as options is a keyword list, not a map. I never saw that log until I ran it from the app's root supervisor
F
Feliks37d ago
Ok, so is the problem solved?
S
skillet37d ago
That one is! Now I have a new one 😅 I'm getting an error about returning setup: :complete from handle_setup. It says the setup is already complete. Haven't had a chance to investigate yet but I'll keep you posted
F
Feliks36d ago
Returning setup: :complete is reasonable only if you returned setup: :incomplete ealier in handle_setup, eg. you return setup: :incomplete from handle_setup and 1 second later setup: :complete from handle_info. Setup is completed by default after handle_setup, you have to return setup: :incomplete to change this behaviour I guess that there is no need to return setup: :complete in your case
S
skillet34d ago
Ah! That explains it. Thank you Ever seen this one before @Feliks ?
[error] GenServer #PID<0.916.0> terminating
** (CaseClauseError) no case clause matching: "LIST"
(membrane_wav_plugin 0.10.1) lib/membrane_wav/parser.ex:162: Membrane.WAV.Parser.parse_payload/3
[error] GenServer #PID<0.916.0> terminating
** (CaseClauseError) no case clause matching: "LIST"
(membrane_wav_plugin 0.10.1) lib/membrane_wav/parser.ex:162: Membrane.WAV.Parser.parse_payload/3
Last message: {Membrane.Core.Message, :buffer, [%Membrane.Buffer{payload: <<82, 73, 70, 70, 70, 196, 9, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 2, 0, 0, 125, 0, 0, 0, 244, 1, 0, 4, 0, 16, 0, 76, 73, 83, 84, 26, 0, 0, 0, 73, ...>>, pts: nil, dts: nil, metadata: %{}}], [for_pad: :input]}
Last message: {Membrane.Core.Message, :buffer, [%Membrane.Buffer{payload: <<82, 73, 70, 70, 70, 196, 9, 0, 87, 65, 86, 69, 102, 109, 116, 32, 16, 0, 0, 0, 1, 0, 2, 0, 0, 125, 0, 0, 0, 244, 1, 0, 4, 0, 16, 0, 76, 73, 83, 84, 26, 0, 0, 0, 73, ...>>, pts: nil, dts: nil, metadata: %{}}], [for_pad: :input]}
And this is the spec:
spec =
child(:hackney, %Hackney.Source{location: output, hackney_opts: [follow_redirect: true]})
|> child(:parser, Membrane.WAV.Parser)
|> get_child(:funnel)
spec =
child(:hackney, %Hackney.Source{location: output, hackney_opts: [follow_redirect: true]})
|> child(:parser, Membrane.WAV.Parser)
|> get_child(:funnel)
F
Feliks34d ago
This looks like Membrane.WAV.Parser receives buffer with payload, that contains field with value, that for some reason (idk why) is not supported by our parser Where did you take you .wav data from?
S
skillet34d ago
It's generated by the MusicGen AI model running on replicate.com the API outputs a URL like https://replicate.delivery/pbxt/xe13ALmFxxQtX6qbQ2JzIQl4cF8835PeYKHexJaN5gIIQiQlA/out.wav which I pass into the Hackney source
F
Feliks34d ago
Your .wav contains some field specifying chunk type set to "LIST", while our parser supports "data" and "fact" values.
S
skillet34d ago
Interesting I can also get them delivered as mp3, but I didn't see any parsers for that, only decoders expecting an mp3 file rather than a bytestream But I'm probably overlooking something
F
Feliks34d ago
MP3 decoder should be able to handle bytestream, that is not chunked in any specific way
S
skillet34d ago
Ok, I'll give that a shot
F
Feliks34d ago
What do you want to do with this mp3 then?
S
skillet34d ago
Send it through the funnel to be added to an ongoing HLS stream basically the AI will be continually generating these files in the background and I will add them to the stream as they are delivered
F
Feliks34d ago
So decode mp3 end encode raw audio to AAC and put it into HLS
S
skillet32d ago
right @Feliks do I need to explicitly tell the pipeline to wait for its child bin to finish setup before playing?
F
Feliks31d ago
handle_playing will be always called after finishing the setup, but various elements may finish setup and execute handle_playing independently. Every element/bin (let's name it :child) executes handle_playing as soon as: 1. It's own setup is completed 2. Setups of all other elements/bins spawned in the same spec as :child are completed 3. Parent of :child is already playing For pipelines it is simpler: every pipeline executes handle_playing just after finishing setup. Setup is finished: 1. Just after handle_setup, if action {:setup, :incomplete} wasn't returned from it. 2. Otherwise, just after returning action {:setup, :complete}, if handle_setup had returned {:setup, :incomplete} earlier If you shared your code, maybe I would be able to say what is the reason of your problem
S
skillet31d ago
Sure thing!
S
skillet31d ago
S
skillet31d ago
S
skillet31d ago
I've also tried with all the conversion elements inside the source bin And here is the error I'm currently getting:
[error] GenServer #PID<0.830.0> terminating
** (ArithmeticError) bad argument in arithmetic expression
:erlang.-(nil, nil)
(membrane_aac_fdk_plugin 0.18.7) lib/membrane_aac_fdk_plugin/encoder.ex:281: Membrane.AAC.FDK.Encoder.validate_pts_integrity/2
(membrane_aac_fdk_plugin 0.18.7) lib/membrane_aac_fdk_plugin/encoder.ex:165: Membrane.AAC.FDK.Encoder.handle_buffer/4
(membrane_core 1.0.1) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.0.1) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(elixir 1.16.2) lib/enum.ex:2528: Enum."-reduce/3-lists^foldl/2-0-"/3
(membrane_core 1.0.1) lib/membrane/core/element.ex:232: Membrane.Core.Element.handle_info/2
(stdlib 5.2) gen_server.erl:1095: :gen_server.try_handle_info/3
(stdlib 5.2) gen_server.erl:1183: :gen_server.handle_msg/6
(stdlib 5.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
[error] GenServer #PID<0.830.0> terminating
** (ArithmeticError) bad argument in arithmetic expression
:erlang.-(nil, nil)
(membrane_aac_fdk_plugin 0.18.7) lib/membrane_aac_fdk_plugin/encoder.ex:281: Membrane.AAC.FDK.Encoder.validate_pts_integrity/2
(membrane_aac_fdk_plugin 0.18.7) lib/membrane_aac_fdk_plugin/encoder.ex:165: Membrane.AAC.FDK.Encoder.handle_buffer/4
(membrane_core 1.0.1) lib/membrane/core/callback_handler.ex:139: Membrane.Core.CallbackHandler.exec_callback/4
(membrane_core 1.0.1) lib/membrane/core/callback_handler.ex:69: Membrane.Core.CallbackHandler.exec_and_handle_callback/5
(elixir 1.16.2) lib/enum.ex:2528: Enum."-reduce/3-lists^foldl/2-0-"/3
(membrane_core 1.0.1) lib/membrane/core/element.ex:232: Membrane.Core.Element.handle_info/2
(stdlib 5.2) gen_server.erl:1095: :gen_server.try_handle_info/3
(stdlib 5.2) gen_server.erl:1183: :gen_server.handle_msg/6
(stdlib 5.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
I'm sure the hackney element's stream is ending much faster than I can generate new files so I'm not 100% sure how to handle that. I need it to gracefully wait for more files to be available Yeah I guess I need to check if the queue is empty in the end of stream handler and put it in some kind of waiting mode Then in the :get_next_track handler I can check if it is waiting and kick it off again But even with this obvious problem, I'm still not sure why I'm getting that error. Seems that the AAC encoder is getting some data without a pts But now that I think about it, I'm not sure the hackney source's end of stream handler is even being called. But the funnel's is Here is a reworked end of stream handler:
@impl true
def handle_element_end_of_stream(
{:hackney, _url} = element,
_pad,
_context,
%{queue: queue} = state
) do
remove_hackney = [remove_children: [element]]

case LimitedQueue.pop(queue) do
{:ok, queue, next_track} ->
spec =
child({:hackney, next_track}, %Hackney.Source{
location: next_track,
hackney_opts: [follow_redirect: true]
})
|> get_child(:funnel)

{[remove_hackney | [spec: spec]], %{state | queue: queue}}

_ ->
{[remove_hackney | [notify_parent: :waiting]], state}
end
end
@impl true
def handle_element_end_of_stream(
{:hackney, _url} = element,
_pad,
_context,
%{queue: queue} = state
) do
remove_hackney = [remove_children: [element]]

case LimitedQueue.pop(queue) do
{:ok, queue, next_track} ->
spec =
child({:hackney, next_track}, %Hackney.Source{
location: next_track,
hackney_opts: [follow_redirect: true]
})
|> get_child(:funnel)

{[remove_hackney | [spec: spec]], %{state | queue: queue}}

_ ->
{[remove_hackney | [notify_parent: :waiting]], state}
end
end
F
Feliks29d ago
Sorry for delay The error from aac fdk plugin occurs in the changes that where introduced recently, so try downgrading version of this plugin to {:membrance_aac_fdk_plugin, "0.18.2"}
A
Aske29d ago
Thanks! Just spent a couple of hours on this, because I thought I were doing something wrong, and somehow had messed up the pts in my pipeline.
A
Aske29d ago
I opened an issue here https://github.com/membraneframework/membrane_core/issues/791 to track it. @skillet do you have an example stream you can share, to help reproduce the problem? I'm not able to share the one I have.
GitHub
AAC FDK plugin pts bad arg in arithmetic · Issue #791 · membranefra...
When encoding with certain streams (sorry, can't share the one example I have), the pts check fails with: [error] GenServer #PID<0.830.0> terminating ** (ArithmeticError) bad argument in ...
S
skillet29d ago
Thanks guys. @Aske I will put something together for you to reproduce it
F
Feliks27d ago
@Aske @skillet does downgrading membrane_aac_fdk_plugin to 0.18.2 or lower solve your problem and allow you to go further?
S
skillet27d ago
Yes it worked for me!
A
Aske26d ago
Yes, downgrading worked for me! Thanks a lot for suggesting that! ❤️
F
Feliks25d ago
@Aske @skillet We have just released version 0.18.8 of membrane_aac_fdk_plugin fixing the bug that you have met, so you should be able to update this pluing to the newest version and use it without such an error. Let me know, if the new version will cause any other problem
S
skillet25d ago
Thank you @Feliks ! I'll try it out today
Want results from more Discord servers?
Add your server
More Posts
Split audio file into 20mb chunksIm trying to figure out how to take the file at this URL, and send it to OpenAI in chunks of 20mb: hbundlex nifs and libasanIs it anyone build nifs with libasan support? Even if I put compiler_flags ["-fno-omit-frame-pointerunifex seg fault on handle_destroy_stateHi, i'm implementing g772.1 decoder/encoder plugin and have issue with handle_destroy state. I've taDeveloping an advanced Jellyfish use caseHey I've been using jellyfish to develop a platform for essentially one-on-one calls between two peotoilet capacity of outbound_rtx_controllerHi, I'm getting the following error on SessionBin: ``` [error] <0.1282.0>/:sip_rtp/{:outbound_rtx_cOn JF Tracks and Reconnecting (in React)So I noticed a few things about the react-sdk and JF tracks in general. Note I have react code that h264 encoder problemshi guys, I'm using h264 encoder plugin for video encoding and sending it via rtp to client. SometimPipeline for muxing 2 msr files (audio and video) into a single flv fileI have the following pipeline which takes 2 msr files (recorded to disk using the RecordingEntrypoinMP3 output is audible, but test not passHi everyone. I made small changes in `membrane_mp3_lame_plugin` to support other input config (the oGrab keyframe image data from h264 stream?We are looking at some h264 coming from an RTSP stream. Membrane is doing a fine job with the HLS deUnity client?Thanks a lot to the Membrane team. The number of examples and availability of code has been incredibWebRTC stream not workingI've hit an issue trying to get an MPEGTS stream displaying in the browser via WebRTC. All seems toError when compiling free4chatThis is probably pretty basic but I'm at square one. I get an error when I'm compiling free4chat, a Screen ShareIs there any reason why jellyfish_videoroom when is running localy share screen is not working ? I sTesting Membrane ElementI'm trying to setup a simple test of a membrane element, but I'm stumped a bit on how to assert thatClustering and scale out behaviourContext: I've a membrane application running in an elixir cluster of 1. It receives a RTSP stream (URTSP authentication problem?Hi, I am playing with an RTSP camera (**Tapo C210**) that is on the local network. The path to the sExtending the jellyfish video room demo with a queueHey I am curious what a scalable way would be to add a queue in front of jellyfish rooms. I have a sDebugging bundlex/unifex errorsHello, I've been tinkering with membrane cross-compiled to Nerves (rpi4). I've had features (e.g. mRTP to HLS Disconnect and Reconnect Audio StreamHello everyone, I currently have a microphone input which is sending UDP to a server and into a RTP