tiders_evm_decode/
lib.rs

1//! # tiders-evm-decode
2//!
3//! Decodes EVM smart contract data from binary format into Apache Arrow RecordBatches.
4//!
5//! Supports three types of decoding:
6//! - **Event logs** ([`decode_events`]) — Decodes indexed topics and non-indexed body data
7//!   using Solidity event signatures (e.g. `"Transfer(address indexed,address indexed,uint256)"`).
8//! - **Function call inputs** ([`decode_call_inputs`]) — Decodes ABI-encoded calldata.
9//! - **Function call outputs** ([`decode_call_outputs`]) — Decodes ABI-encoded return data.
10//!
11//! Also provides ABI parsing utilities ([`abi_events`], [`abi_functions`]) to extract
12//! event/function signatures from JSON ABI files, and schema generation functions
13//! to preview the Arrow output schema without decoding data.
14//!
15//! All decoded output uses Arrow's columnar format with support for arbitrarily nested
16//! Solidity types (tuples, arrays, structs) mapped to Arrow Struct and List types.
17
18mod abi;
19mod arrow_convert;
20
21use std::sync::Arc;
22
23use alloy_dyn_abi::{DynSolCall, DynSolEvent, DynSolType, DynSolValue, Specifier};
24use anyhow::{anyhow, Context, Result};
25use arrow::{
26    array::{
27        Array, BinaryArray, GenericBinaryArray, LargeBinaryArray, OffsetSizeTrait, RecordBatch,
28        StructArray,
29    },
30    compute,
31    datatypes::{DataType, Field, Schema},
32};
33
34pub use abi::*;
35use arrow_convert::{build_topic0_mask, decode_body, decode_topic, to_arrow, to_arrow_dtype};
36
37/// Returns topic0 based on given event signature
38pub fn signature_to_topic0(signature: &str) -> Result<[u8; 32]> {
39    let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
40    Ok(event.selector().into())
41}
42
43/// Decodes given call input data in arrow format to arrow format.
44/// Output Arrow schema is auto generated based on the function signature.
45/// Handles any level of nesting with Lists/Structs.
46///
47/// Writes `null` for data rows that fail to decode if `allow_decode_fail` is set to `true`.
48/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
49pub fn decode_call_inputs<I: OffsetSizeTrait>(
50    signature: &str,
51    data: &GenericBinaryArray<I>,
52    allow_decode_fail: bool,
53) -> Result<RecordBatch> {
54    decode_call_impl::<true, I>(signature, data, allow_decode_fail)
55}
56
57/// Decodes given call output data in arrow format to arrow format.
58/// Output Arrow schema is auto generated based on the function signature.
59/// Handles any level of nesting with Lists/Structs.
60///
61/// Writes `null` for data rows that fail to decode if `allow_decode_fail` is set to `true`.
62/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
63pub fn decode_call_outputs<I: OffsetSizeTrait>(
64    signature: &str,
65    data: &GenericBinaryArray<I>,
66    allow_decode_fail: bool,
67) -> Result<RecordBatch> {
68    decode_call_impl::<false, I>(signature, data, allow_decode_fail)
69}
70
71fn decode_call_impl<const IS_INPUT: bool, I: OffsetSizeTrait>(
72    signature: &str,
73    data: &GenericBinaryArray<I>,
74    allow_decode_fail: bool,
75) -> Result<RecordBatch> {
76    let (call, resolved) = resolve_function_signature(signature)?;
77
78    let schema = function_signature_to_arrow_schemas_impl(&call, &resolved)
79        .context("convert event signature to arrow schema")?;
80    let schema = if IS_INPUT { schema.0 } else { schema.1 };
81
82    let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(schema.fields().len());
83
84    let mut decoded = Vec::<Option<DynSolValue>>::with_capacity(data.len());
85
86    for blob in data {
87        match blob {
88            Some(blob) => {
89                let decode_res = if IS_INPUT {
90                    resolved.abi_decode_input(blob)
91                } else {
92                    resolved.abi_decode_output(blob)
93                };
94                match decode_res {
95                    Ok(data) => decoded.push(Some(DynSolValue::Tuple(data))),
96                    Err(e) if allow_decode_fail => {
97                        log::debug!("failed to decode function call data: {e}");
98                        decoded.push(None);
99                    }
100                    Err(e) => {
101                        return Err(anyhow!("failed to decode function call data: {e}"));
102                    }
103                }
104            }
105            None => decoded.push(None),
106        }
107    }
108
109    let sol_type = if IS_INPUT {
110        DynSolType::Tuple(resolved.types().to_vec())
111    } else {
112        DynSolType::Tuple(resolved.returns().types().to_vec())
113    };
114
115    let array = to_arrow(&sol_type, decoded, allow_decode_fail).context("map params to arrow")?;
116    let arr = array
117        .as_any()
118        .downcast_ref::<StructArray>()
119        .context("expected struct array from to_arrow")?;
120
121    for f in arr.columns() {
122        arrays.push(f.clone());
123    }
124
125    RecordBatch::try_new(Arc::new(schema), arrays).context("construct arrow batch")
126}
127
128/// Returns the Arrow schemas for a function's inputs and outputs as `(input_schema, output_schema)`.
129pub fn function_signature_to_arrow_schemas(signature: &str) -> Result<(Schema, Schema)> {
130    let (func, resolved) = resolve_function_signature(signature)?;
131    function_signature_to_arrow_schemas_impl(&func, &resolved)
132}
133
134fn function_signature_to_arrow_schemas_impl(
135    func: &alloy_json_abi::Function,
136    call: &DynSolCall,
137) -> Result<(Schema, Schema)> {
138    let mut input_fields = Vec::with_capacity(call.types().len());
139    let mut output_fields = Vec::with_capacity(call.returns().types().len());
140
141    for (i, (sol_t, param)) in call.types().iter().zip(func.inputs.iter()).enumerate() {
142        let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
143        let name = if param.name() == "" {
144            format!("param{i}")
145        } else {
146            param.name().to_owned()
147        };
148        input_fields.push(Arc::new(Field::new(name, dtype, true)));
149    }
150
151    for (i, (sol_t, param)) in call
152        .returns()
153        .types()
154        .iter()
155        .zip(func.outputs.iter())
156        .enumerate()
157    {
158        let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
159        let name = if param.name() == "" {
160            format!("param{i}")
161        } else {
162            param.name().to_owned()
163        };
164        output_fields.push(Arc::new(Field::new(name, dtype, true)));
165    }
166
167    Ok((Schema::new(input_fields), Schema::new(output_fields)))
168}
169
170fn resolve_function_signature(signature: &str) -> Result<(alloy_json_abi::Function, DynSolCall)> {
171    let event = alloy_json_abi::Function::parse(signature).context("parse function signature")?;
172    let resolved = event.resolve().context("resolve function signature")?;
173
174    Ok((event, resolved))
175}
176
177/// Decodes given event data in arrow format to arrow format.
178/// Output Arrow schema is auto generated based on the event signature.
179/// Handles any level of nesting with Lists/Structs.
180///
181/// When `filter_by_topic0` is `true`, only rows whose `topic0` column matches the
182/// event's selector are decoded. Non-matching rows are silently filtered out.
183///
184/// When `hstack` is `true`, the original input columns (after any topic0 filtering)
185/// are appended alongside the decoded columns in the output.
186///
187/// Writes `null` for data rows that fail to decode if `allow_decode_fail` is set to `true`.
188/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
189pub fn decode_events(
190    signature: &str,
191    data: &RecordBatch,
192    allow_decode_fail: bool,
193    filter_by_topic0: bool,
194    hstack: bool,
195) -> Result<RecordBatch> {
196    let (event, resolved) = resolve_event_signature(signature)?;
197
198    // Optionally filter input to only rows whose topic0 matches the event selector.
199    let data = if filter_by_topic0 {
200        filter_by_topic0_impl(&event, data)?
201    } else {
202        data.clone()
203    };
204
205    let schema = event_signature_to_arrow_schema_impl(&event, &resolved)
206        .context("convert event signature to arrow schema")?;
207
208    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
209    let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(fields.len());
210
211    for (sol_type, topic_name) in resolved
212        .indexed()
213        .iter()
214        .zip(&["topic1", "topic2", "topic3"])
215    {
216        let col = data
217            .column_by_name(topic_name)
218            .context("get topic column")?;
219
220        if col.data_type() == &DataType::Binary {
221            let arr = col
222                .as_any()
223                .downcast_ref::<BinaryArray>()
224                .context("downcast to BinaryArray")?;
225            decode_topic(sol_type, arr, allow_decode_fail, &mut arrays).context("decode topic")?;
226        } else if col.data_type() == &DataType::LargeBinary {
227            let arr = col
228                .as_any()
229                .downcast_ref::<LargeBinaryArray>()
230                .context("downcast to LargeBinaryArray")?;
231            decode_topic(sol_type, arr, allow_decode_fail, &mut arrays).context("decode topic")?;
232        }
233    }
234
235    let body_col = data.column_by_name("data").context("get data column")?;
236
237    let body_sol_type = DynSolType::Tuple(resolved.body().to_vec());
238
239    if body_col.data_type() == &DataType::Binary {
240        let arr = body_col
241            .as_any()
242            .downcast_ref::<BinaryArray>()
243            .context("downcast to BinaryArray")?;
244        decode_body(&body_sol_type, arr, allow_decode_fail, &mut arrays).context("decode body")?;
245    } else if body_col.data_type() == &DataType::LargeBinary {
246        let arr = body_col
247            .as_any()
248            .downcast_ref::<LargeBinaryArray>()
249            .context("downcast to LargeBinaryArray")?;
250        decode_body(&body_sol_type, arr, allow_decode_fail, &mut arrays).context("decode body")?;
251    }
252
253    if hstack {
254        for (i, col) in data.columns().iter().enumerate() {
255            fields.push(data.schema().field(i).clone().into());
256            arrays.push(col.clone());
257        }
258    }
259
260    let output_schema = Schema::new(fields);
261    RecordBatch::try_new(Arc::new(output_schema), arrays).context("construct arrow batch")
262}
263
264/// Returns the Arrow schema that [`decode_events`] would produce for the given event signature.
265pub fn event_signature_to_arrow_schema(signature: &str) -> Result<Schema> {
266    let (resolved, event) = resolve_event_signature(signature)?;
267    event_signature_to_arrow_schema_impl(&resolved, &event)
268}
269
270fn event_signature_to_arrow_schema_impl(
271    sig: &alloy_json_abi::Event,
272    event: &DynSolEvent,
273) -> Result<Schema> {
274    let num_fields = event.indexed().len() + event.body().len();
275    let mut fields = Vec::<Arc<Field>>::with_capacity(num_fields);
276    let mut names = Vec::with_capacity(num_fields);
277
278    for (i, input) in sig.inputs.iter().enumerate() {
279        if input.indexed {
280            let name = if input.name.is_empty() {
281                format!("param{i}")
282            } else {
283                input.name.clone()
284            };
285            names.push(name);
286        }
287    }
288    for (i, input) in sig.inputs.iter().enumerate() {
289        if !input.indexed {
290            let name = if input.name.is_empty() {
291                format!("param{i}")
292            } else {
293                input.name.clone()
294            };
295            names.push(name);
296        }
297    }
298
299    for (sol_t, name) in event.indexed().iter().chain(event.body()).zip(names) {
300        let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
301        fields.push(Arc::new(Field::new(name, dtype, true)));
302    }
303
304    Ok(Schema::new(fields))
305}
306
307fn resolve_event_signature(signature: &str) -> Result<(alloy_json_abi::Event, DynSolEvent)> {
308    let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
309    let resolved = event.resolve().context("resolve event signature")?;
310
311    Ok((event, resolved))
312}
313
314/// Filters a RecordBatch to only rows where the `topic0` column matches the
315/// event's selector hash. If `topic0` column is not present, returns the data
316/// unchanged. Non-matching rows are always silently filtered out.
317fn filter_by_topic0_impl(event: &alloy_json_abi::Event, data: &RecordBatch) -> Result<RecordBatch> {
318    let Some(topic0_col) = data.column_by_name("topic0") else {
319        return Ok(data.clone());
320    };
321
322    let selector = event.selector();
323
324    let mask =
325        build_topic0_mask(topic0_col, selector.as_slice()).context("build topic0 filter mask")?;
326
327    let non_matching = mask.iter().filter(|v| !v.unwrap_or(false)).count();
328    if non_matching > 0 {
329        log::debug!(
330            "filtering out {non_matching} events whose topic0 does not match '{}'",
331            event.full_signature()
332        );
333    }
334
335    compute::filter_record_batch(data, &mask).context("filter record batch by topic0")
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use alloy_primitives::{I256, U256};
342    use arrow::datatypes::{Fields, Int32Type};
343
344    #[test]
345    fn test_int_overflow_with_allow_decode_fail() {
346        // When decoding all pool events without topic filtering, a Swap decoder
347        // may successfully ABI-decode body data from a different event type,
348        // producing an int24 value that doesn't fit in i32. With allow_decode_fail=true
349        // this should produce null instead of erroring.
350        let sol_values = vec![Some(DynSolValue::Int(I256::MAX, 24))];
351        let result = arrow_convert::to_int_impl::<Int32Type>(24, &sol_values, true);
352        assert!(result.is_ok());
353        let arr = result.unwrap();
354        assert!(arr.is_null(0));
355
356        // Without allow_decode_fail, it should error
357        let sol_values = vec![Some(DynSolValue::Int(I256::MAX, 24))];
358        let result = arrow_convert::to_int_impl::<Int32Type>(24, &sol_values, false);
359        assert!(result.is_err());
360    }
361
362    #[test]
363    fn test_topic0_filtering_with_allow_decode_fail() {
364        use arrow::array::GenericBinaryBuilder;
365
366        let swap_sig = "Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)";
367        let mint_sig = "Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)";
368
369        let swap_selector = signature_to_topic0(swap_sig).unwrap();
370        let mint_selector = signature_to_topic0(mint_sig).unwrap();
371
372        // Build a batch with 2 rows: one Swap event and one Mint event
373        let mut topic0_builder = GenericBinaryBuilder::<i32>::new();
374        let mut topic1_builder = GenericBinaryBuilder::<i32>::new();
375        let mut topic2_builder = GenericBinaryBuilder::<i32>::new();
376        let mut topic3_builder = GenericBinaryBuilder::<i32>::new();
377        let mut data_builder = GenericBinaryBuilder::<i32>::new();
378
379        let addr = [0u8; 32];
380
381        // Row 0: Swap event
382        topic0_builder.append_value(swap_selector);
383        topic1_builder.append_value(addr);
384        topic2_builder.append_value(addr);
385        topic3_builder.append_null();
386        let amount0 = I256::try_from(-1000i64).unwrap();
387        let amount1 = I256::try_from(2000i64).unwrap();
388        let sqrt_price: U256 = U256::from(1u64) << 96;
389        let liquidity = U256::from(1000000u64);
390        let tick = I256::try_from(-100i64).unwrap();
391        let mut swap_body = Vec::new();
392        swap_body.extend_from_slice(&amount0.to_be_bytes::<32>());
393        swap_body.extend_from_slice(&amount1.to_be_bytes::<32>());
394        swap_body.extend_from_slice(&sqrt_price.to_be_bytes::<32>());
395        swap_body.extend_from_slice(&liquidity.to_be_bytes::<32>());
396        swap_body.extend_from_slice(&tick.to_be_bytes::<32>());
397        data_builder.append_value(&swap_body);
398
399        // Row 1: Mint event (different topic0, different body layout)
400        topic0_builder.append_value(mint_selector);
401        topic1_builder.append_value(addr);
402        topic2_builder.append_value(addr);
403        topic3_builder.append_value(addr);
404        let mint_body = vec![0u8; 32 * 4]; // sender, amount, amount0, amount1
405        data_builder.append_value(&mint_body);
406
407        let schema = Arc::new(Schema::new(vec![
408            Field::new("topic0", DataType::Binary, true),
409            Field::new("topic1", DataType::Binary, true),
410            Field::new("topic2", DataType::Binary, true),
411            Field::new("topic3", DataType::Binary, true),
412            Field::new("data", DataType::Binary, true),
413        ]));
414
415        let batch = RecordBatch::try_new(
416            schema,
417            vec![
418                Arc::new(topic0_builder.finish()),
419                Arc::new(topic1_builder.finish()),
420                Arc::new(topic2_builder.finish()),
421                Arc::new(topic3_builder.finish()),
422                Arc::new(data_builder.finish()),
423            ],
424        )
425        .unwrap();
426
427        // With filter_by_topic0=true, should filter to only the Swap row
428        let result = decode_events(swap_sig, &batch, true, true, false).unwrap();
429        assert_eq!(result.num_rows(), 1, "should only decode the Swap row");
430
431        // With filter_by_topic0=true and hstack=true, decoded + input columns are returned
432        let result = decode_events(swap_sig, &batch, true, true, true).unwrap();
433        assert_eq!(result.num_rows(), 1);
434        // Should have decoded columns + original input columns
435        assert!(
436            result.column_by_name("topic0").is_some(),
437            "hstack should include original input columns"
438        );
439    }
440
441    #[test]
442    #[ignore]
443    fn nested_event_signature_to_schema() {
444        let sig = "ConfiguredQuests(address editor, uint256[][], address indexed my_addr, (bool,bool[],(bool, uint256[]))[] questDetails)";
445
446        let schema = event_signature_to_arrow_schema(sig).unwrap();
447
448        let expected_schema = Schema::new(vec![
449            Arc::new(Field::new("my_addr", DataType::Binary, true)),
450            Arc::new(Field::new("editor", DataType::Binary, true)),
451            Arc::new(Field::new(
452                "param1",
453                DataType::List(Arc::new(Field::new(
454                    "",
455                    DataType::List(Arc::new(Field::new("", DataType::Decimal256(76, 0), true))),
456                    true,
457                ))),
458                true,
459            )),
460            Arc::new(Field::new(
461                "questDetails",
462                DataType::List(Arc::new(Field::new(
463                    "",
464                    DataType::Struct(Fields::from(vec![
465                        Arc::new(Field::new("param0", DataType::Boolean, true)),
466                        Arc::new(Field::new(
467                            "param1",
468                            DataType::List(Arc::new(Field::new("", DataType::Boolean, true))),
469                            true,
470                        )),
471                        Arc::new(Field::new(
472                            "param2",
473                            DataType::Struct(Fields::from(vec![
474                                Arc::new(Field::new("param0", DataType::Boolean, true)),
475                                Arc::new(Field::new(
476                                    "param1",
477                                    DataType::List(Arc::new(Field::new(
478                                        "",
479                                        DataType::Decimal256(76, 0),
480                                        true,
481                                    ))),
482                                    true,
483                                )),
484                            ])),
485                            true,
486                        )),
487                    ])),
488                    true,
489                ))),
490                true,
491            )),
492        ]);
493
494        assert_eq!(schema, expected_schema);
495    }
496
497    #[test]
498    #[ignore]
499    fn i256_to_arrow_i256() {
500        for val in [
501            I256::MIN,
502            I256::MAX,
503            I256::MAX / I256::try_from(2i32).unwrap(),
504        ] {
505            let out = arrow::datatypes::i256::from_be_bytes(val.to_be_bytes::<32>());
506
507            assert_eq!(val.to_string(), out.to_string());
508        }
509    }
510
511    #[test]
512    #[ignore]
513    fn read_parquet_with_real_data() {
514        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
515        use std::fs::File;
516        let builder =
517            ParquetRecordBatchReaderBuilder::try_new(File::open("logs.parquet").unwrap()).unwrap();
518        let mut reader = builder.build().unwrap();
519        let logs = reader.next().unwrap().unwrap();
520
521        let signature =
522            "PairCreated(address indexed token0, address indexed token1, address pair,uint256)";
523
524        let decoded = decode_events(signature, &logs, false, false, false).unwrap();
525
526        // Save the filtered instructions to a new parquet file
527        let mut file = File::create("decoded_logs.parquet").unwrap();
528        let mut writer =
529            parquet::arrow::ArrowWriter::try_new(&mut file, decoded.schema(), None).unwrap();
530        writer.write(&decoded).unwrap();
531        writer.close().unwrap();
532    }
533}