tiders_ingest/
lib.rs

1#![expect(
2    clippy::should_implement_trait,
3    reason = "LogKind::from_str is a constructor pattern, not Trait impl"
4)]
5#![expect(
6    clippy::field_reassign_with_default,
7    reason = "ProviderConfig is built by setting fields after construction"
8)]
9
10//! # tiders-ingest
11//!
12//! Streams blockchain data from multiple provider backends as Apache Arrow RecordBatches.
13//!
14//! Supports both EVM (Ethereum) and SVM (Solana) chains through a unified [`Query`] enum.
15//! Data is fetched from one of three provider backends:
16//!
17//! - **SQD** ([`ProviderKind::Sqd`]) — SQD Network portal for historical data.
18//! - **HyperSync** ([`ProviderKind::Hypersync`]) — Envio HyperSync for fast historical data.
19//! - **RPC** ([`ProviderKind::Rpc`]) — Direct JSON-RPC node connection.
20//!
21//! Use [`start_stream`] to create an async stream of `BTreeMap<String, RecordBatch>` where
22//! keys are table names (e.g. "blocks", "transactions", "logs").
23
24use std::{collections::BTreeMap, pin::Pin, sync::Arc};
25
26#[cfg(feature = "pyo3")]
27use anyhow::anyhow;
28use anyhow::{Context, Result};
29use arrow::record_batch::RecordBatch;
30use futures_lite::{Stream, StreamExt};
31use provider::common::{evm_query_to_generic, svm_query_to_generic};
32use serde::de::DeserializeOwned;
33
34pub mod evm;
35mod provider;
36mod rayon_async;
37pub mod svm;
38
39/// Top-level query type: either an EVM or SVM blockchain data query.
40#[derive(Debug, Clone)]
41pub enum Query {
42    Evm(evm::Query),
43    Svm(svm::Query),
44}
45
46#[cfg(feature = "pyo3")]
47impl<'py> pyo3::FromPyObject<'py> for Query {
48    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
49        use pyo3::types::PyAnyMethods;
50
51        let kind = ob.getattr("kind").context("get kind attribute")?;
52        let kind: &str = kind.extract().context("kind as str")?;
53
54        let query = ob.getattr("params").context("get params attribute")?;
55
56        match kind {
57            "evm" => Ok(Self::Evm(query.extract().context("parse query")?)),
58            "svm" => Ok(Self::Svm(query.extract().context("parse query")?)),
59            _ => Err(anyhow!("unknown query kind: {kind}").into()),
60        }
61    }
62}
63
64/// Configuration for a data provider, including connection details and retry settings.
65#[derive(Debug, Clone)]
66#[cfg_attr(feature = "pyo3", derive(pyo3::FromPyObject))]
67pub struct ProviderConfig {
68    pub kind: ProviderKind,
69    pub url: Option<String>,
70    pub bearer_token: Option<String>,
71    pub max_num_retries: Option<usize>,
72    pub retry_backoff_ms: Option<u64>,
73    pub retry_base_ms: Option<u64>,
74    pub retry_ceiling_ms: Option<u64>,
75    pub req_timeout_millis: Option<u64>,
76    pub stop_on_head: bool,
77    pub head_poll_interval_millis: Option<u64>,
78    pub buffer_size: Option<usize>,
79    // RPC-specific fields
80    pub compute_units_per_second: Option<u64>,
81    pub batch_size: Option<usize>,
82    pub reorg_safe_distance: Option<u64>,
83    pub trace_method: Option<RpcTraceMethod>,
84}
85
86impl ProviderConfig {
87    pub fn new(kind: ProviderKind) -> Self {
88        Self {
89            kind,
90            url: None,
91            bearer_token: None,
92            max_num_retries: None,
93            retry_backoff_ms: None,
94            retry_base_ms: None,
95            retry_ceiling_ms: None,
96            req_timeout_millis: None,
97            stop_on_head: false,
98            head_poll_interval_millis: None,
99            buffer_size: None,
100            compute_units_per_second: None,
101            batch_size: None,
102            reorg_safe_distance: None,
103            trace_method: None,
104        }
105    }
106}
107
108/// Selects which RPC method to use for fetching EVM execution traces.
109#[derive(Debug, Clone, Copy)]
110pub enum RpcTraceMethod {
111    /// Uses `trace_block` (Parity/OpenEthereum style).
112    TraceBlock,
113    /// Uses `debug_traceBlockByNumber` (Geth style).
114    DebugTraceBlockByNumber,
115}
116
117#[cfg(feature = "pyo3")]
118impl<'py> pyo3::FromPyObject<'py> for RpcTraceMethod {
119    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
120        use pyo3::types::PyAnyMethods;
121
122        let out: &str = ob.extract().context("read as string")?;
123
124        match out {
125            "trace_block" => Ok(Self::TraceBlock),
126            "debug_trace_block_by_number" => Ok(Self::DebugTraceBlockByNumber),
127            _ => Err(anyhow!("unknown trace method: {out}").into()),
128        }
129    }
130}
131
132/// The type of data provider backend.
133#[derive(Debug, Clone, Copy)]
134pub enum ProviderKind {
135    /// SQD Network portal.
136    Sqd,
137    /// Envio HyperSync.
138    Hypersync,
139    /// Direct JSON-RPC node.
140    Rpc,
141}
142
143#[cfg(feature = "pyo3")]
144impl<'py> pyo3::FromPyObject<'py> for ProviderKind {
145    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
146        use pyo3::types::PyAnyMethods;
147
148        let out: &str = ob.extract().context("read as string")?;
149
150        match out {
151            "sqd" => Ok(Self::Sqd),
152            "hypersync" => Ok(Self::Hypersync),
153            "rpc" => Ok(Self::Rpc),
154            _ => Err(anyhow!("unknown provider kind: {out}").into()),
155        }
156    }
157}
158
159type DataStream = Pin<Box<dyn Stream<Item = Result<BTreeMap<String, RecordBatch>>> + Send + Sync>>;
160
161fn make_req_fields<T: DeserializeOwned>(query: &tiders_query::Query) -> Result<T> {
162    let mut req_fields_query = query.clone();
163    req_fields_query
164        .add_request_and_include_fields()
165        .context("add req and include fields")?;
166
167    let fields = req_fields_query
168        .fields
169        .into_iter()
170        .map(|(k, v)| -> Result<_> {
171            Ok((
172                k.strip_suffix('s')
173                    .context("field key should end with 's'")?
174                    .to_owned(),
175                v.into_iter()
176                    .map(|v| (v, true))
177                    .collect::<BTreeMap<String, bool>>(),
178            ))
179        })
180        .collect::<Result<BTreeMap<String, _>>>()?;
181
182    let json_value = serde_json::to_value(&fields).context("serialize fields to JSON")?;
183    serde_json::from_value(json_value).context("deserialize fields from JSON")
184}
185
186/// Creates an async stream of Arrow RecordBatches from the configured provider.
187pub async fn start_stream(provider_config: ProviderConfig, mut query: Query) -> Result<DataStream> {
188    let generic_query = match &mut query {
189        Query::Evm(evm_query) => {
190            let generic_query = evm_query_to_generic(evm_query).context("validate evm query")?;
191
192            evm_query.fields = make_req_fields(&generic_query).context("make req fields")?;
193
194            generic_query
195        }
196        Query::Svm(svm_query) => {
197            let generic_query = svm_query_to_generic(svm_query);
198
199            svm_query.fields = make_req_fields(&generic_query).context("make req fields")?;
200
201            generic_query
202        }
203    };
204    let generic_query = Arc::new(generic_query);
205
206    let stream = match provider_config.kind {
207        ProviderKind::Sqd => {
208            provider::sqd::start_stream(provider_config, query).context("start sqd stream")?
209        }
210        ProviderKind::Hypersync => provider::hypersync::start_stream(provider_config, query)
211            .await
212            .context("start hypersync stream")?,
213        ProviderKind::Rpc => {
214            provider::rpc::start_stream(&provider_config, query).context("start rpc stream")?
215        }
216    };
217
218    let stream = stream.then(move |res| {
219        let generic_query = Arc::clone(&generic_query);
220        async {
221            rayon_async::spawn(move || {
222                res.and_then(move |data| {
223                    let data = tiders_query::run_query(&data, &generic_query)
224                        .context("run local query")?;
225                    Ok(data)
226                })
227            })
228            .await
229            .context("rayon task was cancelled")
230            .and_then(|r| r)
231        }
232    });
233
234    Ok(Box::pin(stream))
235}
236
237#[cfg(test)]
238mod tests {
239
240    use super::*;
241    use crate::svm::*;
242    use parquet::arrow::ArrowWriter;
243    use std::fs::File;
244
245    #[tokio::test]
246    #[ignore]
247    async fn simple_svm_start_stream() {
248        let mut provider_config = ProviderConfig::new(ProviderKind::Sqd);
249        provider_config.url = Some("https://portal.sqd.dev/datasets/solana-mainnet".to_string());
250
251        let program_id = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA";
252        let program_id: [u8; 32] = bs58::decode(program_id)
253            .into_vec()
254            .unwrap()
255            .try_into()
256            .unwrap();
257        let program_id = Address(program_id);
258
259        let query = crate::Query::Svm(svm::Query {
260            from_block: 329443000,
261            to_block: Some(329443000),
262            include_all_blocks: false,
263            fields: Fields {
264                instruction: InstructionFields::all(),
265                transaction: TransactionFields::default(),
266                log: LogFields::default(),
267                balance: BalanceFields::default(),
268                token_balance: TokenBalanceFields::default(),
269                reward: RewardFields::default(),
270                block: BlockFields::default(),
271            },
272            instructions: vec![
273                // InstructionRequest::default() ,
274                InstructionRequest {
275                    program_id: vec![program_id],
276                    discriminator: vec![Data(vec![12, 96, 49, 128, 22])],
277                    ..Default::default()
278                },
279            ],
280            transactions: vec![],
281            logs: vec![],
282            balances: vec![],
283            token_balances: vec![],
284            rewards: vec![],
285        });
286        let mut stream = start_stream(provider_config, query).await.unwrap();
287        let data = stream.next().await.unwrap().unwrap();
288        for (k, v) in data.into_iter() {
289            let mut file = File::create(format!("{}.parquet", k)).unwrap();
290            let mut writer = ArrowWriter::try_new(&mut file, v.schema(), None).unwrap();
291            writer.write(&v).unwrap();
292            writer.close().unwrap();
293        }
294    }
295
296    #[tokio::test]
297    async fn simple_rpc_start_stream() {
298        let mut provider_config = ProviderConfig::new(ProviderKind::Rpc);
299        provider_config.url = Some("http://localhost:8545".to_string());
300
301        let query = crate::Query::Evm(evm::Query {
302            from_block: 0,
303            to_block: Some(0),
304            include_all_blocks: true,
305            logs: vec![],
306            transactions: vec![],
307            traces: vec![],
308            fields: evm::Fields::all(),
309        });
310
311        let mut stream = start_stream(provider_config, query).await.unwrap();
312        let data = stream.next().await.unwrap().unwrap();
313
314        // The RPC provider returns empty batches (Part 1 scaffolding).
315        // `run_query` post-filters, so only tables referenced in the
316        // generic query survive. `include_all_blocks` guarantees "blocks".
317        assert!(data.contains_key("blocks"));
318
319        for (_name, batch) in &data {
320            assert_eq!(batch.num_rows(), 0);
321        }
322    }
323}