Not sure if this is disconnect in what

Not sure if this is disconnect in what the interface says and what the intended behaviour is, but when viewing streams, the UI says: "Specify origins that can send cross-origin requests to this stream. Leave empty to allow all origins." But if I leave that blank, I get console errors indicating CORS errors. Is there no way to have wildcards for CORS headers?
No description
No description
20 Replies
Unknown User
Unknown User2mo ago
Message Not Public
Sign In & Join Server To View
volter
volterOP2mo ago
@kagitac 9bc757afca1548ee94d09bcfd07c6359
Unknown User
Unknown User2mo ago
Message Not Public
Sign In & Join Server To View
volter
volterOP2mo ago
npx wrangler pipelines streams create test_stream --cors-origin ['*']

⛅️ wrangler 4.40.3
───────────────────
▲ [WARNING] 🚧 `wrangler pipelines streams create` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose


✔ No schema file provided. Do you want to create stream without a schema (unstructured JSON)? … yes
🌀 Creating stream 'test_stream'...

✘ [ERROR] A request to the Cloudflare API (/accounts/9221c360e34e6aa76e568a9652ef9d0d/pipelines/v1/streams) failed.

[{"type":"invalid_string","path":"http.cors.origins.0","message":"Invalid"}] [code: 2]

If you think this is a bug, please open an issue at:
https://github.com/cloudflare/workers-sdk/issues/new/choose
npx wrangler pipelines streams create test_stream --cors-origin ['*']

⛅️ wrangler 4.40.3
───────────────────
▲ [WARNING] 🚧 `wrangler pipelines streams create` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose


✔ No schema file provided. Do you want to create stream without a schema (unstructured JSON)? … yes
🌀 Creating stream 'test_stream'...

✘ [ERROR] A request to the Cloudflare API (/accounts/9221c360e34e6aa76e568a9652ef9d0d/pipelines/v1/streams) failed.

[{"type":"invalid_string","path":"http.cors.origins.0","message":"Invalid"}] [code: 2]

If you think this is a bug, please open an issue at:
https://github.com/cloudflare/workers-sdk/issues/new/choose
Unknown User
Unknown User2mo ago
Message Not Public
Sign In & Join Server To View
volter
volterOP2mo ago
May want to update the documentation from Wrangler since it says it accepts an array: https://developers.cloudflare.com/pipelines/platform/wrangler-commands/#pipelines-streams-create
Cloudflare Docs
Wrangler commands
Create a new pipeline
Unknown User
Unknown User2mo ago
Message Not Public
Sign In & Join Server To View
volter
volterOP2mo ago
Gotcha.
Unknown User
Unknown User2mo ago
Message Not Public
Sign In & Join Server To View
volter
volterOP2mo ago
I must be responsible for like, half of the issues flagged in Pipelines🤣
Unknown User
Unknown User2mo ago
Message Not Public
Sign In & Join Server To View
Micah Wylde
Micah Wylde2mo ago
Keep em coming!
volter
volterOP2mo ago
@cole | pipelines Is my understanding correct that not all of the method of Arroyo have been implemented in Pipelines? So a SQL insert like
WITH DeduplicatedEvents AS (
SELECT
-- Select all original columns
*,
-- Assign a rank (rn) to each row
ROW_NUMBER() OVER (
-- Group rows by messageId
PARTITION BY "messageId"
-- Order by timestamp, so the earliest one gets rn = 1
ORDER BY "timestamp" ASC
) as rn
FROM
test_pipeline_stream
)
INSERT INTO test_pipeline_sink
SELECT
-- Select all columns EXCEPT the temporary ranking column 'rn'
*
FROM
DeduplicatedEvents
WHERE
-- Filter to keep only the first record for each messageId
rn = 1;
WITH DeduplicatedEvents AS (
SELECT
-- Select all original columns
*,
-- Assign a rank (rn) to each row
ROW_NUMBER() OVER (
-- Group rows by messageId
PARTITION BY "messageId"
-- Order by timestamp, so the earliest one gets rn = 1
ORDER BY "timestamp" ASC
) as rn
FROM
test_pipeline_stream
)
INSERT INTO test_pipeline_sink
SELECT
-- Select all columns EXCEPT the temporary ranking column 'rn'
*
FROM
DeduplicatedEvents
WHERE
-- Filter to keep only the first record for each messageId
rn = 1;
Is not going to work? When I try this I get an error indicating that it's not available, but I want to double check I haven't just made a mistake in my query:
npx wrangler pipelines create test_pipeline_pipeline2 --sql-file simple.sql

⛅️ wrangler 4.40.3
───────────────────
▲ [WARNING] 🚧 `wrangler pipelines create` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose


🌀 Validating SQL...

✘ [ERROR] SQL validation failed: This feature is not implemented: Query INSERT INTO test_pipeline_sink SELECT * FROM DeduplicatedEvents WHERE rn = 1 not implemented yet [code: 1014]
npx wrangler pipelines create test_pipeline_pipeline2 --sql-file simple.sql

⛅️ wrangler 4.40.3
───────────────────
▲ [WARNING] 🚧 `wrangler pipelines create` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose


🌀 Validating SQL...

✘ [ERROR] SQL validation failed: This feature is not implemented: Query INSERT INTO test_pipeline_sink SELECT * FROM DeduplicatedEvents WHERE rn = 1 not implemented yet [code: 1014]
To tell you as a user story what I am trying to do here: As a data engineer, analytics libraries have the potential to send duplicate messages despite best intentions. By have a de-duplication based on messageId this provides some defense against duplicate events sent in quick succession from being ingested twice. In the end, I am trying to get the data as close to "silver" as possible.
Unknown User
Unknown User2mo ago
Message Not Public
Sign In & Join Server To View
volter
volterOP2mo ago
@cole | pipelines Thanks. Noticed that a couple of days ago. Thanks for being so responsive
Micah Wylde
Micah Wylde2mo ago
Right now, we're just supporting the "stateless" subset of arroyo's functionality—so basically projections (select) and filters (where)—no aggregations, window functions, joins, etc. You can see the supported SQL in the pipelines docs (https://developers.cloudflare.com/pipelines/sql-reference). Supporting stateful processing with a serverless experience is really hard. We want to get there, but for now we're focused on providing a really good product for streaming ingestion into r2/iceberg. So for now, with pipelines, you're best off ingesting all of the messages and doing the final filtering in your query engine.
switz
switz2mo ago
is there a plan to support aggregations or window functions?
Micah Wylde
Micah Wylde2mo ago
We are planning on supporting aggregations in the future, but it's likely at least a few quarters out We'd love to hear the usecases people have for stateful processing though, either here or in DM or on a call Since they're supported in arroyo (the underlying engine) we might onboard some early users if it seems like something we can support well operationally
switz
switz2mo ago
personally for OTEL metrics (could add a processing layer in between)
Micah Wylde
Micah Wylde2mo ago
like pre-aggregating metrics before ingestion?

Did you find this page helpful?