From dc6168fb5481063abfde6853cbeaf606d5dfad26 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 4 Aug 2024 09:24:18 -0300 Subject: [PATCH] feat: make v1 compatible with Conway era (#807) --- Cargo.lock | 116 ++++--- Cargo.toml | 13 +- src/mapper/babbage.rs | 12 +- src/mapper/byron.rs | 66 ++-- src/mapper/cip15.rs | 2 +- src/mapper/cip25.rs | 2 +- src/mapper/collect.rs | 26 +- src/mapper/conway.rs | 588 ++++++++++++++++++++++++++++++++++++ src/mapper/map.rs | 58 ++-- src/mapper/mod.rs | 1 + src/mapper/prelude.rs | 16 +- src/mapper/shelley.rs | 12 +- src/model.rs | 3 +- src/sinks/gcp_pubsub/run.rs | 1 + src/sources/common.rs | 167 +++++----- src/sources/n2c/run.rs | 14 +- src/sources/n2n/run.rs | 12 +- src/utils/mod.rs | 2 +- src/utils/time.rs | 20 ++ 19 files changed, 905 insertions(+), 226 deletions(-) create mode 100644 src/mapper/conway.rs diff --git a/Cargo.lock b/Cargo.lock index 92ebce5a..27ee56c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1830,6 +1830,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -2007,6 +2016,16 @@ dependencies = [ "minicbor-derive", ] +[[package]] +name = "minicbor" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d15f4203d71fdf90903c2696e55426ac97a363c67b218488a73b534ce7aca10" +dependencies = [ + "half", + "minicbor-derive", +] + [[package]] name = "minicbor-derive" version = "0.13.0" @@ -2101,6 +2120,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.45" @@ -2231,7 +2256,13 @@ dependencies = [ "murmur3", "net2", "openssl", - "pallas", + "pallas-addresses", + "pallas-codec 0.29.0", + "pallas-crypto", + "pallas-miniprotocols", + "pallas-multiplexer", + "pallas-primitives", + "pallas-traverse", "prometheus_exporter", "redis", "reqwest", @@ -2239,6 +2270,7 @@ dependencies = [ "serde_json", "strum", "strum_macros", + "time", "tokio", "unicode-truncate", ] @@ -2260,31 +2292,18 @@ dependencies = [ "yasna", ] -[[package]] -name = "pallas" -version = "0.18.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ef55b690eac7ddf43a3e7ce10d4594866c34279c424aa2ce26d757789246da" -dependencies = [ - "pallas-addresses", - "pallas-codec", - "pallas-crypto", - "pallas-miniprotocols", - "pallas-multiplexer", - "pallas-primitives", - "pallas-traverse", -] - [[package]] name = "pallas-addresses" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8db28c4050dea032d497555bc68c269ae8e691486d8ec83f02b090487da0d0be" +checksum = "d628ad58404ddd733e8fe46fe9986489b46258a2ab1bb7b1c4b8e406b91b7cff" dependencies = [ "base58", "bech32", + "crc", + "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "thiserror", ] @@ -2296,19 +2315,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6e03d05d42a663526d78c8b1d4f2554f09bbf4cc846e1a9e839c558bf6103c" dependencies = [ "hex", - "minicbor", + "minicbor 0.19.1", "serde", ] +[[package]] +name = "pallas-codec" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da003a7360fa032b80d38b4a15573f885f412f2b3868772d49fb072197a9d5f9" +dependencies = [ + "hex", + "minicbor 0.20.0", + "serde", + "thiserror", +] + [[package]] name = "pallas-crypto" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a35fc93b3613c0a628d0820f8d5d9a52709d795b59a1754a337aee0fca289dd" +checksum = "c9248ed0e594bcb0f548393264519c7adea88874d8bd7cc86f894e8ba4e918c2" dependencies = [ "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.29.0", "rand_core", "serde", "thiserror", @@ -2321,8 +2352,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8a4754676d92ae351ad524d98bc32d70835856ee0623a45288bb50a5ee4b161" dependencies = [ "hex", - "itertools", - "pallas-codec", + "itertools 0.10.5", + "pallas-codec 0.18.2", "pallas-multiplexer", "thiserror", "tracing", @@ -2337,7 +2368,7 @@ dependencies = [ "byteorder", "hex", "log", - "pallas-codec", + "pallas-codec 0.18.2", "rand", "thiserror", "tracing", @@ -2345,15 +2376,15 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc5fdf328f41971e0b1457e2377abeb09143fa50ab79f1a6a6ab5740bc94dc4b" +checksum = "c0fa55305212f7828651c8db024e1e286198c2fccb028bbb697c68990c044959" dependencies = [ "base58", "bech32", "hex", "log", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "serde", "serde_json", @@ -2361,15 +2392,18 @@ dependencies = [ [[package]] name = "pallas-traverse" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58c353ecb175a63422386c80301493db9fc448407bc63322534522579e22879" +checksum = "49459bd0d2ba86fd909890a81e6238eaf051952d7e38ad63195301e72e8f458e" dependencies = [ "hex", + "itertools 0.13.0", "pallas-addresses", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "pallas-primitives", + "paste", + "serde", "thiserror", ] @@ -2402,6 +2436,12 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pathdiff" version = "0.2.1" @@ -2603,7 +2643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.41", @@ -3369,12 +3409,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", + "num-conv", "powerfmt", "serde", "time-core", @@ -3389,10 +3430,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ + "num-conv", "time-core", ] diff --git a/Cargo.toml b/Cargo.toml index 6bbf25d7..ee6b8858 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,13 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.18.2" +pallas-multiplexer = "0.18.2" +pallas-miniprotocols = "0.18.2" +pallas-primitives = "0.29.0" +pallas-traverse = "0.29.0" +pallas-addresses = "0.29.0" +pallas-codec = "0.29.0" +pallas-crypto = "0.29.0" # pallas = { git = "https://github.com/txpipe/pallas" } # pallas = { path = "../pallas/pallas" } hex = "0.4.3" @@ -30,6 +36,7 @@ strum = "0.24" strum_macros = "0.24" prometheus_exporter = { version = "0.8.5", default-features = false } unicode-truncate = "0.2.0" +time = "0.3.36" # feature logs file-rotate = { version = "0.7.1", optional = true } @@ -63,7 +70,7 @@ redis = { version = "0.21.6", optional = true, features = ["tokio-comp"] } # features: gcp -google-cloud-gax = {version ="0.17.0", optional = true } +google-cloud-gax = { version = "0.17.0", optional = true } google-cloud-pubsub = { version = "0.23.0", optional = true } google-cloud-googleapis = { version = "0.12.0", optional = true } @@ -80,5 +87,5 @@ elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] redissink = ["redis", "tokio"] -gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web" ,"google-cloud-gax"] +gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web", "google-cloud-gax"] rabbitmqsink = ["lapin", "tokio"] diff --git a/src/mapper/babbage.rs b/src/mapper/babbage.rs index fda60d46..994e3ad1 100644 --- a/src/mapper/babbage.rs +++ b/src/mapper/babbage.rs @@ -1,12 +1,12 @@ -use pallas::codec::utils::KeepRaw; +use pallas_codec::utils::KeepRaw; -use pallas::ledger::primitives::babbage::{ +use pallas_primitives::babbage::{ AuxiliaryData, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput, MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, NetworkId, }; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_traverse::OriginalHash; use crate::model::{BlockRecord, Era, TransactionRecord}; use crate::utils::time::TimeProvider; @@ -199,7 +199,7 @@ impl EventWriter { let record = self.to_post_alonzo_output_record(output)?; self.append(record.into())?; - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; let child = &self.child_writer(EventContext { output_address: address.to_string().into(), @@ -389,7 +389,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_babbage_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_babbage_with_cbor(&block, cbor) } } diff --git a/src/mapper/byron.rs b/src/mapper/byron.rs index 2ed223d5..d0f58ac5 100644 --- a/src/mapper/byron.rs +++ b/src/mapper/byron.rs @@ -3,11 +3,12 @@ use std::ops::Deref; use super::map::ToHex; use super::EventWriter; use crate::model::{BlockRecord, Era, EventData, TransactionRecord, TxInputRecord, TxOutputRecord}; +use crate::utils::time::TimeProvider; use crate::{model::EventContext, Error}; -use pallas::crypto::hash::Hash; -use pallas::ledger::primitives::byron; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_primitives::byron; +use pallas_traverse::OriginalHash; impl EventWriter { fn to_byron_input_record(&self, source: &byron::TxIn) -> Option { @@ -41,12 +42,9 @@ impl EventWriter { } fn to_byron_output_record(&self, source: &byron::TxOut) -> Result { - let address: pallas::ledger::addresses::Address = - pallas::ledger::addresses::ByronAddress::new( - &source.address.payload.0, - source.address.crc, - ) - .into(); + let address: pallas_addresses::Address = + pallas_addresses::ByronAddress::new(&source.address.payload.0, source.address.crc) + .into(); Ok(TxOutputRecord { address: address.to_string(), @@ -168,10 +166,12 @@ impl EventWriter { hash: &Hash<32>, cbor: &[u8], ) -> Result { - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - source.header.consensus_data.0.epoch, - source.header.consensus_data.0.slot, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute( + source.header.consensus_data.0.epoch, + source.header.consensus_data.0.slot, + ) + }); let mut record = BlockRecord { era: Era::Byron, @@ -181,7 +181,7 @@ impl EventWriter { tx_count: source.body.tx_payload.len(), hash: hash.to_hex(), number: source.header.consensus_data.2[0], - slot: abs_slot, + slot: abs_slot.unwrap_or_default(), epoch: Some(source.header.consensus_data.0.epoch), epoch_slot: Some(source.header.consensus_data.0.slot), previous_hash: source.header.prev_block.to_hex(), @@ -234,10 +234,9 @@ impl EventWriter { hash: &Hash<32>, cbor: &[u8], ) -> Result { - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - source.header.consensus_data.epoch_id, - 0, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute(source.header.consensus_data.epoch_id, 0) + }); Ok(BlockRecord { era: Era::Byron, @@ -247,7 +246,7 @@ impl EventWriter { vrf_vkey: Default::default(), tx_count: 0, number: source.header.consensus_data.difficulty[0], - slot: abs_slot, + slot: abs_slot.unwrap_or_default(), epoch: Some(source.header.consensus_data.epoch_id), epoch_slot: Some(0), previous_hash: source.header.prev_block.to_hex(), @@ -288,16 +287,18 @@ impl EventWriter { ) -> Result<(), Error> { let hash = block.header.original_hash(); - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - block.header.consensus_data.0.epoch, - block.header.consensus_data.0.slot, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute( + block.header.consensus_data.0.epoch, + block.header.consensus_data.0.slot, + ) + }); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(hash)), block_number: Some(block.header.consensus_data.2[0]), - slot: Some(abs_slot), - timestamp: self.compute_timestamp(abs_slot), + slot: abs_slot, + timestamp: abs_slot.and_then(|slot| self.compute_timestamp(slot)), ..EventContext::default() }); @@ -311,7 +312,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_byron_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, byron::MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, byron::MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_byron_with_cbor(&block, cbor) } @@ -328,16 +329,15 @@ impl EventWriter { if self.config.include_byron_ebb { let hash = block.header.original_hash(); - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - block.header.consensus_data.epoch_id, - 0, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute(block.header.consensus_data.epoch_id, 0) + }); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(hash)), block_number: Some(block.header.consensus_data.difficulty[0]), - slot: Some(abs_slot), - timestamp: self.compute_timestamp(abs_slot), + slot: abs_slot, + timestamp: abs_slot.and_then(|slot| self.compute_timestamp(slot)), ..EventContext::default() }); @@ -352,7 +352,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_ebb_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, byron::MintedEbBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, byron::MintedEbBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_ebb_with_cbor(&block, cbor) } } diff --git a/src/mapper/cip15.rs b/src/mapper/cip15.rs index 1d269206..84a548d6 100644 --- a/src/mapper/cip15.rs +++ b/src/mapper/cip15.rs @@ -3,7 +3,7 @@ use crate::model::CIP15AssetRecord; use crate::Error; use serde_json::Value as JsonValue; -use pallas::ledger::primitives::alonzo::Metadatum; +use pallas_primitives::alonzo::Metadatum; fn extract_json_property<'a>( json: &'a JsonValue, diff --git a/src/mapper/cip25.rs b/src/mapper/cip25.rs index 9def5b1a..813b7ef7 100644 --- a/src/mapper/cip25.rs +++ b/src/mapper/cip25.rs @@ -1,6 +1,6 @@ use serde_json::Value as JsonValue; -use pallas::ledger::primitives::alonzo::Metadatum; +use pallas_primitives::alonzo::Metadatum; use crate::{model::CIP25AssetRecord, Error}; diff --git a/src/mapper/collect.rs b/src/mapper/collect.rs index 515e11b5..1de39df9 100644 --- a/src/mapper/collect.rs +++ b/src/mapper/collect.rs @@ -1,19 +1,15 @@ -use pallas::{ - codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray}, - ledger::{ - primitives::{ - alonzo::{ - AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, - PlutusScript, Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value, - }, - babbage::{ - LegacyTransactionOutput, MintedPostAlonzoTransactionOutput, - MintedTransactionOutput, PlutusV2Script, - }, - }, - traverse::OriginalHash, +use pallas_codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray}; +use pallas_primitives::{ + alonzo::{ + AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, PlutusScript, + Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value, + }, + babbage::{ + LegacyTransactionOutput, MintedPostAlonzoTransactionOutput, MintedTransactionOutput, + PlutusV2Script, }, }; +use pallas_traverse::OriginalHash; use crate::{ model::{ @@ -138,7 +134,7 @@ impl EventWriter { pub fn collect_native_witness_records( &self, - witness_set: &Option>, + witness_set: &Option>>, ) -> Result, Error> { match witness_set { Some(all) => all diff --git a/src/mapper/conway.rs b/src/mapper/conway.rs new file mode 100644 index 00000000..8b07fc9e --- /dev/null +++ b/src/mapper/conway.rs @@ -0,0 +1,588 @@ +use pallas_codec::utils::{KeepRaw, NonZeroInt}; + +use pallas_primitives::conway::{ + AuxiliaryData, Certificate, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput, + MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, Multiasset, NetworkId, + RedeemerTag, RedeemersKey, RedeemersValue, +}; + +use pallas_crypto::hash::Hash; +use pallas_primitives::ToCanonicalJson as _; +use pallas_traverse::OriginalHash; + +use crate::model::{ + BlockRecord, Era, MintRecord, PlutusRedeemerRecord, TransactionRecord, TxOutputRecord, +}; +use crate::utils::time::TimeProvider; +use crate::{ + model::{EventContext, EventData}, + Error, +}; + +use super::{map::ToHex, EventWriter}; + +impl EventWriter { + pub fn collect_conway_mint_records(&self, mint: &Multiasset) -> Vec { + mint.iter() + .flat_map(|(policy, assets)| { + assets + .iter() + .map(|(asset, amount)| self.to_mint_record(policy, asset, amount.into())) + }) + .collect() + } + + pub fn crawl_conway_mints(&self, mints: &Multiasset) -> Result<(), Error> { + for (policy, assets) in mints.iter() { + for (asset, quantity) in assets.iter() { + self.append_from(self.to_mint_record(policy, asset, quantity.into()))?; + } + } + + Ok(()) + } + + pub fn to_conway_output_record( + &self, + output: &MintedPostAlonzoTransactionOutput, + ) -> Result { + let address = pallas_addresses::Address::from_bytes(&output.address)?; + + Ok(TxOutputRecord { + address: address.to_string(), + amount: super::map::get_tx_output_coin_value(&output.value), + assets: self.collect_asset_records(&output.value).into(), + datum_hash: match &output.datum_option { + Some(MintedDatumOption::Hash(x)) => Some(x.to_string()), + Some(MintedDatumOption::Data(x)) => Some(x.original_hash().to_hex()), + None => None, + }, + inline_datum: match &output.datum_option { + Some(MintedDatumOption::Data(x)) => Some(self.to_plutus_datum_record(x)?), + _ => None, + }, + }) + } + + pub fn to_conway_redeemer_record( + &self, + key: &RedeemersKey, + value: &RedeemersValue, + ) -> Result { + Ok(PlutusRedeemerRecord { + purpose: match key.tag { + RedeemerTag::Spend => "spend".to_string(), + RedeemerTag::Mint => "mint".to_string(), + RedeemerTag::Cert => "cert".to_string(), + RedeemerTag::Reward => "reward".to_string(), + RedeemerTag::Vote => "vote".to_string(), + RedeemerTag::Propose => "propose".to_string(), + }, + ex_units_mem: value.ex_units.mem, + ex_units_steps: value.ex_units.steps, + input_idx: key.index, + plutus_data: value.data.to_json(), + }) + } + + pub fn collect_conway_output_records( + &self, + source: &[MintedTransactionOutput], + ) -> Result, Error> { + source + .iter() + .map(|x| match x { + MintedTransactionOutput::Legacy(x) => self.to_legacy_output_record(x), + MintedTransactionOutput::PostAlonzo(x) => self.to_conway_output_record(x), + }) + .collect() + } + + pub fn to_conway_tx_size( + &self, + body: &KeepRaw, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> usize { + body.raw_cbor().len() + + aux_data.map(|ax| ax.raw_cbor().len()).unwrap_or(2) + + witness_set.map(|ws| ws.raw_cbor().len()).unwrap_or(1) + } + + pub fn to_conway_transaction_record( + &self, + body: &KeepRaw, + tx_hash: &str, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> Result { + let mut record = TransactionRecord { + hash: tx_hash.to_owned(), + size: self.to_conway_tx_size(body, aux_data, witness_set) as u32, + fee: body.fee, + ttl: body.ttl, + validity_interval_start: body.validity_interval_start, + network_id: body.network_id.as_ref().map(|x| match x { + NetworkId::One => 1, + NetworkId::Two => 2, + }), + ..Default::default() + }; + + let outputs = self.collect_conway_output_records(body.outputs.as_slice())?; + record.output_count = outputs.len(); + record.total_output = outputs.iter().map(|o| o.amount).sum(); + + let inputs = self.collect_input_records(&body.inputs); + record.input_count = inputs.len(); + + if let Some(mint) = &body.mint { + let mints = self.collect_conway_mint_records(mint); + record.mint_count = mints.len(); + + if self.config.include_transaction_details { + record.mint = mints.into(); + } + } + + // Add Collateral Stuff + let collateral_inputs = &body.collateral.as_deref(); + record.collateral_input_count = collateral_inputs.iter().count(); + record.has_collateral_output = body.collateral_return.is_some(); + + // TODO + // TransactionBodyComponent::ScriptDataHash(_) + // TransactionBodyComponent::RequiredSigners(_) + // TransactionBodyComponent::AuxiliaryDataHash(_) + + if self.config.include_transaction_details { + record.outputs = outputs.into(); + record.inputs = inputs.into(); + + // transaction_details collateral stuff + record.collateral_inputs = + collateral_inputs.map(|inputs| self.collect_input_records(inputs)); + + record.collateral_output = body.collateral_return.as_ref().map(|output| match output { + MintedTransactionOutput::Legacy(x) => self.to_legacy_output_record(x).unwrap(), + MintedTransactionOutput::PostAlonzo(x) => self.to_conway_output_record(x).unwrap(), + }); + + record.metadata = match aux_data { + Some(aux_data) => self.collect_metadata_records(aux_data)?.into(), + None => None, + }; + + if let Some(witnesses) = witness_set { + record.vkey_witnesses = Some( + witnesses + .vkeywitness + .iter() + .flatten() + .map(|i| self.to_vkey_witness_record(i)) + .collect::>()?, + ); + + record.native_witnesses = Some( + witnesses + .native_script + .iter() + .flatten() + .map(|i| self.to_native_witness_record(i)) + .collect::>()?, + ); + + let mut all_plutus = vec![]; + + let plutus_v1: Vec<_> = witnesses + .plutus_v1_script + .iter() + .flatten() + .map(|i| self.to_plutus_v1_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v1); + + let plutus_v2: Vec<_> = witnesses + .plutus_v2_script + .iter() + .flatten() + .map(|i| self.to_plutus_v2_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v2); + + let plutus_v3: Vec<_> = witnesses + .plutus_v3_script + .iter() + .flatten() + .map(|i| self.to_plutus_v3_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v3); + + record.plutus_witnesses = Some(all_plutus); + + record.plutus_redeemers = Some( + witnesses + .redeemer + .iter() + .flat_map(|i| i.iter()) + .map(|(k, v)| self.to_conway_redeemer_record(k, v)) + .collect::>()?, + ); + + record.plutus_data = Some( + witnesses + .plutus_data + .iter() + .flatten() + .map(|i| self.to_plutus_datum_record(i)) + .collect::>()?, + ); + } + + if let Some(withdrawals) = &body.withdrawals { + record.withdrawals = self.collect_withdrawal_records(withdrawals).into(); + } + } + + Ok(record) + } + + pub fn to_conway_block_record( + &self, + source: &MintedBlock, + hash: &Hash<32>, + cbor: &[u8], + ) -> Result { + let relative_epoch = self + .utils + .time + .as_ref() + .map(|time| time.absolute_slot_to_relative(source.header.header_body.slot)); + + let mut record = BlockRecord { + era: Era::Conway, + body_size: source.header.header_body.block_body_size as usize, + issuer_vkey: source.header.header_body.issuer_vkey.to_hex(), + vrf_vkey: source.header.header_body.vrf_vkey.to_hex(), + tx_count: source.transaction_bodies.len(), + hash: hex::encode(hash), + number: source.header.header_body.block_number, + slot: source.header.header_body.slot, + epoch: relative_epoch.map(|(epoch, _)| epoch), + epoch_slot: relative_epoch.map(|(_, epoch_slot)| epoch_slot), + previous_hash: source + .header + .header_body + .prev_hash + .map(hex::encode) + .unwrap_or_default(), + cbor_hex: match self.config.include_block_cbor { + true => hex::encode(cbor).into(), + false => None, + }, + transactions: None, + }; + + if self.config.include_block_details || self.config.include_transaction_details { + record.transactions = Some(self.collect_conway_tx_records(source)?); + } + + Ok(record) + } + + pub fn collect_conway_tx_records( + &self, + block: &MintedBlock, + ) -> Result, Error> { + block + .transaction_bodies + .iter() + .enumerate() + .map(|(idx, tx)| { + let aux_data = block + .auxiliary_data_set + .iter() + .find(|(k, _)| *k == (idx as u32)) + .map(|(_, v)| v); + + let witness_set = block.transaction_witness_sets.get(idx); + + let tx_hash = tx.original_hash().to_hex(); + + self.to_conway_transaction_record(tx, &tx_hash, aux_data, witness_set) + }) + .collect() + } + + fn crawl_conway_output(&self, output: &MintedPostAlonzoTransactionOutput) -> Result<(), Error> { + let record = self.to_conway_output_record(output)?; + self.append(record.into())?; + + let address = pallas_addresses::Address::from_bytes(&output.address)?; + + let child = &self.child_writer(EventContext { + output_address: address.to_string().into(), + ..EventContext::default() + }); + + child.crawl_transaction_output_amount(&output.value)?; + + if let Some(MintedDatumOption::Data(datum)) = &output.datum_option { + let record = self.to_plutus_datum_record(datum)?; + child.append(record.into())?; + } + + Ok(()) + } + + fn crawl_conway_transaction_output( + &self, + output: &MintedTransactionOutput, + ) -> Result<(), Error> { + match output { + MintedTransactionOutput::Legacy(x) => self.crawl_legacy_output(x), + MintedTransactionOutput::PostAlonzo(x) => self.crawl_conway_output(x), + } + } + + fn crawl_conway_witness_set( + &self, + witness_set: &KeepRaw, + ) -> Result<(), Error> { + if let Some(native) = &witness_set.native_script { + for script in native.iter() { + self.append_from(self.to_native_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v1_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v1_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v2_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v2_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v3_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v3_witness_record(script)?)?; + } + } + + if let Some(redeemers) = &witness_set.redeemer { + for (key, value) in redeemers.iter() { + self.append_from(self.to_conway_redeemer_record(key, value)?)?; + } + } + + if let Some(datums) = &witness_set.plutus_data { + for datum in datums.iter() { + self.append_from(self.to_plutus_datum_record(datum)?)?; + } + } + + Ok(()) + } + + pub fn to_conway_certificate_event(&self, certificate: &Certificate) -> Option { + match certificate { + Certificate::StakeRegistration(credential) => EventData::StakeRegistration { + credential: credential.into(), + } + .into(), + Certificate::StakeDeregistration(credential) => EventData::StakeDeregistration { + credential: credential.into(), + } + .into(), + Certificate::StakeDelegation(credential, pool) => EventData::StakeDelegation { + credential: credential.into(), + pool_hash: pool.to_hex(), + } + .into(), + Certificate::PoolRegistration { + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + } => EventData::PoolRegistration { + operator: operator.to_hex(), + vrf_keyhash: vrf_keyhash.to_hex(), + pledge: *pledge, + cost: *cost, + margin: (margin.numerator as f64 / margin.denominator as f64), + reward_account: reward_account.to_hex(), + pool_owners: pool_owners.iter().map(|p| p.to_hex()).collect(), + relays: relays.iter().map(super::map::relay_to_string).collect(), + pool_metadata: pool_metadata.to_owned().map(|m| m.url.clone()).into(), + pool_metadata_hash: pool_metadata + .to_owned() + .map(|m| m.hash.clone().to_hex()) + .into(), + } + .into(), + Certificate::PoolRetirement(pool, epoch) => EventData::PoolRetirement { + pool: pool.to_hex(), + epoch: *epoch, + } + .into(), + // all new Conway certs are out of scope for Oura lts/v1 + _ => None, + } + } + + fn crawl_conway_certificate(&self, certificate: &Certificate) -> Result<(), Error> { + if let Some(evt) = self.to_conway_certificate_event(certificate) { + self.append(evt)?; + } + + Ok(()) + } + + fn crawl_conway_transaction( + &self, + tx: &KeepRaw, + tx_hash: &str, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> Result<(), Error> { + let record = self.to_conway_transaction_record(tx, tx_hash, aux_data, witness_set)?; + + self.append_from(record.clone())?; + + for (idx, input) in tx.inputs.iter().enumerate() { + let child = self.child_writer(EventContext { + input_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_transaction_input(input)?; + } + + for (idx, output) in tx.outputs.iter().enumerate() { + let child = self.child_writer(EventContext { + output_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_conway_transaction_output(output)?; + } + + if let Some(certs) = &tx.certificates { + for (idx, cert) in certs.iter().enumerate() { + let child = self.child_writer(EventContext { + certificate_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_conway_certificate(cert)?; + } + } + + if let Some(collateral) = &tx.collateral { + for collateral in collateral.iter() { + // TODO: collateral context? + + self.crawl_collateral(collateral)?; + } + } + + if let Some(mint) = &tx.mint { + self.crawl_conway_mints(mint)?; + } + + if let Some(aux_data) = aux_data { + self.crawl_auxdata(aux_data)?; + } + + if let Some(witness_set) = witness_set { + self.crawl_conway_witness_set(witness_set)?; + } + + if self.config.include_transaction_end_events { + self.append(EventData::TransactionEnd(record))?; + } + + Ok(()) + } + + fn crawl_conway_block( + &self, + block: &MintedBlock, + hash: &Hash<32>, + cbor: &[u8], + ) -> Result<(), Error> { + let record = self.to_conway_block_record(block, hash, cbor)?; + + self.append(EventData::Block(record.clone()))?; + + for (idx, tx) in block.transaction_bodies.iter().enumerate() { + let aux_data = block + .auxiliary_data_set + .iter() + .find(|(k, _)| *k == (idx as u32)) + .map(|(_, v)| v); + + let witness_set = block.transaction_witness_sets.get(idx); + + let tx_hash = tx.original_hash().to_hex(); + + let child = self.child_writer(EventContext { + tx_idx: Some(idx), + tx_hash: Some(tx_hash.to_owned()), + ..EventContext::default() + }); + + child.crawl_conway_transaction(tx, &tx_hash, aux_data, witness_set)?; + } + + if self.config.include_block_end_events { + self.append(EventData::BlockEnd(record))?; + } + + Ok(()) + } + + /// Mapper entry-point for decoded Conway blocks + /// + /// Entry-point to start crawling a blocks for events. Meant to be used when + /// we already have a decoded block (for example, N2C). The raw CBOR is also + /// passed through in case we need to attach it to outbound events. + pub fn crawl_conway_with_cbor<'b>( + &self, + block: &'b MintedBlock<'b>, + cbor: &'b [u8], + ) -> Result<(), Error> { + let hash = block.header.original_hash(); + + let child = self.child_writer(EventContext { + block_hash: Some(hex::encode(hash)), + block_number: Some(block.header.header_body.block_number), + slot: Some(block.header.header_body.slot), + timestamp: self.compute_timestamp(block.header.header_body.slot), + ..EventContext::default() + }); + + child.crawl_conway_block(block, &hash, cbor) + } + + /// Mapper entry-point for raw Conway cbor blocks + /// + /// Entry-point to start crawling a blocks for events. Meant to be used when + /// we haven't decoded the CBOR yet (for example, N2N). + pub fn crawl_from_conway_cbor(&self, cbor: &[u8]) -> Result<(), Error> { + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; + self.crawl_conway_with_cbor(&block, cbor) + } +} diff --git a/src/mapper/map.rs b/src/mapper/map.rs index d05d20a0..79be068b 100644 --- a/src/mapper/map.rs +++ b/src/mapper/map.rs @@ -1,20 +1,21 @@ use std::collections::HashMap; -use pallas::ledger::primitives::alonzo::MintedWitnessSet; -use pallas::ledger::primitives::babbage::MintedDatumOption; -use pallas::ledger::traverse::{ComputeHash, OriginalHash}; -use pallas::{codec::utils::KeepRaw, crypto::hash::Hash}; +use pallas_codec::utils::KeepRaw; +use pallas_crypto::hash::Hash; +use pallas_primitives::alonzo::MintedWitnessSet; +use pallas_primitives::babbage::MintedDatumOption; +use pallas_traverse::{ComputeHash, OriginalHash}; -use pallas::ledger::primitives::{ +use pallas_primitives::{ alonzo::{ self as alonzo, AuxiliaryData, Certificate, InstantaneousRewardSource, InstantaneousRewardTarget, Metadatum, MetadatumLabel, MintedBlock, NetworkId, Relay, TransactionBody, TransactionInput, Value, }, - babbage, ToCanonicalJson, + babbage, conway, ToCanonicalJson, }; -use pallas::network::miniprotocols::Point; +use pallas_miniprotocols::Point; use serde_json::{json, Value as JsonValue}; use crate::model::{ @@ -64,23 +65,23 @@ fn ip_string_from_bytes(bytes: &[u8]) -> String { format!("{}.{}.{}.{}", bytes[0], bytes[1], bytes[2], bytes[3]) } -fn relay_to_string(relay: &Relay) -> String { +pub fn relay_to_string(relay: &Relay) -> String { match relay { Relay::SingleHostAddr(port, ipv4, ipv6) => { let ip = match (ipv6, ipv4) { - (None, None) => "".to_string(), - (_, Some(x)) => ip_string_from_bytes(x.as_ref()), - (Some(x), _) => ip_string_from_bytes(x.as_ref()), + (_, pallas_codec::utils::Nullable::Some(x)) => ip_string_from_bytes(x.as_ref()), + (pallas_codec::utils::Nullable::Some(x), _) => ip_string_from_bytes(x.as_ref()), + _ => "".to_string(), }; match port { - Some(port) => format!("{ip}:{port}"), - None => ip, + pallas_codec::utils::Nullable::Some(port) => format!("{ip}:{port}"), + _ => ip, } } Relay::SingleHostName(port, host) => match port { - Some(port) => format!("{host}:{port}"), - None => host.clone(), + pallas_codec::utils::Nullable::Some(port) => format!("{host}:{port}"), + _ => host.clone(), }, Relay::MultiHostName(host) => host.clone(), } @@ -98,7 +99,7 @@ fn metadatum_to_string_key(datum: &Metadatum) -> String { } } -fn get_tx_output_coin_value(amount: &Value) -> u64 { +pub fn get_tx_output_coin_value(amount: &Value) -> u64 { match amount { Value::Coin(x) => *x, Value::Multiasset(x, _) => *x, @@ -169,7 +170,7 @@ impl EventWriter { &self, output: &alonzo::TransactionOutput, ) -> Result { - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; Ok(TxOutputRecord { address: address.to_string(), @@ -184,7 +185,7 @@ impl EventWriter { &self, output: &babbage::MintedPostAlonzoTransactionOutput, ) -> Result { - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; Ok(TxOutputRecord { address: address.to_string(), @@ -205,7 +206,7 @@ impl EventWriter { pub fn to_transaction_output_asset_record( &self, policy: &Hash<28>, - asset: &pallas::codec::utils::Bytes, + asset: &pallas_codec::utils::Bytes, amount: u64, ) -> OutputAssetRecord { OutputAssetRecord { @@ -219,7 +220,7 @@ impl EventWriter { pub fn to_mint_record( &self, policy: &Hash<28>, - asset: &pallas::codec::utils::Bytes, + asset: &pallas_codec::utils::Bytes, quantity: i64, ) -> MintRecord { MintRecord { @@ -291,6 +292,16 @@ impl EventWriter { }) } + pub fn to_plutus_v3_witness_record( + &self, + script: &conway::PlutusV3Script, + ) -> Result { + Ok(PlutusWitnessRecord { + script_hash: script.compute_hash().to_hex(), + script_hex: script.as_ref().to_hex(), + }) + } + pub fn to_native_witness_record( &self, script: &alonzo::NativeScript, @@ -342,8 +353,11 @@ impl EventWriter { reward_account: reward_account.to_hex(), pool_owners: pool_owners.iter().map(|p| p.to_hex()).collect(), relays: relays.iter().map(relay_to_string).collect(), - pool_metadata: pool_metadata.as_ref().map(|m| m.url.clone()), - pool_metadata_hash: pool_metadata.as_ref().map(|m| m.hash.clone().to_hex()), + pool_metadata: pool_metadata.to_owned().map(|m| m.url.clone()).into(), + pool_metadata_hash: pool_metadata + .to_owned() + .map(|m| m.hash.clone().to_hex()) + .into(), }, Certificate::PoolRetirement(pool, epoch) => EventData::PoolRetirement { pool: pool.to_hex(), diff --git a/src/mapper/mod.rs b/src/mapper/mod.rs index a176e40f..13e68677 100644 --- a/src/mapper/mod.rs +++ b/src/mapper/mod.rs @@ -3,6 +3,7 @@ mod byron; mod cip15; mod cip25; mod collect; +mod conway; mod map; mod prelude; mod shelley; diff --git a/src/mapper/prelude.rs b/src/mapper/prelude.rs index 4564bb92..806ec250 100644 --- a/src/mapper/prelude.rs +++ b/src/mapper/prelude.rs @@ -106,15 +106,15 @@ impl EventWriter { } } -impl From for Era { - fn from(other: pallas::ledger::traverse::Era) -> Self { +impl From for Era { + fn from(other: pallas_traverse::Era) -> Self { match other { - pallas::ledger::traverse::Era::Byron => Era::Byron, - pallas::ledger::traverse::Era::Shelley => Era::Shelley, - pallas::ledger::traverse::Era::Allegra => Era::Allegra, - pallas::ledger::traverse::Era::Mary => Era::Mary, - pallas::ledger::traverse::Era::Alonzo => Era::Alonzo, - pallas::ledger::traverse::Era::Babbage => Era::Babbage, + pallas_traverse::Era::Byron => Era::Byron, + pallas_traverse::Era::Shelley => Era::Shelley, + pallas_traverse::Era::Allegra => Era::Allegra, + pallas_traverse::Era::Mary => Era::Mary, + pallas_traverse::Era::Alonzo => Era::Alonzo, + pallas_traverse::Era::Babbage => Era::Babbage, _ => Era::Unknown, } } diff --git a/src/mapper/shelley.rs b/src/mapper/shelley.rs index 183ec6e7..c10a930c 100644 --- a/src/mapper/shelley.rs +++ b/src/mapper/shelley.rs @@ -1,12 +1,12 @@ -use pallas::codec::utils::KeepRaw; +use pallas_codec::utils::KeepRaw; -use pallas::ledger::primitives::alonzo::{ +use pallas_primitives::alonzo::{ AuxiliaryData, Certificate, Metadata, MintedBlock, MintedWitnessSet, Multiasset, TransactionBody, TransactionInput, TransactionOutput, Value, }; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_traverse::OriginalHash; use crate::{ model::{Era, EventContext, EventData}, @@ -89,7 +89,7 @@ impl EventWriter { let record = self.to_legacy_output_record(output)?; self.append(record.into())?; - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; let child = &self.child_writer(EventContext { output_address: address.to_string().into(), @@ -325,7 +325,7 @@ impl EventWriter { /// Shelley. In this way, we can avoid having to fork the crawling procedure /// for each different hard-fork. pub fn crawl_from_shelley_cbor(&self, cbor: &[u8], era: Era) -> Result<(), Error> { - let (_, block): (u16, MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_shelley_with_cbor(&block, cbor, era) } } diff --git a/src/model.rs b/src/model.rs index ad661bc3..89f894b4 100644 --- a/src/model.rs +++ b/src/model.rs @@ -25,6 +25,7 @@ pub enum Era { Mary, Alonzo, Babbage, + Conway, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -247,7 +248,7 @@ impl From for EventData { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct PlutusRedeemerRecord { pub purpose: String, - pub ex_units_mem: u32, + pub ex_units_mem: u64, pub ex_units_steps: u64, pub input_idx: u32, pub plutus_data: JsonValue, diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index b378753a..c52bad93 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -42,6 +42,7 @@ async fn send_pubsub_msg( Ok(()) } +#[allow(clippy::too_many_arguments)] pub fn writer_loop( input: StageReceiver, topic_name: &str, diff --git a/src/sources/common.rs b/src/sources/common.rs index 4687561e..7731a555 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -1,13 +1,9 @@ use core::fmt; use std::{ops::Deref, str::FromStr, time::Duration}; -use pallas::{ - ledger::traverse::{probe, Era}, - network::{ - miniprotocols::{chainsync, Point, MAINNET_MAGIC, TESTNET_MAGIC}, - multiplexer::{bearers::Bearer, StdChannel, StdPlexer}, - }, -}; +use pallas_miniprotocols::{chainsync, Point, MAINNET_MAGIC, TESTNET_MAGIC}; +use pallas_multiplexer::{bearers::Bearer, StdChannel, StdPlexer}; +use pallas_traverse::{probe, Era}; use serde::{de::Visitor, Deserializer}; use serde::{Deserialize, Serialize}; @@ -72,9 +68,9 @@ impl FromStr for PointArg { } } -impl ToString for PointArg { - fn to_string(&self) -> String { - format!("{},{}", self.0, self.1) +impl std::fmt::Display for PointArg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{},{}", self.0, self.1) } } @@ -262,77 +258,91 @@ pub fn should_finalize( false } -pub(crate) fn intersect_starting_point( - client: &mut chainsync::Client, - intersect_arg: &Option, - since_arg: &Option, - utils: &Utils, -) -> Result, Error> -where - chainsync::Message: pallas::codec::Fragment, -{ - let cursor = utils.get_cursor_if_any(); - - match cursor { - Some(cursor) => { - log::info!("found persisted cursor, will use as starting point"); - let desired = cursor.try_into()?; - let (point, _) = client.find_intersect(vec![desired])?; - - Ok(point) - } - None => match intersect_arg { - Some(IntersectArg::Fallbacks(x)) => { - log::info!("found 'fallbacks' intersect argument, will use as starting point"); - let options: Result, _> = x.iter().map(|x| x.clone().try_into()).collect(); - - let (point, _) = client.find_intersect(options?)?; - - Ok(point) - } - Some(IntersectArg::Origin) => { - log::info!("found 'origin' intersect argument, will use as starting point"); - - let point = client.intersect_origin()?; - - Ok(Some(point)) - } - Some(IntersectArg::Point(x)) => { - log::info!("found 'point' intersect argument, will use as starting point"); - let options = vec![x.clone().try_into()?]; - - let (point, _) = client.find_intersect(options)?; - - Ok(point) - } - Some(IntersectArg::Tip) => { - log::info!("found 'tip' intersect argument, will use as starting point"); - - let point = client.intersect_tip()?; - - Ok(Some(point)) - } - None => match since_arg { - Some(x) => { - log::info!("explicit 'since' argument, will use as starting point"); - log::warn!("`since` value is deprecated, please use `intersect`"); - let options = vec![x.clone().try_into()?]; - - let (point, _) = client.find_intersect(options)?; +macro_rules! intersect_starting_point { + ($fn:ident, $client:ty) => { + pub(crate) fn $fn( + client: &mut $client, + intersect_arg: &Option, + since_arg: &Option, + utils: &Utils, + ) -> Result, Error> { + let cursor = utils.get_cursor_if_any(); + + match cursor { + Some(cursor) => { + log::info!("found persisted cursor, will use as starting point"); + let desired = cursor.try_into()?; + let (point, _) = client.find_intersect(vec![desired])?; Ok(point) } - None => { - log::info!("no starting point specified, will use tip of chain"); + None => match intersect_arg { + Some(IntersectArg::Fallbacks(x)) => { + log::info!( + "found 'fallbacks' intersect argument, will use as starting point" + ); + let options: Result, _> = + x.iter().map(|x| x.clone().try_into()).collect(); + + let (point, _) = client.find_intersect(options?)?; + + Ok(point) + } + Some(IntersectArg::Origin) => { + log::info!("found 'origin' intersect argument, will use as starting point"); + + let point = client.intersect_origin()?; + + Ok(Some(point)) + } + Some(IntersectArg::Point(x)) => { + log::info!("found 'point' intersect argument, will use as starting point"); + let options = vec![x.clone().try_into()?]; + + let (point, _) = client.find_intersect(options)?; + + Ok(point) + } + Some(IntersectArg::Tip) => { + log::info!("found 'tip' intersect argument, will use as starting point"); + + let point = client.intersect_tip()?; + + Ok(Some(point)) + } + None => match since_arg { + Some(x) => { + log::info!("explicit 'since' argument, will use as starting point"); + log::warn!("`since` value is deprecated, please use `intersect`"); + let options = vec![x.clone().try_into()?]; + + let (point, _) = client.find_intersect(options)?; + + Ok(point) + } + None => { + log::info!("no starting point specified, will use tip of chain"); + + let point = client.intersect_tip()?; + + Ok(Some(point)) + } + }, + }, + } + } + }; +} - let point = client.intersect_tip()?; +intersect_starting_point!( + intersect_starting_point_n2n, + chainsync::N2NClient +); - Ok(Some(point)) - } - }, - }, - } -} +intersect_starting_point!( + intersect_starting_point_n2c, + chainsync::N2CClient +); pub fn unknown_block_to_events(writer: &EventWriter, body: &Vec) -> Result<(), Error> { match probe::block_era(body) { @@ -352,6 +362,11 @@ pub fn unknown_block_to_events(writer: &EventWriter, body: &Vec) -> Result<( .crawl_from_babbage_cbor(body) .ok_or_warn("error crawling babbage block for events"); } + Era::Conway => { + writer + .crawl_from_conway_cbor(body) + .ok_or_warn("error crawling conway block for events"); + } x => { return Err(format!("This version of Oura can't handle era: {x}").into()); } diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index a6aef999..2678813f 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -1,18 +1,14 @@ use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration}; -use pallas::{ - ledger::traverse::MultiEraBlock, - network::{ - miniprotocols::{chainsync, handshake, Point, MAINNET_MAGIC}, - multiplexer::StdChannel, - }, -}; +use pallas_miniprotocols::{chainsync, handshake, Point, MAINNET_MAGIC}; +use pallas_multiplexer::StdChannel; +use pallas_traverse::MultiEraBlock; use crate::{ mapper::EventWriter, pipelining::StageSender, sources::{ - intersect_starting_point, setup_multiplexer, should_finalize, unknown_block_to_events, + intersect_starting_point_n2c, setup_multiplexer, should_finalize, unknown_block_to_events, FinalizeConfig, }, utils::{retry, Utils}, @@ -217,7 +213,7 @@ fn do_chainsync_attempt( let mut client = chainsync::N2CClient::new(cs_channel); - let intersection = intersect_starting_point( + let intersection = intersect_starting_point_n2c( &mut client, &config.intersect, #[allow(deprecated)] diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index f2b83267..75936ecb 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -1,9 +1,7 @@ use std::{fmt::Debug, ops::Deref, sync::Arc, time::Duration}; -use pallas::network::{ - miniprotocols::{blockfetch, chainsync, handshake, Point, MAINNET_MAGIC}, - multiplexer::StdChannel, -}; +use pallas_miniprotocols::{blockfetch, chainsync, handshake, Point, MAINNET_MAGIC}; +use pallas_multiplexer::StdChannel; use std::sync::mpsc::{Receiver, SyncSender}; @@ -11,7 +9,7 @@ use crate::{ mapper::EventWriter, pipelining::StageSender, sources::{ - intersect_starting_point, setup_multiplexer, should_finalize, unknown_block_to_events, + intersect_starting_point_n2n, setup_multiplexer, should_finalize, unknown_block_to_events, FinalizeConfig, }, utils::{retry, Utils}, @@ -56,7 +54,7 @@ impl ChainObserver { ) -> Result { // parse the header and extract the point of the chain - let header = pallas::ledger::traverse::MultiEraHeader::decode( + let header = pallas_traverse::MultiEraHeader::decode( content.variant, content.byron_prefix.map(|x| x.0), &content.cbor, @@ -224,7 +222,7 @@ fn do_chainsync_attempt( let mut cs_client = chainsync::N2NClient::new(cs_channel); - let intersection = intersect_starting_point( + let intersection = intersect_starting_point_n2n( &mut cs_client, &config.intersect, #[allow(deprecated)] diff --git a/src/utils/mod.rs b/src/utils/mod.rs index b7c8375b..d142c883 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,7 +7,7 @@ use std::sync::Arc; -use pallas::network::miniprotocols::{Point, MAINNET_MAGIC, TESTNET_MAGIC}; +use pallas_miniprotocols::{Point, MAINNET_MAGIC, TESTNET_MAGIC}; // TODO: move these values to Pallas pub const PREPROD_MAGIC: u64 = 1; diff --git a/src/utils/time.rs b/src/utils/time.rs index da2bef4d..77db083a 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -9,6 +9,7 @@ pub(crate) trait TimeProvider { /// Maps between slots and wallclock fn slot_to_wallclock(&self, slot: u64) -> u64; fn absolute_slot_to_relative(&self, slot: u64) -> (u64, u64); + fn byron_epoch_slot_to_absolute(&self, epoch: u64, slot: u64) -> u64; } /// A naive, standalone implementation of a time provider @@ -66,6 +67,16 @@ fn compute_era_epoch(era_slot: u64, era_slot_length: u64, era_epoch_length: u64) (epoch, reminder) } +#[inline] +fn relative_slot_to_absolute( + epoch: u64, + sub_epoch_slot: u64, + epoch_length: u64, + slot_length: u64, +) -> u64 { + ((epoch * epoch_length) / slot_length) + sub_epoch_slot +} + impl TimeProvider for NaiveProvider { fn slot_to_wallclock(&self, slot: u64) -> u64 { let NaiveProvider { config, .. } = self; @@ -111,6 +122,15 @@ impl TimeProvider for NaiveProvider { (shelley_start_epoch + era_epoch, reminder) } } + + fn byron_epoch_slot_to_absolute(&self, epoch: u64, slot: u64) -> u64 { + relative_slot_to_absolute( + epoch, + slot, + self.config.byron_epoch_length as u64, + self.config.byron_slot_length as u64, + ) + } } #[cfg(test)]