tiders_rpc_client/
response.rs1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use arrow::record_batch::RecordBatch;
5
6#[derive(Debug, Clone)]
11pub struct ArrowResponse {
12 pub blocks: RecordBatch,
14 pub transactions: RecordBatch,
16 pub logs: RecordBatch,
18 pub traces: RecordBatch,
20}
21
22impl ArrowResponse {
23 pub fn next_block(&self) -> Result<Option<u64>> {
28 let mut max_block: Option<u64> = None;
29
30 for (batch, col_name) in [
31 (&self.blocks, "number"),
32 (&self.transactions, "block_number"),
33 (&self.logs, "block_number"),
34 (&self.traces, "block_number"),
35 ] {
36 if batch.num_rows() == 0 {
37 continue;
38 }
39 if let Some(block_num) = max_block_in_batch(batch, col_name)? {
40 max_block = Some(max_block.map_or(block_num, |m: u64| m.max(block_num)));
41 }
42 }
43
44 Ok(max_block.map(|b| b + 1))
45 }
46
47 pub fn empty() -> Self {
49 Self {
50 blocks: empty_batch(&tiders_evm_schema::blocks_schema()),
51 transactions: empty_batch(&tiders_evm_schema::transactions_schema()),
52 logs: empty_batch(&tiders_evm_schema::logs_schema()),
53 traces: empty_batch(&tiders_evm_schema::traces_schema()),
54 }
55 }
56
57 pub fn with_logs(logs: RecordBatch) -> Self {
60 Self {
61 logs,
62 ..Self::empty()
63 }
64 }
65
66 pub fn with_blocks(blocks: RecordBatch, transactions: RecordBatch) -> Self {
69 Self {
70 blocks,
71 transactions,
72 ..Self::empty()
73 }
74 }
75
76 pub fn with_traces(traces: RecordBatch) -> Self {
79 Self {
80 traces,
81 ..Self::empty()
82 }
83 }
84}
85
86fn empty_batch(schema: &arrow::datatypes::Schema) -> RecordBatch {
87 RecordBatch::new_empty(Arc::new(schema.clone()))
88}
89
90fn max_block_in_batch(batch: &RecordBatch, col_name: &str) -> Result<Option<u64>> {
91 let col = batch
92 .column_by_name(col_name)
93 .with_context(|| format!("column '{col_name}' not found in batch"))?;
94
95 let arr = col
96 .as_any()
97 .downcast_ref::<arrow::array::UInt64Array>()
98 .with_context(|| format!("column '{col_name}' is not UInt64"))?;
99
100 Ok(arrow::compute::max(arr))
101}