Skip to content

Commit e240a18

Browse files
committed
add rabbitMq, executeSellDecision
Signed-off-by: MarcoMandar <malicemandar@gmail.com>
1 parent e6c3456 commit e240a18

File tree

4 files changed

+257
-96
lines changed

4 files changed

+257
-96
lines changed

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"node": ">=22"
4343
},
4444
"dependencies": {
45+
"amqplib": "^0.10.4",
4546
"ollama-ai-provider": "^0.16.1",
4647
"optional": "^0.1.4",
4748
"sharp": "^0.33.5"

packages/plugin-solana/src/providers/simulationSellingService.ts

+206-94
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import { TokenProvider } from "./token.ts";
1111
import { settings } from "@ai16z/eliza";
1212
import { IAgentRuntime, Memory, Provider, State } from "@ai16z/eliza";
1313
import { WalletProvider } from "./wallet.ts";
14+
import * as amqp from "amqplib";
1415

15-
interface sellDetails {
16+
interface SellDetails {
1617
sell_amount: number;
1718
sell_recommender_id: string | null;
1819
}
@@ -26,6 +27,12 @@ export class simulationSellingService {
2627
private MAX_DECAY_DAYS = 30;
2728
private backend: string;
2829
private backendToken: string;
30+
private amqpConnection: amqp.Connection;
31+
private amqpChannel: amqp.Channel;
32+
private sonarBe: string;
33+
private sonarBeToken: string;
34+
35+
private runningProcesses: Set<string> = new Set();
2936

3037
constructor(
3138
runtime: IAgentRuntime,
@@ -45,141 +52,245 @@ export class simulationSellingService {
4552
);
4653
this.backend = runtime.getSetting("BACKEND_URL");
4754
this.backendToken = runtime.getSetting("BACKEND_TOKEN");
55+
this.initializeRabbitMQ(runtime.getSetting("AMQP_URL"));
56+
this.sonarBe = runtime.getSetting("SONAR_BE");
57+
this.sonarBeToken = runtime.getSetting("SONAR_BE_TOKEN");
4858
}
49-
50-
public async startService() {
51-
// starting the service
52-
console.log("Starting SellingService...");
53-
await this.scanAndSell();
59+
/**
60+
* Initializes the RabbitMQ connection and starts consuming messages.
61+
* @param amqpUrl The RabbitMQ server URL.
62+
*/
63+
private async initializeRabbitMQ(amqpUrl: string) {
64+
try {
65+
this.amqpConnection = await amqp.connect(amqpUrl);
66+
this.amqpChannel = await this.amqpConnection.createChannel();
67+
console.log("Connected to RabbitMQ");
68+
// Start consuming messages
69+
this.consumeMessages();
70+
} catch (error) {
71+
console.error("Failed to connect to RabbitMQ:", error);
72+
}
5473
}
5574

56-
private async scanAndSell() {
57-
// scanning recommendations and selling
58-
console.log("Scanning for token performances...");
59-
const tokenPerformances =
60-
await this.trustScoreDb.getAllTokenPerformancesWithBalance();
61-
62-
const sellDecisions = this.decideWhenToSell(tokenPerformances);
63-
64-
// Execute sells
65-
await this.executeSells(sellDecisions);
66-
67-
// Perform stop loss checks
68-
await this.performStopLoss(tokenPerformances);
75+
/**
76+
* Sets up the consumer for the specified RabbitMQ queue.
77+
*/
78+
private async consumeMessages() {
79+
const queue = "process_eliza_simulation";
80+
await this.amqpChannel.assertQueue(queue, { durable: true });
81+
this.amqpChannel.consume(
82+
queue,
83+
(msg) => {
84+
if (msg !== null) {
85+
const content = msg.content.toString();
86+
this.processMessage(content);
87+
this.amqpChannel.ack(msg);
88+
}
89+
},
90+
{ noAck: false }
91+
);
92+
console.log(`Listening for messages on queue: ${queue}`);
6993
}
7094

71-
private decideWhenToSell(
72-
tokenPerformances: TokenPerformance[]
73-
): SellDecision[] {
74-
// To Do: logic when to sell and how much
75-
console.log("Deciding when to sell and how much...");
76-
const decisions: SellDecision[] = [];
77-
78-
tokenPerformances.forEach(async (performance) => {
79-
const tokenProvider = new TokenProvider(
80-
performance.tokenAddress,
81-
this.walletProvider
82-
);
83-
const sellAmount = await this.amountToSell(
84-
performance.tokenAddress,
85-
tokenProvider
95+
/**
96+
* Processes incoming messages from RabbitMQ.
97+
* @param message The message content as a string.
98+
*/
99+
private async processMessage(message: string) {
100+
try {
101+
const { tokenAddress, amount, sell_recommender_id } =
102+
JSON.parse(message);
103+
console.log(
104+
`Received message for token ${tokenAddress} to sell ${amount}`
86105
);
87-
const amountToSell = sellAmount.sellAmount;
88-
decisions.push({ tokenPerformance: performance, amountToSell });
89-
});
90-
91-
return decisions;
92-
}
93106

94-
async amountToSell(tokenAddress: string, tokenProvider: TokenProvider) {
95-
// To Do: Implement logic to decide how much to sell
96-
//placeholder
97-
const processedData: ProcessedTokenData =
98-
await tokenProvider.getProcessedTokenData();
99-
const prices = await this.walletProvider.fetchPrices(null);
100-
const solPrice = prices.solana.usd;
101-
const tokenBalance = this.trustScoreDb.getTokenBalance(tokenAddress);
107+
const decision: SellDecision = {
108+
tokenPerformance:
109+
await this.trustScoreDb.getTokenPerformance(tokenAddress),
110+
amountToSell: amount,
111+
sell_recommender_id: sell_recommender_id,
112+
};
102113

103-
const sellAmount = tokenBalance * 0.1;
104-
const sellSol = sellAmount / parseFloat(solPrice);
105-
const sellValueUsd = sellAmount * processedData.tradeData.price;
114+
// Execute the sell
115+
await this.executeSellDecision(decision);
106116

107-
return { sellAmount, sellSol, sellValueUsd };
117+
// Remove from running processes after completion
118+
this.runningProcesses.delete(tokenAddress);
119+
} catch (error) {
120+
console.error("Error processing message:", error);
121+
}
108122
}
109123

110-
private async executeSells(decisions: SellDecision[]) {
111-
console.log("Executing sell orders...");
112-
for (const decision of decisions) {
124+
/**
125+
* Executes a single sell decision.
126+
* @param decision The sell decision containing token performance and amount to sell.
127+
*/
128+
private async executeSellDecision(decision: SellDecision) {
129+
const { tokenPerformance, amountToSell, sell_recommender_id } =
130+
decision;
131+
const tokenAddress = tokenPerformance.tokenAddress;
132+
133+
try {
113134
console.log(
114-
`Selling ${decision.amountToSell} of token ${decision.tokenPerformance.tokenSymbol}`
135+
`Executing sell for token ${tokenPerformance.tokenSymbol}: ${amountToSell}`
115136
);
116-
// update the sell details
117-
const sellDetails = {
118-
sell_amount: decision.amountToSell,
119-
sell_recommender_id: null,
137+
138+
// Update the sell details
139+
const sellDetails: SellDetails = {
140+
sell_amount: amountToSell,
141+
sell_recommender_id: sell_recommender_id, // Adjust if necessary
120142
};
121143
const sellTimeStamp = new Date().toISOString();
122144
const tokenProvider = new TokenProvider(
123-
decision.tokenPerformance.tokenAddress,
145+
tokenAddress,
124146
this.walletProvider
125147
);
148+
149+
// Update sell details in the database
126150
const sellDetailsData = await this.updateSellDetails(
127-
decision.tokenPerformance.tokenAddress,
128-
decision.tokenPerformance.recommenderId,
151+
tokenAddress,
152+
tokenPerformance.recommenderId,
129153
sellTimeStamp,
130154
sellDetails,
131-
true,
155+
true, // isSimulation
132156
tokenProvider
133157
);
158+
134159
console.log("Sell order executed successfully", sellDetailsData);
160+
161+
// check if balance is zero and remove token from running processes
162+
const balance = this.trustScoreDb.getTokenBalance(tokenAddress);
163+
if (balance === 0) {
164+
this.runningProcesses.delete(tokenAddress);
165+
}
166+
// stop the process in the sonar backend
167+
await this.stopProcessInTheSonarBackend(tokenAddress);
168+
} catch (error) {
169+
console.error(
170+
`Error executing sell for token ${tokenAddress}:`,
171+
error
172+
);
135173
}
136174
}
137175

138-
private async performStopLoss(tokenPerformances: TokenPerformance[]) {
139-
console.log("Performing stop loss checks...");
140-
// To Do: Implement stop loss logic
141-
// check if the token has dropped by more than 50% in the last 24 hours
142-
for (const performance of tokenPerformances) {
176+
public async startService() {
177+
// starting the service
178+
console.log("Starting SellingService...");
179+
await this.startListeners();
180+
}
181+
182+
private async startListeners() {
183+
// scanning recommendations and selling
184+
console.log("Scanning for token performances...");
185+
const tokenPerformances =
186+
await this.trustScoreDb.getAllTokenPerformancesWithBalance();
187+
188+
await this.processTokenPerformances(tokenPerformances);
189+
}
190+
191+
private processTokenPerformances(tokenPerformances: TokenPerformance[]) {
192+
// To Do: logic when to sell and how much
193+
console.log("Deciding when to sell and how much...");
194+
const runningProcesses = this.runningProcesses;
195+
// remove running processes from tokenPerformances
196+
tokenPerformances = tokenPerformances.filter(
197+
(tp) => !runningProcesses.has(tp.tokenAddress)
198+
);
199+
200+
// start the process in the sonar backend
201+
tokenPerformances.forEach(async (tokenPerformance) => {
143202
const tokenProvider = new TokenProvider(
144-
performance.tokenAddress,
203+
tokenPerformance.tokenAddress,
145204
this.walletProvider
146205
);
147-
const processedData: ProcessedTokenData =
148-
await tokenProvider.getProcessedTokenData();
149-
if (processedData.tradeData.trade_24h_change_percent < -50) {
150-
const sellAmount = performance.balance;
151-
const sellSol = sellAmount / 100;
152-
const sellValueUsd = sellAmount * processedData.tradeData.price;
153-
const sellDetails = {
154-
sell_amount: sellAmount,
155-
sell_recommender_id: null,
156-
};
157-
const sellTimeStamp = new Date().toISOString();
158-
const sellDetailsData = await this.updateSellDetails(
159-
performance.tokenAddress,
160-
performance.recommenderId,
161-
sellTimeStamp,
162-
sellDetails,
163-
true,
164-
tokenProvider
206+
const shouldTrade = await tokenProvider.shouldTradeToken();
207+
if (shouldTrade) {
208+
const balance = tokenPerformance.balance;
209+
const sell_recommender_id = tokenPerformance.recommenderId;
210+
const tokenAddress = tokenPerformance.tokenAddress;
211+
const process = await this.startProcessInTheSonarBackend(
212+
tokenAddress,
213+
balance,
214+
sell_recommender_id
165215
);
166-
console.log(
167-
"Stop loss triggered. Sell order executed successfully",
168-
sellDetailsData
216+
if (process) {
217+
this.runningProcesses.add(tokenAddress);
218+
}
219+
}
220+
});
221+
}
222+
223+
private async startProcessInTheSonarBackend(
224+
tokenAddress: string,
225+
balance: number,
226+
sell_recommender_id: string
227+
) {
228+
try {
229+
const message = JSON.stringify({
230+
tokenAddress,
231+
balance,
232+
sell_recommender_id,
233+
});
234+
const response = await fetch(
235+
`${this.sonarBe}/api/simulation/sell`,
236+
{
237+
method: "POST",
238+
headers: {
239+
"Content-Type": "application/json",
240+
Authorization: `Bearer ${this.sonarBeToken}`,
241+
},
242+
body: message,
243+
}
244+
);
245+
246+
if (!response.ok) {
247+
console.error(
248+
`Failed to send message to process token ${tokenAddress}`
169249
);
250+
return;
170251
}
252+
253+
const result = await response.json();
254+
console.log("Received response:", result);
255+
console.log(`Sent message to process token ${tokenAddress}`);
256+
257+
return result;
258+
} catch (error) {
259+
console.error(
260+
`Error sending message to process token ${tokenAddress}:`,
261+
error
262+
);
263+
return null;
264+
}
265+
}
266+
267+
private stopProcessInTheSonarBackend(tokenAddress: string) {
268+
try {
269+
return fetch(
270+
`${this.sonarBe}/api/simulation/sell/${tokenAddress}`,
271+
{
272+
method: "GET",
273+
headers: {
274+
Authorization: `Bearer ${this.sonarBeToken}`,
275+
},
276+
}
277+
);
278+
} catch (error) {
279+
console.error(
280+
`Error stopping process for token ${tokenAddress}:`,
281+
error
282+
);
171283
}
172284
}
173285

174286
async updateSellDetails(
175287
tokenAddress: string,
176288
recommenderId: string,
177289
sellTimeStamp: string,
178-
sellDetails: sellDetails,
290+
sellDetails: SellDetails,
179291
isSimulation: boolean,
180292
tokenProvider: TokenProvider
181293
) {
182-
// To Do: Change the logic after codex update
183294
const recommender =
184295
await this.trustScoreDb.getOrCreateRecommenderWithTelegramId(
185296
recommenderId
@@ -278,7 +389,7 @@ export class simulationSellingService {
278389
tokenAddress: string,
279390
recommenderId: string,
280391
username: string,
281-
data: sellDetails,
392+
data: SellDetails,
282393
balanceLeft: number,
283394
retries = 3,
284395
delayMs = 2000
@@ -325,4 +436,5 @@ export class simulationSellingService {
325436
interface SellDecision {
326437
tokenPerformance: TokenPerformance;
327438
amountToSell: number;
439+
sell_recommender_id: string | null;
328440
}

0 commit comments

Comments
 (0)