1use std::collections::BTreeMap;
2use std::pin::Pin;
3
4use anyhow::{Context, Result};
5use arrow::record_batch::RecordBatch;
6use futures_lite::Stream;
7use log::{info, warn};
8
9use crate::config::ClientConfig;
10use crate::query::{analyze_query, Query};
11use crate::response::ArrowResponse;
12use crate::rpc::{
13 start_block_stream, start_coordinated_stream, start_log_stream, start_trace_stream, RpcProvider,
14};
15
16pub type DataStream = Pin<Box<dyn Stream<Item = Result<ArrowResponse>> + Send + Sync>>;
18
19#[derive(Debug, Clone)]
24pub struct Client {
25 config: ClientConfig,
26 provider: RpcProvider,
27}
28
29impl Client {
30 pub fn new(config: ClientConfig) -> Result<Self> {
34 let provider = RpcProvider::new(&config).context("failed to create RPC provider")?;
35 Ok(Self { config, provider })
36 }
37
38 pub fn config(&self) -> &ClientConfig {
40 &self.config
41 }
42
43 pub fn stream(&self, query: Query) -> Result<DataStream> {
51 let pipelines = analyze_query(&query)?;
52
53 let rx = if pipelines.needs_coordinator() {
54 let mut config = self.config.clone();
55 if config.batch_size.is_none() {
56 config.batch_size = Some(100);
57 warn!(
58 "Multi-pipeline stream requires a batch_size. \
59 Defaulting to 100 blocks per batch. The client accumulates all \
60 pipeline data for the full batch range before sending each response. \
61 Set batch_size in ClientConfig to control memory usage and response granularity."
62 );
63 }
64 info!(
65 "Starting multi-pipeline stream (blocks_transactions={}, logs={}, traces={})",
66 pipelines.blocks_transactions, pipelines.logs, pipelines.traces
67 );
68 start_coordinated_stream(self.provider.clone(), query, config, pipelines)
69 } else if pipelines.logs {
70 info!("Starting log stream");
71 start_log_stream(self.provider.clone(), query, self.config.clone())
72 } else if pipelines.traces {
73 info!("Starting trace stream");
74 start_trace_stream(self.provider.clone(), query, self.config.clone())
75 } else {
76 info!("Starting block stream");
77 start_block_stream(self.provider.clone(), query, self.config.clone())
78 };
79 Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
80 }
81
82 pub fn response_to_btree(response: ArrowResponse) -> BTreeMap<String, RecordBatch> {
85 let mut map = BTreeMap::new();
86 map.insert("blocks".to_owned(), response.blocks);
87 map.insert("transactions".to_owned(), response.transactions);
88 map.insert("logs".to_owned(), response.logs);
89 map.insert("traces".to_owned(), response.traces);
90 map
91 }
92}
93
94#[cfg(test)]
95#[expect(clippy::panic)]
96mod tests {
97 use std::fs::File;
98 use std::path::Path;
99
100 use alloy::primitives::{address, b256};
101 use arrow::record_batch::RecordBatch;
102 use futures_lite::StreamExt;
103 use parquet::arrow::ArrowWriter;
104
105 use super::*;
106 use crate::config::ClientConfig;
107 use crate::query::{
108 Address, BlockFields, Fields, LogFields, LogRequest, Query, Topic, TraceFields,
109 TraceRequest, TransactionFields, TransactionRequest,
110 };
111
112 const REPO_ROOT: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/..");
114
115 const RPC_URL: &str = "https://mainnet.gateway.tenderly.co/";
119
120 fn make_client() -> Client {
121 Client::new(ClientConfig {
122 stop_on_head: true,
123 batch_size: Some(500),
124 ..ClientConfig::new(RPC_URL.to_owned())
125 })
126 .unwrap_or_else(|e| panic!("Failed to create client: {e}"))
127 }
128
129 #[tokio::test]
133 async fn stream_reth_transfer_logs() {
134 let _ = env_logger::builder()
138 .filter_level(log::LevelFilter::Info)
139 .parse_default_env()
140 .try_init();
141 let client = make_client();
142
143 let latest_block = client
145 .provider
146 .get_block_number()
147 .await
148 .unwrap_or_else(|e| panic!("failed to get block number: {e}"));
149 log::info!("Latest block: {latest_block}");
150
151 let from_block = latest_block.saturating_sub(5_000);
152
153 let a = address!("ae78736cd615f374d3085123a210448e74fc6393");
154 let reth_address = Address(a.0 .0);
155
156 let t = b256!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef");
157 let transfer_topic0 = Topic(t.0);
158
159 let query = Query {
160 from_block,
161 to_block: Some(latest_block),
162 include_all_blocks: false,
163 logs: vec![LogRequest {
164 address: vec![reth_address],
165 topic0: vec![transfer_topic0],
166 ..LogRequest::default()
167 }],
168 transactions: Vec::new(),
169 traces: Vec::new(),
170 fields: Fields {
171 log: LogFields {
172 removed: true,
173 log_index: true,
174 transaction_index: true,
175 transaction_hash: true,
176 block_hash: true,
177 block_number: true,
178 address: true,
179 data: true,
180 topic0: true,
181 topic1: true,
182 topic2: true,
183 topic3: true,
184 },
185 block: BlockFields {
186 ..BlockFields::default()
187 },
188 transaction: TransactionFields {
189 ..TransactionFields::default()
190 },
191 trace: TraceFields {
192 ..TraceFields::default()
193 },
194 },
195 };
196
197 let mut stream = client
198 .stream(query)
199 .unwrap_or_else(|e| panic!("stream creation failed: {e}"));
200
201 let mut total_log_rows = 0u64;
202 let mut chunk_count = 0u64;
203 let mut all_responses: Vec<ArrowResponse> = Vec::new();
204
205 while let Some(result) = stream.next().await {
206 let resp = result.unwrap_or_else(|e| panic!("stream chunk failed: {e}"));
207
208 assert_eq!(
210 resp.logs.schema().as_ref(),
211 &tiders_evm_schema::logs_schema(),
212 );
213
214 assert_eq!(resp.blocks.num_rows(), 0);
216 assert_eq!(resp.transactions.num_rows(), 0);
217 assert_eq!(resp.traces.num_rows(), 0);
218
219 total_log_rows += resp.logs.num_rows() as u64;
220 chunk_count += 1;
221 all_responses.push(resp);
222 }
223
224 assert!(
225 total_log_rows > 0,
226 "stream should have yielded Transfer logs"
227 );
228 assert!(
229 chunk_count > 0,
230 "stream should have yielded at least one chunk"
231 );
232
233 let root = Path::new(REPO_ROOT);
235 for (name, extract) in [
236 (
237 "blocks",
238 (|r: &ArrowResponse| r.blocks.clone()) as fn(&ArrowResponse) -> RecordBatch,
239 ),
240 ("transactions", |r| r.transactions.clone()),
241 ("logs", |r| r.logs.clone()),
242 ("traces", |r| r.traces.clone()),
243 ] {
244 let batches: Vec<RecordBatch> = all_responses.iter().map(extract).collect();
245 let path = root.join(format!("data/stream_{name}.parquet"));
246 write_parquet(&path, &batches);
247 }
248 }
249
250 #[tokio::test]
256 async fn stream_blocks_and_transactions() {
257 let _ = env_logger::builder()
258 .filter_level(log::LevelFilter::Info)
259 .parse_default_env()
260 .try_init();
261
262 let client = Client::new(ClientConfig {
263 stop_on_head: true,
264 batch_size: Some(10),
265 reorg_safe_distance: 10,
266 ..ClientConfig::new(RPC_URL.to_owned())
267 })
268 .unwrap_or_else(|e| panic!("Failed to create client: {e}"));
269
270 let latest_block = client
271 .provider
272 .get_block_number()
273 .await
274 .unwrap_or_else(|e| panic!("failed to get block number: {e}"));
275 log::info!("Latest block: {latest_block}");
276
277 let from_block = latest_block.saturating_sub(20);
278
279 let query = Query {
280 from_block,
281 to_block: None,
282 include_all_blocks: true,
283 logs: Vec::new(),
284 transactions: vec![TransactionRequest {
285 ..TransactionRequest::default()
286 }],
287 traces: Vec::new(),
288 fields: Fields {
289 block: BlockFields {
290 number: true,
291 hash: true,
292 timestamp: true,
293 gas_used: true,
294 gas_limit: true,
295 base_fee_per_gas: true,
296 miner: true,
297 ..BlockFields::default()
298 },
299 transaction: TransactionFields {
300 hash: true,
301 gas_used: true,
303 status: true,
304 effective_gas_price: true,
305 cumulative_gas_used: true,
306 contract_address: true,
307 logs_bloom: true,
308 ..TransactionFields::default()
309 },
310 log: LogFields::default(),
311 trace: TraceFields::default(),
312 },
313 };
314
315 let mut stream = client
316 .stream(query)
317 .unwrap_or_else(|e| panic!("stream creation failed: {e}"));
318
319 let mut total_block_rows = 0u64;
320 let mut total_tx_rows = 0u64;
321 let mut chunk_count = 0u64;
322 let mut all_responses: Vec<ArrowResponse> = Vec::new();
323
324 while let Some(result) = stream.next().await {
325 let resp = result.unwrap_or_else(|e| panic!("stream chunk failed: {e}"));
326
327 let block_schema = resp.blocks.schema();
329 assert!(block_schema.field_with_name("number").is_ok());
330 assert!(block_schema.field_with_name("hash").is_ok());
331 assert!(block_schema.field_with_name("timestamp").is_ok());
332 assert!(block_schema.field_with_name("gas_used").is_ok());
333 assert!(block_schema.field_with_name("gas_limit").is_ok());
334 assert!(block_schema.field_with_name("base_fee_per_gas").is_ok());
335 assert!(block_schema.field_with_name("miner").is_ok());
336 assert_eq!(block_schema.fields().len(), 7);
337
338 let tx_schema = resp.transactions.schema();
340 assert!(tx_schema.field_with_name("hash").is_ok());
341 assert_eq!(tx_schema.fields().len(), 7);
342
343 assert_eq!(resp.logs.num_rows(), 0);
345 assert_eq!(resp.traces.num_rows(), 0);
346
347 total_block_rows += resp.blocks.num_rows() as u64;
348 total_tx_rows += resp.transactions.num_rows() as u64;
349 chunk_count += 1;
350 all_responses.push(resp);
351 }
352
353 assert!(
354 total_block_rows > 0,
355 "stream should have yielded block rows"
356 );
357 assert!(
358 chunk_count > 0,
359 "stream should have yielded at least one chunk"
360 );
361
362 log::info!(
363 "Streamed {total_block_rows} block rows and {total_tx_rows} tx rows \
364 in {chunk_count} chunks"
365 );
366
367 let root = Path::new(REPO_ROOT);
369 for (name, extract) in [
370 (
371 "blocks",
372 (|r: &ArrowResponse| r.blocks.clone()) as fn(&ArrowResponse) -> RecordBatch,
373 ),
374 ("transactions", |r| r.transactions.clone()),
375 ("logs", |r| r.logs.clone()),
376 ("traces", |r| r.traces.clone()),
377 ] {
378 let batches: Vec<RecordBatch> = all_responses.iter().map(extract).collect();
379 let path = root.join(format!("data/block_stream_{name}.parquet"));
380 write_parquet(&path, &batches);
381 }
382 }
383
384 #[tokio::test]
389 async fn stream_multi_pipeline() {
390 let _ = env_logger::builder()
391 .filter_level(log::LevelFilter::Info)
392 .parse_default_env()
393 .try_init();
394
395 let client = Client::new(ClientConfig {
396 stop_on_head: true,
397 batch_size: Some(10),
398 ..ClientConfig::new(RPC_URL.to_owned())
399 })
400 .unwrap_or_else(|e| panic!("Failed to create client: {e}"));
401
402 let latest_block = client
403 .provider
404 .get_block_number()
405 .await
406 .unwrap_or_else(|e| panic!("failed to get block number: {e}"));
407 log::info!("Latest block: {latest_block}");
408
409 let from_block = latest_block.saturating_sub(30);
410
411 let query = Query {
412 from_block,
413 to_block: Some(latest_block),
414 include_all_blocks: true,
415 logs: Vec::new(),
416 transactions: vec![TransactionRequest {
419 include_logs: true,
420 ..TransactionRequest::default()
422 }],
423 traces: Vec::new(),
424 fields: Fields {
425 block: BlockFields {
426 number: true,
427 hash: true,
428 timestamp: true,
429 gas_used: true,
430 gas_limit: true,
431 base_fee_per_gas: true,
432 miner: true,
433 ..BlockFields::default()
434 },
435 transaction: TransactionFields {
436 hash: true,
437 from: true,
438 to: true,
439 value: true,
440 gas_used: true,
442 status: true,
443 effective_gas_price: true,
444 ..TransactionFields::default()
445 },
446 log: LogFields {
447 block_number: true,
448 transaction_hash: true,
449 address: true,
450 topic0: true,
451 data: true,
452 ..LogFields::default()
453 },
454 trace: TraceFields {
455 ..TraceFields::default()
463 },
464 },
465 };
466
467 let mut stream = client
468 .stream(query)
469 .unwrap_or_else(|e| panic!("stream creation failed: {e}"));
470
471 let mut total_block_rows = 0u64;
472 let mut total_tx_rows = 0u64;
473 let mut total_log_rows = 0u64;
474 let mut chunk_count = 0u64;
475 let mut all_responses: Vec<ArrowResponse> = Vec::new();
476
477 while let Some(result) = stream.next().await {
478 let resp = result.unwrap_or_else(|e| panic!("stream chunk failed: {e}"));
479
480 assert_eq!(resp.blocks.schema().fields().len(), 7);
482 assert_eq!(resp.transactions.schema().fields().len(), 7);
483 assert_eq!(resp.logs.schema().fields().len(), 5);
484
485 total_block_rows += resp.blocks.num_rows() as u64;
486 total_tx_rows += resp.transactions.num_rows() as u64;
487 total_log_rows += resp.logs.num_rows() as u64;
488 chunk_count += 1;
489 all_responses.push(resp);
490 }
491
492 assert!(total_block_rows > 0, "should have yielded block rows");
493 assert!(total_tx_rows > 0, "should have yielded transaction rows");
494 assert!(total_log_rows > 0, "should have yielded log rows");
495 assert!(chunk_count > 0, "should have yielded at least one chunk");
496
497 log::info!(
498 "Multi-pipeline: {total_block_rows} blocks, {total_tx_rows} txs, \
499 {total_log_rows} logs, in {chunk_count} chunks"
500 );
501
502 let root = Path::new(REPO_ROOT);
504 for (name, extract) in [
505 (
506 "blocks",
507 (|r: &ArrowResponse| r.blocks.clone()) as fn(&ArrowResponse) -> RecordBatch,
508 ),
509 ("transactions", |r| r.transactions.clone()),
510 ("logs", |r| r.logs.clone()),
511 ] {
512 let batches: Vec<RecordBatch> = all_responses.iter().map(extract).collect();
513 let path = root.join(format!("data/multi_pipeline_{name}.parquet"));
514 write_parquet(&path, &batches);
515 }
516 }
517
518 #[tokio::test]
524 async fn stream_traces() {
525 let _ = env_logger::builder()
526 .filter_level(log::LevelFilter::Info)
527 .parse_default_env()
528 .try_init();
529
530 let client = Client::new(ClientConfig {
531 stop_on_head: true,
532 batch_size: Some(5),
533 ..ClientConfig::new(RPC_URL.to_owned())
534 })
535 .unwrap_or_else(|e| panic!("Failed to create client: {e}"));
536
537 let latest_block = client
538 .provider
539 .get_block_number()
540 .await
541 .unwrap_or_else(|e| panic!("failed to get block number: {e}"));
542 log::info!("Latest block: {latest_block}");
543
544 let from_block = latest_block.saturating_sub(10);
545
546 let query = Query {
547 from_block,
548 to_block: Some(latest_block),
549 include_all_blocks: false,
550 logs: Vec::new(),
551 transactions: Vec::new(),
552 traces: vec![TraceRequest {
553 ..TraceRequest::default()
554 }],
555 fields: Fields {
556 block: BlockFields::default(),
557 transaction: TransactionFields::default(),
558 log: LogFields::default(),
559 trace: TraceFields {
560 block_number: true,
561 transaction_hash: true,
562 from: true,
563 to: true,
564 value: true,
565 call_type: true,
566 gas: true,
567 gas_used: true,
568 input: true,
569 output: true,
570 subtraces: true,
571 trace_address: true,
572 type_: true,
573 error: true,
574 ..TraceFields::default()
575 },
576 },
577 };
578
579 let mut stream = client
580 .stream(query)
581 .unwrap_or_else(|e| panic!("stream creation failed: {e}"));
582
583 let mut total_trace_rows = 0u64;
584 let mut chunk_count = 0u64;
585 let mut all_responses: Vec<ArrowResponse> = Vec::new();
586
587 while let Some(result) = stream.next().await {
588 let resp = result.unwrap_or_else(|e| panic!("stream chunk failed: {e}"));
589
590 let trace_schema = resp.traces.schema();
592 assert!(trace_schema.field_with_name("block_number").is_ok());
593 assert!(trace_schema.field_with_name("transaction_hash").is_ok());
594 assert!(trace_schema.field_with_name("from").is_ok());
595 assert!(trace_schema.field_with_name("to").is_ok());
596 assert!(trace_schema.field_with_name("call_type").is_ok());
597 assert!(trace_schema.field_with_name("gas_used").is_ok());
598 assert!(trace_schema.field_with_name("type").is_ok());
599 assert_eq!(trace_schema.fields().len(), 14);
600
601 assert_eq!(resp.blocks.num_rows(), 0);
603 assert_eq!(resp.transactions.num_rows(), 0);
604 assert_eq!(resp.logs.num_rows(), 0);
605
606 total_trace_rows += resp.traces.num_rows() as u64;
607 chunk_count += 1;
608 all_responses.push(resp);
609 }
610
611 assert!(total_trace_rows > 0, "should have yielded trace rows");
612 assert!(chunk_count > 0, "should have yielded at least one chunk");
613
614 log::info!("Streamed {total_trace_rows} trace rows in {chunk_count} chunks");
615
616 let root = Path::new(REPO_ROOT);
617 for (name, extract) in [(
618 "traces",
619 (|r: &ArrowResponse| r.traces.clone()) as fn(&ArrowResponse) -> RecordBatch,
620 )] {
621 let batches: Vec<RecordBatch> = all_responses.iter().map(extract).collect();
622 let path = root.join(format!("data/trace_stream_{name}.parquet"));
623 write_parquet(&path, &batches);
624 }
625 }
626
627 fn write_parquet(path: &Path, batches: &[RecordBatch]) {
631 if batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0) {
632 return;
633 }
634 let schema = batches[0].schema();
635 if let Some(parent) = path.parent() {
636 std::fs::create_dir_all(parent)
637 .unwrap_or_else(|e| panic!("create dir {}: {e}", parent.display()));
638 }
639 let file = File::create(path).unwrap_or_else(|e| panic!("create {}: {e}", path.display()));
640 let mut writer = ArrowWriter::try_new(file, schema, None)
641 .unwrap_or_else(|e| panic!("ArrowWriter::try_new: {e}"));
642 for batch in batches {
643 writer
644 .write(batch)
645 .unwrap_or_else(|e| panic!("ArrowWriter::write: {e}"));
646 }
647 writer
648 .close()
649 .unwrap_or_else(|e| panic!("ArrowWriter::close: {e}"));
650 }
651}