M
Mastra•2w ago
Aman Zishan

Question: does createQueryVectorTool support db schema

Our setup have schema isolated storage instances. I was able to setup vector stores in schema but when agent calls the createQueryVectorTool, it does not respect schema isolation. If i create a custom tool to create embeddings on query and perform a vector search, it does work. But the custom tool does not get called easily with prompt engineering. I would like to use the native tool but have support for schema could be a key provided in databaseConfig.pgVector?
6 Replies
Mastra Triager
Mastra Triager•2w ago
šŸ“ Created GitHub issue: https://github.com/mastra-ai/mastra/issues/10584 šŸ” If you're experiencing an error, please provide a minimal reproducible example whenever possible to help us resolve it quickly. šŸ™ Thank you for helping us improve Mastra!
Abhi Aiyer
Abhi Aiyer•2w ago
Hi @Aman Zishan ill take a look How are you setting up the tool, and your vector stores Is it like this?
// PgVector instances per schema
const pgVectorSchemaA = new PgVector({
id: 'pg-vector-schema-a',
connectionString: process.env.POSTGRES_CONNECTION_STRING!,
schemaName: 'schema_a', // <-- Schema set at constructor time
});

const pgVectorSchemaB = new PgVector({
id: 'pg-vector-schema-b',
connectionString: process.env.POSTGRES_CONNECTION_STRING!,
schemaName: 'schema_b',
});

// Then use different vectorStoreName for each, or pass vectorStore directly
const toolA = createVectorQueryTool({
vectorStore: pgVectorSchemaA, // Direct instance with correct schema
indexName: "embeddings",
model: embeddingModel,
});
// PgVector instances per schema
const pgVectorSchemaA = new PgVector({
id: 'pg-vector-schema-a',
connectionString: process.env.POSTGRES_CONNECTION_STRING!,
schemaName: 'schema_a', // <-- Schema set at constructor time
});

const pgVectorSchemaB = new PgVector({
id: 'pg-vector-schema-b',
connectionString: process.env.POSTGRES_CONNECTION_STRING!,
schemaName: 'schema_b',
});

// Then use different vectorStoreName for each, or pass vectorStore directly
const toolA = createVectorQueryTool({
vectorStore: pgVectorSchemaA, // Direct instance with correct schema
indexName: "embeddings",
model: embeddingModel,
});
Aman Zishan
Aman ZishanOP•2w ago
Thats is correct; we have a getStorage() function that gets called with the schema id from requestContext. GetStorage is responsible for init the instance just like you have here. And for query tool setup as a workaround what i have now is use the native vector tool as base tool and overrode the execute function to manually fetch the correct vectorDB instance by passing the schema ID Then return the query result to match the output schema of the native tool I guess this way the framework level prompts know to call the vector tool correctly rather than completely using a custom tool and prompt engineer the agent to override framework prompt? It would be very clean to avoid custom logic to fetch schema based vector store and utilise the native vector tool itself šŸ˜„
Abhi Aiyer
Abhi Aiyer•2w ago
Do you mind sharing the code you have so I can inspect it and see the issue?
Aman Zishan
Aman ZishanOP•7d ago
export const getStorage = async (schemaId?: string) => {
const schemaName = schemaId ? TENANT_SCHEMA_PREFIX + schemaId : "mastra";
// Return cached storage if already initialized
if (storageInstances[schemaName]) {
return storageInstances[schemaName];
}

const storage = new RoledPostgresStore({
id: "mastra-vizibl-storage",
host: config.databaseHost,
port: config.databasePort,
user: config.databaseUser,
password: () => config.databasePassword,
database: config.database,
schemaName,
});

const libPGVector = new PgVector({
id: "research-vectors",
host: config.databaseHost,
port: config.databasePort,
user: config.databaseUser,
password: () => config.databasePassword,
database: config.database,
schemaName,
});

// Set search_path for vector operations to ensure vector type is accessible
libPGVector.pool.on("connect", async (client: PoolClient) => {
await client.query(`SET search_path TO "${schemaName}";`);
if (config.databaseRole) {
await client.query(`SET ROLE ${config.databaseRole};`);
}
});

// Initialize storage and create tables if needed
await storage.init();

if (config.databaseRole) {
await storage.db.query("select apply_permissions_to_all_schemas();");
}

storageInstances[schemaName] = { db: storage, vectorDb: libPGVector };
return { db: storage, vectorDb: libPGVector };
};

