|
| 1 | +import { SupabaseProvider } from "./providers/supabase.ts"; |
| 2 | +import { ArkiveProvider } from "./providers/interfaces.ts"; |
| 3 | +import { arkiver, arkiverTypes } from "./deps.ts"; |
| 4 | +import { rm } from "./utils.ts"; |
| 5 | +import { ArkiveMessageEvent } from "../manager/types.ts"; |
| 6 | + |
| 7 | +export class ArkiveManager { |
| 8 | + private arkiveProvider: ArkiveProvider; |
| 9 | + private arkives: { arkive: arkiverTypes.Arkive; worker: Worker }[] = []; |
| 10 | + |
| 11 | + constructor() { |
| 12 | + this.removeAllArkives = this.removeAllArkives.bind(this); |
| 13 | + this.addNewArkive = this.addNewArkive.bind(this); |
| 14 | + |
| 15 | + this.arkiveProvider = new SupabaseProvider(); |
| 16 | + } |
| 17 | + |
| 18 | + public async init() { |
| 19 | + try { |
| 20 | + // a1. Get all arkives from supabase |
| 21 | + const arkives = await this.getArkives(); |
| 22 | + // b1. Subscribe to new inserts and deletes in arkive table |
| 23 | + this.listenNewArkives(); |
| 24 | + this.listenForDeletedArkives(); |
| 25 | + // a2. Aggregate arkives by owner and name and get the latest version |
| 26 | + // const aggregatedArkives = this.aggregateArkives(arkives); |
| 27 | + await Promise.all( |
| 28 | + arkives.map(async (arkive) => { |
| 29 | + await this.addNewArkive(arkive); |
| 30 | + }), |
| 31 | + ); |
| 32 | + } catch (e) { |
| 33 | + arkiver.logger.error(e, { source: "ArkiveManager.init" }); |
| 34 | + } |
| 35 | + } |
| 36 | + |
| 37 | + private async getArkives() { |
| 38 | + arkiver.logger.info("fetching existing arkives"); |
| 39 | + return await this.arkiveProvider.getArkives(); |
| 40 | + } |
| 41 | + |
| 42 | + private listenNewArkives() { |
| 43 | + this.arkiveProvider.listenNewArkive(async (arkive: arkiverTypes.Arkive) => { |
| 44 | + arkiver.logger.info("new arkive", arkive); |
| 45 | + // only remove previous versions if on the same major version. |
| 46 | + // new major versions will be removed once the new version is synced |
| 47 | + const previousArkives = this.getPreviousVersions(arkive); |
| 48 | + const sameMajor = previousArkives.filter( |
| 49 | + (a) => |
| 50 | + a.arkive.deployment.major_version === arkive.deployment.major_version, |
| 51 | + ); |
| 52 | + this.arkives = this.arkives.filter( |
| 53 | + (a) => |
| 54 | + a.arkive.deployment.major_version !== arkive.deployment.major_version, |
| 55 | + ); |
| 56 | + |
| 57 | + await Promise.all(sameMajor.map(async (arkive) => { |
| 58 | + await this.removeArkive(arkive); |
| 59 | + })); |
| 60 | + |
| 61 | + await this.addNewArkive(arkive); |
| 62 | + }); |
| 63 | + arkiver.logger.info("listening for new arkives"); |
| 64 | + } |
| 65 | + |
| 66 | + private listenForDeletedArkives() { |
| 67 | + this.arkiveProvider.listenDeletedArkive(async ({ id }) => { |
| 68 | + arkiver.logger.info("deleting arkives", id); |
| 69 | + await this.removeAllArkives(id); |
| 70 | + arkiver.logger.info("deleted arkives", id); |
| 71 | + }); |
| 72 | + arkiver.logger.info("listening for deleted arkives"); |
| 73 | + } |
| 74 | + |
| 75 | + private async addNewArkive(arkive: arkiverTypes.Arkive) { |
| 76 | + arkiver.logger.info("adding new arkive", arkive); |
| 77 | + await this.pullPackage(arkive); |
| 78 | + const worker = await this.spawnArkiverWorker(arkive); |
| 79 | + await this.updateDeploymentStatus(arkive, "syncing"); |
| 80 | + this.arkives = [...this.arkives, { arkive, worker }]; |
| 81 | + arkiver.logger.info("added new arkive", arkive); |
| 82 | + } |
| 83 | + |
| 84 | + // this is called when an arkive is deleted by the user which means the record is no longer in the tables |
| 85 | + private async removeAllArkives(id: number) { |
| 86 | + arkiver.logger.info("removing arkives", id); |
| 87 | + const deletedArkives = this.arkives.filter((a) => a.arkive.id === id); |
| 88 | + await Promise.all(deletedArkives.map(async (arkive) => { |
| 89 | + await this.removePackage(arkive.arkive); |
| 90 | + arkive.worker.terminate(); |
| 91 | + })); |
| 92 | + this.arkives = this.arkives.filter((a) => a.arkive.id !== id); |
| 93 | + arkiver.logger.info("removed arkives", id); |
| 94 | + } |
| 95 | + |
| 96 | + // this is called in two places: when a new minor version is added (listenNewArkives) |
| 97 | + // and when a new major version has fully synced (worker.onmessage) |
| 98 | + private async removeArkive( |
| 99 | + arkive: { arkive: arkiverTypes.Arkive; worker: Worker }, |
| 100 | + ) { |
| 101 | + arkiver.logger.info("removing arkive", arkive); |
| 102 | + await this.removePackage(arkive.arkive); |
| 103 | + await this.updateDeploymentStatus( |
| 104 | + arkive.arkive, |
| 105 | + "retired", |
| 106 | + ); |
| 107 | + arkive.worker.terminate(); |
| 108 | + } |
| 109 | + |
| 110 | + private removeIndexedData(arkive: arkiverTypes.Arkive) { |
| 111 | + //TODO: remove indexed data from mongodb |
| 112 | + } |
| 113 | + |
| 114 | + private async spawnArkiverWorker(arkive: arkiverTypes.Arkive) { |
| 115 | + const manifestPath = |
| 116 | + `../packages/${arkive.user_id}/${arkive.id}/${arkive.deployment.major_version}_${arkive.deployment.minor_version}/manifest.ts`; |
| 117 | + const { manifest } = await import(manifestPath); |
| 118 | + |
| 119 | + const worker = new Worker( |
| 120 | + new URL("../arkiver/worker.ts", import.meta.url), |
| 121 | + { |
| 122 | + type: "module", |
| 123 | + deno: { |
| 124 | + permissions: { |
| 125 | + env: [ |
| 126 | + "INFLUXDB_URL", |
| 127 | + "INFLUXDB_TOKEN", |
| 128 | + "INFLUXDB_ORG", |
| 129 | + "INFLUXDB_BUCKET", |
| 130 | + "DENO_ENV", |
| 131 | + "NODE_ENV", |
| 132 | + "AVALANCHE_RPC_URL", |
| 133 | + ], |
| 134 | + hrtime: false, |
| 135 | + net: true, |
| 136 | + ffi: false, |
| 137 | + read: true, |
| 138 | + run: false, |
| 139 | + sys: false, |
| 140 | + write: false, |
| 141 | + }, |
| 142 | + }, |
| 143 | + }, |
| 144 | + ); |
| 145 | + |
| 146 | + worker.onmessage = async (e: MessageEvent<ArkiveMessageEvent>) => { |
| 147 | + if (e.data.topic === "workerError") { |
| 148 | + arkiver.logger.error(e.data.data.error, { |
| 149 | + source: "worker-arkive-" + e.data.data.arkive.id, |
| 150 | + }); |
| 151 | + } else if (e.data.topic === "synced") { |
| 152 | + try { |
| 153 | + const previousVersions = this.getPreviousVersions(e.data.data.arkive); |
| 154 | + for (const previousVersion of previousVersions) { |
| 155 | + // check if previous version is an older major version |
| 156 | + if ( |
| 157 | + previousVersion.arkive.deployment.major_version < |
| 158 | + arkive.deployment.major_version |
| 159 | + ) { |
| 160 | + arkiver.logger.info( |
| 161 | + "removing old major version", |
| 162 | + previousVersion.arkive, |
| 163 | + ); |
| 164 | + await this.removeArkive(previousVersion); |
| 165 | + this.removeIndexedData(previousVersion.arkive); |
| 166 | + arkiver.logger.info( |
| 167 | + "removed old major version", |
| 168 | + previousVersion.arkive, |
| 169 | + ); |
| 170 | + } |
| 171 | + } |
| 172 | + await this.updateDeploymentStatus( |
| 173 | + e.data.data.arkive, |
| 174 | + "synced", |
| 175 | + ); |
| 176 | + } catch (error) { |
| 177 | + arkiver.logger.error(error, { |
| 178 | + source: "worker-arkive-synced-" + e.data.data.arkive.id, |
| 179 | + }); |
| 180 | + } |
| 181 | + } |
| 182 | + }; |
| 183 | + worker.onerror = (e) => { |
| 184 | + arkiver.logger.error(e.error, { |
| 185 | + source: "worker-arkive-" + arkive.id, |
| 186 | + }); |
| 187 | + }; |
| 188 | + worker.postMessage({ |
| 189 | + topic: "initArkive", |
| 190 | + data: { |
| 191 | + arkive, |
| 192 | + manifest, |
| 193 | + }, |
| 194 | + }); |
| 195 | + return worker; |
| 196 | + } |
| 197 | + |
| 198 | + private async pullPackage(arkive: arkiverTypes.Arkive) { |
| 199 | + await this.arkiveProvider.pullArkive(arkive); |
| 200 | + } |
| 201 | + |
| 202 | + private getPreviousVersions(arkive: arkiverTypes.Arkive) { |
| 203 | + return this.arkives.filter( |
| 204 | + (a) => |
| 205 | + a.arkive.id === arkive.id && // same id |
| 206 | + (a.arkive.deployment.major_version < arkive.deployment.major_version || // older major version |
| 207 | + (a.arkive.deployment.major_version === // same major version but older minor version |
| 208 | + arkive.deployment.major_version && |
| 209 | + a.arkive.deployment.minor_version < |
| 210 | + arkive.deployment.minor_version)), |
| 211 | + ); |
| 212 | + } |
| 213 | + |
| 214 | + private async removePackage(arkive: arkiverTypes.Arkive) { |
| 215 | + const path = `${arkive.user_id}/${arkive.id}`; |
| 216 | + const localDir = new URL( |
| 217 | + `../packages/${path}/${arkive.deployment.major_version}_${arkive.deployment.minor_version}`, |
| 218 | + import.meta.url, |
| 219 | + ); |
| 220 | + arkiver.logger.info("removing package", localDir.pathname); |
| 221 | + await rm(localDir.pathname, { recursive: true }); |
| 222 | + } |
| 223 | + |
| 224 | + private async updateDeploymentStatus( |
| 225 | + arkive: arkiverTypes.Arkive, |
| 226 | + status: arkiverTypes.Deployment["status"], |
| 227 | + ) { |
| 228 | + await this.arkiveProvider.updateDeploymentStatus(arkive, status); |
| 229 | + } |
| 230 | + |
| 231 | + public cleanup() { |
| 232 | + this.arkives.forEach((arkive) => arkive.worker.terminate()); |
| 233 | + this.arkiveProvider.close(); |
| 234 | + } |
| 235 | +} |
0 commit comments