Skip to content

Commit

Permalink
Merge pull request #1356 from akhilmhdh/feat/audit-log-and-log
Browse files Browse the repository at this point in the history
feat: added audit log prune, resolved env update and pino file transport
  • Loading branch information
maidul98 authored Jan 31, 2024
2 parents 7bf1f47 + cb27cfd commit 7711994
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 54 deletions.
4 changes: 3 additions & 1 deletion backend/e2e-test/mocks/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ export const mockQueue = (): TQueueServiceFactory => {
queues[name] = jobFn;
workers[name] = jobFn;
},
listen: async (name, event) => {
listen: (name, event) => {
events[name] = event;
},
clearQueue: async () => {},
stopJobById: async () => {},
stopRepeatableJobByJobId: async () => true
};
};
54 changes: 35 additions & 19 deletions backend/src/ee/services/audit-log/audit-log-dal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Knex } from "knex";

import { TDbClient } from "@app/db";
import { TableName } from "@app/db/schemas";
import { DatabaseError } from "@app/lib/errors";
import { ormify, stripUndefinedInWhere } from "@app/lib/knex";

export type TAuditLogDALFactory = ReturnType<typeof auditLogDALFactory>;
Expand All @@ -25,27 +26,42 @@ export const auditLogDALFactory = (db: TDbClient) => {
{ orgId, projectId, userAgentType, startDate, endDate, limit = 20, offset = 0, actor, eventType }: TFindQuery,
tx?: Knex
) => {
const sqlQuery = (tx || db)(TableName.AuditLog)
.where(
stripUndefinedInWhere({
projectId,
orgId,
eventType,
actor,
userAgentType
})
)
.limit(limit)
.offset(offset);
if (startDate) {
void sqlQuery.where("createdAt", ">=", startDate);
try {
const sqlQuery = (tx || db)(TableName.AuditLog)
.where(
stripUndefinedInWhere({
projectId,
orgId,
eventType,
actor,
userAgentType
})
)
.limit(limit)
.offset(offset);
if (startDate) {
void sqlQuery.where("createdAt", ">=", startDate);
}
if (endDate) {
void sqlQuery.where("createdAt", "<=", endDate);
}
const docs = await sqlQuery;
return docs;
} catch (error) {
throw new DatabaseError({ error });
}
if (endDate) {
void sqlQuery.where("createdAt", "<=", endDate);
};

// delete all audit log that have expired
const pruneAuditLog = async (tx?: Knex) => {
try {
const today = new Date();
const docs = await (tx || db)(TableName.AuditLog).where("expiresAt", "<", today).del();
return docs;
} catch (error) {
throw new DatabaseError({ error, name: "PruneAuditLog" });
}
const docs = await sqlQuery;
return docs;
};

return { ...auditLogOrm, find };
return { ...auditLogOrm, pruneAuditLog, find };
};
33 changes: 32 additions & 1 deletion backend/src/ee/services/audit-log/audit-log-queue.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { logger } from "@app/lib/logger";
import { QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue";
import { TProjectDALFactory } from "@app/services/project/project-dal";

Expand Down Expand Up @@ -43,6 +44,8 @@ export const auditLogQueueServiceFactory = ({

const plan = await licenseService.getPlan(orgId);
const ttl = plan.auditLogsRetentionDays * MS_IN_DAY;
// skip inserting if audit log retention is 0 meaning its not supported
if (ttl === 0) return;
await auditLogDAL.create({
actor: actor.type,
actorMetadata: actor.metadata,
Expand All @@ -57,7 +60,35 @@ export const auditLogQueueServiceFactory = ({
});
});

queueService.start(QueueName.AuditLogPrune, async () => {
logger.info(`${QueueName.AuditLogPrune}: queue task started`);
await auditLogDAL.pruneAuditLog();
logger.info(`${QueueName.AuditLogPrune}: queue task competed`);
});

// we do a repeat cron job in utc timezone at 12 Midnight each day
const startAuditLogPruneJob = async () => {
// clear previous job
await queueService.stopRepeatableJob(
QueueName.AuditLogPrune,
QueueJobs.AuditLogPrune,
{ pattern: "0 0 * * *", utc: true },
QueueName.AuditLogPrune // just a job id
);

await queueService.queue(QueueName.AuditLogPrune, QueueJobs.AuditLogPrune, undefined, {
delay: 5000,
jobId: QueueName.AuditLogPrune,
repeat: { pattern: "0 0 * * *", utc: true }
});
};

queueService.listen(QueueName.AuditLogPrune, "failed", (err) => {
logger.error(err?.failedReason, `${QueueName.AuditLogPrune}: log pruning failed`);
});

return {
pushToLog
pushToLog,
startAuditLogPruneJob
};
};
Empty file added backend/src/lib/fn/dates.ts
Empty file.
15 changes: 12 additions & 3 deletions backend/src/lib/logger/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@ const loggerConfig = z.object({
AWS_CLOUDWATCH_LOG_REGION: z.string().default("us-east-1"),
AWS_CLOUDWATCH_LOG_ACCESS_KEY_ID: z.string().min(1).optional(),
AWS_CLOUDWATCH_LOG_ACCESS_KEY_SECRET: z.string().min(1).optional(),
AWS_CLOUDWATCH_LOG_INTERVAL: z.coerce.number().default(1000)
AWS_CLOUDWATCH_LOG_INTERVAL: z.coerce.number().default(1000),
NODE_ENV: z.enum(["development", "test", "production"]).default("production")
});

export const initLogger = async () => {
const cfg = loggerConfig.parse(process.env);
const targets: pino.TransportMultiOptions["targets"][number][] = [
{ level: "info", target: "pino/file", options: {} }
{
level: "info",
target: "pino/file",
options: {
destination: cfg.NODE_ENV === "development" ? 1 : "/var/log/infisical",
mkdir: true
}
}
];
const cfg = loggerConfig.parse(process.env);

if (cfg.AWS_CLOUDWATCH_LOG_ACCESS_KEY_ID && cfg.AWS_CLOUDWATCH_LOG_ACCESS_KEY_SECRET) {
targets.push({
target: "@serdnam/pino-cloudwatch-transport",
Expand Down
27 changes: 24 additions & 3 deletions backend/src/queue/queue-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Job, JobsOptions, Queue, RepeatOptions, Worker, WorkerListener } from "bullmq";
import { Job, JobsOptions, Queue, QueueOptions, RepeatOptions, Worker, WorkerListener } from "bullmq";
import Redis from "ioredis";

import { TCreateAuditLogDTO } from "@app/ee/services/audit-log/audit-log-types";
Expand All @@ -11,6 +11,7 @@ export enum QueueName {
SecretRotation = "secret-rotation",
SecretReminder = "secret-reminder",
AuditLog = "audit-log",
AuditLogPrune = "audit-log-prune",
IntegrationSync = "sync-integrations",
SecretWebhook = "secret-webhook",
SecretFullRepoScan = "secret-full-repo-scan",
Expand All @@ -21,6 +22,7 @@ export enum QueueJobs {
SecretReminder = "secret-reminder-job",
SecretRotation = "secret-rotation-job",
AuditLog = "audit-log-job",
AuditLogPrune = "audit-log-prune-job",
SecWebhook = "secret-webhook-trigger",
IntegrationSync = "secret-integration-pull",
SecretScan = "secret-scan"
Expand All @@ -45,6 +47,10 @@ export type TQueueJobTypes = {
name: QueueJobs.AuditLog;
payload: TCreateAuditLogDTO;
};
[QueueName.AuditLogPrune]: {
name: QueueJobs.AuditLogPrune;
payload: undefined;
};
[QueueName.SecretWebhook]: {
name: QueueJobs.SecWebhook;
payload: { projectId: string; environment: string; secretPath: string };
Expand Down Expand Up @@ -74,16 +80,20 @@ export const queueServiceFactory = (redisUrl: string) => {

const start = <T extends QueueName>(
name: T,
jobFn: (job: Job<TQueueJobTypes[T]["payload"], void, TQueueJobTypes[T]["name"]>) => Promise<void>
jobFn: (job: Job<TQueueJobTypes[T]["payload"], void, TQueueJobTypes[T]["name"]>) => Promise<void>,
queueSettings: Omit<QueueOptions, "connection"> = {}
) => {
if (queueContainer[name]) {
throw new Error(`${name} queue is already initialized`);
}

queueContainer[name] = new Queue<TQueueJobTypes[T]["payload"], void, TQueueJobTypes[T]["name"]>(name as string, {
...queueSettings,
connection
});

workerContainer[name] = new Worker<TQueueJobTypes[T]["payload"], void, TQueueJobTypes[T]["name"]>(name, jobFn, {
...queueSettings,
connection
});
};
Expand Down Expand Up @@ -129,9 +139,20 @@ export const queueServiceFactory = (redisUrl: string) => {
return q.removeRepeatableByKey(job.repeatJobKey);
};

const stopJobById = async <T extends QueueName>(name: T, jobId: string) => {
const q = queueContainer[name];
const job = await q.getJob(jobId);
return job?.remove().catch(() => undefined);
};

const clearQueue = async (name: QueueName) => {
const q = queueContainer[name];
await q.drain();
};

const shutdown = async () => {
await Promise.all(Object.values(workerContainer).map((worker) => worker.close()));
};

return { start, listen, queue, shutdown, stopRepeatableJob, stopRepeatableJobByJobId };
return { start, listen, queue, shutdown, stopRepeatableJob, stopRepeatableJobByJobId, clearQueue, stopJobById };
};
1 change: 1 addition & 0 deletions backend/src/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ export const registerRoutes = async (
});

await superAdminService.initServerCfg();
await auditLogQueue.startAuditLogPruneJob();
// setup the communication with license key server
await licenseService.init();
// inject all services
Expand Down
2 changes: 1 addition & 1 deletion backend/src/services/project-env/project-env-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const projectEnvServiceFactory = ({
if (!oldEnv) throw new BadRequestError({ message: "Environment not found" });

if (slug) {
const existingEnv = await projectEnvDAL.findOne({ slug });
const existingEnv = await projectEnvDAL.findOne({ slug, projectId });
if (existingEnv && existingEnv.id !== id) {
throw new BadRequestError({
message: "Environment with slug already exist",
Expand Down
52 changes: 26 additions & 26 deletions docker-compose.pg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,32 +96,32 @@ services:
- 1025:1025 # SMTP server
- 8025:8025 # Web UI

mongo:
image: mongo
container_name: infisical-dev-mongo
restart: always
env_file: .env
environment:
- MONGO_INITDB_ROOT_USERNAME=root
- MONGO_INITDB_ROOT_PASSWORD=example
volumes:
- mongo-data:/data/db
ports:
- 27017:27017

mongo-express:
container_name: infisical-dev-mongo-express
image: mongo-express
restart: always
depends_on:
- mongo
env_file: .env
environment:
- ME_CONFIG_MONGODB_ADMINUSERNAME=root
- ME_CONFIG_MONGODB_ADMINPASSWORD=example
- ME_CONFIG_MONGODB_URL=mongodb://root:example@mongo:27017/
ports:
- 8081:8081
# mongo:
# image: mongo
# container_name: infisical-dev-mongo
# restart: always
# env_file: .env
# environment:
# - MONGO_INITDB_ROOT_USERNAME=root
# - MONGO_INITDB_ROOT_PASSWORD=example
# volumes:
# - mongo-data:/data/db
# ports:
# - 27017:27017
#
# mongo-express:
# container_name: infisical-dev-mongo-express
# image: mongo-express
# restart: always
# depends_on:
# - mongo
# env_file: .env
# environment:
# - ME_CONFIG_MONGODB_ADMINUSERNAME=root
# - ME_CONFIG_MONGODB_ADMINPASSWORD=example
# - ME_CONFIG_MONGODB_URL=mongodb://root:example@mongo:27017/
# ports:
# - 8081:8081

volumes:
postgres-data:
Expand Down

0 comments on commit 7711994

Please sign in to comment.