tiders_rpc_client/
client.rs

1use std::collections::BTreeMap;
2use std::pin::Pin;
3
4use anyhow::{Context, Result};
5use arrow::record_batch::RecordBatch;
6use futures_lite::Stream;
7use log::{info, warn};
8
9use crate::config::ClientConfig;
10use crate::query::{analyze_query, Query};
11use crate::response::ArrowResponse;
12use crate::rpc::{
13    start_block_stream, start_coordinated_stream, start_log_stream, start_trace_stream, RpcProvider,
14};
15
16/// An async stream of [`ArrowResponse`] chunks.
17pub type DataStream = Pin<Box<dyn Stream<Item = Result<ArrowResponse>> + Send + Sync>>;
18
19/// The main entry point for fetching EVM data over JSON-RPC.
20///
21/// Wraps an alloy RPC provider and routes queries to the appropriate
22/// pipeline (blocks, logs, traces, or a coordinated combination).
23#[derive(Debug, Clone)]
24pub struct Client {
25    config: ClientConfig,
26    provider: RpcProvider,
27}
28
29impl Client {
30    /// Create a new client from configuration.
31    ///
32    /// This sets up the alloy RPC provider with retry/backoff layers.
33    pub fn new(config: ClientConfig) -> Result<Self> {
34        let provider = RpcProvider::new(&config).context("failed to create RPC provider")?;
35        Ok(Self { config, provider })
36    }
37
38    /// Return a reference to the client's configuration.
39    pub fn config(&self) -> &ClientConfig {
40        &self.config
41    }
42
43    /// Start a streaming query that yields `ArrowResponse` chunks.
44    ///
45    /// Selects the appropriate pipeline based on the query:
46    /// - If the query requires, uses the coordinated multi-pipeline stream.
47    /// - If only blocks and/or tx, uses the `eth_getBlockByNumber` pipeline.
48    /// - If only log requests are present, uses the `eth_getLogs` pipeline.
49    /// - If only trace requests are present, uses the `trace_block` pipeline.
50    pub fn stream(&self, query: Query) -> Result<DataStream> {
51        let pipelines = analyze_query(&query)?;
52
53        let rx = if pipelines.needs_coordinator() {
54            let mut config = self.config.clone();
55            if config.batch_size.is_none() {
56                config.batch_size = Some(100);
57                warn!(
58                    "Multi-pipeline stream requires a batch_size. \
59                     Defaulting to 100 blocks per batch. The client accumulates all \
60                     pipeline data for the full batch range before sending each response. \
61                     Set batch_size in ClientConfig to control memory usage and response granularity."
62                );
63            }
64            info!(
65                "Starting multi-pipeline stream (blocks_transactions={}, logs={}, traces={})",
66                pipelines.blocks_transactions, pipelines.logs, pipelines.traces
67            );
68            start_coordinated_stream(self.provider.clone(), query, config, pipelines)
69        } else if pipelines.logs {
70            info!("Starting log stream");
71            start_log_stream(self.provider.clone(), query, self.config.clone())
72        } else if pipelines.traces {
73            info!("Starting trace stream");
74            start_trace_stream(self.provider.clone(), query, self.config.clone())
75        } else {
76            info!("Starting block stream");
77            start_block_stream(self.provider.clone(), query, self.config.clone())
78        };
79        Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
80    }
81
82    /// Convert an `ArrowResponse` into the `BTreeMap<String, RecordBatch>`
83    /// format expected by the tiders-core ingest pipeline.
84    pub fn response_to_btree(response: ArrowResponse) -> BTreeMap<String, RecordBatch> {
85        let mut map = BTreeMap::new();
86        map.insert("blocks".to_owned(), response.blocks);
87        map.insert("transactions".to_owned(), response.transactions);
88        map.insert("logs".to_owned(), response.logs);
89        map.insert("traces".to_owned(), response.traces);
90        map
91    }
92}
93
94#[cfg(test)]
95#[expect(clippy::panic)]
96mod tests {
97    use std::fs::File;
98    use std::path::Path;
99
100    use alloy::primitives::{address, b256};
101    use arrow::record_batch::RecordBatch;
102    use futures_lite::StreamExt;
103    use parquet::arrow::ArrowWriter;
104
105    use super::*;
106    use crate::config::ClientConfig;
107    use crate::query::{
108        Address, BlockFields, Fields, LogFields, LogRequest, Query, Topic, TraceFields,
109        TraceRequest, TransactionFields, TransactionRequest,
110    };
111
112    /// Root of the tiders-rpc-client repository (one level above `rust/`).
113    const REPO_ROOT: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/..");
114
115    /// Tenderly free Ethereum mainnet gateway (same default as rindexer examples).
116    /// To run the traces test, you will need to get a free tenderly API key and
117    /// set the RPC URL, e.g. `https://mainnet.gateway.tenderly.co/your_key_here`.
118    const RPC_URL: &str = "https://mainnet.gateway.tenderly.co/";
119
120    fn make_client() -> Client {
121        Client::new(ClientConfig {
122            stop_on_head: true,
123            batch_size: Some(500),
124            ..ClientConfig::new(RPC_URL.to_owned())
125        })
126        .unwrap_or_else(|e| panic!("Failed to create client: {e}"))
127    }
128
129    /// Streaming query: fetches the last 5000 blocks up to the current head
130    /// (`stop_on_head: true`) and saves all log chunks to Parquet.
131    /// Run with: cargo test stream_reth_transfer_logs -- --nocapture
132    #[tokio::test]
133    async fn stream_reth_transfer_logs() {
134        // Initialise env_logger so log output is visible when running
135        // tests with `--nocapture`. Defaults to `info`; override with `RUST_LOG`.
136        // Silently ignores the error if a logger has already been set.
137        let _ = env_logger::builder()
138            .filter_level(log::LevelFilter::Info)
139            .parse_default_env()
140            .try_init();
141        let client = make_client();
142
143        // Resolve the current head and log it.
144        let latest_block = client
145            .provider
146            .get_block_number()
147            .await
148            .unwrap_or_else(|e| panic!("failed to get block number: {e}"));
149        log::info!("Latest block: {latest_block}");
150
151        let from_block = latest_block.saturating_sub(5_000);
152
153        let a = address!("ae78736cd615f374d3085123a210448e74fc6393");
154        let reth_address = Address(a.0 .0);
155
156        let t = b256!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef");
157        let transfer_topic0 = Topic(t.0);
158
159        let query = Query {
160            from_block,
161            to_block: Some(latest_block),
162            include_all_blocks: false,
163            logs: vec![LogRequest {
164                address: vec![reth_address],
165                topic0: vec![transfer_topic0],
166                ..LogRequest::default()
167            }],
168            transactions: Vec::new(),
169            traces: Vec::new(),
170            fields: Fields {
171                log: LogFields {
172                    removed: true,
173                    log_index: true,
174                    transaction_index: true,
175                    transaction_hash: true,
176                    block_hash: true,
177                    block_number: true,
178                    address: true,
179                    data: true,
180                    topic0: true,
181                    topic1: true,
182                    topic2: true,
183                    topic3: true,
184                },
185                block: BlockFields {
186                    ..BlockFields::default()
187                },
188                transaction: TransactionFields {
189                    ..TransactionFields::default()
190                },
191                trace: TraceFields {
192                    ..TraceFields::default()
193                },
194            },
195        };
196
197        let mut stream = client
198            .stream(query)
199            .unwrap_or_else(|e| panic!("stream creation failed: {e}"));
200
201        let mut total_log_rows = 0u64;
202        let mut chunk_count = 0u64;
203        let mut all_responses: Vec<ArrowResponse> = Vec::new();
204
205        while let Some(result) = stream.next().await {
206            let resp = result.unwrap_or_else(|e| panic!("stream chunk failed: {e}"));
207
208            // Every chunk should carry the correct schemas.
209            assert_eq!(
210                resp.logs.schema().as_ref(),
211                &tiders_evm_schema::logs_schema(),
212            );
213
214            // Unimplemented tables stay empty.
215            assert_eq!(resp.blocks.num_rows(), 0);
216            assert_eq!(resp.transactions.num_rows(), 0);
217            assert_eq!(resp.traces.num_rows(), 0);
218
219            total_log_rows += resp.logs.num_rows() as u64;
220            chunk_count += 1;
221            all_responses.push(resp);
222        }
223
224        assert!(
225            total_log_rows > 0,
226            "stream should have yielded Transfer logs"
227        );
228        assert!(
229            chunk_count > 0,
230            "stream should have yielded at least one chunk"
231        );
232
233        // Save all streamed chunks to parquet files.
234        let root = Path::new(REPO_ROOT);
235        for (name, extract) in [
236            (
237                "blocks",
238                (|r: &ArrowResponse| r.blocks.clone()) as fn(&ArrowResponse) -> RecordBatch,
239            ),
240            ("transactions", |r| r.transactions.clone()),
241            ("logs", |r| r.logs.clone()),
242            ("traces", |r| r.traces.clone()),
243        ] {
244            let batches: Vec<RecordBatch> = all_responses.iter().map(extract).collect();
245            let path = root.join(format!("data/stream_{name}.parquet"));
246            write_parquet(&path, &batches);
247        }
248    }
249
250    /// Streaming query: fetches the last 100 blocks up to the current head
251    /// via the block fetcher pipeline (`eth_getBlockByNumber`)and their
252    /// transaction receipts via `eth_getBlockReceipts` and saves
253    /// blocks and transactions to Parquet.
254    /// Run with: cargo test stream_blocks_and_transactions -- --nocapture
255    #[tokio::test]
256    async fn stream_blocks_and_transactions() {
257        let _ = env_logger::builder()
258            .filter_level(log::LevelFilter::Info)
259            .parse_default_env()
260            .try_init();
261
262        let client = Client::new(ClientConfig {
263            stop_on_head: true,
264            batch_size: Some(10),
265            reorg_safe_distance: 10,
266            ..ClientConfig::new(RPC_URL.to_owned())
267        })
268        .unwrap_or_else(|e| panic!("Failed to create client: {e}"));
269
270        let latest_block = client
271            .provider
272            .get_block_number()
273            .await
274            .unwrap_or_else(|e| panic!("failed to get block number: {e}"));
275        log::info!("Latest block: {latest_block}");
276
277        let from_block = latest_block.saturating_sub(20);
278
279        let query = Query {
280            from_block,
281            to_block: None,
282            include_all_blocks: true,
283            logs: Vec::new(),
284            transactions: vec![TransactionRequest {
285                ..TransactionRequest::default()
286            }],
287            traces: Vec::new(),
288            fields: Fields {
289                block: BlockFields {
290                    number: true,
291                    hash: true,
292                    timestamp: true,
293                    gas_used: true,
294                    gas_limit: true,
295                    base_fee_per_gas: true,
296                    miner: true,
297                    ..BlockFields::default()
298                },
299                transaction: TransactionFields {
300                    hash: true,
301                    // Receipt-only fields:
302                    gas_used: true,
303                    status: true,
304                    effective_gas_price: true,
305                    cumulative_gas_used: true,
306                    contract_address: true,
307                    logs_bloom: true,
308                    ..TransactionFields::default()
309                },
310                log: LogFields::default(),
311                trace: TraceFields::default(),
312            },
313        };
314
315        let mut stream = client
316            .stream(query)
317            .unwrap_or_else(|e| panic!("stream creation failed: {e}"));
318
319        let mut total_block_rows = 0u64;
320        let mut total_tx_rows = 0u64;
321        let mut chunk_count = 0u64;
322        let mut all_responses: Vec<ArrowResponse> = Vec::new();
323
324        while let Some(result) = stream.next().await {
325            let resp = result.unwrap_or_else(|e| panic!("stream chunk failed: {e}"));
326
327            // Blocks batch should contain exactly the requested fields.
328            let block_schema = resp.blocks.schema();
329            assert!(block_schema.field_with_name("number").is_ok());
330            assert!(block_schema.field_with_name("hash").is_ok());
331            assert!(block_schema.field_with_name("timestamp").is_ok());
332            assert!(block_schema.field_with_name("gas_used").is_ok());
333            assert!(block_schema.field_with_name("gas_limit").is_ok());
334            assert!(block_schema.field_with_name("base_fee_per_gas").is_ok());
335            assert!(block_schema.field_with_name("miner").is_ok());
336            assert_eq!(block_schema.fields().len(), 7);
337
338            // Transactions batch should contain exactly the requested fields.
339            let tx_schema = resp.transactions.schema();
340            assert!(tx_schema.field_with_name("hash").is_ok());
341            assert_eq!(tx_schema.fields().len(), 7);
342
343            // Log and trace tables should be empty for the block pipeline.
344            assert_eq!(resp.logs.num_rows(), 0);
345            assert_eq!(resp.traces.num_rows(), 0);
346
347            total_block_rows += resp.blocks.num_rows() as u64;
348            total_tx_rows += resp.transactions.num_rows() as u64;
349            chunk_count += 1;
350            all_responses.push(resp);
351        }
352
353        assert!(
354            total_block_rows > 0,
355            "stream should have yielded block rows"
356        );
357        assert!(
358            chunk_count > 0,
359            "stream should have yielded at least one chunk"
360        );
361
362        log::info!(
363            "Streamed {total_block_rows} block rows and {total_tx_rows} tx rows \
364             in {chunk_count} chunks"
365        );
366
367        // Save all streamed chunks to parquet files.
368        let root = Path::new(REPO_ROOT);
369        for (name, extract) in [
370            (
371                "blocks",
372                (|r: &ArrowResponse| r.blocks.clone()) as fn(&ArrowResponse) -> RecordBatch,
373            ),
374            ("transactions", |r| r.transactions.clone()),
375            ("logs", |r| r.logs.clone()),
376            ("traces", |r| r.traces.clone()),
377        ] {
378            let batches: Vec<RecordBatch> = all_responses.iter().map(extract).collect();
379            let path = root.join(format!("data/block_stream_{name}.parquet"));
380            write_parquet(&path, &batches);
381        }
382    }
383
384    /// Multi-pipeline streaming query: fetches 100 blocks in 10-block batches
385    /// via the coordinated stream, exercising blocks, transactions (with
386    /// receipts), logs, and traces simultaneously.
387    /// Run with: cargo test stream_multi_pipeline -- --nocapture
388    #[tokio::test]
389    async fn stream_multi_pipeline() {
390        let _ = env_logger::builder()
391            .filter_level(log::LevelFilter::Info)
392            .parse_default_env()
393            .try_init();
394
395        let client = Client::new(ClientConfig {
396            stop_on_head: true,
397            batch_size: Some(10),
398            ..ClientConfig::new(RPC_URL.to_owned())
399        })
400        .unwrap_or_else(|e| panic!("Failed to create client: {e}"));
401
402        let latest_block = client
403            .provider
404            .get_block_number()
405            .await
406            .unwrap_or_else(|e| panic!("failed to get block number: {e}"));
407        log::info!("Latest block: {latest_block}");
408
409        let from_block = latest_block.saturating_sub(30);
410
411        let query = Query {
412            from_block,
413            to_block: Some(latest_block),
414            include_all_blocks: true,
415            logs: Vec::new(),
416            // TransactionRequest with include_logs + include_traces triggers
417            // the coordinated multi-pipeline stream.
418            transactions: vec![TransactionRequest {
419                include_logs: true,
420                // include_traces: true,
421                ..TransactionRequest::default()
422            }],
423            traces: Vec::new(),
424            fields: Fields {
425                block: BlockFields {
426                    number: true,
427                    hash: true,
428                    timestamp: true,
429                    gas_used: true,
430                    gas_limit: true,
431                    base_fee_per_gas: true,
432                    miner: true,
433                    ..BlockFields::default()
434                },
435                transaction: TransactionFields {
436                    hash: true,
437                    from: true,
438                    to: true,
439                    value: true,
440                    // Receipt fields:
441                    gas_used: true,
442                    status: true,
443                    effective_gas_price: true,
444                    ..TransactionFields::default()
445                },
446                log: LogFields {
447                    block_number: true,
448                    transaction_hash: true,
449                    address: true,
450                    topic0: true,
451                    data: true,
452                    ..LogFields::default()
453                },
454                trace: TraceFields {
455                    //     block_number: true,
456                    //     transaction_hash: true,
457                    //     from: true,
458                    //     to: true,
459                    //     value: true,
460                    //     call_type: true,
461                    //     gas_used: true,
462                    ..TraceFields::default()
463                },
464            },
465        };
466
467        let mut stream = client
468            .stream(query)
469            .unwrap_or_else(|e| panic!("stream creation failed: {e}"));
470
471        let mut total_block_rows = 0u64;
472        let mut total_tx_rows = 0u64;
473        let mut total_log_rows = 0u64;
474        let mut chunk_count = 0u64;
475        let mut all_responses: Vec<ArrowResponse> = Vec::new();
476
477        while let Some(result) = stream.next().await {
478            let resp = result.unwrap_or_else(|e| panic!("stream chunk failed: {e}"));
479
480            // All four tables must carry correct field counts.
481            assert_eq!(resp.blocks.schema().fields().len(), 7);
482            assert_eq!(resp.transactions.schema().fields().len(), 7);
483            assert_eq!(resp.logs.schema().fields().len(), 5);
484
485            total_block_rows += resp.blocks.num_rows() as u64;
486            total_tx_rows += resp.transactions.num_rows() as u64;
487            total_log_rows += resp.logs.num_rows() as u64;
488            chunk_count += 1;
489            all_responses.push(resp);
490        }
491
492        assert!(total_block_rows > 0, "should have yielded block rows");
493        assert!(total_tx_rows > 0, "should have yielded transaction rows");
494        assert!(total_log_rows > 0, "should have yielded log rows");
495        assert!(chunk_count > 0, "should have yielded at least one chunk");
496
497        log::info!(
498            "Multi-pipeline: {total_block_rows} blocks, {total_tx_rows} txs, \
499             {total_log_rows} logs, in {chunk_count} chunks"
500        );
501
502        // Save all streamed chunks to parquet files.
503        let root = Path::new(REPO_ROOT);
504        for (name, extract) in [
505            (
506                "blocks",
507                (|r: &ArrowResponse| r.blocks.clone()) as fn(&ArrowResponse) -> RecordBatch,
508            ),
509            ("transactions", |r| r.transactions.clone()),
510            ("logs", |r| r.logs.clone()),
511        ] {
512            let batches: Vec<RecordBatch> = all_responses.iter().map(extract).collect();
513            let path = root.join(format!("data/multi_pipeline_{name}.parquet"));
514            write_parquet(&path, &batches);
515        }
516    }
517
518    /// Streaming query: fetches the last 20 blocks via the trace pipeline
519    /// (`trace_block`) and saves traces to Parquet.
520    /// To run the traces test, you will need to get a free tenderly API key and
521    /// set the RPC URL, e.g. `https://mainnet.gateway.tenderly.co/your_key_here`
522    /// Run with: cargo test stream_traces -- --nocapturecargo test stream_traces -- --nocapture
523    #[tokio::test]
524    async fn stream_traces() {
525        let _ = env_logger::builder()
526            .filter_level(log::LevelFilter::Info)
527            .parse_default_env()
528            .try_init();
529
530        let client = Client::new(ClientConfig {
531            stop_on_head: true,
532            batch_size: Some(5),
533            ..ClientConfig::new(RPC_URL.to_owned())
534        })
535        .unwrap_or_else(|e| panic!("Failed to create client: {e}"));
536
537        let latest_block = client
538            .provider
539            .get_block_number()
540            .await
541            .unwrap_or_else(|e| panic!("failed to get block number: {e}"));
542        log::info!("Latest block: {latest_block}");
543
544        let from_block = latest_block.saturating_sub(10);
545
546        let query = Query {
547            from_block,
548            to_block: Some(latest_block),
549            include_all_blocks: false,
550            logs: Vec::new(),
551            transactions: Vec::new(),
552            traces: vec![TraceRequest {
553                ..TraceRequest::default()
554            }],
555            fields: Fields {
556                block: BlockFields::default(),
557                transaction: TransactionFields::default(),
558                log: LogFields::default(),
559                trace: TraceFields {
560                    block_number: true,
561                    transaction_hash: true,
562                    from: true,
563                    to: true,
564                    value: true,
565                    call_type: true,
566                    gas: true,
567                    gas_used: true,
568                    input: true,
569                    output: true,
570                    subtraces: true,
571                    trace_address: true,
572                    type_: true,
573                    error: true,
574                    ..TraceFields::default()
575                },
576            },
577        };
578
579        let mut stream = client
580            .stream(query)
581            .unwrap_or_else(|e| panic!("stream creation failed: {e}"));
582
583        let mut total_trace_rows = 0u64;
584        let mut chunk_count = 0u64;
585        let mut all_responses: Vec<ArrowResponse> = Vec::new();
586
587        while let Some(result) = stream.next().await {
588            let resp = result.unwrap_or_else(|e| panic!("stream chunk failed: {e}"));
589
590            // Trace schema should contain exactly the requested fields.
591            let trace_schema = resp.traces.schema();
592            assert!(trace_schema.field_with_name("block_number").is_ok());
593            assert!(trace_schema.field_with_name("transaction_hash").is_ok());
594            assert!(trace_schema.field_with_name("from").is_ok());
595            assert!(trace_schema.field_with_name("to").is_ok());
596            assert!(trace_schema.field_with_name("call_type").is_ok());
597            assert!(trace_schema.field_with_name("gas_used").is_ok());
598            assert!(trace_schema.field_with_name("type").is_ok());
599            assert_eq!(trace_schema.fields().len(), 14);
600
601            // Block, transaction, and log tables should be empty for trace-only pipeline.
602            assert_eq!(resp.blocks.num_rows(), 0);
603            assert_eq!(resp.transactions.num_rows(), 0);
604            assert_eq!(resp.logs.num_rows(), 0);
605
606            total_trace_rows += resp.traces.num_rows() as u64;
607            chunk_count += 1;
608            all_responses.push(resp);
609        }
610
611        assert!(total_trace_rows > 0, "should have yielded trace rows");
612        assert!(chunk_count > 0, "should have yielded at least one chunk");
613
614        log::info!("Streamed {total_trace_rows} trace rows in {chunk_count} chunks");
615
616        let root = Path::new(REPO_ROOT);
617        for (name, extract) in [(
618            "traces",
619            (|r: &ArrowResponse| r.traces.clone()) as fn(&ArrowResponse) -> RecordBatch,
620        )] {
621            let batches: Vec<RecordBatch> = all_responses.iter().map(extract).collect();
622            let path = root.join(format!("data/trace_stream_{name}.parquet"));
623            write_parquet(&path, &batches);
624        }
625    }
626
627    // helper function to write a sequence of RecordBatches to a Parquet file,
628    // used to inspect streamed data in tests. Recommend using a vscode extension
629    // like "Parquet Viewer" to open the resulting files.
630    fn write_parquet(path: &Path, batches: &[RecordBatch]) {
631        if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
632            return;
633        }
634        let schema = batches[0].schema();
635        if let Some(parent) = path.parent() {
636            std::fs::create_dir_all(parent)
637                .unwrap_or_else(|e| panic!("create dir {}: {e}", parent.display()));
638        }
639        let file = File::create(path).unwrap_or_else(|e| panic!("create {}: {e}", path.display()));
640        let mut writer = ArrowWriter::try_new(file, schema, None)
641            .unwrap_or_else(|e| panic!("ArrowWriter::try_new: {e}"));
642        for batch in batches {
643            writer
644                .write(batch)
645                .unwrap_or_else(|e| panic!("ArrowWriter::write: {e}"));
646        }
647        writer
648            .close()
649            .unwrap_or_else(|e| panic!("ArrowWriter::close: {e}"));
650    }
651}