diff --git a/packages/adapter-postgres/src/index.ts b/packages/adapter-postgres/src/index.ts index 701b8eafdc0..4b4459b59be 100644 --- a/packages/adapter-postgres/src/index.ts +++ b/packages/adapter-postgres/src/index.ts @@ -1,5 +1,12 @@ import { v4 } from "uuid"; -import pg, { type Pool } from "pg"; +import pg, { + type Pool, + QueryConfig, + QueryConfigValues, + QueryResult, + QueryResultRow, + DatabaseError, +} from "pg"; import { Account, Actor, @@ -76,22 +83,33 @@ export class PostgresDatabaseAdapter }); } - async init() { - await this.testConnection(); + async query( + queryTextOrConfig: string | QueryConfig, + values?: QueryConfigValues + ): Promise> { + const client = await this.pool.connect(); try { - const client = await this.pool.connect(); - const schema = fs.readFileSync( - path.resolve(__dirname, "../schema.sql"), - "utf8" - ); - await client.query(schema); + return client.query(queryTextOrConfig, values); } catch (error) { elizaLogger.error(error); throw error; + } finally { + client.release(); } } + async init() { + await this.testConnection(); + + const schema = fs.readFileSync( + path.resolve(__dirname, "../schema.sql"), + "utf8" + ); + + await this.query(schema); + } + async testConnection(): Promise { let client; try { @@ -104,87 +122,65 @@ export class PostgresDatabaseAdapter return true; } catch (error) { elizaLogger.error("Database connection test failed:", error); - throw new Error(`Failed to connect to database: ${error.message}`); + throw new Error( + `Failed to connect to database: ${(error as Error).message}` + ); } finally { if (client) client.release(); } } async getRoom(roomId: UUID): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - "SELECT id FROM rooms WHERE id = $1", - [roomId] - ); - return rows.length > 0 ? (rows[0].id as UUID) : null; - } finally { - client.release(); - } + const { rows } = await this.query( + "SELECT id FROM rooms WHERE id = $1", + [roomId] + ); + + return rows.length > 0 ? (rows[0].id as UUID) : null; } async getParticipantsForAccount(userId: UUID): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - `SELECT id, "userId", "roomId", "last_message_read" - FROM participants - WHERE "userId" = $1`, - [userId] - ); - return rows as Participant[]; - } finally { - client.release(); - } + const { rows } = await this.query( + `SELECT id, "userId", "roomId", "last_message_read" + FROM participants + WHERE "userId" = $1`, + [userId] + ); + return rows as Participant[]; } async getParticipantUserState( roomId: UUID, userId: UUID ): Promise<"FOLLOWED" | "MUTED" | null> { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - `SELECT "userState" FROM participants WHERE "roomId" = $1 AND "userId" = $2`, - [roomId, userId] - ); - return rows.length > 0 ? rows[0].userState : null; - } finally { - client.release(); - } + const { rows } = await this.query( + `SELECT "userState" FROM participants WHERE "roomId" = $1 AND "userId" = $2`, + [roomId, userId] + ); + return rows.length > 0 ? rows[0].userState : null; } async getMemoriesByRoomIds(params: { + agentId: UUID; roomIds: UUID[]; - agentId?: UUID; tableName: string; }): Promise { - const client = await this.pool.connect(); - try { - if (params.roomIds.length === 0) return []; - const placeholders = params.roomIds - .map((_, i) => `$${i + 2}`) - .join(", "); - - let query = `SELECT * FROM memories WHERE type = $1 AND "roomId" IN (${placeholders})`; - let queryParams = [params.tableName, ...params.roomIds]; + if (params.roomIds.length === 0) return []; + const placeholders = params.roomIds + .map((_, i) => `$${i + 3}`) + .join(", "); - if (params.agentId) { - query += ` AND "agentId" = $${params.roomIds.length + 2}`; - queryParams = [...queryParams, params.agentId]; - } + let query = `SELECT * FROM memories WHERE type = $1 AND "agentId" = $2 AND "roomId" IN (${placeholders})`; + let queryParams = [params.tableName, params.agentId, ...params.roomIds]; - const { rows } = await client.query(query, queryParams); - return rows.map((row) => ({ - ...row, - content: - typeof row.content === "string" - ? JSON.parse(row.content) - : row.content, - })); - } finally { - client.release(); - } + const { rows } = await this.query(query, queryParams); + return rows.map((row) => ({ + ...row, + content: + typeof row.content === "string" + ? JSON.parse(row.content) + : row.content, + })); } async setParticipantUserState( @@ -192,59 +188,43 @@ export class PostgresDatabaseAdapter userId: UUID, state: "FOLLOWED" | "MUTED" | null ): Promise { - const client = await this.pool.connect(); - try { - await client.query( - `UPDATE participants SET "userState" = $1 WHERE "roomId" = $2 AND "userId" = $3`, - [state, roomId, userId] - ); - } finally { - client.release(); - } + await this.query( + `UPDATE participants SET "userState" = $1 WHERE "roomId" = $2 AND "userId" = $3`, + [state, roomId, userId] + ); } async getParticipantsForRoom(roomId: UUID): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - 'SELECT "userId" FROM participants WHERE "roomId" = $1', - [roomId] - ); - return rows.map((row) => row.userId); - } finally { - client.release(); - } + const { rows } = await this.query( + 'SELECT "userId" FROM participants WHERE "roomId" = $1', + [roomId] + ); + return rows.map((row) => row.userId); } async getAccountById(userId: UUID): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - "SELECT * FROM accounts WHERE id = $1", - [userId] - ); - if (rows.length === 0) return null; - - const account = rows[0]; - // elizaLogger.log("account", account); - return { - ...account, - details: - typeof account.details === "string" - ? JSON.parse(account.details) - : account.details, - }; - } finally { - client.release(); - } + const { rows } = await this.query( + "SELECT * FROM accounts WHERE id = $1", + [userId] + ); + if (rows.length === 0) return null; + + const account = rows[0]; + elizaLogger.log("account", account); + return { + ...account, + details: + typeof account.details === "string" + ? JSON.parse(account.details) + : account.details, + }; } async createAccount(account: Account): Promise { - const client = await this.pool.connect(); try { - await client.query( + await this.query( `INSERT INTO accounts (id, name, username, email, "avatarUrl", details) - VALUES ($1, $2, $3, $4, $5, $6)`, + VALUES ($1, $2, $3, $4, $5, $6)`, [ account.id ?? v4(), account.name, @@ -254,98 +234,84 @@ export class PostgresDatabaseAdapter JSON.stringify(account.details), ] ); + return true; } catch (error) { elizaLogger.log("Error creating account", error); return false; - } finally { - client.release(); } } async getActorById(params: { roomId: UUID }): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - `SELECT a.id, a.name, a.username, a.details - FROM participants p - LEFT JOIN accounts a ON p."userId" = a.id - WHERE p."roomId" = $1`, - [params.roomId] - ); - return rows.map((row) => ({ - ...row, - details: - typeof row.details === "string" - ? JSON.parse(row.details) - : row.details, - })); - } finally { - client.release(); - } + const { rows } = await this.query( + `SELECT a.id, a.name, a.username, a.details + FROM participants p + LEFT JOIN accounts a ON p."userId" = a.id + WHERE p."roomId" = $1`, + [params.roomId] + ); + return rows.map((row) => ({ + ...row, + details: + typeof row.details === "string" + ? JSON.parse(row.details) + : row.details, + })); } async getMemoryById(id: UUID): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - "SELECT * FROM memories WHERE id = $1", - [id] - ); - if (rows.length === 0) return null; - - return { - ...rows[0], - content: - typeof rows[0].content === "string" - ? JSON.parse(rows[0].content) - : rows[0].content, - }; - } finally { - client.release(); - } + const { rows } = await this.query( + "SELECT * FROM memories WHERE id = $1", + [id] + ); + if (rows.length === 0) return null; + + return { + ...rows[0], + content: + typeof rows[0].content === "string" + ? JSON.parse(rows[0].content) + : rows[0].content, + }; } async createMemory(memory: Memory, tableName: string): Promise { - const client = await this.pool.connect(); - try { - let isUnique = true; - if (memory.embedding) { - const similarMemories = await this.searchMemoriesByEmbedding( - memory.embedding, - { - tableName, - roomId: memory.roomId, - match_threshold: 0.95, - count: 1, - } - ); - isUnique = similarMemories.length === 0; - } - - await client.query( - `INSERT INTO memories ( - id, type, content, embedding, "userId", "roomId", "agentId", "unique", "createdAt" - ) VALUES ($1, $2, $3, $4, $5::uuid, $6::uuid, $7::uuid, $8, to_timestamp($9/1000.0))`, - [ - memory.id ?? v4(), + let isUnique = true; + if (memory.embedding) { + const similarMemories = await this.searchMemoriesByEmbedding( + memory.embedding, + { tableName, - JSON.stringify(memory.content), - memory.embedding ? `[${memory.embedding.join(",")}]` : null, - memory.userId, - memory.roomId, - memory.agentId, - memory.unique ?? isUnique, - Date.now(), - ] + agentId: memory.agentId, + roomId: memory.roomId, + match_threshold: 0.95, + count: 1, + } ); - } finally { - client.release(); - } + isUnique = similarMemories.length === 0; + } + + await this.query( + `INSERT INTO memories ( + id, type, content, embedding, "userId", "roomId", "agentId", "unique", "createdAt" + ) VALUES ($1, $2, $3, $4, $5::uuid, $6::uuid, $7::uuid, $8, to_timestamp($9/1000.0))`, + [ + memory.id ?? v4(), + tableName, + JSON.stringify(memory.content), + memory.embedding ? `[${memory.embedding.join(",")}]` : null, + memory.userId, + memory.roomId, + memory.agentId, + memory.unique ?? isUnique, + Date.now(), + ] + ); } async searchMemories(params: { tableName: string; + agentId: UUID; roomId: UUID; embedding: number[]; match_threshold: number; @@ -355,6 +321,7 @@ export class PostgresDatabaseAdapter return await this.searchMemoriesByEmbedding(params.embedding, { match_threshold: params.match_threshold, count: params.match_count, + agentId: params.agentId, roomId: params.roomId, unique: params.unique, tableName: params.tableName, @@ -366,60 +333,48 @@ export class PostgresDatabaseAdapter count?: number; unique?: boolean; tableName: string; - agentId?: UUID; + agentId: UUID; start?: number; end?: number; }): Promise { if (!params.tableName) throw new Error("tableName is required"); if (!params.roomId) throw new Error("roomId is required"); + let sql = `SELECT * FROM memories WHERE type = $1 AND agentId = $2 AND "roomId" = $3`; + const values: any[] = [params.tableName, params.agentId, params.roomId]; + let paramCount = 2; - const client = await this.pool.connect(); - try { - let sql = `SELECT * FROM memories WHERE type = $1 AND "roomId" = $2`; - const values: any[] = [params.tableName, params.roomId]; - let paramCount = 2; - - if (params.start) { - paramCount++; - sql += ` AND "createdAt" >= to_timestamp($${paramCount})`; - values.push(params.start / 1000); - } - - if (params.end) { - paramCount++; - sql += ` AND "createdAt" <= to_timestamp($${paramCount})`; - values.push(params.end / 1000); - } - - if (params.unique) { - sql += ` AND "unique" = true`; - } + if (params.start) { + paramCount++; + sql += ` AND "createdAt" >= to_timestamp($${paramCount})`; + values.push(params.start / 1000); + } - if (params.agentId) { - paramCount++; - sql += ` AND "agentId" = $${paramCount}`; - values.push(params.agentId); - } + if (params.end) { + paramCount++; + sql += ` AND "createdAt" <= to_timestamp($${paramCount})`; + values.push(params.end / 1000); + } - sql += ' ORDER BY "createdAt" DESC'; + if (params.unique) { + sql += ` AND "unique" = true`; + } - if (params.count) { - paramCount++; - sql += ` LIMIT $${paramCount}`; - values.push(params.count); - } + sql += ' ORDER BY "createdAt" DESC'; - const { rows } = await client.query(sql, values); - return rows.map((row) => ({ - ...row, - content: - typeof row.content === "string" - ? JSON.parse(row.content) - : row.content, - })); - } finally { - client.release(); + if (params.count) { + paramCount++; + sql += ` LIMIT $${paramCount}`; + values.push(params.count); } + + const { rows } = await this.query(sql, values); + return rows.map((row) => ({ + ...row, + content: + typeof row.content === "string" + ? JSON.parse(row.content) + : row.content, + })); } async getGoals(params: { @@ -428,107 +383,70 @@ export class PostgresDatabaseAdapter onlyInProgress?: boolean; count?: number; }): Promise { - const client = await this.pool.connect(); - try { - let sql = `SELECT * FROM goals WHERE "roomId" = $1`; - const values: any[] = [params.roomId]; - let paramCount = 1; - - if (params.userId) { - paramCount++; - sql += ` AND "userId" = $${paramCount}`; - values.push(params.userId); - } + let sql = `SELECT * FROM goals WHERE "roomId" = $1`; + const values: any[] = [params.roomId]; + let paramCount = 1; - if (params.onlyInProgress) { - sql += " AND status = 'IN_PROGRESS'"; - } + if (params.userId) { + paramCount++; + sql += ` AND "userId" = $${paramCount}`; + values.push(params.userId); + } - if (params.count) { - paramCount++; - sql += ` LIMIT $${paramCount}`; - values.push(params.count); - } + if (params.onlyInProgress) { + sql += " AND status = 'IN_PROGRESS'"; + } - const { rows } = await client.query(sql, values); - return rows.map((row) => ({ - ...row, - objectives: - typeof row.objectives === "string" - ? JSON.parse(row.objectives) - : row.objectives, - })); - } finally { - client.release(); + if (params.count) { + paramCount++; + sql += ` LIMIT $${paramCount}`; + values.push(params.count); } + + const { rows } = await this.query(sql, values); + return rows.map((row) => ({ + ...row, + objectives: + typeof row.objectives === "string" + ? JSON.parse(row.objectives) + : row.objectives, + })); } async updateGoal(goal: Goal): Promise { - const client = await this.pool.connect(); - try { - await client.query( - `UPDATE goals SET name = $1, status = $2, objectives = $3 WHERE id = $4`, - [ - goal.name, - goal.status, - JSON.stringify(goal.objectives), - goal.id, - ] - ); - } finally { - client.release(); - } + await this.query( + `UPDATE goals SET name = $1, status = $2, objectives = $3 WHERE id = $4`, + [goal.name, goal.status, JSON.stringify(goal.objectives), goal.id] + ); } async createGoal(goal: Goal): Promise { - const client = await this.pool.connect(); - try { - await client.query( - `INSERT INTO goals (id, "roomId", "userId", name, status, objectives) - VALUES ($1, $2, $3, $4, $5, $6)`, - [ - goal.id ?? v4(), - goal.roomId, - goal.userId, - goal.name, - goal.status, - JSON.stringify(goal.objectives), - ] - ); - } finally { - client.release(); - } + await this.query( + `INSERT INTO goals (id, "roomId", "userId", name, status, objectives) + VALUES ($1, $2, $3, $4, $5, $6)`, + [ + goal.id ?? v4(), + goal.roomId, + goal.userId, + goal.name, + goal.status, + JSON.stringify(goal.objectives), + ] + ); } async removeGoal(goalId: UUID): Promise { - const client = await this.pool.connect(); - try { - await client.query("DELETE FROM goals WHERE id = $1", [goalId]); - } finally { - client.release(); - } + await this.query("DELETE FROM goals WHERE id = $1", [goalId]); } async createRoom(roomId?: UUID): Promise { - const client = await this.pool.connect(); - try { - const newRoomId = roomId || v4(); - await client.query("INSERT INTO rooms (id) VALUES ($1)", [ - newRoomId, - ]); - return newRoomId as UUID; - } finally { - client.release(); - } + const newRoomId = roomId || v4(); + await this.query("INSERT INTO rooms (id) VALUES ($1)", [newRoomId]); + return newRoomId as UUID; } async removeRoom(roomId: UUID): Promise { - const client = await this.pool.connect(); - try { - await client.query("DELETE FROM rooms WHERE id = $1", [roomId]); - } finally { - client.release(); - } + await this.query("DELETE FROM rooms WHERE id = $1", [roomId]); } async createRelationship(params: { @@ -539,19 +457,16 @@ export class PostgresDatabaseAdapter throw new Error("userA and userB are required"); } - const client = await this.pool.connect(); try { - await client.query( + await this.query( `INSERT INTO relationships (id, "userA", "userB", "userId") VALUES ($1, $2, $3, $4)`, [v4(), params.userA, params.userB, params.userA] ); return true; } catch (error) { - console.log("Error creating relationship", error); + elizaLogger.log("Error creating relationship", error); return false; - } finally { - client.release(); } } @@ -559,30 +474,20 @@ export class PostgresDatabaseAdapter userA: UUID; userB: UUID; }): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - `SELECT * FROM relationships - WHERE ("userA" = $1 AND "userB" = $2) OR ("userA" = $2 AND "userB" = $1)`, - [params.userA, params.userB] - ); - return rows.length > 0 ? rows[0] : null; - } finally { - client.release(); - } + const { rows } = await this.query( + `SELECT * FROM relationships + WHERE ("userA" = $1 AND "userB" = $2) OR ("userA" = $2 AND "userB" = $1)`, + [params.userA, params.userB] + ); + return rows.length > 0 ? rows[0] : null; } async getRelationships(params: { userId: UUID }): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - `SELECT * FROM relationships WHERE "userA" = $1 OR "userB" = $1`, - [params.userId] - ); - return rows; - } finally { - client.release(); - } + const { rows } = await this.query( + `SELECT * FROM relationships WHERE "userA" = $1 OR "userB" = $1`, + [params.userId] + ); + return rows; } async getCachedEmbeddings(opts: { @@ -593,10 +498,8 @@ export class PostgresDatabaseAdapter query_field_sub_name: string; query_match_count: number; }): Promise<{ embedding: number[]; levenshtein_score: number }[]> { - const client = await this.pool.connect(); - try { - // Get the JSON field content as text first - const sql = ` + // Get the JSON field content as text first + const sql = ` WITH content_text AS ( SELECT embedding, @@ -619,24 +522,18 @@ export class PostgresDatabaseAdapter LIMIT $5 `; - const { rows } = await client.query(sql, [ - opts.query_input, - opts.query_field_name, - opts.query_field_sub_name, - opts.query_table_name, - opts.query_match_count, - ]); + const { rows } = await this.query(sql, [ + opts.query_input, + opts.query_field_name, + opts.query_field_sub_name, + opts.query_table_name, + opts.query_match_count, + ]); - return rows.map((row) => ({ - embedding: row.embedding, - levenshtein_score: row.levenshtein_score, - })); - } catch (error) { - console.error("Error in getCachedEmbeddings:", error); - throw error; - } finally { - client.release(); - } + return rows.map((row) => ({ + embedding: row.embedding, + levenshtein_score: row.levenshtein_score, + })); } async log(params: { @@ -645,16 +542,11 @@ export class PostgresDatabaseAdapter roomId: UUID; type: string; }): Promise { - const client = await this.pool.connect(); - try { - await client.query( - `INSERT INTO logs (body, "userId", "roomId", type) - VALUES ($1, $2, $3, $4)`, - [params.body, params.userId, params.roomId, params.type] - ); - } finally { - client.release(); - } + await this.query( + `INSERT INTO logs (body, "userId", "roomId", type) + VALUES ($1, $2, $3, $4)`, + [params.body, params.userId, params.roomId, params.type] + ); } async searchMemoriesByEmbedding( @@ -668,117 +560,113 @@ export class PostgresDatabaseAdapter tableName: string; } ): Promise { - const client = await this.pool.connect(); - try { - const vectorStr = `[${embedding.join(",")}]`; + const vectorStr = `[${embedding.join(",")}]`; - let sql = ` + let sql = ` SELECT *, 1 - (embedding <-> $1::vector) as similarity FROM memories WHERE type = $2 `; - const values: any[] = [vectorStr, params.tableName]; - let paramCount = 2; - - if (params.unique) { - sql += ` AND "unique" = true`; - } + const values: any[] = [vectorStr, params.tableName]; + let paramCount = 2; - if (params.agentId) { - paramCount++; - sql += ` AND "agentId" = $${paramCount}`; - values.push(params.agentId); - } + if (params.unique) { + sql += ` AND "unique" = true`; + } - if (params.roomId) { - paramCount++; - sql += ` AND "roomId" = $${paramCount}::uuid`; - values.push(params.roomId); - } + if (params.agentId) { + paramCount++; + sql += ` AND "agentId" = $${paramCount}`; + values.push(params.agentId); + } - if (params.match_threshold) { - paramCount++; - sql += ` AND 1 - (embedding <-> $1::vector) >= $${paramCount}`; - values.push(params.match_threshold); - } + if (params.roomId) { + paramCount++; + sql += ` AND "roomId" = $${paramCount}::uuid`; + values.push(params.roomId); + } - sql += ` ORDER BY embedding <-> $1::vector`; + if (params.match_threshold) { + paramCount++; + sql += ` AND 1 - (embedding <-> $1::vector) >= $${paramCount}`; + values.push(params.match_threshold); + } - if (params.count) { - paramCount++; - sql += ` LIMIT $${paramCount}`; - values.push(params.count); - } + sql += ` ORDER BY embedding <-> $1::vector`; - const { rows } = await client.query(sql, values); - return rows.map((row) => ({ - ...row, - content: - typeof row.content === "string" - ? JSON.parse(row.content) - : row.content, - similarity: row.similarity, - })); - } finally { - client.release(); + if (params.count) { + paramCount++; + sql += ` LIMIT $${paramCount}`; + values.push(params.count); } + + const { rows } = await this.query(sql, values); + + return rows.map((row) => ({ + ...row, + content: + typeof row.content === "string" + ? JSON.parse(row.content) + : row.content, + similarity: row.similarity, + })); } async addParticipant(userId: UUID, roomId: UUID): Promise { - const client = await this.pool.connect(); try { - // Check if the participant already exists - const existingParticipant = await client.query( + const existingParticipant = await this.query( + // Check if the participant already exists `SELECT * FROM participants WHERE "userId" = $1 AND "roomId" = $2`, [userId, roomId] ); if (existingParticipant.rows.length > 0) { - console.log( + elizaLogger.log( `Participant with userId ${userId} already exists in room ${roomId}.` ); - return; // Exit early if the participant already exists + return true; // Exit early if the participant already exists } // Proceed to add the participant if they do not exist - await client.query( + await this.query( `INSERT INTO participants (id, "userId", "roomId") VALUES ($1, $2, $3)`, [v4(), userId, roomId] ); return true; } catch (error) { - // This is to prevent duplicate participant error in case of a race condition - // Handle unique constraint violation error (code 23505) - if (error.code === "23505") { - console.warn( - `Participant with userId ${userId} already exists in room ${roomId}.` - ); // Optionally, you can log this or handle it differently - } else { - // Handle other errors - console.error("Error adding participant:", error); - return false; + if (error instanceof DatabaseError) { + elizaLogger.log("Error adding participant", error); + // This is to prevent duplicate participant error in case of a race condition + // Handle unique constraint violation error (code 23505) + if (error.code === "23505") { + elizaLogger.warn( + `Participant with userId ${userId} already exists in room ${roomId}.` + ); // Optionally, you can log this or handle it differently + return true; + } else { + // Handle other errors + elizaLogger.error("Error adding participant:", error); + return false; + } } - } finally { - client.release(); + + return false; } } async removeParticipant(userId: UUID, roomId: UUID): Promise { - const client = await this.pool.connect(); try { - await client.query( + await this.query( `DELETE FROM participants WHERE "userId" = $1 AND "roomId" = $2`, [userId, roomId] ); return true; } catch (error) { - console.log("Error removing participant", error); + elizaLogger.log("Error removing participant", error); return false; - } finally { - client.release(); } } @@ -786,39 +674,24 @@ export class PostgresDatabaseAdapter goalId: UUID; status: GoalStatus; }): Promise { - const client = await this.pool.connect(); - try { - await client.query("UPDATE goals SET status = $1 WHERE id = $2", [ - params.status, - params.goalId, - ]); - } finally { - client.release(); - } + await this.query("UPDATE goals SET status = $1 WHERE id = $2", [ + params.status, + params.goalId, + ]); } async removeMemory(memoryId: UUID, tableName: string): Promise { - const client = await this.pool.connect(); - try { - await client.query( - "DELETE FROM memories WHERE type = $1 AND id = $2", - [tableName, memoryId] - ); - } finally { - client.release(); - } + await this.query("DELETE FROM memories WHERE type = $1 AND id = $2", [ + tableName, + memoryId, + ]); } async removeAllMemories(roomId: UUID, tableName: string): Promise { - const client = await this.pool.connect(); - try { - await client.query( - `DELETE FROM memories WHERE type = $1 AND "roomId" = $2`, - [tableName, roomId] - ); - } finally { - client.release(); - } + await this.query( + `DELETE FROM memories WHERE type = $1 AND "roomId" = $2`, + [tableName, roomId] + ); } async countMemories( @@ -828,56 +701,34 @@ export class PostgresDatabaseAdapter ): Promise { if (!tableName) throw new Error("tableName is required"); - const client = await this.pool.connect(); - try { - let sql = `SELECT COUNT(*) as count FROM memories WHERE type = $1 AND "roomId" = $2`; - if (unique) { - sql += ` AND "unique" = true`; - } - - const { rows } = await client.query(sql, [tableName, roomId]); - return parseInt(rows[0].count); - } finally { - client.release(); + let sql = `SELECT COUNT(*) as count FROM memories WHERE type = $1 AND "roomId" = $2`; + if (unique) { + sql += ` AND "unique" = true`; } + + const { rows } = await this.query(sql, [tableName, roomId]); + return parseInt(rows[0].count); } async removeAllGoals(roomId: UUID): Promise { - const client = await this.pool.connect(); - try { - await client.query(`DELETE FROM goals WHERE "roomId" = $1`, [ - roomId, - ]); - } finally { - client.release(); - } + await this.query(`DELETE FROM goals WHERE "roomId" = $1`, [roomId]); } async getRoomsForParticipant(userId: UUID): Promise { - const client = await this.pool.connect(); - try { - const { rows } = await client.query( - `SELECT "roomId" FROM participants WHERE "userId" = $1`, - [userId] - ); - return rows.map((row) => row.roomId); - } finally { - client.release(); - } + const { rows } = await this.query( + `SELECT "roomId" FROM participants WHERE "userId" = $1`, + [userId] + ); + return rows.map((row) => row.roomId); } async getRoomsForParticipants(userIds: UUID[]): Promise { - const client = await this.pool.connect(); - try { - const placeholders = userIds.map((_, i) => `$${i + 1}`).join(", "); - const { rows } = await client.query( - `SELECT DISTINCT "roomId" FROM participants WHERE "userId" IN (${placeholders})`, - userIds - ); - return rows.map((row) => row.roomId); - } finally { - client.release(); - } + const placeholders = userIds.map((_, i) => `$${i + 1}`).join(", "); + const { rows } = await this.query( + `SELECT DISTINCT "roomId" FROM participants WHERE "userId" IN (${placeholders})`, + userIds + ); + return rows.map((row) => row.roomId); } async getActorDetails(params: { roomId: string }): Promise { @@ -893,13 +744,13 @@ export class PostgresDatabaseAdapter `; try { - const result = await this.pool.query(sql, [params.roomId]); + const result = await this.query(sql, [params.roomId]); return result.rows.map((row) => ({ ...row, details: row.details, // PostgreSQL automatically handles JSON parsing })); } catch (error) { - console.error("Error fetching actor details:", error); + elizaLogger.error("Error fetching actor details:", error); throw new Error("Failed to fetch actor details"); } } @@ -908,19 +759,16 @@ export class PostgresDatabaseAdapter key: string; agentId: UUID; }): Promise { - const client = await this.pool.connect(); try { const sql = `SELECT "value"::TEXT FROM cache WHERE "key" = $1 AND "agentId" = $2`; - const { rows } = await this.pool.query<{ value: string }>(sql, [ + const { rows } = await this.query<{ value: string }>(sql, [ params.key, params.agentId, ]); - return rows[0]?.value ?? undefined; } catch (error) { - console.log("Error fetching cache", error); - } finally { - client.release(); + elizaLogger.log("Error fetching cache", error); + return undefined; } } @@ -929,9 +777,8 @@ export class PostgresDatabaseAdapter agentId: UUID; value: string; }): Promise { - const client = await this.pool.connect(); try { - await client.query( + await this.query( `INSERT INTO cache ("key", "agentId", "value", "createdAt") VALUES ($1, $2, $3, CURRENT_TIMESTAMP) ON CONFLICT ("key", "agentId") DO UPDATE SET "value" = EXCLUDED.value, "createdAt" = CURRENT_TIMESTAMP`, @@ -939,9 +786,8 @@ export class PostgresDatabaseAdapter ); return true; } catch (error) { - console.log("Error adding cache", error); - } finally { - client.release(); + elizaLogger.log("Error setting cache", error); + return false; } } @@ -949,17 +795,14 @@ export class PostgresDatabaseAdapter key: string; agentId: UUID; }): Promise { - const client = await this.pool.connect(); try { - await client.query( + await this.query( `DELETE FROM cache WHERE "key" = $1 AND "agentId" = $2`, [params.key, params.agentId] ); return true; - } catch (error) { - console.log("Error adding cache", error); - } finally { - client.release(); + } catch { + return false; } } } diff --git a/packages/adapter-sqlite/src/index.ts b/packages/adapter-sqlite/src/index.ts index b7a33eb04ad..5e5b1a2c8f1 100644 --- a/packages/adapter-sqlite/src/index.ts +++ b/packages/adapter-sqlite/src/index.ts @@ -143,22 +143,17 @@ export class SqliteDatabaseAdapter } async getMemoriesByRoomIds(params: { + agentId: UUID; roomIds: UUID[]; tableName: string; - agentId?: UUID; }): Promise { if (!params.tableName) { // default to messages params.tableName = "messages"; } const placeholders = params.roomIds.map(() => "?").join(", "); - let sql = `SELECT * FROM memories WHERE type = ? AND roomId IN (${placeholders})`; - const queryParams = [params.tableName, ...params.roomIds]; - - if (params.agentId) { - sql += ` AND agentId = ?`; - queryParams.push(params.agentId); - } + let sql = `SELECT * FROM memories WHERE type = ? AND agentId = ? AND roomId IN (${placeholders})`; + let queryParams = [params.tableName, params.agentId, ...params.roomIds]; const stmt = this.db.prepare(sql); const rows = stmt.all(...queryParams) as (Memory & { @@ -189,8 +184,8 @@ export class SqliteDatabaseAdapter async createMemory(memory: Memory, tableName: string): Promise { // Delete any existing memory with the same ID first - const deleteSql = `DELETE FROM memories WHERE id = ? AND type = ?`; - this.db.prepare(deleteSql).run(memory.id, tableName); + // const deleteSql = `DELETE FROM memories WHERE id = ? AND type = ?`; + // this.db.prepare(deleteSql).run(memory.id, tableName); let isUnique = true; @@ -200,6 +195,7 @@ export class SqliteDatabaseAdapter memory.embedding, { tableName, + agentId: memory.agentId, roomId: memory.roomId, match_threshold: 0.95, // 5% similarity threshold count: 1, @@ -281,7 +277,7 @@ export class SqliteDatabaseAdapter match_threshold?: number; count?: number; roomId?: UUID; - agentId?: UUID; + agentId: UUID; unique?: boolean; tableName: string; } @@ -290,20 +286,17 @@ export class SqliteDatabaseAdapter // JSON.stringify(embedding), new Float32Array(embedding), params.tableName, + params.agentId, ]; let sql = ` SELECT *, vec_distance_L2(embedding, ?) AS similarity FROM memories - WHERE type = ?`; + WHERE embedding IS NOT NULL type = ? AND agentId = ?`; if (params.unique) { sql += " AND `unique` = 1"; } - if (params.agentId) { - sql += " AND agentId = ?"; - queryParams.push(params.agentId); - } if (params.roomId) { sql += " AND roomId = ?"; @@ -418,7 +411,7 @@ export class SqliteDatabaseAdapter count?: number; unique?: boolean; tableName: string; - agentId?: UUID; + agentId: UUID; start?: number; end?: number; }): Promise { @@ -428,19 +421,18 @@ export class SqliteDatabaseAdapter if (!params.roomId) { throw new Error("roomId is required"); } - let sql = `SELECT * FROM memories WHERE type = ? AND roomId = ?`; + let sql = `SELECT * FROM memories WHERE type = ? AND agentId = ? AND roomId = ?`; - const queryParams = [params.tableName, params.roomId] as any[]; + const queryParams = [ + params.tableName, + params.agentId, + params.roomId, + ] as any[]; if (params.unique) { sql += " AND `unique` = 1"; } - if (params.agentId) { - sql += " AND agentId = ?"; - queryParams.push(params.agentId); - } - if (params.start) { sql += ` AND createdAt >= ?`; queryParams.push(params.start); diff --git a/packages/client-discord/src/actions/summarize_conversation.ts b/packages/client-discord/src/actions/summarize_conversation.ts index ea7f35d683c..1d3a8ebdc7d 100644 --- a/packages/client-discord/src/actions/summarize_conversation.ts +++ b/packages/client-discord/src/actions/summarize_conversation.ts @@ -220,7 +220,6 @@ const summarizeAction = { // 2. get these memories from the database const memories = await runtime.messageManager.getMemories({ roomId, - agentId: runtime.agentId, // subtract start from current time start: parseInt(start as string), end: parseInt(end as string), diff --git a/packages/client-twitter/src/base.ts b/packages/client-twitter/src/base.ts index b3c54db0d05..499717b4622 100644 --- a/packages/client-twitter/src/base.ts +++ b/packages/client-twitter/src/base.ts @@ -319,7 +319,6 @@ export class ClientBase extends EventEmitter { // Get the existing memories from the database const existingMemories = await this.runtime.messageManager.getMemoriesByRoomIds({ - agentId: this.runtime.agentId, roomIds: cachedTimeline.map((tweet) => stringToUuid( tweet.conversationId + "-" + this.runtime.agentId @@ -462,7 +461,6 @@ export class ClientBase extends EventEmitter { // Check the existing memories in the database const existingMemories = await this.runtime.messageManager.getMemoriesByRoomIds({ - agentId: this.runtime.agentId, roomIds: Array.from(roomIds), }); @@ -564,7 +562,6 @@ export class ClientBase extends EventEmitter { const recentMessage = await this.runtime.messageManager.getMemories( { roomId: message.roomId, - agentId: this.runtime.agentId, count: 1, unique: false, } diff --git a/packages/client-twitter/src/utils.ts b/packages/client-twitter/src/utils.ts index fe003d12951..771438ca5ee 100644 --- a/packages/client-twitter/src/utils.ts +++ b/packages/client-twitter/src/utils.ts @@ -72,7 +72,7 @@ export async function buildConversationThread( "twitter" ); - client.runtime.messageManager.createMemory({ + await client.runtime.messageManager.createMemory({ id: stringToUuid( currentTweet.id + "-" + client.runtime.agentId ), diff --git a/packages/core/src/database.ts b/packages/core/src/database.ts index b18b263851b..fb8f9555b31 100644 --- a/packages/core/src/database.ts +++ b/packages/core/src/database.ts @@ -39,6 +39,7 @@ export abstract class DatabaseAdapter implements IDatabaseAdapter { * @returns A Promise that resolves to an array of Memory objects. */ abstract getMemories(params: { + agentId: UUID; roomId: UUID; count?: number; unique?: boolean; @@ -46,7 +47,7 @@ export abstract class DatabaseAdapter implements IDatabaseAdapter { }): Promise; abstract getMemoriesByRoomIds(params: { - agentId?: UUID; + agentId: UUID; roomIds: UUID[]; tableName: string; }): Promise; @@ -105,6 +106,7 @@ export abstract class DatabaseAdapter implements IDatabaseAdapter { */ abstract searchMemories(params: { tableName: string; + agentId: UUID; roomId: UUID; embedding: number[]; match_threshold: number; @@ -188,6 +190,7 @@ export abstract class DatabaseAdapter implements IDatabaseAdapter { * @returns A Promise that resolves to an array of Goal objects. */ abstract getGoals(params: { + agentId: UUID; roomId: UUID; userId?: UUID | null; onlyInProgress?: boolean; diff --git a/packages/core/src/goals.ts b/packages/core/src/goals.ts index b262717249a..9bfc3f017f9 100644 --- a/packages/core/src/goals.ts +++ b/packages/core/src/goals.ts @@ -6,6 +6,7 @@ import { } from "./types.ts"; export const getGoals = async ({ + agentId, runtime, roomId, userId, @@ -13,12 +14,14 @@ export const getGoals = async ({ count = 5, }: { runtime: IAgentRuntime; + agentId: UUID; roomId: UUID; userId?: UUID; onlyInProgress?: boolean; count?: number; }) => { return runtime.databaseAdapter.getGoals({ + agentId, roomId, userId, onlyInProgress, diff --git a/packages/core/src/knowledge.ts b/packages/core/src/knowledge.ts index 36e7c6c6f07..e0d990e0313 100644 --- a/packages/core/src/knowledge.ts +++ b/packages/core/src/knowledge.ts @@ -14,7 +14,6 @@ async function get(runtime: AgentRuntime, message: Memory): Promise { embedding, { roomId: message.agentId, - agentId: message.agentId, count: 3, match_threshold: 0.1, } @@ -50,13 +49,13 @@ async function set( bleed: number = 20 ) { await runtime.documentsManager.createMemory({ - embedding: embeddingZeroVector, id: item.id, agentId: runtime.agentId, roomId: runtime.agentId, userId: runtime.agentId, createdAt: Date.now(), content: item.content, + embedding: embeddingZeroVector, }); const preprocessed = preprocess(item.content.text); diff --git a/packages/core/src/memory.ts b/packages/core/src/memory.ts index 18f588acc49..f781e00e1ca 100644 --- a/packages/core/src/memory.ts +++ b/packages/core/src/memory.ts @@ -91,14 +91,12 @@ export class MemoryManager implements IMemoryManager { roomId, count = 10, unique = true, - agentId, start, end, }: { roomId: UUID; count?: number; unique?: boolean; - agentId?: UUID; start?: number; end?: number; }): Promise { @@ -107,7 +105,7 @@ export class MemoryManager implements IMemoryManager { count, unique, tableName: this.tableName, - agentId, + agentId: this.runtime.agentId, start, end, }); @@ -143,7 +141,6 @@ export class MemoryManager implements IMemoryManager { embedding: number[], opts: { match_threshold?: number; - agentId?: UUID; count?: number; roomId: UUID; unique?: boolean; @@ -154,20 +151,19 @@ export class MemoryManager implements IMemoryManager { count = defaultMatchCount, roomId, unique, - agentId, } = opts; - const searchOpts = { + const result = await this.runtime.databaseAdapter.searchMemories({ tableName: this.tableName, roomId, - agentId, - embedding, - match_threshold, + agentId: this.runtime.agentId, + embedding: embedding, + match_threshold: match_threshold, match_count: count, unique: !!unique, - }; + }); - return await this.runtime.databaseAdapter.searchMemories(searchOpts); + return result; } /** @@ -177,6 +173,8 @@ export class MemoryManager implements IMemoryManager { * @returns A Promise that resolves when the operation completes. */ async createMemory(memory: Memory, unique = false): Promise { + // TODO: check memory.agentId == this.runtime.agentId + const existingMessage = await this.runtime.databaseAdapter.getMemoryById(memory.id); @@ -185,7 +183,8 @@ export class MemoryManager implements IMemoryManager { return; } - elizaLogger.debug("Creating Memory", memory.id, memory.content.text); + elizaLogger.log("Creating Memory", memory.id, memory.content.text); + await this.runtime.databaseAdapter.createMemory( memory, this.tableName, @@ -193,18 +192,17 @@ export class MemoryManager implements IMemoryManager { ); } - async getMemoriesByRoomIds(params: { - agentId?: UUID; - roomIds: UUID[]; - }): Promise { + async getMemoriesByRoomIds(params: { roomIds: UUID[] }): Promise { return await this.runtime.databaseAdapter.getMemoriesByRoomIds({ - agentId: params.agentId, + agentId: this.runtime.agentId, roomIds: params.roomIds, }); } async getMemoryById(id: UUID): Promise { - return await this.runtime.databaseAdapter.getMemoryById(id); + const result = await this.runtime.databaseAdapter.getMemoryById(id); + if (result && result.agentId !== this.runtime.agentId) return null; + return result; } /** diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 825ca3e2fc8..c740b8fb40f 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -737,11 +737,11 @@ export class AgentRuntime implements IAgentRuntime { getActorDetails({ runtime: this, roomId }), this.messageManager.getMemories({ roomId, - agentId: this.agentId, count: conversationLength, unique: false, }), getGoals({ + agentId: this.agentId, runtime: this, count: 10, onlyInProgress: false, @@ -877,7 +877,6 @@ Text: ${attachment.text} // Check the existing memories in the database const existingMemories = await this.messageManager.getMemoriesByRoomIds({ - agentId: this.agentId, // filter out the current room id from rooms roomIds: rooms.filter((room) => room !== roomId), }); @@ -1172,7 +1171,6 @@ Text: ${attachment.text} const conversationLength = this.getConversationLength(); const recentMessagesData = await this.messageManager.getMemories({ roomId: state.roomId, - agentId: this.agentId, count: conversationLength, unique: false, }); diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index f6ceb828679..131836f7699 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -717,7 +717,7 @@ export interface IDatabaseAdapter { count?: number; unique?: boolean; tableName: string; - agentId?: UUID; + agentId: UUID; start?: number; end?: number; }): Promise; @@ -725,7 +725,7 @@ export interface IDatabaseAdapter { getMemoryById(id: UUID): Promise; getMemoriesByRoomIds(params: { - agentId?: UUID; + agentId: UUID; roomIds: UUID[]; }): Promise; @@ -749,6 +749,7 @@ export interface IDatabaseAdapter { searchMemories(params: { tableName: string; + agentId: UUID; roomId: UUID; embedding: number[]; match_threshold: number; @@ -790,6 +791,7 @@ export interface IDatabaseAdapter { ): Promise; getGoals(params: { + agentId: UUID; roomId: UUID; userId?: UUID | null; onlyInProgress?: boolean; @@ -869,7 +871,6 @@ export interface IMemoryManager { roomId: UUID; count?: number; unique?: boolean; - agentId?: UUID; start?: number; end?: number; }): Promise; @@ -879,12 +880,7 @@ export interface IMemoryManager { ): Promise<{ embedding: number[]; levenshtein_score: number }[]>; getMemoryById(id: UUID): Promise; - - getMemoriesByRoomIds(params: { - roomIds: UUID[]; - agentId?: UUID; - }): Promise; - + getMemoriesByRoomIds(params: { roomIds: UUID[] }): Promise; searchMemoriesByEmbedding( embedding: number[], opts: { @@ -892,7 +888,6 @@ export interface IMemoryManager { count?: number; roomId: UUID; unique?: boolean; - agentId?: UUID; } ): Promise; diff --git a/packages/plugin-bootstrap/src/actions/continue.ts b/packages/plugin-bootstrap/src/actions/continue.ts index aca1eba1e37..3e2d95e0ee3 100644 --- a/packages/plugin-bootstrap/src/actions/continue.ts +++ b/packages/plugin-bootstrap/src/actions/continue.ts @@ -61,7 +61,6 @@ export const continueAction: Action = { validate: async (runtime: IAgentRuntime, message: Memory) => { const recentMessagesData = await runtime.messageManager.getMemories({ roomId: message.roomId, - agentId: runtime.agentId, count: 10, unique: false, }); diff --git a/packages/plugin-bootstrap/src/providers/boredom.ts b/packages/plugin-bootstrap/src/providers/boredom.ts index 159c21f3a5b..8386e51fbbf 100644 --- a/packages/plugin-bootstrap/src/providers/boredom.ts +++ b/packages/plugin-bootstrap/src/providers/boredom.ts @@ -282,7 +282,6 @@ const boredomProvider: Provider = { const recentMessages = await runtime.messageManager.getMemories({ roomId: message.roomId, - agentId: runtime.agentId, start: fifteenMinutesAgo, end: now, count: 20, diff --git a/packages/plugin-bootstrap/src/providers/facts.ts b/packages/plugin-bootstrap/src/providers/facts.ts index 9869640bfbe..b0cce8583eb 100644 --- a/packages/plugin-bootstrap/src/providers/facts.ts +++ b/packages/plugin-bootstrap/src/providers/facts.ts @@ -36,7 +36,6 @@ const factsProvider: Provider = { const recentFactsData = await memoryManager.getMemories({ roomId: message.roomId, count: 10, - agentId: runtime.agentId, start: 0, end: Date.now(), }); diff --git a/packages/plugin-solana/src/evaluators/trust.ts b/packages/plugin-solana/src/evaluators/trust.ts index 0ba3640f667..e8feaae03a7 100644 --- a/packages/plugin-solana/src/evaluators/trust.ts +++ b/packages/plugin-solana/src/evaluators/trust.ts @@ -109,7 +109,6 @@ async function handler(runtime: IAgentRuntime, message: Memory) { }); const recentRecommendations = await recommendationsManager.getMemories({ - agentId, roomId, count: 20, });