tiders_rpc_client/
response.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use arrow::record_batch::RecordBatch;
5
6/// A response containing Arrow `RecordBatch`es for each EVM table.
7///
8/// Each batch uses the canonical `tiders-evm-schema` schemas so the data
9/// can be passed directly into the tiders-core pipeline.
10#[derive(Debug, Clone)]
11pub struct ArrowResponse {
12    /// Block header rows.
13    pub blocks: RecordBatch,
14    /// Transaction rows (may include receipt data if requested).
15    pub transactions: RecordBatch,
16    /// Event log rows.
17    pub logs: RecordBatch,
18    /// Execution trace rows.
19    pub traces: RecordBatch,
20}
21
22impl ArrowResponse {
23    /// Returns the next block number that should be requested, derived from
24    /// the maximum block number present across all tables in this response.
25    ///
26    /// Returns `None` if every table is empty (zero rows).
27    pub fn next_block(&self) -> Result<Option<u64>> {
28        let mut max_block: Option<u64> = None;
29
30        for (batch, col_name) in [
31            (&self.blocks, "number"),
32            (&self.transactions, "block_number"),
33            (&self.logs, "block_number"),
34            (&self.traces, "block_number"),
35        ] {
36            if batch.num_rows() == 0 {
37                continue;
38            }
39            if let Some(block_num) = max_block_in_batch(batch, col_name)? {
40                max_block = Some(max_block.map_or(block_num, |m: u64| m.max(block_num)));
41            }
42        }
43
44        Ok(max_block.map(|b| b + 1))
45    }
46
47    /// Build an empty response (zero rows, correct schemas).
48    pub fn empty() -> Self {
49        Self {
50            blocks: empty_batch(&tiders_evm_schema::blocks_schema()),
51            transactions: empty_batch(&tiders_evm_schema::transactions_schema()),
52            logs: empty_batch(&tiders_evm_schema::logs_schema()),
53            traces: empty_batch(&tiders_evm_schema::traces_schema()),
54        }
55    }
56
57    /// Build a response with only the `logs` RecordBatch populated.
58    /// Other tables are empty with correct schemas.
59    pub fn with_logs(logs: RecordBatch) -> Self {
60        Self {
61            logs,
62            ..Self::empty()
63        }
64    }
65
66    /// Build a response with `blocks` and `transactions` populated.
67    /// Other tables (logs, traces) are empty with correct schemas.
68    pub fn with_blocks(blocks: RecordBatch, transactions: RecordBatch) -> Self {
69        Self {
70            blocks,
71            transactions,
72            ..Self::empty()
73        }
74    }
75
76    /// Build a response with only the `traces` RecordBatch populated.
77    /// Other tables are empty with correct schemas.
78    pub fn with_traces(traces: RecordBatch) -> Self {
79        Self {
80            traces,
81            ..Self::empty()
82        }
83    }
84}
85
86fn empty_batch(schema: &arrow::datatypes::Schema) -> RecordBatch {
87    RecordBatch::new_empty(Arc::new(schema.clone()))
88}
89
90fn max_block_in_batch(batch: &RecordBatch, col_name: &str) -> Result<Option<u64>> {
91    let col = batch
92        .column_by_name(col_name)
93        .with_context(|| format!("column '{col_name}' not found in batch"))?;
94
95    let arr = col
96        .as_any()
97        .downcast_ref::<arrow::array::UInt64Array>()
98        .with_context(|| format!("column '{col_name}' is not UInt64"))?;
99
100    Ok(arrow::compute::max(arr))
101}