export const getMemory = async (schemaId: string) => {
const memory = memories[schemaId];
if (memory) {
return memory;
}
const storage = await getStorage(schemaId);

memories[schemaId] = new Memory({
storage: storage.db,
vector: storage.vectorDb,
options: {
generateTitle: true,
},
});
return memories[schemaId];
};
export const getStorage = async (schemaId?: string) => {
const schemaName = schemaId ? TENANT_SCHEMA_PREFIX + schemaId : "mastra";
// Return cached storage if already initialized
if (storageInstances[schemaName]) {
return storageInstances[schemaName];
}

const storage = new RoledPostgresStore({
id: "mastra-vizibl-storage",
host: config.databaseHost,
port: config.databasePort,
user: config.databaseUser,
password: () => config.databasePassword,
database: config.database,
schemaName,
});

const libPGVector = new PgVector({
id: "research-vectors",
host: config.databaseHost,
port: config.databasePort,
user: config.databaseUser,
password: () => config.databasePassword,
database: config.database,
schemaName,
});

// Set search_path for vector operations to ensure vector type is accessible
libPGVector.pool.on("connect", async (client: PoolClient) => {
await client.query(`SET search_path TO "${schemaName}";`);
if (config.databaseRole) {
await client.query(`SET ROLE ${config.databaseRole};`);
}
});

// Initialize storage and create tables if needed
await storage.init();

if (config.databaseRole) {
await storage.db.query("select apply_permissions_to_all_schemas();");
}

storageInstances[schemaName] = { db: storage, vectorDb: libPGVector };
return { db: storage, vectorDb: libPGVector };
};

export const getMemory = async (schemaId: string) => {
const memory = memories[schemaId];
if (memory) {
return memory;
}
const storage = await getStorage(schemaId);

memories[schemaId] = new Memory({
storage: storage.db,
vector: storage.vectorDb,
options: {
generateTitle: true,
},
});
return memories[schemaId];
};
const chatAgent = new Agent({
id: "chatAgent",
name: "Chat Agent",
memory: async ({
requestContext,
}: {
requestContext: RequestContext<Context>;
}) => {
return await getMemory(requestContext.get("schemaId"));
},
//........
const chatAgent = new Agent({
id: "chatAgent",
name: "Chat Agent",
memory: async ({
requestContext,
}: {
requestContext: RequestContext<Context>;
}) => {
return await getMemory(requestContext.get("schemaId"));
},
//........
const baseTool = createVectorQueryTool({
vectorStoreName: "libPGVector",
indexName: "tenant_embeddings",
model: new ModelRouterEmbeddingModel("openai/text-embedding-3-small"),
includeSources: true,
includeVectors: false,
});

export const vectorQueryTool = {
...baseTool,
execute: async (
inputData: { queryText: string; topK: number },
context?: ToolExecutionContext,
) => {
const { queryText, topK = 5 } = inputData;

if (!context?.requestContext) {
throw new Error("Request context not found");
}

// Get the tenant-specific schema ID from request context
const schemaId = context.requestContext.get("schemaId");
const chatContext: string = context.requestContext.get("chatContext");

const storage = await getStorage(schemaId as string);
const vectorStore = storage.vectorDb;
const model = new ModelRouterEmbeddingModel(
"openai/text-embedding-3-small",
);
const { embeddings } = await embedMany({
model,
values: [queryText],
});
const queryVector = embeddings[0];

if (!queryVector) {
throw new Error("Failed to generate embedding for query");
}
const results = await vectorStore.query({
indexName: "tenant_embeddings",
queryVector,
topK,
filter: { context: chatContext },
});
const relevantContext = results
.map((result) => result.metadata?.text as string)
.filter(Boolean)
.join("\n\n");

return {
relevantContext,
sources: results.map((result) => ({
id: result.id,
metadata: result.metadata || {},
vector: result.vector || [],
score: result.score,
document: (result.metadata?.text as string) || "",
})),
};
},
};
const baseTool = createVectorQueryTool({
vectorStoreName: "libPGVector",
indexName: "tenant_embeddings",
model: new ModelRouterEmbeddingModel("openai/text-embedding-3-small"),
includeSources: true,
includeVectors: false,
});

export const vectorQueryTool = {
...baseTool,
execute: async (
inputData: { queryText: string; topK: number },
context?: ToolExecutionContext,
) => {
const { queryText, topK = 5 } = inputData;

if (!context?.requestContext) {
throw new Error("Request context not found");
}

// Get the tenant-specific schema ID from request context
const schemaId = context.requestContext.get("schemaId");
const chatContext: string = context.requestContext.get("chatContext");

const storage = await getStorage(schemaId as string);
const vectorStore = storage.vectorDb;
const model = new ModelRouterEmbeddingModel(
"openai/text-embedding-3-small",
);
const { embeddings } = await embedMany({
model,
values: [queryText],
});
const queryVector = embeddings[0];

if (!queryVector) {
throw new Error("Failed to generate embedding for query");
}
const results = await vectorStore.query({
indexName: "tenant_embeddings",
queryVector,
topK,
filter: { context: chatContext },
});
const relevantContext = results
.map((result) => result.metadata?.text as string)
.filter(Boolean)
.join("\n\n");

return {
relevantContext,
sources: results.map((result) => ({
id: result.id,
metadata: result.metadata || {},
vector: result.vector || [],
score: result.score,
document: (result.metadata?.text as string) || "",
})),
};
},
};
@Abhi Aiyer Ideally i would like to remove this workaround as i want to use just createVectorQueryTool and createVectorQueryTool should be able to get the schema based storage instance.
Abhi Aiyer
Abhi Aiyer•7d ago
Okay i get you now! ill get a fix

Did you find this page helpful?