Skip to content

Commit

Permalink
Merge pull request #764 from bcnmy/fix/rabbitmq-connection
Browse files Browse the repository at this point in the history
fix(rabbitmq): 🐛 Disable heartbeat, add additional try/catch
  • Loading branch information
TheDivic authored Jan 25, 2025
2 parents 6ce29ac + 27ab189 commit a5fb7cc
Show file tree
Hide file tree
Showing 17 changed files with 18 additions and 102 deletions.
9 changes: 2 additions & 7 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
11155111, 84532, 168587773, 81457, 534351, 534352, 56400, 7000, 11155420,
80002, 27827, 4653, 8101902, 666666666, 2442, 7001, 5003, 100, 10200, 195,
196, 2810, 997, 3799, 167009, 80084, 5845, 167000, 1328, 1329, 995, 28882,
288, 920637907288165, 1740, 1750, 4202, 1135, 2818, 11155111
288, 920637907288165, 1740, 1750, 4202, 1135, 2818
],
"supportedNetworksV07": [
1, 11155420, 84532, 8453, 10, 56, 42161, 137, 100, 80084, 168587773, 81457,
11155111
1, 11155420, 84532, 8453, 10, 56, 42161, 137, 100, 80084, 168587773, 81457
],
"EIP1559SupportedNetworks": [
1, 137, 42161, 10, 43114, 43113, 8453, 59144, 204, 5611, 421614, 11155111,
Expand Down Expand Up @@ -1117,10 +1116,6 @@
"maxBlockWait": 26,
"supportedNetworks": []
},
"clearStaleMessages": {
"supportedNetworks": [1, 11155111],
"ttlSeconds": 3600
},
"isTWSetup": false,
"redisCluster": {
"port": 6379,
Expand Down
6 changes: 0 additions & 6 deletions src/common/queue/BundlerTransactionQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { logger } from "../logger";
import { SendUserOperation, TransactionType } from "../types";
import { IQueue } from "./interface/IQueue";
import { customJSONStringify } from "../utils";
import { shouldDiscardStaleMessage } from "./queueUtils";

const log = logger.child({
module: module.filename.split("/").slice(-4).join("/"),
Expand Down Expand Up @@ -67,11 +66,6 @@ export class BundlerTransactionQueue implements IQueue<SendUserOperation> {
_log.info(`BundlerTransactionQueue:: Publishing data to retry queue`);

try {
if (shouldDiscardStaleMessage(this.chainId, data, Date.now())) {
_log.warn(`BundlerTransactionQueue:: Discarding message because it's stale`);
return true;
}

this.channel.publish(
this.exchangeName,
key,
Expand Down
6 changes: 0 additions & 6 deletions src/common/queue/RetryTransactionHandlerQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { logger } from "../logger";
import { IQueue } from "./interface/IQueue";
import { RetryTransactionQueueData } from "./types";
import { customJSONStringify } from "../utils";
import { shouldDiscardStaleMessage } from "./queueUtils";

const log = logger.child({
module: module.filename.split("/").slice(-4).join("/"),
Expand Down Expand Up @@ -84,11 +83,6 @@ export class RetryTransactionHandlerQueue
_log.info({ data }, `RetryTransactionHandlerQueue:: Publishing data to retry queue`);

try {
if (shouldDiscardStaleMessage(this.chainId, data, Date.now())) {
_log.warn(`RetryTransactionHandlerQueue:: Discarding message because it's stale`);
return true;
}

if (this.channel) {
this.channel.publish(
this.exchangeName,
Expand Down
32 changes: 0 additions & 32 deletions src/common/queue/queueUtils.test.ts

This file was deleted.

30 changes: 0 additions & 30 deletions src/common/queue/queueUtils.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/common/queue/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ export type RetryTransactionQueueData = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
metaData: any;
relayerManagerName: string;
timestamp?: number;
};
4 changes: 0 additions & 4 deletions src/common/relay-service/BundlerRelayService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ export class BundlerRelayService implements IRelayService<SendUserOperation> {
async sendUserOperation(
data: SendUserOperation,
): Promise<RelayServiceResponseType> {
if (!data.timestamp) {
data.timestamp = Date.now();
}

const _log = log.child({
sendUserOperation: data,
});
Expand Down
8 changes: 5 additions & 3 deletions src/common/service-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ import { customJSONStringify, parseError } from "../utils";
import { GasPriceService } from "../gas-price";
import { CacheFeesJob } from "../gas-price/jobs/CacheFees";
import { FlashbotsClient } from "../network/FlashbotsClient";
import { ENTRYPOINT_V6_ABI } from "@biconomy/gas-estimations";
import { ENTRYPOINT_V7_ABI } from "@biconomy/gas-estimations";
import { ENTRYPOINT_V6_ABI, ENTRYPOINT_V7_ABI } from "@biconomy/gas-estimations"

const log = logger.child({
module: module.filename.split("/").slice(-4).join("/"),
Expand Down Expand Up @@ -109,7 +108,10 @@ let statusService: IStatusService;
const queueUrl =
process.env.BUNDLER_QUEUE_URL || nodeconfig.get<string>("queueUrl");

const rabbitMqConnection = await amqp.connect(queueUrl);
// disable heartbeat to avoid connection timeout
const rabbitMqConnection = await amqp.connect(queueUrl, {
heartbeat: 0,
});


const slackNotificationService = new SlackNotificationService(
Expand Down
1 change: 0 additions & 1 deletion src/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ export type SendUserOperation = {
transactionId: string;
userOp?: UserOperationType | UserOperationStruct;
walletAddress?: string;
timestamp?: number;
};

type ResponseType = {
Expand Down
7 changes: 6 additions & 1 deletion src/relayer/consumer/BundlerConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ export class BundlerConsumer
});

topLog.info(`BundlerConsumer.onMessageReceived`);
this.queue.ack(msg);

try {
await this.queue.ack(msg);
} catch (err) {
topLog.error({ err }, `BundlerConsumer.onMessageReceived:: Error while acknowledging message`);
}

const sendTransactionWithRetry = async (): Promise<void> => {
// get active relayer
Expand Down
1 change: 0 additions & 1 deletion src/relayer/relayer-manager/EVMRelayerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ export class EVMRelayerManager
...rawTx,
transactionId,
walletAddress: "",
timestamp: Date.now(),
},
this.ownerAccountDetails,
TransactionType.FUNDING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ export class EVMRetryTransactionService
"Message received from retry transaction queue",
);

this.queue.ack(msg);
try {
await this.queue.ack(msg);
} catch (err) {
log.error({ err }, `EVMRetryTransactionService.onMessageReceived:: Error while acknowledging message`);
}

const {
transactionHash,
Expand Down
2 changes: 0 additions & 2 deletions src/relayer/transaction-listener/EVMTransactionListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ export class EVMTransactionListener
metaData,
relayerManagerName,
previousTransactionHash,
timestamp,
} = notifyTransactionListenerParams;

const _log = log.child({
Expand Down Expand Up @@ -167,7 +166,6 @@ export class EVMTransactionListener
walletAddress,
metaData,
relayerManagerName,
timestamp,
});
} catch (publishToRetryTransactionQueueError) {
_log.error(
Expand Down
1 change: 0 additions & 1 deletion src/relayer/transaction-listener/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export type NotifyTransactionListenerParamsType = {
metaData?: any;
relayerManagerName: string;
error?: string;
timestamp?: number;
};

export type OnTransactionSuccessParamsType =
Expand Down
4 changes: 0 additions & 4 deletions src/relayer/transaction-service/EVMTransactionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ export class EVMTransactionService
transactionId,
walletAddress,
metaData,
timestamp,
} = transactionData;

const retryTransactionCount = parseInt(
Expand Down Expand Up @@ -234,7 +233,6 @@ export class EVMTransactionService
walletAddress,
metaData,
relayerManagerName,
timestamp,
});

if (transactionType === TransactionType.FUNDING) {
Expand Down Expand Up @@ -344,7 +342,6 @@ export class EVMTransactionService
walletAddress,
metaData,
relayerManagerName,
timestamp,
} = retryTransactionData;

const _log = log.child({
Expand Down Expand Up @@ -424,7 +421,6 @@ export class EVMTransactionService
walletAddress,
metaData,
relayerManagerName,
timestamp,
});

_log.info(
Expand Down
1 change: 0 additions & 1 deletion src/relayer/transaction-service/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ export type TransactionDataType = {
metaData?: {
dappAPIKey: string;
};
timestamp?: number;
};

export type ErrorTransactionResponseType = {
Expand Down
1 change: 0 additions & 1 deletion src/server/api/v2/eth_sendUserOperation/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ export const eth_sendUserOperation = async (req: Request, res: Response) => {
userOp,
transactionId,
walletAddress,
timestamp: Date.now(),
});

if (isError(response)) {
Expand Down

0 comments on commit a5fb7cc

Please sign in to comment.