1mod abi;
19mod arrow_convert;
20
21use std::sync::Arc;
22
23use alloy_dyn_abi::{DynSolCall, DynSolEvent, DynSolType, DynSolValue, Specifier};
24use anyhow::{anyhow, Context, Result};
25use arrow::{
26 array::{
27 Array, BinaryArray, GenericBinaryArray, LargeBinaryArray, OffsetSizeTrait, RecordBatch,
28 StructArray,
29 },
30 compute,
31 datatypes::{DataType, Field, Schema},
32};
33
34pub use abi::*;
35use arrow_convert::{build_topic0_mask, decode_body, decode_topic, to_arrow, to_arrow_dtype};
36
37pub fn signature_to_topic0(signature: &str) -> Result<[u8; 32]> {
39 let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
40 Ok(event.selector().into())
41}
42
43pub fn decode_call_inputs<I: OffsetSizeTrait>(
50 signature: &str,
51 data: &GenericBinaryArray<I>,
52 allow_decode_fail: bool,
53) -> Result<RecordBatch> {
54 decode_call_impl::<true, I>(signature, data, allow_decode_fail)
55}
56
57pub fn decode_call_outputs<I: OffsetSizeTrait>(
64 signature: &str,
65 data: &GenericBinaryArray<I>,
66 allow_decode_fail: bool,
67) -> Result<RecordBatch> {
68 decode_call_impl::<false, I>(signature, data, allow_decode_fail)
69}
70
71fn decode_call_impl<const IS_INPUT: bool, I: OffsetSizeTrait>(
72 signature: &str,
73 data: &GenericBinaryArray<I>,
74 allow_decode_fail: bool,
75) -> Result<RecordBatch> {
76 let (call, resolved) = resolve_function_signature(signature)?;
77
78 let schema = function_signature_to_arrow_schemas_impl(&call, &resolved)
79 .context("convert event signature to arrow schema")?;
80 let schema = if IS_INPUT { schema.0 } else { schema.1 };
81
82 let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(schema.fields().len());
83
84 let mut decoded = Vec::<Option<DynSolValue>>::with_capacity(data.len());
85
86 for blob in data {
87 match blob {
88 Some(blob) => {
89 let decode_res = if IS_INPUT {
90 resolved.abi_decode_input(blob)
91 } else {
92 resolved.abi_decode_output(blob)
93 };
94 match decode_res {
95 Ok(data) => decoded.push(Some(DynSolValue::Tuple(data))),
96 Err(e) if allow_decode_fail => {
97 log::debug!("failed to decode function call data: {e}");
98 decoded.push(None);
99 }
100 Err(e) => {
101 return Err(anyhow!("failed to decode function call data: {e}"));
102 }
103 }
104 }
105 None => decoded.push(None),
106 }
107 }
108
109 let sol_type = if IS_INPUT {
110 DynSolType::Tuple(resolved.types().to_vec())
111 } else {
112 DynSolType::Tuple(resolved.returns().types().to_vec())
113 };
114
115 let array = to_arrow(&sol_type, decoded, allow_decode_fail).context("map params to arrow")?;
116 let arr = array
117 .as_any()
118 .downcast_ref::<StructArray>()
119 .context("expected struct array from to_arrow")?;
120
121 for f in arr.columns() {
122 arrays.push(f.clone());
123 }
124
125 RecordBatch::try_new(Arc::new(schema), arrays).context("construct arrow batch")
126}
127
128pub fn function_signature_to_arrow_schemas(signature: &str) -> Result<(Schema, Schema)> {
130 let (func, resolved) = resolve_function_signature(signature)?;
131 function_signature_to_arrow_schemas_impl(&func, &resolved)
132}
133
134fn function_signature_to_arrow_schemas_impl(
135 func: &alloy_json_abi::Function,
136 call: &DynSolCall,
137) -> Result<(Schema, Schema)> {
138 let mut input_fields = Vec::with_capacity(call.types().len());
139 let mut output_fields = Vec::with_capacity(call.returns().types().len());
140
141 for (i, (sol_t, param)) in call.types().iter().zip(func.inputs.iter()).enumerate() {
142 let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
143 let name = if param.name() == "" {
144 format!("param{i}")
145 } else {
146 param.name().to_owned()
147 };
148 input_fields.push(Arc::new(Field::new(name, dtype, true)));
149 }
150
151 for (i, (sol_t, param)) in call
152 .returns()
153 .types()
154 .iter()
155 .zip(func.outputs.iter())
156 .enumerate()
157 {
158 let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
159 let name = if param.name() == "" {
160 format!("param{i}")
161 } else {
162 param.name().to_owned()
163 };
164 output_fields.push(Arc::new(Field::new(name, dtype, true)));
165 }
166
167 Ok((Schema::new(input_fields), Schema::new(output_fields)))
168}
169
170fn resolve_function_signature(signature: &str) -> Result<(alloy_json_abi::Function, DynSolCall)> {
171 let event = alloy_json_abi::Function::parse(signature).context("parse function signature")?;
172 let resolved = event.resolve().context("resolve function signature")?;
173
174 Ok((event, resolved))
175}
176
177pub fn decode_events(
190 signature: &str,
191 data: &RecordBatch,
192 allow_decode_fail: bool,
193 filter_by_topic0: bool,
194 hstack: bool,
195) -> Result<RecordBatch> {
196 let (event, resolved) = resolve_event_signature(signature)?;
197
198 let data = if filter_by_topic0 {
200 filter_by_topic0_impl(&event, data)?
201 } else {
202 data.clone()
203 };
204
205 let schema = event_signature_to_arrow_schema_impl(&event, &resolved)
206 .context("convert event signature to arrow schema")?;
207
208 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
209 let mut arrays: Vec<Arc<dyn Array + 'static>> = Vec::with_capacity(fields.len());
210
211 for (sol_type, topic_name) in resolved
212 .indexed()
213 .iter()
214 .zip(&["topic1", "topic2", "topic3"])
215 {
216 let col = data
217 .column_by_name(topic_name)
218 .context("get topic column")?;
219
220 if col.data_type() == &DataType::Binary {
221 let arr = col
222 .as_any()
223 .downcast_ref::<BinaryArray>()
224 .context("downcast to BinaryArray")?;
225 decode_topic(sol_type, arr, allow_decode_fail, &mut arrays).context("decode topic")?;
226 } else if col.data_type() == &DataType::LargeBinary {
227 let arr = col
228 .as_any()
229 .downcast_ref::<LargeBinaryArray>()
230 .context("downcast to LargeBinaryArray")?;
231 decode_topic(sol_type, arr, allow_decode_fail, &mut arrays).context("decode topic")?;
232 }
233 }
234
235 let body_col = data.column_by_name("data").context("get data column")?;
236
237 let body_sol_type = DynSolType::Tuple(resolved.body().to_vec());
238
239 if body_col.data_type() == &DataType::Binary {
240 let arr = body_col
241 .as_any()
242 .downcast_ref::<BinaryArray>()
243 .context("downcast to BinaryArray")?;
244 decode_body(&body_sol_type, arr, allow_decode_fail, &mut arrays).context("decode body")?;
245 } else if body_col.data_type() == &DataType::LargeBinary {
246 let arr = body_col
247 .as_any()
248 .downcast_ref::<LargeBinaryArray>()
249 .context("downcast to LargeBinaryArray")?;
250 decode_body(&body_sol_type, arr, allow_decode_fail, &mut arrays).context("decode body")?;
251 }
252
253 if hstack {
254 for (i, col) in data.columns().iter().enumerate() {
255 fields.push(data.schema().field(i).clone().into());
256 arrays.push(col.clone());
257 }
258 }
259
260 let output_schema = Schema::new(fields);
261 RecordBatch::try_new(Arc::new(output_schema), arrays).context("construct arrow batch")
262}
263
264pub fn event_signature_to_arrow_schema(signature: &str) -> Result<Schema> {
266 let (resolved, event) = resolve_event_signature(signature)?;
267 event_signature_to_arrow_schema_impl(&resolved, &event)
268}
269
270fn event_signature_to_arrow_schema_impl(
271 sig: &alloy_json_abi::Event,
272 event: &DynSolEvent,
273) -> Result<Schema> {
274 let num_fields = event.indexed().len() + event.body().len();
275 let mut fields = Vec::<Arc<Field>>::with_capacity(num_fields);
276 let mut names = Vec::with_capacity(num_fields);
277
278 for (i, input) in sig.inputs.iter().enumerate() {
279 if input.indexed {
280 let name = if input.name.is_empty() {
281 format!("param{i}")
282 } else {
283 input.name.clone()
284 };
285 names.push(name);
286 }
287 }
288 for (i, input) in sig.inputs.iter().enumerate() {
289 if !input.indexed {
290 let name = if input.name.is_empty() {
291 format!("param{i}")
292 } else {
293 input.name.clone()
294 };
295 names.push(name);
296 }
297 }
298
299 for (sol_t, name) in event.indexed().iter().chain(event.body()).zip(names) {
300 let dtype = to_arrow_dtype(sol_t).context("map to arrow type")?;
301 fields.push(Arc::new(Field::new(name, dtype, true)));
302 }
303
304 Ok(Schema::new(fields))
305}
306
307fn resolve_event_signature(signature: &str) -> Result<(alloy_json_abi::Event, DynSolEvent)> {
308 let event = alloy_json_abi::Event::parse(signature).context("parse event signature")?;
309 let resolved = event.resolve().context("resolve event signature")?;
310
311 Ok((event, resolved))
312}
313
314fn filter_by_topic0_impl(event: &alloy_json_abi::Event, data: &RecordBatch) -> Result<RecordBatch> {
318 let Some(topic0_col) = data.column_by_name("topic0") else {
319 return Ok(data.clone());
320 };
321
322 let selector = event.selector();
323
324 let mask =
325 build_topic0_mask(topic0_col, selector.as_slice()).context("build topic0 filter mask")?;
326
327 let non_matching = mask.iter().filter(|v| !v.unwrap_or(false)).count();
328 if non_matching > 0 {
329 log::debug!(
330 "filtering out {non_matching} events whose topic0 does not match '{}'",
331 event.full_signature()
332 );
333 }
334
335 compute::filter_record_batch(data, &mask).context("filter record batch by topic0")
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use alloy_primitives::{I256, U256};
342 use arrow::datatypes::{Fields, Int32Type};
343
344 #[test]
345 fn test_int_overflow_with_allow_decode_fail() {
346 let sol_values = vec![Some(DynSolValue::Int(I256::MAX, 24))];
351 let result = arrow_convert::to_int_impl::<Int32Type>(24, &sol_values, true);
352 assert!(result.is_ok());
353 let arr = result.unwrap();
354 assert!(arr.is_null(0));
355
356 let sol_values = vec![Some(DynSolValue::Int(I256::MAX, 24))];
358 let result = arrow_convert::to_int_impl::<Int32Type>(24, &sol_values, false);
359 assert!(result.is_err());
360 }
361
362 #[test]
363 fn test_topic0_filtering_with_allow_decode_fail() {
364 use arrow::array::GenericBinaryBuilder;
365
366 let swap_sig = "Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)";
367 let mint_sig = "Mint(address sender, address indexed owner, int24 indexed tickLower, int24 indexed tickUpper, uint128 amount, uint256 amount0, uint256 amount1)";
368
369 let swap_selector = signature_to_topic0(swap_sig).unwrap();
370 let mint_selector = signature_to_topic0(mint_sig).unwrap();
371
372 let mut topic0_builder = GenericBinaryBuilder::<i32>::new();
374 let mut topic1_builder = GenericBinaryBuilder::<i32>::new();
375 let mut topic2_builder = GenericBinaryBuilder::<i32>::new();
376 let mut topic3_builder = GenericBinaryBuilder::<i32>::new();
377 let mut data_builder = GenericBinaryBuilder::<i32>::new();
378
379 let addr = [0u8; 32];
380
381 topic0_builder.append_value(swap_selector);
383 topic1_builder.append_value(addr);
384 topic2_builder.append_value(addr);
385 topic3_builder.append_null();
386 let amount0 = I256::try_from(-1000i64).unwrap();
387 let amount1 = I256::try_from(2000i64).unwrap();
388 let sqrt_price: U256 = U256::from(1u64) << 96;
389 let liquidity = U256::from(1000000u64);
390 let tick = I256::try_from(-100i64).unwrap();
391 let mut swap_body = Vec::new();
392 swap_body.extend_from_slice(&amount0.to_be_bytes::<32>());
393 swap_body.extend_from_slice(&amount1.to_be_bytes::<32>());
394 swap_body.extend_from_slice(&sqrt_price.to_be_bytes::<32>());
395 swap_body.extend_from_slice(&liquidity.to_be_bytes::<32>());
396 swap_body.extend_from_slice(&tick.to_be_bytes::<32>());
397 data_builder.append_value(&swap_body);
398
399 topic0_builder.append_value(mint_selector);
401 topic1_builder.append_value(addr);
402 topic2_builder.append_value(addr);
403 topic3_builder.append_value(addr);
404 let mint_body = vec![0u8; 32 * 4]; data_builder.append_value(&mint_body);
406
407 let schema = Arc::new(Schema::new(vec![
408 Field::new("topic0", DataType::Binary, true),
409 Field::new("topic1", DataType::Binary, true),
410 Field::new("topic2", DataType::Binary, true),
411 Field::new("topic3", DataType::Binary, true),
412 Field::new("data", DataType::Binary, true),
413 ]));
414
415 let batch = RecordBatch::try_new(
416 schema,
417 vec![
418 Arc::new(topic0_builder.finish()),
419 Arc::new(topic1_builder.finish()),
420 Arc::new(topic2_builder.finish()),
421 Arc::new(topic3_builder.finish()),
422 Arc::new(data_builder.finish()),
423 ],
424 )
425 .unwrap();
426
427 let result = decode_events(swap_sig, &batch, true, true, false).unwrap();
429 assert_eq!(result.num_rows(), 1, "should only decode the Swap row");
430
431 let result = decode_events(swap_sig, &batch, true, true, true).unwrap();
433 assert_eq!(result.num_rows(), 1);
434 assert!(
436 result.column_by_name("topic0").is_some(),
437 "hstack should include original input columns"
438 );
439 }
440
441 #[test]
442 #[ignore]
443 fn nested_event_signature_to_schema() {
444 let sig = "ConfiguredQuests(address editor, uint256[][], address indexed my_addr, (bool,bool[],(bool, uint256[]))[] questDetails)";
445
446 let schema = event_signature_to_arrow_schema(sig).unwrap();
447
448 let expected_schema = Schema::new(vec![
449 Arc::new(Field::new("my_addr", DataType::Binary, true)),
450 Arc::new(Field::new("editor", DataType::Binary, true)),
451 Arc::new(Field::new(
452 "param1",
453 DataType::List(Arc::new(Field::new(
454 "",
455 DataType::List(Arc::new(Field::new("", DataType::Decimal256(76, 0), true))),
456 true,
457 ))),
458 true,
459 )),
460 Arc::new(Field::new(
461 "questDetails",
462 DataType::List(Arc::new(Field::new(
463 "",
464 DataType::Struct(Fields::from(vec![
465 Arc::new(Field::new("param0", DataType::Boolean, true)),
466 Arc::new(Field::new(
467 "param1",
468 DataType::List(Arc::new(Field::new("", DataType::Boolean, true))),
469 true,
470 )),
471 Arc::new(Field::new(
472 "param2",
473 DataType::Struct(Fields::from(vec![
474 Arc::new(Field::new("param0", DataType::Boolean, true)),
475 Arc::new(Field::new(
476 "param1",
477 DataType::List(Arc::new(Field::new(
478 "",
479 DataType::Decimal256(76, 0),
480 true,
481 ))),
482 true,
483 )),
484 ])),
485 true,
486 )),
487 ])),
488 true,
489 ))),
490 true,
491 )),
492 ]);
493
494 assert_eq!(schema, expected_schema);
495 }
496
497 #[test]
498 #[ignore]
499 fn i256_to_arrow_i256() {
500 for val in [
501 I256::MIN,
502 I256::MAX,
503 I256::MAX / I256::try_from(2i32).unwrap(),
504 ] {
505 let out = arrow::datatypes::i256::from_be_bytes(val.to_be_bytes::<32>());
506
507 assert_eq!(val.to_string(), out.to_string());
508 }
509 }
510
511 #[test]
512 #[ignore]
513 fn read_parquet_with_real_data() {
514 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
515 use std::fs::File;
516 let builder =
517 ParquetRecordBatchReaderBuilder::try_new(File::open("logs.parquet").unwrap()).unwrap();
518 let mut reader = builder.build().unwrap();
519 let logs = reader.next().unwrap().unwrap();
520
521 let signature =
522 "PairCreated(address indexed token0, address indexed token1, address pair,uint256)";
523
524 let decoded = decode_events(signature, &logs, false, false, false).unwrap();
525
526 let mut file = File::create("decoded_logs.parquet").unwrap();
528 let mut writer =
529 parquet::arrow::ArrowWriter::try_new(&mut file, decoded.schema(), None).unwrap();
530 writer.write(&decoded).unwrap();
531 writer.close().unwrap();
532 }
533}