const dependencies = Layer.mergeAll(
ZeroPushProcessor.Default,
Database.Default,
ClusterClientLive,
);
export class Server {
static readonly Default = pipe(
HttpRouter.Default.serve(),
HttpServer.withLogAddress,
Layer.provide(
rpcServer.pipe(
Layer.provide(
RpcServer.layerProtocolHttp({
path: "/",
}).pipe(Layer.provide(RpcSerialization.layerJson)),
),
),
),
Layer.provide(
HttpRouter.Default.use((router) =>
router.get("/heartbeat", HttpServerResponse.text("alive")),
),
),
Layer.provide(
HttpRouter.Default.use((router) =>
router.post(
"/push",
Effect.gen(function* () {
const req = yield* HttpServerRequest.HttpServerRequest;
const query = yield* HttpServerRequest.ParsedSearchParams;
const serverUrl = pipe(
new URL(req.originalUrl),
(s) => `${s.origin}`,
);
// it's fair to assume that the body is a valid JSON value
const body = (yield* req.json) as ReadonlyJSONValue;
const { accountId, tokens } = yield* authenticate(req.headers);
const processor = yield* ZeroPushProcessor;
yield* Effect.log("SERVER URL", serverUrl);
const asyncTasks: Array<
Effect.Effect<void, any, Sharding | Database>
> = [];
const serverMutator = createServerMutators(
createMutators({ sub: accountId }),
{
accessToken: tokens.access,
sub: accountId,
},
asyncTasks,
);
yield* Effect.log("About to mutate", {
query,
body,
});
const result = yield* processor.process(
// Just the default mutators for now
serverMutator,
query as any, // bah bah... gotta fix this
body as any, // bah bah... gotta fix this
);
yield* Effect.log(result);
yield* Effect.log("Running async tasks: ", asyncTasks);
yield* Effect.all(asyncTasks, {
concurrency: "unbounded",
});
return yield* HttpServerResponse.json(result);
}).pipe(
Effect.provide(dependencies),
Effect.tapErrorCause((cause) =>
Effect.logError("Error in push", { cause }),
),
),
// .pipe(
// ),
),
),
),
Layer.provide(
BunHttpServer.layer({
port: DEFAULT_PORT,
}),
),
Layer.provide(Observability.Client({ serviceName: "server" })),
);
}
const dependencies = Layer.mergeAll(
ZeroPushProcessor.Default,
Database.Default,
ClusterClientLive,
);
export class Server {
static readonly Default = pipe(
HttpRouter.Default.serve(),
HttpServer.withLogAddress,
Layer.provide(
rpcServer.pipe(
Layer.provide(
RpcServer.layerProtocolHttp({
path: "/",
}).pipe(Layer.provide(RpcSerialization.layerJson)),
),
),
),
Layer.provide(
HttpRouter.Default.use((router) =>
router.get("/heartbeat", HttpServerResponse.text("alive")),
),
),
Layer.provide(
HttpRouter.Default.use((router) =>
router.post(
"/push",
Effect.gen(function* () {
const req = yield* HttpServerRequest.HttpServerRequest;
const query = yield* HttpServerRequest.ParsedSearchParams;
const serverUrl = pipe(
new URL(req.originalUrl),
(s) => `${s.origin}`,
);
// it's fair to assume that the body is a valid JSON value
const body = (yield* req.json) as ReadonlyJSONValue;
const { accountId, tokens } = yield* authenticate(req.headers);
const processor = yield* ZeroPushProcessor;
yield* Effect.log("SERVER URL", serverUrl);
const asyncTasks: Array<
Effect.Effect<void, any, Sharding | Database>
> = [];
const serverMutator = createServerMutators(
createMutators({ sub: accountId }),
{
accessToken: tokens.access,
sub: accountId,
},
asyncTasks,
);
yield* Effect.log("About to mutate", {
query,
body,
});
const result = yield* processor.process(
// Just the default mutators for now
serverMutator,
query as any, // bah bah... gotta fix this
body as any, // bah bah... gotta fix this
);
yield* Effect.log(result);
yield* Effect.log("Running async tasks: ", asyncTasks);
yield* Effect.all(asyncTasks, {
concurrency: "unbounded",
});
return yield* HttpServerResponse.json(result);
}).pipe(
Effect.provide(dependencies),
Effect.tapErrorCause((cause) =>
Effect.logError("Error in push", { cause }),
),
),
// .pipe(
// ),
),
),
),
Layer.provide(
BunHttpServer.layer({
port: DEFAULT_PORT,
}),
),
Layer.provide(Observability.Client({ serviceName: "server" })),
);
}