1mod abi;
19mod arrow_convert;
20
21use std::sync::Arc;
22
23use alloy_dyn_abi::{DynSolCall, DynSolEvent, DynSolType, DynSolValue, Specifier};
24use anyhow::{anyhow, Context, Result};
25use arrow::{
26 array::{
27 Array, BinaryArray, GenericBinaryArray, LargeBinaryArray, OffsetSizeTrait, RecordBatch,
28 StructArray,
29 },
30 compute,
31 datatypes::{DataType, Field, Schema},
32};
33
34pub use abi::*;
35use arrow_convert::{
36 build_topic0_mask, decode_body_named, decode_topic, param_to_arrow_dtype, to_struct_named,
37};
38
39pub fn signature_to_topic0(signature: &str) -> Result<[u8; 32]> {
42 let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
43 Ok(event.selector().into())
44}
45
46pub fn abi_to_topic0(abi_json: &str) -> Result<[u8; 32]> {
49 let event: alloy_json_abi::Event =
50 serde_json::from_str(abi_json).context("parse event ABI JSON")?;
51 Ok(event.selector().into())
52}
53
54pub fn decode_call_inputs<I: OffsetSizeTrait>(
65 signature: &str,
66 data: &GenericBinaryArray<I>,
67 allow_decode_fail: bool,
68 large_int_as_binary: bool,
69) -> Result<RecordBatch> {
70 decode_call_impl::<true, I>(signature, data, allow_decode_fail, large_int_as_binary)
71}
72
73pub fn decode_call_outputs<I: OffsetSizeTrait>(
84 signature: &str,
85 data: &GenericBinaryArray<I>,
86 allow_decode_fail: bool,
87 large_int_as_binary: bool,
88) -> Result<RecordBatch> {
89 decode_call_impl::<false, I>(signature, data, allow_decode_fail, large_int_as_binary)
90}
91
92fn decode_call_impl<const IS_INPUT: bool, I: OffsetSizeTrait>(
93 signature: &str,
94 data: &GenericBinaryArray<I>,
95 allow_decode_fail: bool,
96 large_int_as_binary: bool,
97) -> Result<RecordBatch> {
98 let (func, resolved) = resolve_function_signature(signature)?;
99
100 let schema = function_signature_to_arrow_schemas_impl(&func, large_int_as_binary)
101 .context("convert function signature to arrow schema")?;
102 let schema = if IS_INPUT { schema.0 } else { schema.1 };
103
104 let mut decoded = Vec::<Option<DynSolValue>>::with_capacity(data.len());
105
106 for blob in data {
107 match blob {
108 Some(blob) => {
109 let decode_res = if IS_INPUT {
110 resolved.abi_decode_input(blob)
111 } else {
112 resolved.abi_decode_output(blob)
113 };
114 match decode_res {
115 Ok(data) => decoded.push(Some(DynSolValue::Tuple(data))),
116 Err(e) if allow_decode_fail => {
117 log::debug!("failed to decode function call data: {e}");
118 decoded.push(None);
119 }
120 Err(e) => {
121 return Err(anyhow!("failed to decode function call data: {e}"));
122 }
123 }
124 }
125 None => decoded.push(None),
126 }
127 }
128
129 let sol_fields = if IS_INPUT {
130 resolved.types().to_vec()
131 } else {
132 resolved.returns().types().to_vec()
133 };
134 let params = if IS_INPUT {
135 &func.inputs
136 } else {
137 &func.outputs
138 };
139 let named: Vec<_> = params
140 .iter()
141 .map(|p| (p.name.as_str(), p.components.as_slice()))
142 .collect();
143
144 let array = to_struct_named(
145 &sol_fields,
146 &named,
147 decoded,
148 allow_decode_fail,
149 large_int_as_binary,
150 )
151 .context("map params to arrow")?;
152 let arr = array
153 .as_any()
154 .downcast_ref::<StructArray>()
155 .context("expected struct array from to_struct_named")?;
156
157 let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(arr.num_columns());
158 for f in arr.columns() {
159 arrays.push(f.clone());
160 }
161
162 let batch = RecordBatch::try_new(Arc::new(schema), arrays).context("construct arrow batch")?;
163 tiders_cast::flatten_record_batch(&batch).context("flatten decoded batch")
164}
165
166pub fn function_signature_to_arrow_schemas(
171 signature: &str,
172 large_int_as_binary: bool,
173) -> Result<(Schema, Schema)> {
174 let (func, _) = resolve_function_signature(signature)?;
175 let (input_schema, output_schema) =
176 function_signature_to_arrow_schemas_impl(&func, large_int_as_binary)?;
177 Ok((
178 tiders_cast::flatten_schema(&input_schema),
179 tiders_cast::flatten_schema(&output_schema),
180 ))
181}
182
183fn function_signature_to_arrow_schemas_impl(
184 func: &alloy_json_abi::Function,
185 large_int_as_binary: bool,
186) -> Result<(Schema, Schema)> {
187 let mut input_fields = Vec::with_capacity(func.inputs.len());
188 let mut output_fields = Vec::with_capacity(func.outputs.len());
189
190 for (i, param) in func.inputs.iter().enumerate() {
191 let dtype = param_to_arrow_dtype(¶m.ty, ¶m.components, large_int_as_binary)
192 .context("map to arrow type")?;
193 let name = if param.name.is_empty() {
194 format!("param{i}")
195 } else {
196 param.name.clone()
197 };
198 input_fields.push(Arc::new(Field::new(name, dtype, true)));
199 }
200
201 for (i, param) in func.outputs.iter().enumerate() {
202 let dtype = param_to_arrow_dtype(¶m.ty, ¶m.components, large_int_as_binary)
203 .context("map to arrow type")?;
204 let name = if param.name.is_empty() {
205 format!("param{i}")
206 } else {
207 param.name.clone()
208 };
209 output_fields.push(Arc::new(Field::new(name, dtype, true)));
210 }
211
212 Ok((Schema::new(input_fields), Schema::new(output_fields)))
213}
214
215fn parse_function_str(signature: &str) -> Result<alloy_json_abi::Function> {
216 if signature.starts_with('{') {
217 return serde_json::from_str(signature).context("parse function JSON");
218 }
219 if signature.contains("tuple(") {
220 log::warn!(
221 "Function signature contains `tuple(...)` which will produce unnamed fields in the Arrow schema. \
222 Consider passing a JSON ABI fragment instead.\n \
223 Signature: {signature}"
224 );
225 }
226 alloy_json_abi::Function::parse(signature).map_err(|e| {
227 anyhow!(
228 "{e}\n \
229 Hint: if the signature contains named tuple fields (e.g. `tuple(int256 foo, ...)`),\n \
230 either strip the inner names (e.g. `(int256,...)`) or pass a JSON ABI fragment."
231 )
232 })
233}
234
235fn resolve_function_signature(signature: &str) -> Result<(alloy_json_abi::Function, DynSolCall)> {
236 let func = parse_function_str(signature)?;
237 let resolved = func.resolve().context("resolve function signature")?;
238 Ok((func, resolved))
239}
240
241#[expect(clippy::fn_params_excessive_bools, reason = "stable public API")]
258pub fn decode_events(
259 signature: &str,
260 data: &RecordBatch,
261 allow_decode_fail: bool,
262 filter_by_topic0: bool,
263 hstack: bool,
264 large_int_as_binary: bool,
265) -> Result<RecordBatch> {
266 let (event, resolved) = resolve_event_signature(signature)?;
267
268 let data = if filter_by_topic0 {
270 filter_by_topic0_impl(&event, data)?
271 } else {
272 data.clone()
273 };
274
275 let schema = event_signature_to_arrow_schema_impl(&event, large_int_as_binary)
276 .context("convert event signature to arrow schema")?;
277
278 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
279 let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(fields.len());
280
281 for (sol_type, topic_name) in resolved
282 .indexed()
283 .iter()
284 .zip(&["topic1", "topic2", "topic3"])
285 {
286 let col = data
287 .column_by_name(topic_name)
288 .context("get topic column")?;
289
290 if col.data_type() == &DataType::Binary {
291 let arr = col
292 .as_any()
293 .downcast_ref::<BinaryArray>()
294 .context("downcast to BinaryArray")?;
295 decode_topic(
296 sol_type,
297 arr,
298 allow_decode_fail,
299 large_int_as_binary,
300 &mut arrays,
301 )
302 .context("decode topic")?;
303 } else if col.data_type() == &DataType::LargeBinary {
304 let arr = col
305 .as_any()
306 .downcast_ref::<LargeBinaryArray>()
307 .context("downcast to LargeBinaryArray")?;
308 decode_topic(
309 sol_type,
310 arr,
311 allow_decode_fail,
312 large_int_as_binary,
313 &mut arrays,
314 )
315 .context("decode topic")?;
316 }
317 }
318
319 let body_col = data.column_by_name("data").context("get data column")?;
320 let body_sol_type = DynSolType::Tuple(resolved.body().to_vec());
321 let body_params: Vec<&alloy_json_abi::EventParam> =
322 event.inputs.iter().filter(|i| !i.indexed).collect();
323
324 if body_col.data_type() == &DataType::Binary {
325 let arr = body_col
326 .as_any()
327 .downcast_ref::<BinaryArray>()
328 .context("downcast to BinaryArray")?;
329 decode_body_named(
330 &body_sol_type,
331 &body_params,
332 arr,
333 allow_decode_fail,
334 large_int_as_binary,
335 &mut arrays,
336 )
337 .context("decode body")?;
338 } else if body_col.data_type() == &DataType::LargeBinary {
339 let arr = body_col
340 .as_any()
341 .downcast_ref::<LargeBinaryArray>()
342 .context("downcast to LargeBinaryArray")?;
343 decode_body_named(
344 &body_sol_type,
345 &body_params,
346 arr,
347 allow_decode_fail,
348 large_int_as_binary,
349 &mut arrays,
350 )
351 .context("decode body")?;
352 }
353
354 if hstack {
355 for (i, col) in data.columns().iter().enumerate() {
356 fields.push(data.schema().field(i).clone().into());
357 arrays.push(col.clone());
358 }
359 }
360
361 let output_schema = Schema::new(fields);
362 let batch =
363 RecordBatch::try_new(Arc::new(output_schema), arrays).context("construct arrow batch")?;
364 tiders_cast::flatten_record_batch(&batch).context("flatten decoded batch")
365}
366
367pub fn event_signature_to_arrow_schema(
372 signature: &str,
373 large_int_as_binary: bool,
374) -> Result<Schema> {
375 let (event, _) = resolve_event_signature(signature)?;
376 let schema = event_signature_to_arrow_schema_impl(&event, large_int_as_binary)?;
377 Ok(tiders_cast::flatten_schema(&schema))
378}
379
380fn event_signature_to_arrow_schema_impl(
386 sig: &alloy_json_abi::Event,
387 large_int_as_binary: bool,
388) -> Result<Schema> {
389 let mut fields = Vec::<Arc<Field>>::new();
390
391 for (i, input) in sig.inputs.iter().enumerate() {
392 if input.indexed {
393 let name = if input.name.is_empty() {
394 format!("param{i}")
395 } else {
396 input.name.clone()
397 };
398 let dtype = param_to_arrow_dtype(&input.ty, &input.components, large_int_as_binary)
399 .context("map indexed param to arrow type")?;
400 fields.push(Arc::new(Field::new(name, dtype, true)));
401 }
402 }
403 for (i, input) in sig.inputs.iter().enumerate() {
404 if !input.indexed {
405 let name = if input.name.is_empty() {
406 format!("param{i}")
407 } else {
408 input.name.clone()
409 };
410 let dtype = param_to_arrow_dtype(&input.ty, &input.components, large_int_as_binary)
411 .context("map body param to arrow type")?;
412 fields.push(Arc::new(Field::new(name, dtype, true)));
413 }
414 }
415
416 Ok(Schema::new(fields))
417}
418
419fn parse_event_str(signature: &str) -> Result<alloy_json_abi::Event> {
430 if signature.starts_with('{') {
431 return serde_json::from_str(signature).context("parse event JSON");
432 }
433 if signature.contains("tuple(") {
434 log::warn!(
435 "Event signature contains `tuple(...)` which will produce unnamed fields in the Arrow schema. \
436 Consider passing a JSON ABI fragment instead.\n \
437 Signature: {signature}"
438 );
439 }
440 alloy_json_abi::Event::parse(signature).map_err(|e| {
441 anyhow!(
442 "{e}\n \
443 Hint: if the signature contains named tuple fields (e.g. `tuple(int256 foo, ...)`),\n \
444 either strip the inner names (e.g. `(int256,...)`) or pass a JSON ABI fragment."
445 )
446 })
447}
448
449fn resolve_event_signature(signature: &str) -> Result<(alloy_json_abi::Event, DynSolEvent)> {
451 let event = parse_event_str(signature)?;
452 let resolved = event.resolve().context("resolve event signature")?;
453 Ok((event, resolved))
454}
455
456fn filter_by_topic0_impl(event: &alloy_json_abi::Event, data: &RecordBatch) -> Result<RecordBatch> {
460 let Some(topic0_col) = data.column_by_name("topic0") else {
461 return Ok(data.clone());
462 };
463
464 let selector = event.selector();
465
466 let mask =
467 build_topic0_mask(topic0_col, selector.as_slice()).context("build topic0 filter mask")?;
468
469 let non_matching = mask.iter().filter(|v| !v.unwrap_or(false)).count();
470 if non_matching > 0 {
471 log::debug!(
472 "filtering out {non_matching} events whose topic0 does not match '{}'",
473 event.full_signature()
474 );
475 }
476
477 compute::filter_record_batch(data, &mask).context("filter record batch by topic0")
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use alloy_primitives::{I256, U256};
484 use arrow::datatypes::{Fields, Int32Type};
485
486 #[test]
487 fn test_int_overflow_with_allow_decode_fail() {
488 let sol_values = vec![Some(DynSolValue::Int(I256::MAX, 24))];
493 let result = arrow_convert::to_int_impl::<Int32Type>(24, &sol_values, true);
494 assert!(result.is_ok());
495 let arr = result.unwrap();
496 assert!(arr.is_null(0));
497
498 let sol_values = vec![Some(DynSolValue::Int(I256::MAX, 24))];
500 let result = arrow_convert::to_int_impl::<Int32Type>(24, &sol_values, false);
501 assert!(result.is_err());
502 }
503
504 #[test]
509 fn test_decode_uint256_above_signed_max() {
510 use arrow::array::{Decimal256Array, GenericBinaryBuilder};
511
512 let sig = "Filled(uint256 tokenId)";
513 let token_id = U256::MAX;
514
515 let selector = signature_to_topic0(sig).unwrap();
516 let mut topic0_b = GenericBinaryBuilder::<i32>::new();
517 let mut topic1_b = GenericBinaryBuilder::<i32>::new();
518 let mut topic2_b = GenericBinaryBuilder::<i32>::new();
519 let mut topic3_b = GenericBinaryBuilder::<i32>::new();
520 let mut data_b = GenericBinaryBuilder::<i32>::new();
521 topic0_b.append_value(selector);
522 topic1_b.append_null();
523 topic2_b.append_null();
524 topic3_b.append_null();
525 data_b.append_value(token_id.to_be_bytes::<32>());
526
527 let schema = Arc::new(Schema::new(vec![
528 Field::new("topic0", DataType::Binary, true),
529 Field::new("topic1", DataType::Binary, true),
530 Field::new("topic2", DataType::Binary, true),
531 Field::new("topic3", DataType::Binary, true),
532 Field::new("data", DataType::Binary, true),
533 ]));
534 let batch = RecordBatch::try_new(
535 schema,
536 vec![
537 Arc::new(topic0_b.finish()),
538 Arc::new(topic1_b.finish()),
539 Arc::new(topic2_b.finish()),
540 Arc::new(topic3_b.finish()),
541 Arc::new(data_b.finish()),
542 ],
543 )
544 .unwrap();
545
546 let result = decode_events(sig, &batch, false, false, false, false).unwrap();
547 let col = result
548 .column_by_name("tokenId")
549 .unwrap()
550 .as_any()
551 .downcast_ref::<Decimal256Array>()
552 .unwrap();
553 let expected = arrow::datatypes::i256::from_be_bytes(token_id.to_be_bytes::<32>());
554 assert_eq!(col.value(0), expected);
555 }
556
557 #[test]
560 fn test_decode_uint128_above_signed_max() {
561 use arrow::array::{Decimal128Array, GenericBinaryBuilder};
562
563 let sig = "Filled(uint128 amount)";
564 let amount = u128::MAX;
565
566 let selector = signature_to_topic0(sig).unwrap();
567 let mut topic0_b = GenericBinaryBuilder::<i32>::new();
568 let mut topic1_b = GenericBinaryBuilder::<i32>::new();
569 let mut topic2_b = GenericBinaryBuilder::<i32>::new();
570 let mut topic3_b = GenericBinaryBuilder::<i32>::new();
571 let mut data_b = GenericBinaryBuilder::<i32>::new();
572 topic0_b.append_value(selector);
573 topic1_b.append_null();
574 topic2_b.append_null();
575 topic3_b.append_null();
576 let mut body = [0u8; 32];
578 body[16..].copy_from_slice(&amount.to_be_bytes());
579 data_b.append_value(body);
580
581 let schema = Arc::new(Schema::new(vec![
582 Field::new("topic0", DataType::Binary, true),
583 Field::new("topic1", DataType::Binary, true),
584 Field::new("topic2", DataType::Binary, true),
585 Field::new("topic3", DataType::Binary, true),
586 Field::new("data", DataType::Binary, true),
587 ]));
588 let batch = RecordBatch::try_new(
589 schema,
590 vec![
591 Arc::new(topic0_b.finish()),
592 Arc::new(topic1_b.finish()),
593 Arc::new(topic2_b.finish()),
594 Arc::new(topic3_b.finish()),
595 Arc::new(data_b.finish()),
596 ],
597 )
598 .unwrap();
599
600 let result = decode_events(sig, &batch, false, false, false, false).unwrap();
601 let col = result
602 .column_by_name("amount")
603 .unwrap()
604 .as_any()
605 .downcast_ref::<Decimal128Array>()
606 .unwrap();
607 assert_eq!(col.value(0), -1i128);
609 }
610
611 #[test]
616 fn test_decode_large_uint_as_binary() {
617 use arrow::array::{BinaryArray, GenericBinaryBuilder};
618
619 let sig = "Filled(uint256 indexed indexedId, uint256 bodyId)";
620 let indexed_id = U256::MAX;
621 let body_id = U256::MAX - U256::from(1u64);
622
623 let selector = signature_to_topic0(sig).unwrap();
624 let mut topic0_b = GenericBinaryBuilder::<i32>::new();
625 let mut topic1_b = GenericBinaryBuilder::<i32>::new();
626 let mut topic2_b = GenericBinaryBuilder::<i32>::new();
627 let mut topic3_b = GenericBinaryBuilder::<i32>::new();
628 let mut data_b = GenericBinaryBuilder::<i32>::new();
629 topic0_b.append_value(selector);
630 topic1_b.append_value(indexed_id.to_be_bytes::<32>());
631 topic2_b.append_null();
632 topic3_b.append_null();
633 data_b.append_value(body_id.to_be_bytes::<32>());
634
635 let schema = Arc::new(Schema::new(vec![
636 Field::new("topic0", DataType::Binary, true),
637 Field::new("topic1", DataType::Binary, true),
638 Field::new("topic2", DataType::Binary, true),
639 Field::new("topic3", DataType::Binary, true),
640 Field::new("data", DataType::Binary, true),
641 ]));
642 let batch = RecordBatch::try_new(
643 schema,
644 vec![
645 Arc::new(topic0_b.finish()),
646 Arc::new(topic1_b.finish()),
647 Arc::new(topic2_b.finish()),
648 Arc::new(topic3_b.finish()),
649 Arc::new(data_b.finish()),
650 ],
651 )
652 .unwrap();
653
654 let result = decode_events(sig, &batch, false, false, false, true).unwrap();
655
656 let indexed_col = result
657 .column_by_name("indexedId")
658 .unwrap()
659 .as_any()
660 .downcast_ref::<BinaryArray>()
661 .unwrap();
662 assert_eq!(indexed_col.value(0), &indexed_id.to_be_bytes::<32>());
663 assert_eq!(indexed_col.value(0).len(), 32);
664
665 let body_col = result
666 .column_by_name("bodyId")
667 .unwrap()
668 .as_any()
669 .downcast_ref::<BinaryArray>()
670 .unwrap();
671 assert_eq!(body_col.value(0), &body_id.to_be_bytes::<32>());
672
673 let schema_from_sig = event_signature_to_arrow_schema(sig, true).unwrap();
675 assert_eq!(
676 schema_from_sig
677 .field_with_name("indexedId")
678 .unwrap()
679 .data_type(),
680 &DataType::Binary
681 );
682 assert_eq!(
683 schema_from_sig
684 .field_with_name("bodyId")
685 .unwrap()
686 .data_type(),
687 &DataType::Binary
688 );
689 }
690
691 #[test]
694 fn test_decode_int256_as_binary() {
695 use arrow::array::{BinaryArray, GenericBinaryBuilder};
696
697 let sig = "Signed(int256 value)";
698 let value = I256::MIN;
699
700 let selector = signature_to_topic0(sig).unwrap();
701 let mut topic0_b = GenericBinaryBuilder::<i32>::new();
702 let mut topic1_b = GenericBinaryBuilder::<i32>::new();
703 let mut topic2_b = GenericBinaryBuilder::<i32>::new();
704 let mut topic3_b = GenericBinaryBuilder::<i32>::new();
705 let mut data_b = GenericBinaryBuilder::<i32>::new();
706 topic0_b.append_value(selector);
707 topic1_b.append_null();
708 topic2_b.append_null();
709 topic3_b.append_null();
710 data_b.append_value(value.to_be_bytes::<32>());
711
712 let schema = Arc::new(Schema::new(vec![
713 Field::new("topic0", DataType::Binary, true),
714 Field::new("topic1", DataType::Binary, true),
715 Field::new("topic2", DataType::Binary, true),
716 Field::new("topic3", DataType::Binary, true),
717 Field::new("data", DataType::Binary, true),
718 ]));
719 let batch = RecordBatch::try_new(
720 schema,
721 vec![
722 Arc::new(topic0_b.finish()),
723 Arc::new(topic1_b.finish()),
724 Arc::new(topic2_b.finish()),
725 Arc::new(topic3_b.finish()),
726 Arc::new(data_b.finish()),
727 ],
728 )
729 .unwrap();
730
731 let result = decode_events(sig, &batch, false, false, false, true).unwrap();
732 let col = result
733 .column_by_name("value")
734 .unwrap()
735 .as_any()
736 .downcast_ref::<BinaryArray>()
737 .unwrap();
738 let mut expected = [0u8; 32];
739 expected[0] = 0x80;
740 assert_eq!(col.value(0), &expected);
741 }
742
743 #[test]
744 fn test_topic0_filtering_with_allow_decode_fail() {
745 use arrow::array::GenericBinaryBuilder;
746
747 let swap_sig = "Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)";
748 let mint_sig = "Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)";
749
750 let swap_selector = signature_to_topic0(swap_sig).unwrap();
751 let mint_selector = signature_to_topic0(mint_sig).unwrap();
752
753 let mut topic0_builder = GenericBinaryBuilder::<i32>::new();
755 let mut topic1_builder = GenericBinaryBuilder::<i32>::new();
756 let mut topic2_builder = GenericBinaryBuilder::<i32>::new();
757 let mut topic3_builder = GenericBinaryBuilder::<i32>::new();
758 let mut data_builder = GenericBinaryBuilder::<i32>::new();
759
760 let addr = [0u8; 32];
761
762 topic0_builder.append_value(swap_selector);
764 topic1_builder.append_value(addr);
765 topic2_builder.append_value(addr);
766 topic3_builder.append_null();
767 let amount0 = I256::try_from(-1000i64).unwrap();
768 let amount1 = I256::try_from(2000i64).unwrap();
769 let sqrt_price: U256 = U256::from(1u64) << 96;
770 let liquidity = U256::from(1000000u64);
771 let tick = I256::try_from(-100i64).unwrap();
772 let mut swap_body = Vec::new();
773 swap_body.extend_from_slice(&amount0.to_be_bytes::<32>());
774 swap_body.extend_from_slice(&amount1.to_be_bytes::<32>());
775 swap_body.extend_from_slice(&sqrt_price.to_be_bytes::<32>());
776 swap_body.extend_from_slice(&liquidity.to_be_bytes::<32>());
777 swap_body.extend_from_slice(&tick.to_be_bytes::<32>());
778 data_builder.append_value(&swap_body);
779
780 topic0_builder.append_value(mint_selector);
782 topic1_builder.append_value(addr);
783 topic2_builder.append_value(addr);
784 topic3_builder.append_value(addr);
785 let mint_body = vec![0u8; 32 * 4]; data_builder.append_value(&mint_body);
787
788 let schema = Arc::new(Schema::new(vec![
789 Field::new("topic0", DataType::Binary, true),
790 Field::new("topic1", DataType::Binary, true),
791 Field::new("topic2", DataType::Binary, true),
792 Field::new("topic3", DataType::Binary, true),
793 Field::new("data", DataType::Binary, true),
794 ]));
795
796 let batch = RecordBatch::try_new(
797 schema,
798 vec![
799 Arc::new(topic0_builder.finish()),
800 Arc::new(topic1_builder.finish()),
801 Arc::new(topic2_builder.finish()),
802 Arc::new(topic3_builder.finish()),
803 Arc::new(data_builder.finish()),
804 ],
805 )
806 .unwrap();
807
808 let result = decode_events(swap_sig, &batch, true, true, false, false).unwrap();
810 assert_eq!(result.num_rows(), 1, "should only decode the Swap row");
811
812 let result = decode_events(swap_sig, &batch, true, true, true, false).unwrap();
814 assert_eq!(result.num_rows(), 1);
815 assert!(
817 result.column_by_name("topic0").is_some(),
818 "hstack should include original input columns"
819 );
820 }
821
822 #[test]
823 fn test_decode_call_inputs_named_tuple_via_abi_json() {
824 use arrow::array::{BinaryArray, Decimal128Array, Decimal256Array, GenericBinaryBuilder};
825
826 let abi_json = r#"{
832 "type": "function",
833 "name": "setConfig",
834 "inputs": [
835 {"name": "assetId", "type": "uint256", "components": []},
836 {"name": "spoke", "type": "address", "components": []},
837 {"name": "config", "type": "tuple", "components": [
838 {"name": "sharesDelta", "type": "int256", "components": []},
839 {"name": "offsetRayDelta", "type": "int256", "components": []},
840 {"name": "breakdown", "type": "tuple", "components": [
841 {"name": "base", "type": "uint128", "components": []},
842 {"name": "bonus", "type": "uint128", "components": []}
843 ]}
844 ]},
845 {"name": "rewards", "type": "tuple[]", "components": [
846 {"name": "token", "type": "address", "components": []},
847 {"name": "amount", "type": "uint256", "components": []}
848 ]}
849 ],
850 "outputs": [],
851 "stateMutability": "nonpayable"
852 }"#;
853
854 let asset_id = U256::from(42u64);
855 let spoke_addr = [1u8; 20];
856 let shares_delta = I256::try_from(-100i64).unwrap();
857 let offset_ray_delta = I256::try_from(200i64).unwrap();
858 let base: u128 = 500;
859 let bonus: u128 = 999;
860 let reward_token_0 = [2u8; 20];
861 let reward_amount_0 = U256::from(1000u64);
862 let reward_token_1 = [3u8; 20];
863 let reward_amount_1 = U256::from(2000u64);
864
865 let mut calldata = Vec::new();
880 calldata.extend_from_slice(&asset_id.to_be_bytes::<32>());
881 let mut spoke_padded = [0u8; 32];
882 spoke_padded[12..].copy_from_slice(&spoke_addr);
883 calldata.extend_from_slice(&spoke_padded);
884 calldata.extend_from_slice(&shares_delta.to_be_bytes::<32>());
885 calldata.extend_from_slice(&offset_ray_delta.to_be_bytes::<32>());
886 calldata.extend_from_slice(&U256::from(base).to_be_bytes::<32>());
887 calldata.extend_from_slice(&U256::from(bonus).to_be_bytes::<32>());
888 calldata.extend_from_slice(&U256::from(7u64 * 32).to_be_bytes::<32>()); calldata.extend_from_slice(&U256::from(2u64).to_be_bytes::<32>()); let mut reward_token_0_padded = [0u8; 32];
891 reward_token_0_padded[12..].copy_from_slice(&reward_token_0);
892 calldata.extend_from_slice(&reward_token_0_padded);
893 calldata.extend_from_slice(&reward_amount_0.to_be_bytes::<32>());
894 let mut reward_token_1_padded = [0u8; 32];
895 reward_token_1_padded[12..].copy_from_slice(&reward_token_1);
896 calldata.extend_from_slice(&reward_token_1_padded);
897 calldata.extend_from_slice(&reward_amount_1.to_be_bytes::<32>());
898
899 let mut builder = GenericBinaryBuilder::<i32>::new();
900 builder.append_value(&calldata);
901 let col = builder.finish();
902
903 let result = decode_call_inputs(abi_json, &col, false, false).unwrap();
904
905 assert_eq!(result.num_rows(), 1);
906
907 for field in result.schema().fields() {
909 assert!(
910 !matches!(field.data_type(), DataType::Struct(_)),
911 "field '{}' should not be Struct after flattening",
912 field.name()
913 );
914 }
915
916 let out_schema = result.schema();
919 assert!(out_schema.field_with_name("assetId").is_ok());
920 assert!(out_schema.field_with_name("spoke").is_ok());
921 assert!(out_schema.field_with_name("config.sharesDelta").is_ok());
922 assert!(out_schema.field_with_name("config.offsetRayDelta").is_ok());
923 assert!(out_schema.field_with_name("config.breakdown.base").is_ok());
924 assert!(out_schema.field_with_name("config.breakdown.bonus").is_ok());
925 assert_eq!(
926 out_schema.field_with_name("rewards").unwrap().data_type(),
927 &DataType::Utf8,
928 "variable-length tuple[] should be serialised to Utf8"
929 );
930
931 let func: alloy_json_abi::Function = serde_json::from_str(abi_json).unwrap();
933 println!("Human-readable signature: {}", func.full_signature());
934
935 use arrow::array::StringArray;
937 let rewards_col = result
938 .column_by_name("rewards")
939 .unwrap()
940 .as_any()
941 .downcast_ref::<StringArray>()
942 .unwrap();
943 println!("rewards[0]: {}", rewards_col.value(0));
944
945 let spoke_col = result
947 .column_by_name("spoke")
948 .unwrap()
949 .as_any()
950 .downcast_ref::<BinaryArray>()
951 .unwrap();
952 assert_eq!(spoke_col.value(0), spoke_addr);
953
954 let base_col = result
956 .column_by_name("config.breakdown.base")
957 .unwrap()
958 .as_any()
959 .downcast_ref::<Decimal128Array>()
960 .unwrap();
961 assert_eq!(base_col.value(0), base as i128);
962
963 let bonus_col = result
964 .column_by_name("config.breakdown.bonus")
965 .unwrap()
966 .as_any()
967 .downcast_ref::<Decimal128Array>()
968 .unwrap();
969 assert_eq!(bonus_col.value(0), bonus as i128);
970
971 let asset_id_col = result
973 .column_by_name("assetId")
974 .unwrap()
975 .as_any()
976 .downcast_ref::<Decimal256Array>()
977 .unwrap();
978 let expected = arrow::datatypes::i256::from_be_bytes(asset_id.to_be_bytes::<32>());
979 assert_eq!(asset_id_col.value(0), expected);
980
981 let shares_delta_col = result
983 .column_by_name("config.sharesDelta")
984 .unwrap()
985 .as_any()
986 .downcast_ref::<Decimal256Array>()
987 .unwrap();
988 let expected = arrow::datatypes::i256::from_be_bytes(shares_delta.to_be_bytes::<32>());
989 assert_eq!(shares_delta_col.value(0), expected);
990
991 }
999
1000 #[test]
1001 fn test_decode_events_named_tuple_via_abi_json() {
1002 use arrow::array::{BinaryArray, Decimal128Array, GenericBinaryBuilder};
1003
1004 let abi_json = r#"{
1008 "type": "event",
1009 "name": "RefreshPremium",
1010 "inputs": [
1011 {"name": "assetId", "type": "uint256", "indexed": true, "components": []},
1012 {"name": "spoke", "type": "address", "indexed": true, "components": []},
1013 {"name": "premiumDelta", "type": "tuple", "indexed": false, "components": [
1014 {"name": "sharesDelta", "type": "int256", "components": []},
1015 {"name": "offsetRayDelta", "type": "int256", "components": []},
1016 {"name": "breakdown", "type": "tuple", "components": [
1017 {"name": "base", "type": "uint128", "components": []},
1018 {"name": "bonus", "type": "uint128", "components": []}
1019 ]}
1020 ]},
1021 {"name": "rewards", "type": "tuple[]", "indexed": false, "components": [
1022 {"name": "token", "type": "address", "components": []},
1023 {"name": "amount", "type": "uint256", "components": []}
1024 ]}
1025 ],
1026 "anonymous": false
1027 }"#;
1028
1029 let asset_id = U256::from(42u64);
1030 let spoke_addr = [1u8; 20];
1031 let shares_delta = I256::try_from(-100i64).unwrap();
1032 let offset_ray_delta = I256::try_from(200i64).unwrap();
1033 let base: u128 = 500;
1034 let bonus: u128 = 999;
1035 let reward_token_0 = [2u8; 20];
1036 let reward_amount_0 = U256::from(1000u64);
1037 let reward_token_1 = [3u8; 20];
1038 let reward_amount_1 = U256::from(2000u64);
1039
1040 let topic1 = asset_id.to_be_bytes::<32>();
1042
1043 let mut topic2 = [0u8; 32];
1045 topic2[12..].copy_from_slice(&spoke_addr);
1046
1047 let premiumdelta_word_count: u64 = 4; let head_size: u64 = premiumdelta_word_count * 32 + 32; let mut body = Vec::new();
1058 body.extend_from_slice(&shares_delta.to_be_bytes::<32>());
1060 body.extend_from_slice(&offset_ray_delta.to_be_bytes::<32>());
1061 body.extend_from_slice(&U256::from(base).to_be_bytes::<32>());
1062 body.extend_from_slice(&U256::from(bonus).to_be_bytes::<32>());
1063 body.extend_from_slice(&U256::from(head_size).to_be_bytes::<32>());
1065 body.extend_from_slice(&U256::from(2u64).to_be_bytes::<32>());
1067 let mut reward_token_0_padded = [0u8; 32];
1068 reward_token_0_padded[12..].copy_from_slice(&reward_token_0);
1069 body.extend_from_slice(&reward_token_0_padded);
1070 body.extend_from_slice(&reward_amount_0.to_be_bytes::<32>());
1071 let mut reward_token_1_padded = [0u8; 32];
1072 reward_token_1_padded[12..].copy_from_slice(&reward_token_1);
1073 body.extend_from_slice(&reward_token_1_padded);
1074 body.extend_from_slice(&reward_amount_1.to_be_bytes::<32>());
1075
1076 let selector = abi_to_topic0(abi_json).unwrap();
1077
1078 let mut topic0_b = GenericBinaryBuilder::<i32>::new();
1079 let mut topic1_b = GenericBinaryBuilder::<i32>::new();
1080 let mut topic2_b = GenericBinaryBuilder::<i32>::new();
1081 let mut topic3_b = GenericBinaryBuilder::<i32>::new();
1082 let mut data_b = GenericBinaryBuilder::<i32>::new();
1083
1084 topic0_b.append_value(selector);
1085 topic1_b.append_value(topic1);
1086 topic2_b.append_value(topic2);
1087 topic3_b.append_null();
1088 data_b.append_value(&body);
1089
1090 let schema = Arc::new(Schema::new(vec![
1091 Field::new("topic0", DataType::Binary, true),
1092 Field::new("topic1", DataType::Binary, true),
1093 Field::new("topic2", DataType::Binary, true),
1094 Field::new("topic3", DataType::Binary, true),
1095 Field::new("data", DataType::Binary, true),
1096 ]));
1097
1098 let batch = RecordBatch::try_new(
1099 schema,
1100 vec![
1101 Arc::new(topic0_b.finish()),
1102 Arc::new(topic1_b.finish()),
1103 Arc::new(topic2_b.finish()),
1104 Arc::new(topic3_b.finish()),
1105 Arc::new(data_b.finish()),
1106 ],
1107 )
1108 .unwrap();
1109
1110 let result = decode_events(abi_json, &batch, false, false, false, false).unwrap();
1111
1112 assert_eq!(result.num_rows(), 1);
1113
1114 for field in result.schema().fields() {
1116 assert!(
1117 !matches!(field.data_type(), DataType::Struct(_)),
1118 "field '{}' should not be Struct after flattening",
1119 field.name()
1120 );
1121 }
1122
1123 let out_schema = result.schema();
1126 assert!(out_schema.field_with_name("assetId").is_ok());
1127 assert!(out_schema.field_with_name("spoke").is_ok());
1128 assert!(out_schema
1129 .field_with_name("premiumDelta.sharesDelta")
1130 .is_ok());
1131 assert!(out_schema
1132 .field_with_name("premiumDelta.offsetRayDelta")
1133 .is_ok());
1134 assert!(out_schema
1135 .field_with_name("premiumDelta.breakdown.base")
1136 .is_ok());
1137 assert!(out_schema
1138 .field_with_name("premiumDelta.breakdown.bonus")
1139 .is_ok());
1140 assert_eq!(
1141 out_schema.field_with_name("rewards").unwrap().data_type(),
1142 &DataType::Utf8,
1143 "variable-length tuple[] should be serialised to Utf8"
1144 );
1145
1146 let event: alloy_json_abi::Event = serde_json::from_str(abi_json).unwrap();
1148 println!("Human-readable signature: {}", event.full_signature());
1149
1150 use arrow::array::StringArray;
1152 let rewards_col = result
1153 .column_by_name("rewards")
1154 .unwrap()
1155 .as_any()
1156 .downcast_ref::<StringArray>()
1157 .unwrap();
1158 println!("rewards[0]: {}", rewards_col.value(0));
1159
1160 let spoke_col = result
1162 .column_by_name("spoke")
1163 .unwrap()
1164 .as_any()
1165 .downcast_ref::<BinaryArray>()
1166 .unwrap();
1167 assert_eq!(spoke_col.value(0), spoke_addr);
1168
1169 let base_col = result
1171 .column_by_name("premiumDelta.breakdown.base")
1172 .unwrap()
1173 .as_any()
1174 .downcast_ref::<Decimal128Array>()
1175 .unwrap();
1176 assert_eq!(base_col.value(0), base as i128);
1177
1178 let bonus_col = result
1179 .column_by_name("premiumDelta.breakdown.bonus")
1180 .unwrap()
1181 .as_any()
1182 .downcast_ref::<Decimal128Array>()
1183 .unwrap();
1184 assert_eq!(bonus_col.value(0), bonus as i128);
1185
1186 }
1194
1195 #[test]
1196 #[ignore]
1197 fn nested_event_signature_to_schema() {
1198 let sig = "ConfiguredQuests(address editor, uint256[][], address indexed my_addr, (bool,bool[],(bool, uint256[]))[] questDetails)";
1199
1200 let schema = event_signature_to_arrow_schema(sig, false).unwrap();
1201
1202 let expected_schema = Schema::new(vec![
1203 Arc::new(Field::new("my_addr", DataType::Binary, true)),
1204 Arc::new(Field::new("editor", DataType::Binary, true)),
1205 Arc::new(Field::new(
1206 "param1",
1207 DataType::List(Arc::new(Field::new(
1208 "",
1209 DataType::List(Arc::new(Field::new("", DataType::Decimal256(76, 0), true))),
1210 true,
1211 ))),
1212 true,
1213 )),
1214 Arc::new(Field::new(
1215 "questDetails",
1216 DataType::List(Arc::new(Field::new(
1217 "",
1218 DataType::Struct(Fields::from(vec![
1219 Arc::new(Field::new("param0", DataType::Boolean, true)),
1220 Arc::new(Field::new(
1221 "param1",
1222 DataType::List(Arc::new(Field::new("", DataType::Boolean, true))),
1223 true,
1224 )),
1225 Arc::new(Field::new(
1226 "param2",
1227 DataType::Struct(Fields::from(vec![
1228 Arc::new(Field::new("param0", DataType::Boolean, true)),
1229 Arc::new(Field::new(
1230 "param1",
1231 DataType::List(Arc::new(Field::new(
1232 "",
1233 DataType::Decimal256(76, 0),
1234 true,
1235 ))),
1236 true,
1237 )),
1238 ])),
1239 true,
1240 )),
1241 ])),
1242 true,
1243 ))),
1244 true,
1245 )),
1246 ]);
1247
1248 assert_eq!(schema, expected_schema);
1249 }
1250
1251 #[test]
1252 #[ignore]
1253 fn i256_to_arrow_i256() {
1254 for val in [
1255 I256::MIN,
1256 I256::MAX,
1257 I256::MAX / I256::try_from(2i32).unwrap(),
1258 ] {
1259 let out = arrow::datatypes::i256::from_be_bytes(val.to_be_bytes::<32>());
1260
1261 assert_eq!(val.to_string(), out.to_string());
1262 }
1263 }
1264
1265 #[test]
1266 #[ignore]
1267 fn read_parquet_with_real_data() {
1268 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1269 use std::fs::File;
1270 let builder =
1271 ParquetRecordBatchReaderBuilder::try_new(File::open("logs.parquet").unwrap()).unwrap();
1272 let mut reader = builder.build().unwrap();
1273 let logs = reader.next().unwrap().unwrap();
1274
1275 let signature =
1276 "PairCreated(address indexed token0, address indexed token1, address pair,uint256)";
1277
1278 let decoded = decode_events(signature, &logs, false, false, false, false).unwrap();
1279
1280 let mut file = File::create("decoded_logs.parquet").unwrap();
1282 let mut writer =
1283 parquet::arrow::ArrowWriter::try_new(&mut file, decoded.schema(), None).unwrap();
1284 writer.write(&decoded).unwrap();
1285 writer.close().unwrap();
1286 }
1287}