Skip to main content

tiders_evm_decode/
lib.rs

1//! # tiders-evm-decode
2//!
3//! Decodes EVM smart contract data from binary format into Apache Arrow RecordBatches.
4//!
5//! Supports three types of decoding:
6//! - **Event logs** ([`decode_events`]) — Decodes indexed topics and non-indexed body data
7//!   using Solidity event signatures (e.g. `"Transfer(address indexed,address indexed,uint256)"`).
8//! - **Function call inputs** ([`decode_call_inputs`]) — Decodes ABI-encoded calldata.
9//! - **Function call outputs** ([`decode_call_outputs`]) — Decodes ABI-encoded return data.
10//!
11//! Also provides ABI parsing utilities ([`abi_events`], [`abi_functions`]) to extract
12//! event/function signatures from JSON ABI files, and schema generation functions
13//! to preview the Arrow output schema without decoding data.
14//!
15//! All decoded output uses Arrow's columnar format with support for arbitrarily nested
16//! Solidity types (tuples, arrays, structs) mapped to Arrow Struct and List types.
17
18mod abi;
19mod arrow_convert;
20
21use std::sync::Arc;
22
23use alloy_dyn_abi::{DynSolCall, DynSolEvent, DynSolType, DynSolValue, Specifier};
24use anyhow::{anyhow, Context, Result};
25use arrow::{
26    array::{
27        Array, BinaryArray, GenericBinaryArray, LargeBinaryArray, OffsetSizeTrait, RecordBatch,
28        StructArray,
29    },
30    compute,
31    datatypes::{DataType, Field, Schema},
32};
33
34pub use abi::*;
35use arrow_convert::{
36    build_topic0_mask, decode_body_named, decode_topic, param_to_arrow_dtype, to_struct_named,
37};
38
39/// Returns topic0 based on a human-readable Solidity event signature
40/// (e.g. `"Transfer(address indexed,address indexed,uint256)"`).
41pub fn signature_to_topic0(signature: &str) -> Result<[u8; 32]> {
42    let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
43    Ok(event.selector().into())
44}
45
46/// Returns topic0 based on a JSON ABI fragment for an event
47/// (e.g. the `abi_json` field from [`crate::abi_events`]).
48pub fn abi_to_topic0(abi_json: &str) -> Result<[u8; 32]> {
49    let event: alloy_json_abi::Event =
50        serde_json::from_str(abi_json).context("parse event ABI JSON")?;
51    Ok(event.selector().into())
52}
53
54/// Decodes given call input data in arrow format to arrow format.
55/// Output Arrow schema is auto generated based on the function signature.
56/// Handles any level of nesting with Lists/Structs.
57///
58/// Writes `null` for data rows that fail to decode if `allow_decode_fail` is set to `true`.
59/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
60///
61/// When `large_int_as_binary` is `true`, signed and unsigned integers wider than
62/// 64 bits are emitted as 32-byte big-endian `Binary` columns (two's-complement
63/// for signed) instead of `Decimal128`/`Decimal256`.
64pub fn decode_call_inputs<I: OffsetSizeTrait>(
65    signature: &str,
66    data: &GenericBinaryArray<I>,
67    allow_decode_fail: bool,
68    large_int_as_binary: bool,
69) -> Result<RecordBatch> {
70    decode_call_impl::<true, I>(signature, data, allow_decode_fail, large_int_as_binary)
71}
72
73/// Decodes given call output data in arrow format to arrow format.
74/// Output Arrow schema is auto generated based on the function signature.
75/// Handles any level of nesting with Lists/Structs.
76///
77/// Writes `null` for data rows that fail to decode if `allow_decode_fail` is set to `true`.
78/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
79///
80/// When `large_int_as_binary` is `true`, signed and unsigned integers wider than
81/// 64 bits are emitted as 32-byte big-endian `Binary` columns (two's-complement
82/// for signed) instead of `Decimal128`/`Decimal256`.
83pub fn decode_call_outputs<I: OffsetSizeTrait>(
84    signature: &str,
85    data: &GenericBinaryArray<I>,
86    allow_decode_fail: bool,
87    large_int_as_binary: bool,
88) -> Result<RecordBatch> {
89    decode_call_impl::<false, I>(signature, data, allow_decode_fail, large_int_as_binary)
90}
91
92fn decode_call_impl<const IS_INPUT: bool, I: OffsetSizeTrait>(
93    signature: &str,
94    data: &GenericBinaryArray<I>,
95    allow_decode_fail: bool,
96    large_int_as_binary: bool,
97) -> Result<RecordBatch> {
98    let (func, resolved) = resolve_function_signature(signature)?;
99
100    let schema = function_signature_to_arrow_schemas_impl(&func, large_int_as_binary)
101        .context("convert function signature to arrow schema")?;
102    let schema = if IS_INPUT { schema.0 } else { schema.1 };
103
104    let mut decoded = Vec::<Option<DynSolValue>>::with_capacity(data.len());
105
106    for blob in data {
107        match blob {
108            Some(blob) => {
109                let decode_res = if IS_INPUT {
110                    resolved.abi_decode_input(blob)
111                } else {
112                    resolved.abi_decode_output(blob)
113                };
114                match decode_res {
115                    Ok(data) => decoded.push(Some(DynSolValue::Tuple(data))),
116                    Err(e) if allow_decode_fail => {
117                        log::debug!("failed to decode function call data: {e}");
118                        decoded.push(None);
119                    }
120                    Err(e) => {
121                        return Err(anyhow!("failed to decode function call data: {e}"));
122                    }
123                }
124            }
125            None => decoded.push(None),
126        }
127    }
128
129    let sol_fields = if IS_INPUT {
130        resolved.types().to_vec()
131    } else {
132        resolved.returns().types().to_vec()
133    };
134    let params = if IS_INPUT {
135        &func.inputs
136    } else {
137        &func.outputs
138    };
139    let named: Vec<_> = params
140        .iter()
141        .map(|p| (p.name.as_str(), p.components.as_slice()))
142        .collect();
143
144    let array = to_struct_named(
145        &sol_fields,
146        &named,
147        decoded,
148        allow_decode_fail,
149        large_int_as_binary,
150    )
151    .context("map params to arrow")?;
152    let arr = array
153        .as_any()
154        .downcast_ref::<StructArray>()
155        .context("expected struct array from to_struct_named")?;
156
157    let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(arr.num_columns());
158    for f in arr.columns() {
159        arrays.push(f.clone());
160    }
161
162    let batch = RecordBatch::try_new(Arc::new(schema), arrays).context("construct arrow batch")?;
163    tiders_cast::flatten_record_batch(&batch).context("flatten decoded batch")
164}
165
166/// Returns the Arrow schemas for a function's inputs and outputs as `(input_schema, output_schema)`.
167///
168/// `large_int_as_binary` must match the value passed to [`decode_call_inputs`] /
169/// [`decode_call_outputs`] for the resulting batch to match this schema.
170pub fn function_signature_to_arrow_schemas(
171    signature: &str,
172    large_int_as_binary: bool,
173) -> Result<(Schema, Schema)> {
174    let (func, _) = resolve_function_signature(signature)?;
175    let (input_schema, output_schema) =
176        function_signature_to_arrow_schemas_impl(&func, large_int_as_binary)?;
177    Ok((
178        tiders_cast::flatten_schema(&input_schema),
179        tiders_cast::flatten_schema(&output_schema),
180    ))
181}
182
183fn function_signature_to_arrow_schemas_impl(
184    func: &alloy_json_abi::Function,
185    large_int_as_binary: bool,
186) -> Result<(Schema, Schema)> {
187    let mut input_fields = Vec::with_capacity(func.inputs.len());
188    let mut output_fields = Vec::with_capacity(func.outputs.len());
189
190    for (i, param) in func.inputs.iter().enumerate() {
191        let dtype = param_to_arrow_dtype(&param.ty, &param.components, large_int_as_binary)
192            .context("map to arrow type")?;
193        let name = if param.name.is_empty() {
194            format!("param{i}")
195        } else {
196            param.name.clone()
197        };
198        input_fields.push(Arc::new(Field::new(name, dtype, true)));
199    }
200
201    for (i, param) in func.outputs.iter().enumerate() {
202        let dtype = param_to_arrow_dtype(&param.ty, &param.components, large_int_as_binary)
203            .context("map to arrow type")?;
204        let name = if param.name.is_empty() {
205            format!("param{i}")
206        } else {
207            param.name.clone()
208        };
209        output_fields.push(Arc::new(Field::new(name, dtype, true)));
210    }
211
212    Ok((Schema::new(input_fields), Schema::new(output_fields)))
213}
214
215fn parse_function_str(signature: &str) -> Result<alloy_json_abi::Function> {
216    if signature.starts_with('{') {
217        return serde_json::from_str(signature).context("parse function JSON");
218    }
219    if signature.contains("tuple(") {
220        log::warn!(
221            "Function signature contains `tuple(...)` which will produce unnamed fields in the Arrow schema. \
222             Consider passing a JSON ABI fragment instead.\n  \
223             Signature: {signature}"
224        );
225    }
226    alloy_json_abi::Function::parse(signature).map_err(|e| {
227        anyhow!(
228            "{e}\n  \
229             Hint: if the signature contains named tuple fields (e.g. `tuple(int256 foo, ...)`),\n  \
230             either strip the inner names (e.g. `(int256,...)`) or pass a JSON ABI fragment."
231        )
232    })
233}
234
235fn resolve_function_signature(signature: &str) -> Result<(alloy_json_abi::Function, DynSolCall)> {
236    let func = parse_function_str(signature)?;
237    let resolved = func.resolve().context("resolve function signature")?;
238    Ok((func, resolved))
239}
240
241/// Decodes given event data in arrow format to arrow format.
242/// Output Arrow schema is auto generated based on the event signature.
243/// Handles any level of nesting with Lists/Structs.
244///
245/// When `filter_by_topic0` is `true`, only rows whose `topic0` column matches the
246/// event's selector are decoded. Non-matching rows are silently filtered out.
247///
248/// When `hstack` is `true`, the original input columns (after any topic0 filtering)
249/// are appended alongside the decoded columns in the output.
250///
251/// Writes `null` for data rows that fail to decode if `allow_decode_fail` is set to `true`.
252/// Errors when a row fails to decode if `allow_decode_fail` is set to `false`.
253///
254/// When `large_int_as_binary` is `true`, signed and unsigned integers wider than
255/// 64 bits are emitted as 32-byte big-endian `Binary` columns (two's-complement
256/// for signed) instead of `Decimal128`/`Decimal256`.
257#[expect(clippy::fn_params_excessive_bools, reason = "stable public API")]
258pub fn decode_events(
259    signature: &str,
260    data: &RecordBatch,
261    allow_decode_fail: bool,
262    filter_by_topic0: bool,
263    hstack: bool,
264    large_int_as_binary: bool,
265) -> Result<RecordBatch> {
266    let (event, resolved) = resolve_event_signature(signature)?;
267
268    // Optionally filter input to only rows whose topic0 matches the event selector.
269    let data = if filter_by_topic0 {
270        filter_by_topic0_impl(&event, data)?
271    } else {
272        data.clone()
273    };
274
275    let schema = event_signature_to_arrow_schema_impl(&event, large_int_as_binary)
276        .context("convert event signature to arrow schema")?;
277
278    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
279    let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(fields.len());
280
281    for (sol_type, topic_name) in resolved
282        .indexed()
283        .iter()
284        .zip(&["topic1", "topic2", "topic3"])
285    {
286        let col = data
287            .column_by_name(topic_name)
288            .context("get topic column")?;
289
290        if col.data_type() == &DataType::Binary {
291            let arr = col
292                .as_any()
293                .downcast_ref::<BinaryArray>()
294                .context("downcast to BinaryArray")?;
295            decode_topic(
296                sol_type,
297                arr,
298                allow_decode_fail,
299                large_int_as_binary,
300                &mut arrays,
301            )
302            .context("decode topic")?;
303        } else if col.data_type() == &DataType::LargeBinary {
304            let arr = col
305                .as_any()
306                .downcast_ref::<LargeBinaryArray>()
307                .context("downcast to LargeBinaryArray")?;
308            decode_topic(
309                sol_type,
310                arr,
311                allow_decode_fail,
312                large_int_as_binary,
313                &mut arrays,
314            )
315            .context("decode topic")?;
316        }
317    }
318
319    let body_col = data.column_by_name("data").context("get data column")?;
320    let body_sol_type = DynSolType::Tuple(resolved.body().to_vec());
321    let body_params: Vec<&alloy_json_abi::EventParam> =
322        event.inputs.iter().filter(|i| !i.indexed).collect();
323
324    if body_col.data_type() == &DataType::Binary {
325        let arr = body_col
326            .as_any()
327            .downcast_ref::<BinaryArray>()
328            .context("downcast to BinaryArray")?;
329        decode_body_named(
330            &body_sol_type,
331            &body_params,
332            arr,
333            allow_decode_fail,
334            large_int_as_binary,
335            &mut arrays,
336        )
337        .context("decode body")?;
338    } else if body_col.data_type() == &DataType::LargeBinary {
339        let arr = body_col
340            .as_any()
341            .downcast_ref::<LargeBinaryArray>()
342            .context("downcast to LargeBinaryArray")?;
343        decode_body_named(
344            &body_sol_type,
345            &body_params,
346            arr,
347            allow_decode_fail,
348            large_int_as_binary,
349            &mut arrays,
350        )
351        .context("decode body")?;
352    }
353
354    if hstack {
355        for (i, col) in data.columns().iter().enumerate() {
356            fields.push(data.schema().field(i).clone().into());
357            arrays.push(col.clone());
358        }
359    }
360
361    let output_schema = Schema::new(fields);
362    let batch =
363        RecordBatch::try_new(Arc::new(output_schema), arrays).context("construct arrow batch")?;
364    tiders_cast::flatten_record_batch(&batch).context("flatten decoded batch")
365}
366
367/// Returns the Arrow schema that [`decode_events`] would produce for the given event signature.
368///
369/// `large_int_as_binary` must match the value passed to [`decode_events`] for the
370/// resulting batch to match this schema.
371pub fn event_signature_to_arrow_schema(
372    signature: &str,
373    large_int_as_binary: bool,
374) -> Result<Schema> {
375    let (event, _) = resolve_event_signature(signature)?;
376    let schema = event_signature_to_arrow_schema_impl(&event, large_int_as_binary)?;
377    Ok(tiders_cast::flatten_schema(&schema))
378}
379
380/// Builds the Arrow schema for an event directly from its [`alloy_json_abi::Event`].
381///
382/// Indexed params come first (matching the topic decode order), followed by
383/// body params. Tuple params use [`param_to_arrow_dtype`] so component names
384/// are preserved as named `Struct` fields.
385fn event_signature_to_arrow_schema_impl(
386    sig: &alloy_json_abi::Event,
387    large_int_as_binary: bool,
388) -> Result<Schema> {
389    let mut fields = Vec::<Arc<Field>>::new();
390
391    for (i, input) in sig.inputs.iter().enumerate() {
392        if input.indexed {
393            let name = if input.name.is_empty() {
394                format!("param{i}")
395            } else {
396                input.name.clone()
397            };
398            let dtype = param_to_arrow_dtype(&input.ty, &input.components, large_int_as_binary)
399                .context("map indexed param to arrow type")?;
400            fields.push(Arc::new(Field::new(name, dtype, true)));
401        }
402    }
403    for (i, input) in sig.inputs.iter().enumerate() {
404        if !input.indexed {
405            let name = if input.name.is_empty() {
406                format!("param{i}")
407            } else {
408                input.name.clone()
409            };
410            let dtype = param_to_arrow_dtype(&input.ty, &input.components, large_int_as_binary)
411                .context("map body param to arrow type")?;
412            fields.push(Arc::new(Field::new(name, dtype, true)));
413        }
414    }
415
416    Ok(Schema::new(fields))
417}
418
419/// Parse an event from either a human-readable Solidity signature or a JSON
420/// fragment (the format produced by [`crate::abi_events`]).
421///
422/// Accepts three formats:
423/// - JSON ABI fragment: `{"type":"event","name":"Swap","inputs":[...],...}`
424/// - Standard HR signature: `Swap(address indexed sender, uint256 amount)`
425/// - Full HR signature with named tuple fields (alloy `full_signature()` output):
426///   `Swap(address indexed sender, tuple(int256 a, int256 b) data)` — alloy's
427///   `Event::parse` rejects named types inside tuple parens, so we parse this
428///   ourselves, build a JSON value, and deserialise to preserve component names.
429fn parse_event_str(signature: &str) -> Result<alloy_json_abi::Event> {
430    if signature.starts_with('{') {
431        return serde_json::from_str(signature).context("parse event JSON");
432    }
433    if signature.contains("tuple(") {
434        log::warn!(
435            "Event signature contains `tuple(...)` which will produce unnamed fields in the Arrow schema. \
436             Consider passing a JSON ABI fragment instead.\n  \
437             Signature: {signature}"
438        );
439    }
440    alloy_json_abi::Event::parse(signature).map_err(|e| {
441        anyhow!(
442            "{e}\n  \
443             Hint: if the signature contains named tuple fields (e.g. `tuple(int256 foo, ...)`),\n  \
444             either strip the inner names (e.g. `(int256,...)`) or pass a JSON ABI fragment."
445        )
446    })
447}
448
449/// Find the index of the closing `)` that matches the `(` at `open`.
450fn resolve_event_signature(signature: &str) -> Result<(alloy_json_abi::Event, DynSolEvent)> {
451    let event = parse_event_str(signature)?;
452    let resolved = event.resolve().context("resolve event signature")?;
453    Ok((event, resolved))
454}
455
456/// Filters a RecordBatch to only rows where the `topic0` column matches the
457/// event's selector hash. If `topic0` column is not present, returns the data
458/// unchanged. Non-matching rows are always silently filtered out.
459fn filter_by_topic0_impl(event: &alloy_json_abi::Event, data: &RecordBatch) -> Result<RecordBatch> {
460    let Some(topic0_col) = data.column_by_name("topic0") else {
461        return Ok(data.clone());
462    };
463
464    let selector = event.selector();
465
466    let mask =
467        build_topic0_mask(topic0_col, selector.as_slice()).context("build topic0 filter mask")?;
468
469    let non_matching = mask.iter().filter(|v| !v.unwrap_or(false)).count();
470    if non_matching > 0 {
471        log::debug!(
472            "filtering out {non_matching} events whose topic0 does not match '{}'",
473            event.full_signature()
474        );
475    }
476
477    compute::filter_record_batch(data, &mask).context("filter record batch by topic0")
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use alloy_primitives::{I256, U256};
484    use arrow::datatypes::{Fields, Int32Type};
485
486    #[test]
487    fn test_int_overflow_with_allow_decode_fail() {
488        // When decoding all pool events without topic filtering, a Swap decoder
489        // may successfully ABI-decode body data from a different event type,
490        // producing an int24 value that doesn't fit in i32. With allow_decode_fail=true
491        // this should produce null instead of erroring.
492        let sol_values = vec![Some(DynSolValue::Int(I256::MAX, 24))];
493        let result = arrow_convert::to_int_impl::<Int32Type>(24, &sol_values, true);
494        assert!(result.is_ok());
495        let arr = result.unwrap();
496        assert!(arr.is_null(0));
497
498        // Without allow_decode_fail, it should error
499        let sol_values = vec![Some(DynSolValue::Int(I256::MAX, 24))];
500        let result = arrow_convert::to_int_impl::<Int32Type>(24, &sol_values, false);
501        assert!(result.is_err());
502    }
503
504    /// Decoding a `uint256` event field whose value is in `[2^255, 2^256 - 1]`
505    /// must succeed losslessly. The cell lands in `Decimal256` with its raw
506    /// 32-byte big-endian bit pattern preserved (so a naive signed reader will
507    /// see a negative value — callers reinterpret as unsigned).
508    #[test]
509    fn test_decode_uint256_above_signed_max() {
510        use arrow::array::{Decimal256Array, GenericBinaryBuilder};
511
512        let sig = "Filled(uint256 tokenId)";
513        let token_id = U256::MAX;
514
515        let selector = signature_to_topic0(sig).unwrap();
516        let mut topic0_b = GenericBinaryBuilder::<i32>::new();
517        let mut topic1_b = GenericBinaryBuilder::<i32>::new();
518        let mut topic2_b = GenericBinaryBuilder::<i32>::new();
519        let mut topic3_b = GenericBinaryBuilder::<i32>::new();
520        let mut data_b = GenericBinaryBuilder::<i32>::new();
521        topic0_b.append_value(selector);
522        topic1_b.append_null();
523        topic2_b.append_null();
524        topic3_b.append_null();
525        data_b.append_value(token_id.to_be_bytes::<32>());
526
527        let schema = Arc::new(Schema::new(vec![
528            Field::new("topic0", DataType::Binary, true),
529            Field::new("topic1", DataType::Binary, true),
530            Field::new("topic2", DataType::Binary, true),
531            Field::new("topic3", DataType::Binary, true),
532            Field::new("data", DataType::Binary, true),
533        ]));
534        let batch = RecordBatch::try_new(
535            schema,
536            vec![
537                Arc::new(topic0_b.finish()),
538                Arc::new(topic1_b.finish()),
539                Arc::new(topic2_b.finish()),
540                Arc::new(topic3_b.finish()),
541                Arc::new(data_b.finish()),
542            ],
543        )
544        .unwrap();
545
546        let result = decode_events(sig, &batch, false, false, false, false).unwrap();
547        let col = result
548            .column_by_name("tokenId")
549            .unwrap()
550            .as_any()
551            .downcast_ref::<Decimal256Array>()
552            .unwrap();
553        let expected = arrow::datatypes::i256::from_be_bytes(token_id.to_be_bytes::<32>());
554        assert_eq!(col.value(0), expected);
555    }
556
557    /// Same shape as the uint256 case, but for `uint128`. The cell lands in
558    /// `Decimal128` with the low 16 bytes equal to `u128::MAX`.
559    #[test]
560    fn test_decode_uint128_above_signed_max() {
561        use arrow::array::{Decimal128Array, GenericBinaryBuilder};
562
563        let sig = "Filled(uint128 amount)";
564        let amount = u128::MAX;
565
566        let selector = signature_to_topic0(sig).unwrap();
567        let mut topic0_b = GenericBinaryBuilder::<i32>::new();
568        let mut topic1_b = GenericBinaryBuilder::<i32>::new();
569        let mut topic2_b = GenericBinaryBuilder::<i32>::new();
570        let mut topic3_b = GenericBinaryBuilder::<i32>::new();
571        let mut data_b = GenericBinaryBuilder::<i32>::new();
572        topic0_b.append_value(selector);
573        topic1_b.append_null();
574        topic2_b.append_null();
575        topic3_b.append_null();
576        // uint128 is zero-extended to 32 bytes on the wire.
577        let mut body = [0u8; 32];
578        body[16..].copy_from_slice(&amount.to_be_bytes());
579        data_b.append_value(body);
580
581        let schema = Arc::new(Schema::new(vec![
582            Field::new("topic0", DataType::Binary, true),
583            Field::new("topic1", DataType::Binary, true),
584            Field::new("topic2", DataType::Binary, true),
585            Field::new("topic3", DataType::Binary, true),
586            Field::new("data", DataType::Binary, true),
587        ]));
588        let batch = RecordBatch::try_new(
589            schema,
590            vec![
591                Arc::new(topic0_b.finish()),
592                Arc::new(topic1_b.finish()),
593                Arc::new(topic2_b.finish()),
594                Arc::new(topic3_b.finish()),
595                Arc::new(data_b.finish()),
596            ],
597        )
598        .unwrap();
599
600        let result = decode_events(sig, &batch, false, false, false, false).unwrap();
601        let col = result
602            .column_by_name("amount")
603            .unwrap()
604            .as_any()
605            .downcast_ref::<Decimal128Array>()
606            .unwrap();
607        // u128::MAX reinterpreted as i128 is -1.
608        assert_eq!(col.value(0), -1i128);
609    }
610
611    /// With `large_int_as_binary=true`, wide ints become 32-byte BE Binary.
612    /// Exercises both an indexed `uint256` topic and a body `uint256` so both
613    /// decode paths are covered. Also cross-checks that the generated schema
614    /// matches the produced batch when called with the same flag.
615    #[test]
616    fn test_decode_large_uint_as_binary() {
617        use arrow::array::{BinaryArray, GenericBinaryBuilder};
618
619        let sig = "Filled(uint256 indexed indexedId, uint256 bodyId)";
620        let indexed_id = U256::MAX;
621        let body_id = U256::MAX - U256::from(1u64);
622
623        let selector = signature_to_topic0(sig).unwrap();
624        let mut topic0_b = GenericBinaryBuilder::<i32>::new();
625        let mut topic1_b = GenericBinaryBuilder::<i32>::new();
626        let mut topic2_b = GenericBinaryBuilder::<i32>::new();
627        let mut topic3_b = GenericBinaryBuilder::<i32>::new();
628        let mut data_b = GenericBinaryBuilder::<i32>::new();
629        topic0_b.append_value(selector);
630        topic1_b.append_value(indexed_id.to_be_bytes::<32>());
631        topic2_b.append_null();
632        topic3_b.append_null();
633        data_b.append_value(body_id.to_be_bytes::<32>());
634
635        let schema = Arc::new(Schema::new(vec![
636            Field::new("topic0", DataType::Binary, true),
637            Field::new("topic1", DataType::Binary, true),
638            Field::new("topic2", DataType::Binary, true),
639            Field::new("topic3", DataType::Binary, true),
640            Field::new("data", DataType::Binary, true),
641        ]));
642        let batch = RecordBatch::try_new(
643            schema,
644            vec![
645                Arc::new(topic0_b.finish()),
646                Arc::new(topic1_b.finish()),
647                Arc::new(topic2_b.finish()),
648                Arc::new(topic3_b.finish()),
649                Arc::new(data_b.finish()),
650            ],
651        )
652        .unwrap();
653
654        let result = decode_events(sig, &batch, false, false, false, true).unwrap();
655
656        let indexed_col = result
657            .column_by_name("indexedId")
658            .unwrap()
659            .as_any()
660            .downcast_ref::<BinaryArray>()
661            .unwrap();
662        assert_eq!(indexed_col.value(0), &indexed_id.to_be_bytes::<32>());
663        assert_eq!(indexed_col.value(0).len(), 32);
664
665        let body_col = result
666            .column_by_name("bodyId")
667            .unwrap()
668            .as_any()
669            .downcast_ref::<BinaryArray>()
670            .unwrap();
671        assert_eq!(body_col.value(0), &body_id.to_be_bytes::<32>());
672
673        // Schema produced with the same flag must match the resulting batch.
674        let schema_from_sig = event_signature_to_arrow_schema(sig, true).unwrap();
675        assert_eq!(
676            schema_from_sig
677                .field_with_name("indexedId")
678                .unwrap()
679                .data_type(),
680            &DataType::Binary
681        );
682        assert_eq!(
683            schema_from_sig
684                .field_with_name("bodyId")
685                .unwrap()
686                .data_type(),
687            &DataType::Binary
688        );
689    }
690
691    /// Signed `int256` with `large_int_as_binary=true` preserves two's-complement
692    /// in the 32-byte BE word: `I256::MIN` becomes `0x80` followed by 31 `0x00`.
693    #[test]
694    fn test_decode_int256_as_binary() {
695        use arrow::array::{BinaryArray, GenericBinaryBuilder};
696
697        let sig = "Signed(int256 value)";
698        let value = I256::MIN;
699
700        let selector = signature_to_topic0(sig).unwrap();
701        let mut topic0_b = GenericBinaryBuilder::<i32>::new();
702        let mut topic1_b = GenericBinaryBuilder::<i32>::new();
703        let mut topic2_b = GenericBinaryBuilder::<i32>::new();
704        let mut topic3_b = GenericBinaryBuilder::<i32>::new();
705        let mut data_b = GenericBinaryBuilder::<i32>::new();
706        topic0_b.append_value(selector);
707        topic1_b.append_null();
708        topic2_b.append_null();
709        topic3_b.append_null();
710        data_b.append_value(value.to_be_bytes::<32>());
711
712        let schema = Arc::new(Schema::new(vec![
713            Field::new("topic0", DataType::Binary, true),
714            Field::new("topic1", DataType::Binary, true),
715            Field::new("topic2", DataType::Binary, true),
716            Field::new("topic3", DataType::Binary, true),
717            Field::new("data", DataType::Binary, true),
718        ]));
719        let batch = RecordBatch::try_new(
720            schema,
721            vec![
722                Arc::new(topic0_b.finish()),
723                Arc::new(topic1_b.finish()),
724                Arc::new(topic2_b.finish()),
725                Arc::new(topic3_b.finish()),
726                Arc::new(data_b.finish()),
727            ],
728        )
729        .unwrap();
730
731        let result = decode_events(sig, &batch, false, false, false, true).unwrap();
732        let col = result
733            .column_by_name("value")
734            .unwrap()
735            .as_any()
736            .downcast_ref::<BinaryArray>()
737            .unwrap();
738        let mut expected = [0u8; 32];
739        expected[0] = 0x80;
740        assert_eq!(col.value(0), &expected);
741    }
742
743    #[test]
744    fn test_topic0_filtering_with_allow_decode_fail() {
745        use arrow::array::GenericBinaryBuilder;
746
747        let swap_sig = "Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)";
748        let mint_sig = "Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)";
749
750        let swap_selector = signature_to_topic0(swap_sig).unwrap();
751        let mint_selector = signature_to_topic0(mint_sig).unwrap();
752
753        // Build a batch with 2 rows: one Swap event and one Mint event
754        let mut topic0_builder = GenericBinaryBuilder::<i32>::new();
755        let mut topic1_builder = GenericBinaryBuilder::<i32>::new();
756        let mut topic2_builder = GenericBinaryBuilder::<i32>::new();
757        let mut topic3_builder = GenericBinaryBuilder::<i32>::new();
758        let mut data_builder = GenericBinaryBuilder::<i32>::new();
759
760        let addr = [0u8; 32];
761
762        // Row 0: Swap event
763        topic0_builder.append_value(swap_selector);
764        topic1_builder.append_value(addr);
765        topic2_builder.append_value(addr);
766        topic3_builder.append_null();
767        let amount0 = I256::try_from(-1000i64).unwrap();
768        let amount1 = I256::try_from(2000i64).unwrap();
769        let sqrt_price: U256 = U256::from(1u64) << 96;
770        let liquidity = U256::from(1000000u64);
771        let tick = I256::try_from(-100i64).unwrap();
772        let mut swap_body = Vec::new();
773        swap_body.extend_from_slice(&amount0.to_be_bytes::<32>());
774        swap_body.extend_from_slice(&amount1.to_be_bytes::<32>());
775        swap_body.extend_from_slice(&sqrt_price.to_be_bytes::<32>());
776        swap_body.extend_from_slice(&liquidity.to_be_bytes::<32>());
777        swap_body.extend_from_slice(&tick.to_be_bytes::<32>());
778        data_builder.append_value(&swap_body);
779
780        // Row 1: Mint event (different topic0, different body layout)
781        topic0_builder.append_value(mint_selector);
782        topic1_builder.append_value(addr);
783        topic2_builder.append_value(addr);
784        topic3_builder.append_value(addr);
785        let mint_body = vec![0u8; 32 * 4]; // sender, amount, amount0, amount1
786        data_builder.append_value(&mint_body);
787
788        let schema = Arc::new(Schema::new(vec![
789            Field::new("topic0", DataType::Binary, true),
790            Field::new("topic1", DataType::Binary, true),
791            Field::new("topic2", DataType::Binary, true),
792            Field::new("topic3", DataType::Binary, true),
793            Field::new("data", DataType::Binary, true),
794        ]));
795
796        let batch = RecordBatch::try_new(
797            schema,
798            vec![
799                Arc::new(topic0_builder.finish()),
800                Arc::new(topic1_builder.finish()),
801                Arc::new(topic2_builder.finish()),
802                Arc::new(topic3_builder.finish()),
803                Arc::new(data_builder.finish()),
804            ],
805        )
806        .unwrap();
807
808        // With filter_by_topic0=true, should filter to only the Swap row
809        let result = decode_events(swap_sig, &batch, true, true, false, false).unwrap();
810        assert_eq!(result.num_rows(), 1, "should only decode the Swap row");
811
812        // With filter_by_topic0=true and hstack=true, decoded + input columns are returned
813        let result = decode_events(swap_sig, &batch, true, true, true, false).unwrap();
814        assert_eq!(result.num_rows(), 1);
815        // Should have decoded columns + original input columns
816        assert!(
817            result.column_by_name("topic0").is_some(),
818            "hstack should include original input columns"
819        );
820    }
821
822    #[test]
823    fn test_decode_call_inputs_named_tuple_via_abi_json() {
824        use arrow::array::{BinaryArray, Decimal128Array, Decimal256Array, GenericBinaryBuilder};
825
826        // JSON ABI fragment with:
827        //  - a nested static tuple (config.breakdown) → tests two-level struct recursion
828        //  - a variable-length tuple array (rewards: tuple[]) → tests List<Struct> → Utf8
829        //
830        // All params land in a single calldata binary (no topic/body split like events).
831        let abi_json = r#"{
832            "type": "function",
833            "name": "setConfig",
834            "inputs": [
835                {"name": "assetId", "type": "uint256", "components": []},
836                {"name": "spoke",   "type": "address", "components": []},
837                {"name": "config", "type": "tuple", "components": [
838                    {"name": "sharesDelta",    "type": "int256", "components": []},
839                    {"name": "offsetRayDelta", "type": "int256", "components": []},
840                    {"name": "breakdown", "type": "tuple", "components": [
841                        {"name": "base",  "type": "uint128", "components": []},
842                        {"name": "bonus", "type": "uint128", "components": []}
843                    ]}
844                ]},
845                {"name": "rewards", "type": "tuple[]", "components": [
846                    {"name": "token",  "type": "address", "components": []},
847                    {"name": "amount", "type": "uint256", "components": []}
848                ]}
849            ],
850            "outputs": [],
851            "stateMutability": "nonpayable"
852        }"#;
853
854        let asset_id = U256::from(42u64);
855        let spoke_addr = [1u8; 20];
856        let shares_delta = I256::try_from(-100i64).unwrap();
857        let offset_ray_delta = I256::try_from(200i64).unwrap();
858        let base: u128 = 500;
859        let bonus: u128 = 999;
860        let reward_token_0 = [2u8; 20];
861        let reward_amount_0 = U256::from(1000u64);
862        let reward_token_1 = [3u8; 20];
863        let reward_amount_1 = U256::from(2000u64);
864
865        // ABI-encoded calldata (without the 4-byte function selector).
866        // assetId and spoke are static; config is a static tuple (4 words inline);
867        // rewards is dynamic (tuple[]) so the head holds a pointer to the tail.
868        //
869        // Head layout (7 words = 224 bytes):
870        //   [0..32]    assetId
871        //   [32..64]   spoke (address left-padded to 32 bytes)
872        //   [64..192]  config inline (sharesDelta, offsetRayDelta, base, bonus)
873        //   [192..224] offset for rewards = 224 (7 × 32, from start of encoding)
874        //
875        // Tail (starting at byte 224):
876        //   [224..256] rewards.length = 2
877        //   [256..320] rewards[0]: token (32 bytes) + amount (32 bytes)
878        //   [320..384] rewards[1]: token (32 bytes) + amount (32 bytes)
879        let mut calldata = Vec::new();
880        calldata.extend_from_slice(&asset_id.to_be_bytes::<32>());
881        let mut spoke_padded = [0u8; 32];
882        spoke_padded[12..].copy_from_slice(&spoke_addr);
883        calldata.extend_from_slice(&spoke_padded);
884        calldata.extend_from_slice(&shares_delta.to_be_bytes::<32>());
885        calldata.extend_from_slice(&offset_ray_delta.to_be_bytes::<32>());
886        calldata.extend_from_slice(&U256::from(base).to_be_bytes::<32>());
887        calldata.extend_from_slice(&U256::from(bonus).to_be_bytes::<32>());
888        calldata.extend_from_slice(&U256::from(7u64 * 32).to_be_bytes::<32>()); // rewards offset
889        calldata.extend_from_slice(&U256::from(2u64).to_be_bytes::<32>()); // rewards length
890        let mut reward_token_0_padded = [0u8; 32];
891        reward_token_0_padded[12..].copy_from_slice(&reward_token_0);
892        calldata.extend_from_slice(&reward_token_0_padded);
893        calldata.extend_from_slice(&reward_amount_0.to_be_bytes::<32>());
894        let mut reward_token_1_padded = [0u8; 32];
895        reward_token_1_padded[12..].copy_from_slice(&reward_token_1);
896        calldata.extend_from_slice(&reward_token_1_padded);
897        calldata.extend_from_slice(&reward_amount_1.to_be_bytes::<32>());
898
899        let mut builder = GenericBinaryBuilder::<i32>::new();
900        builder.append_value(&calldata);
901        let col = builder.finish();
902
903        let result = decode_call_inputs(abi_json, &col, false, false).unwrap();
904
905        assert_eq!(result.num_rows(), 1);
906
907        // Output must be fully flat — no Struct columns survive
908        for field in result.schema().fields() {
909            assert!(
910                !matches!(field.data_type(), DataType::Struct(_)),
911                "field '{}' should not be Struct after flattening",
912                field.name()
913            );
914        }
915
916        // Top-level params and dot-joined nested fields must all be present;
917        // tuple[] is serialised to a single Utf8 column.
918        let out_schema = result.schema();
919        assert!(out_schema.field_with_name("assetId").is_ok());
920        assert!(out_schema.field_with_name("spoke").is_ok());
921        assert!(out_schema.field_with_name("config.sharesDelta").is_ok());
922        assert!(out_schema.field_with_name("config.offsetRayDelta").is_ok());
923        assert!(out_schema.field_with_name("config.breakdown.base").is_ok());
924        assert!(out_schema.field_with_name("config.breakdown.bonus").is_ok());
925        assert_eq!(
926            out_schema.field_with_name("rewards").unwrap().data_type(),
927            &DataType::Utf8,
928            "variable-length tuple[] should be serialised to Utf8"
929        );
930
931        // Print the human-readable signature derived from the JSON ABI fragment
932        let func: alloy_json_abi::Function = serde_json::from_str(abi_json).unwrap();
933        println!("Human-readable signature: {}", func.full_signature());
934
935        // Print the serialised rewards string (List<Struct> → Utf8)
936        use arrow::array::StringArray;
937        let rewards_col = result
938            .column_by_name("rewards")
939            .unwrap()
940            .as_any()
941            .downcast_ref::<StringArray>()
942            .unwrap();
943        println!("rewards[0]: {}", rewards_col.value(0));
944
945        // spoke: address → 20 raw bytes
946        let spoke_col = result
947            .column_by_name("spoke")
948            .unwrap()
949            .as_any()
950            .downcast_ref::<BinaryArray>()
951            .unwrap();
952        assert_eq!(spoke_col.value(0), spoke_addr);
953
954        // config.breakdown.base and .bonus: uint128 → Decimal128(38, 0)
955        let base_col = result
956            .column_by_name("config.breakdown.base")
957            .unwrap()
958            .as_any()
959            .downcast_ref::<Decimal128Array>()
960            .unwrap();
961        assert_eq!(base_col.value(0), base as i128);
962
963        let bonus_col = result
964            .column_by_name("config.breakdown.bonus")
965            .unwrap()
966            .as_any()
967            .downcast_ref::<Decimal128Array>()
968            .unwrap();
969        assert_eq!(bonus_col.value(0), bonus as i128);
970
971        // assetId: uint256 → Decimal256(76, 0)
972        let asset_id_col = result
973            .column_by_name("assetId")
974            .unwrap()
975            .as_any()
976            .downcast_ref::<Decimal256Array>()
977            .unwrap();
978        let expected = arrow::datatypes::i256::from_be_bytes(asset_id.to_be_bytes::<32>());
979        assert_eq!(asset_id_col.value(0), expected);
980
981        // config.sharesDelta: int256 → Decimal256(76, 0)
982        let shares_delta_col = result
983            .column_by_name("config.sharesDelta")
984            .unwrap()
985            .as_any()
986            .downcast_ref::<Decimal256Array>()
987            .unwrap();
988        let expected = arrow::datatypes::i256::from_be_bytes(shares_delta.to_be_bytes::<32>());
989        assert_eq!(shares_delta_col.value(0), expected);
990
991        // // Save decoded batch to parquet in the crate root
992        // use parquet::arrow::ArrowWriter;
993        // use std::fs::File;
994        // let file = File::create("refresh_premium_decoded.parquet").unwrap();
995        // let mut writer = ArrowWriter::try_new(file, result.schema(), None).unwrap();
996        // writer.write(&result).unwrap();
997        // writer.close().unwrap();
998    }
999
1000    #[test]
1001    fn test_decode_events_named_tuple_via_abi_json() {
1002        use arrow::array::{BinaryArray, Decimal128Array, GenericBinaryBuilder};
1003
1004        // JSON ABI fragment with:
1005        //  - a nested static tuple (premiumDelta.breakdown) → tests two-level struct recursion
1006        //  - a variable-length tuple array (rewards: tuple[]) → tests List<Struct> → Utf8
1007        let abi_json = r#"{
1008            "type": "event",
1009            "name": "RefreshPremium",
1010            "inputs": [
1011                {"name": "assetId", "type": "uint256", "indexed": true, "components": []},
1012                {"name": "spoke",   "type": "address", "indexed": true, "components": []},
1013                {"name": "premiumDelta", "type": "tuple", "indexed": false, "components": [
1014                    {"name": "sharesDelta",    "type": "int256", "components": []},
1015                    {"name": "offsetRayDelta", "type": "int256", "components": []},
1016                    {"name": "breakdown", "type": "tuple", "components": [
1017                        {"name": "base",  "type": "uint128", "components": []},
1018                        {"name": "bonus", "type": "uint128", "components": []}
1019                    ]}
1020                ]},
1021                {"name": "rewards", "type": "tuple[]", "indexed": false, "components": [
1022                    {"name": "token",  "type": "address", "components": []},
1023                    {"name": "amount", "type": "uint256", "components": []}
1024                ]}
1025            ],
1026            "anonymous": false
1027        }"#;
1028
1029        let asset_id = U256::from(42u64);
1030        let spoke_addr = [1u8; 20];
1031        let shares_delta = I256::try_from(-100i64).unwrap();
1032        let offset_ray_delta = I256::try_from(200i64).unwrap();
1033        let base: u128 = 500;
1034        let bonus: u128 = 999;
1035        let reward_token_0 = [2u8; 20];
1036        let reward_amount_0 = U256::from(1000u64);
1037        let reward_token_1 = [3u8; 20];
1038        let reward_amount_1 = U256::from(2000u64);
1039
1040        // topic1: uint256 → 32-byte big-endian
1041        let topic1 = asset_id.to_be_bytes::<32>();
1042
1043        // topic2: address → left-padded to 32 bytes
1044        let mut topic2 = [0u8; 32];
1045        topic2[12..].copy_from_slice(&spoke_addr);
1046
1047        // Body: ABI-encoded sequence (premiumDelta, rewards).
1048        // premiumDelta is static (4 × 32 = 128 bytes); rewards is dynamic (tuple[]).
1049        // ABI head/tail layout:
1050        //   [0..128]   premiumDelta inline (4 words)
1051        //   [128..160] offset for rewards = 160 (start of tail, relative to head start)
1052        //   [160..192] rewards array length = 2
1053        //   [192..256] rewards[0]: token (32 bytes) + amount (32 bytes)
1054        //   [256..320] rewards[1]: token (32 bytes) + amount (32 bytes)
1055        let premiumdelta_word_count: u64 = 4; // sharesDelta + offsetRayDelta + base + bonus
1056        let head_size: u64 = premiumdelta_word_count * 32 + 32; // +32 for rewards offset word
1057        let mut body = Vec::new();
1058        // premiumDelta fields (static, encoded inline in the head)
1059        body.extend_from_slice(&shares_delta.to_be_bytes::<32>());
1060        body.extend_from_slice(&offset_ray_delta.to_be_bytes::<32>());
1061        body.extend_from_slice(&U256::from(base).to_be_bytes::<32>());
1062        body.extend_from_slice(&U256::from(bonus).to_be_bytes::<32>());
1063        // offset pointing to the start of rewards tail data
1064        body.extend_from_slice(&U256::from(head_size).to_be_bytes::<32>());
1065        // rewards tail: length + elements
1066        body.extend_from_slice(&U256::from(2u64).to_be_bytes::<32>());
1067        let mut reward_token_0_padded = [0u8; 32];
1068        reward_token_0_padded[12..].copy_from_slice(&reward_token_0);
1069        body.extend_from_slice(&reward_token_0_padded);
1070        body.extend_from_slice(&reward_amount_0.to_be_bytes::<32>());
1071        let mut reward_token_1_padded = [0u8; 32];
1072        reward_token_1_padded[12..].copy_from_slice(&reward_token_1);
1073        body.extend_from_slice(&reward_token_1_padded);
1074        body.extend_from_slice(&reward_amount_1.to_be_bytes::<32>());
1075
1076        let selector = abi_to_topic0(abi_json).unwrap();
1077
1078        let mut topic0_b = GenericBinaryBuilder::<i32>::new();
1079        let mut topic1_b = GenericBinaryBuilder::<i32>::new();
1080        let mut topic2_b = GenericBinaryBuilder::<i32>::new();
1081        let mut topic3_b = GenericBinaryBuilder::<i32>::new();
1082        let mut data_b = GenericBinaryBuilder::<i32>::new();
1083
1084        topic0_b.append_value(selector);
1085        topic1_b.append_value(topic1);
1086        topic2_b.append_value(topic2);
1087        topic3_b.append_null();
1088        data_b.append_value(&body);
1089
1090        let schema = Arc::new(Schema::new(vec![
1091            Field::new("topic0", DataType::Binary, true),
1092            Field::new("topic1", DataType::Binary, true),
1093            Field::new("topic2", DataType::Binary, true),
1094            Field::new("topic3", DataType::Binary, true),
1095            Field::new("data", DataType::Binary, true),
1096        ]));
1097
1098        let batch = RecordBatch::try_new(
1099            schema,
1100            vec![
1101                Arc::new(topic0_b.finish()),
1102                Arc::new(topic1_b.finish()),
1103                Arc::new(topic2_b.finish()),
1104                Arc::new(topic3_b.finish()),
1105                Arc::new(data_b.finish()),
1106            ],
1107        )
1108        .unwrap();
1109
1110        let result = decode_events(abi_json, &batch, false, false, false, false).unwrap();
1111
1112        assert_eq!(result.num_rows(), 1);
1113
1114        // Output must be fully flat — no Struct columns survive
1115        for field in result.schema().fields() {
1116            assert!(
1117                !matches!(field.data_type(), DataType::Struct(_)),
1118                "field '{}' should not be Struct after flattening",
1119                field.name()
1120            );
1121        }
1122
1123        // Indexed params are top-level; nested tuple fields are dot-joined two levels deep;
1124        // tuple[] is flattened to a single Utf8 column (variable-length → serialised).
1125        let out_schema = result.schema();
1126        assert!(out_schema.field_with_name("assetId").is_ok());
1127        assert!(out_schema.field_with_name("spoke").is_ok());
1128        assert!(out_schema
1129            .field_with_name("premiumDelta.sharesDelta")
1130            .is_ok());
1131        assert!(out_schema
1132            .field_with_name("premiumDelta.offsetRayDelta")
1133            .is_ok());
1134        assert!(out_schema
1135            .field_with_name("premiumDelta.breakdown.base")
1136            .is_ok());
1137        assert!(out_schema
1138            .field_with_name("premiumDelta.breakdown.bonus")
1139            .is_ok());
1140        assert_eq!(
1141            out_schema.field_with_name("rewards").unwrap().data_type(),
1142            &DataType::Utf8,
1143            "variable-length tuple[] should be serialised to Utf8"
1144        );
1145
1146        // Print the human-readable signature derived from the JSON ABI fragment
1147        let event: alloy_json_abi::Event = serde_json::from_str(abi_json).unwrap();
1148        println!("Human-readable signature: {}", event.full_signature());
1149
1150        // Print the serialised rewards string (List<Struct> → Utf8)
1151        use arrow::array::StringArray;
1152        let rewards_col = result
1153            .column_by_name("rewards")
1154            .unwrap()
1155            .as_any()
1156            .downcast_ref::<StringArray>()
1157            .unwrap();
1158        println!("rewards[0]: {}", rewards_col.value(0));
1159
1160        // spoke: address decodes to 20 raw bytes
1161        let spoke_col = result
1162            .column_by_name("spoke")
1163            .unwrap()
1164            .as_any()
1165            .downcast_ref::<BinaryArray>()
1166            .unwrap();
1167        assert_eq!(spoke_col.value(0), spoke_addr);
1168
1169        // breakdown.base and breakdown.bonus: uint128 → Decimal128(38, 0)
1170        let base_col = result
1171            .column_by_name("premiumDelta.breakdown.base")
1172            .unwrap()
1173            .as_any()
1174            .downcast_ref::<Decimal128Array>()
1175            .unwrap();
1176        assert_eq!(base_col.value(0), base as i128);
1177
1178        let bonus_col = result
1179            .column_by_name("premiumDelta.breakdown.bonus")
1180            .unwrap()
1181            .as_any()
1182            .downcast_ref::<Decimal128Array>()
1183            .unwrap();
1184        assert_eq!(bonus_col.value(0), bonus as i128);
1185
1186        // // Save decoded batch to parquet in the crate root
1187        // use parquet::arrow::ArrowWriter;
1188        // use std::fs::File;
1189        // let file = File::create("refresh_premium_decoded.parquet").unwrap();
1190        // let mut writer = ArrowWriter::try_new(file, result.schema(), None).unwrap();
1191        // writer.write(&result).unwrap();
1192        // writer.close().unwrap();
1193    }
1194
1195    #[test]
1196    #[ignore]
1197    fn nested_event_signature_to_schema() {
1198        let sig = "ConfiguredQuests(address editor, uint256[][], address indexed my_addr, (bool,bool[],(bool, uint256[]))[] questDetails)";
1199
1200        let schema = event_signature_to_arrow_schema(sig, false).unwrap();
1201
1202        let expected_schema = Schema::new(vec![
1203            Arc::new(Field::new("my_addr", DataType::Binary, true)),
1204            Arc::new(Field::new("editor", DataType::Binary, true)),
1205            Arc::new(Field::new(
1206                "param1",
1207                DataType::List(Arc::new(Field::new(
1208                    "",
1209                    DataType::List(Arc::new(Field::new("", DataType::Decimal256(76, 0), true))),
1210                    true,
1211                ))),
1212                true,
1213            )),
1214            Arc::new(Field::new(
1215                "questDetails",
1216                DataType::List(Arc::new(Field::new(
1217                    "",
1218                    DataType::Struct(Fields::from(vec![
1219                        Arc::new(Field::new("param0", DataType::Boolean, true)),
1220                        Arc::new(Field::new(
1221                            "param1",
1222                            DataType::List(Arc::new(Field::new("", DataType::Boolean, true))),
1223                            true,
1224                        )),
1225                        Arc::new(Field::new(
1226                            "param2",
1227                            DataType::Struct(Fields::from(vec![
1228                                Arc::new(Field::new("param0", DataType::Boolean, true)),
1229                                Arc::new(Field::new(
1230                                    "param1",
1231                                    DataType::List(Arc::new(Field::new(
1232                                        "",
1233                                        DataType::Decimal256(76, 0),
1234                                        true,
1235                                    ))),
1236                                    true,
1237                                )),
1238                            ])),
1239                            true,
1240                        )),
1241                    ])),
1242                    true,
1243                ))),
1244                true,
1245            )),
1246        ]);
1247
1248        assert_eq!(schema, expected_schema);
1249    }
1250
1251    #[test]
1252    #[ignore]
1253    fn i256_to_arrow_i256() {
1254        for val in [
1255            I256::MIN,
1256            I256::MAX,
1257            I256::MAX / I256::try_from(2i32).unwrap(),
1258        ] {
1259            let out = arrow::datatypes::i256::from_be_bytes(val.to_be_bytes::<32>());
1260
1261            assert_eq!(val.to_string(), out.to_string());
1262        }
1263    }
1264
1265    #[test]
1266    #[ignore]
1267    fn read_parquet_with_real_data() {
1268        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1269        use std::fs::File;
1270        let builder =
1271            ParquetRecordBatchReaderBuilder::try_new(File::open("logs.parquet").unwrap()).unwrap();
1272        let mut reader = builder.build().unwrap();
1273        let logs = reader.next().unwrap().unwrap();
1274
1275        let signature =
1276            "PairCreated(address indexed token0, address indexed token1, address pair,uint256)";
1277
1278        let decoded = decode_events(signature, &logs, false, false, false, false).unwrap();
1279
1280        // Save the filtered instructions to a new parquet file
1281        let mut file = File::create("decoded_logs.parquet").unwrap();
1282        let mut writer =
1283            parquet::arrow::ArrowWriter::try_new(&mut file, decoded.schema(), None).unwrap();
1284        writer.write(&decoded).unwrap();
1285        writer.close().unwrap();
1286    }
1287}