1use anyhow::{anyhow, Context, Result};
15use arrow::array::{
16 builder, Array, BinaryArray, BooleanArray, GenericBinaryArray, GenericListArray,
17 GenericStringArray, LargeBinaryArray, LargeStringArray, OffsetSizeTrait, StringArray,
18};
19use arrow::{
20 array::RecordBatch,
21 compute,
22 datatypes::{DataType, Field, Schema},
23};
24use base64::{engine::general_purpose::STANDARD, Engine as _};
25use std::sync::Arc;
26mod deserialize;
27pub use deserialize::{deserialize_data, DynType, DynValue, ParamInput};
28mod arrow_converter;
29use arrow_converter::{to_arrow, to_arrow_dtype};
30
31#[derive(Debug, Clone)]
33pub struct InstructionSignature {
34 pub discriminator: Vec<u8>,
35 pub params: Vec<ParamInput>,
36 pub accounts_names: Vec<String>,
37}
38
39#[derive(Debug, Clone)]
41pub struct LogSignature {
42 pub params: Vec<ParamInput>,
43}
44
45#[cfg(feature = "pyo3")]
46impl<'py> pyo3::FromPyObject<'py> for InstructionSignature {
47 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
48 use pyo3::types::PyAnyMethods;
49 use pyo3::types::PyTypeMethods;
50
51 let discriminator_ob = ob.getattr("discriminator")?;
52
53 let discriminator_ob_type: String = discriminator_ob.get_type().name()?.to_string();
54 let discriminator = match discriminator_ob_type.as_str() {
55 "str" => {
56 let s: &str = discriminator_ob.extract()?;
57 hex_to_bytes(s).context("failed to decode hex")?
58 }
59 "bytes" => discriminator_ob.extract()?,
60 _ => return Err(anyhow!("unknown type: {discriminator_ob_type}").into()),
61 };
62
63 let params = ob.getattr("params")?.extract::<Vec<ParamInput>>()?;
64 let accounts_names = ob.getattr("accounts_names")?.extract::<Vec<String>>()?;
65
66 Ok(InstructionSignature {
67 discriminator,
68 params,
69 accounts_names,
70 })
71 }
72}
73
74#[cfg(feature = "pyo3")]
75fn hex_to_bytes(hex_string: &str) -> Result<Vec<u8>> {
76 let hex_string = hex_string.strip_prefix("0x").unwrap_or(hex_string);
77 let hex_string = if hex_string.len() % 2 == 1 {
78 format!("0{hex_string}")
79 } else {
80 hex_string.to_string()
81 };
82 let out = (0..hex_string.len())
83 .step_by(2)
84 .map(|i| {
85 u8::from_str_radix(&hex_string[i..i + 2], 16)
86 .context("failed to parse hexstring to bytes")
87 })
88 .collect::<Result<Vec<_>, _>>()?;
89
90 Ok(out)
91}
92
93#[cfg(feature = "pyo3")]
94impl<'py> pyo3::FromPyObject<'py> for LogSignature {
95 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
96 use pyo3::types::PyAnyMethods;
97
98 let params = ob.getattr("params")?.extract::<Vec<ParamInput>>()?;
99
100 Ok(LogSignature { params })
101 }
102}
103
104fn unpack_rest_of_accounts<ListI: OffsetSizeTrait, InnerI: OffsetSizeTrait>(
105 num_acc: usize,
106 rest_of_acc: &GenericListArray<ListI>,
107 account_arrays: &mut Vec<BinaryArray>,
108) -> Result<()> {
109 let data_size = rest_of_acc.len() * 32;
110
111 for acc_arr in rest_of_acc.iter().flatten() {
112 if acc_arr.len() < num_acc {
113 return Err(anyhow!(
114 "expected rest_of_accounts to have at least {} addresses but it has {}",
115 num_acc,
116 acc_arr.len()
117 ));
118 }
119 }
120
121 for i in 0..num_acc {
122 let mut builder = builder::BinaryBuilder::with_capacity(rest_of_acc.len(), data_size);
123
124 for acc_arr in rest_of_acc.iter() {
125 let Some(acc_arr) = acc_arr else {
126 builder.append_null();
127 continue;
128 };
129
130 let arr = acc_arr
131 .as_any()
132 .downcast_ref::<GenericBinaryArray<InnerI>>()
133 .context("failed to downcast account array in rest_of_accounts")?;
134 if arr.is_null(i) {
135 builder.append_null();
136 } else {
137 builder.append_value(arr.value(i));
138 }
139 }
140
141 account_arrays.push(builder.finish());
142 }
143
144 Ok(())
145}
146
147pub fn decode_instructions_batch(
152 signature: &InstructionSignature,
153 batch: &RecordBatch,
154 allow_decode_fail: bool,
155 filter_by_discriminator: bool,
156 hstack: bool,
157) -> Result<RecordBatch> {
158 let batch = if filter_by_discriminator {
159 filter_by_discriminator_impl(signature, batch)?
160 } else {
161 batch.clone()
162 };
163 let batch = &batch;
164
165 let mut account_arrays: Vec<BinaryArray> = Vec::with_capacity(20);
166
167 for i in 0..signature.accounts_names.len().min(10) {
168 let col_name = format!("a{i}");
169 let col = batch
170 .column_by_name(&col_name)
171 .with_context(|| format!("account {i} not found but was required"))?;
172
173 if col.data_type() == &DataType::Binary {
174 account_arrays.push(
175 col.as_any()
176 .downcast_ref::<BinaryArray>()
177 .context("failed to downcast Binary account column")?
178 .clone(),
179 );
180 } else if col.data_type() == &DataType::LargeBinary {
181 account_arrays.push(
182 arrow::compute::cast(col, &DataType::Binary)
183 .context("failed to cast LargeBinary account column to Binary")?
184 .as_any()
185 .downcast_ref::<BinaryArray>()
186 .context("failed to downcast casted account column to BinaryArray")?
187 .clone(),
188 );
189 }
190 }
191
192 if signature.accounts_names.len() > 10 {
193 let rest_of_acc = batch
194 .column_by_name("rest_of_accounts")
195 .context("rest_of_accounts column not found in instructions batch")?;
196
197 let num_acc = signature.accounts_names.len() - 10;
198 if rest_of_acc.data_type() == &DataType::new_list(DataType::Binary, true) {
199 unpack_rest_of_accounts::<i32, i32>(
200 num_acc,
201 rest_of_acc
202 .as_any()
203 .downcast_ref()
204 .context("failed to downcast rest_of_accounts to List<Binary>")?,
205 &mut account_arrays,
206 )
207 .context("unpack rest_of_accounts column")?;
208 } else if rest_of_acc.data_type() == &DataType::new_list(DataType::LargeBinary, true) {
209 unpack_rest_of_accounts::<i32, i64>(
210 num_acc,
211 rest_of_acc
212 .as_any()
213 .downcast_ref()
214 .context("failed to downcast rest_of_accounts to List<LargeBinary>")?,
215 &mut account_arrays,
216 )
217 .context("unpack rest_of_accounts column")?;
218 } else if rest_of_acc.data_type() == &DataType::new_large_list(DataType::Binary, true) {
219 unpack_rest_of_accounts::<i64, i32>(
220 num_acc,
221 rest_of_acc
222 .as_any()
223 .downcast_ref()
224 .context("failed to downcast rest_of_accounts to LargeList<Binary>")?,
225 &mut account_arrays,
226 )
227 .context("unpack rest_of_accounts column")?;
228 } else if rest_of_acc.data_type() == &DataType::new_large_list(DataType::LargeBinary, true)
229 {
230 unpack_rest_of_accounts::<i64, i64>(
231 num_acc,
232 rest_of_acc
233 .as_any()
234 .downcast_ref()
235 .context("failed to downcast rest_of_accounts to LargeList<LargeBinary>")?,
236 &mut account_arrays,
237 )
238 .context("unpack rest_of_accounts column")?;
239 }
240 }
241
242 let data_col = batch
243 .column_by_name("data")
244 .context("data column not found in instructions batch")?;
245
246 let decoded = if data_col.data_type() == &DataType::Binary {
247 decode_instructions(
248 signature,
249 &account_arrays,
250 data_col
251 .as_any()
252 .downcast_ref::<BinaryArray>()
253 .context("failed to downcast data column to BinaryArray")?,
254 allow_decode_fail,
255 )
256 } else if data_col.data_type() == &DataType::LargeBinary {
257 decode_instructions(
258 signature,
259 &account_arrays,
260 data_col
261 .as_any()
262 .downcast_ref::<LargeBinaryArray>()
263 .context("failed to downcast data column to LargeBinaryArray")?,
264 allow_decode_fail,
265 )
266 } else {
267 Err(anyhow!(
268 "expected the data column to be Binary or LargeBinary"
269 ))
270 }?;
271
272 if hstack {
273 hstack_impl(&decoded, batch)
274 } else {
275 Ok(decoded)
276 }
277}
278
279pub fn decode_instructions<I: OffsetSizeTrait>(
280 signature: &InstructionSignature,
281 accounts: &[BinaryArray],
282 data: &GenericBinaryArray<I>,
283 allow_decode_fail: bool,
284) -> Result<RecordBatch> {
285 let num_params = signature.params.len();
286
287 let mut decoded_params_vec: Vec<Vec<Option<DynValue>>> =
288 (0..num_params).map(|_| Vec::new()).collect();
289
290 for row_idx in 0..data.len() {
291 if data.is_null(row_idx) {
292 if allow_decode_fail {
293 log::debug!("Instruction data is null in row {row_idx}");
294 for v in &mut decoded_params_vec {
295 v.push(None);
296 }
297 continue;
298 }
299 return Err(anyhow::anyhow!("Instruction data is null in row {row_idx}"));
300 }
301
302 let instruction_data = data.value(row_idx).to_vec();
303 let data_result = match_discriminators(&instruction_data, &signature.discriminator);
304 let data = match data_result {
305 Ok(data) => data,
306 Err(e) if allow_decode_fail => {
307 log::debug!("Error matching discriminators in row {row_idx}: {e:?}");
308 for v in &mut decoded_params_vec {
309 v.push(None);
310 }
311 continue;
312 }
313 Err(e) => {
314 return Err(anyhow::anyhow!(
315 "Error matching discriminators in row {row_idx}: {e:?}"
316 ));
317 }
318 };
319
320 let error_on_remanining = false;
326 let decoded_ix_result = deserialize_data(&data, &signature.params, error_on_remanining);
327 let decoded_ix = match decoded_ix_result {
328 Ok(ix) => ix,
329 Err(e) if allow_decode_fail => {
330 log::debug!("Error deserializing instruction in row {row_idx}: {e:?}");
331 for v in &mut decoded_params_vec {
332 v.push(None);
333 }
334 continue;
335 }
336 Err(e) => {
337 return Err(anyhow::anyhow!(
338 "Error deserializing instruction in row {row_idx}: {e:?}"
339 ));
340 }
341 };
342
343 for (i, value) in decoded_ix.into_iter().enumerate() {
344 decoded_params_vec[i].push(Some(value));
345 }
346 }
347
348 let mut data_arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(decoded_params_vec.len());
349 for (i, v) in decoded_params_vec.iter().enumerate() {
350 let array = to_arrow(&signature.params[i].param_type, v.clone())
351 .context("unable to convert instruction value to a arrow format value")?;
352 data_arrays.push(array);
353 }
354
355 let mut data_fields = Vec::with_capacity(signature.params.len());
356 for param in &signature.params {
357 let field = Field::new(
358 param.name.clone(),
359 to_arrow_dtype(¶m.param_type)
360 .context("unable to convert instruction param type to arrow dtype")?,
361 true,
362 );
363 data_fields.push(field);
364 }
365
366 let acc_names_len = signature.accounts_names.len();
367 let mut accounts_arrays = Vec::new();
368 let mut acc_fields = Vec::new();
369
370 for i in 0..acc_names_len {
371 let arr = accounts
372 .get(i)
373 .context(format!("Account a{i} not found during decoding"))?;
374
375 if arr.data_type() == &DataType::LargeBinary {
376 accounts_arrays.push(
377 arrow::compute::cast(arr, &DataType::Binary)
378 .context("failed to cast LargeBinary account to Binary")?,
379 );
380 } else {
381 accounts_arrays.push(Arc::new(arr.clone()) as Arc<dyn Array>);
382 }
383
384 if signature.accounts_names[i].is_empty() {
385 let field = Field::new(format!("a{i}"), DataType::Binary, true);
386 acc_fields.push(field);
387 } else {
388 let field = Field::new(signature.accounts_names[i].clone(), DataType::Binary, true);
389 acc_fields.push(field);
390 }
391 }
392
393 let decoded_instructions_array = data_arrays
394 .into_iter()
395 .chain(accounts_arrays)
396 .collect::<Vec<_>>();
397 let decoded_instructions_fields = data_fields
398 .into_iter()
399 .chain(acc_fields.clone())
400 .collect::<Vec<_>>();
401
402 let schema = Arc::new(Schema::new(decoded_instructions_fields));
403 let batch = RecordBatch::try_new(schema, decoded_instructions_array)
404 .context("Failed to create record batch from data arrays")?;
405
406 Ok(batch)
407}
408
409pub fn decode_logs_batch(
414 signature: &LogSignature,
415 batch: &RecordBatch,
416 allow_decode_fail: bool,
417 hstack: bool,
418) -> Result<RecordBatch> {
419 let message_col = batch
420 .column_by_name("message")
421 .context("message column not found in logs batch")?;
422
423 let decoded = if message_col.data_type() == &DataType::Utf8 {
424 decode_logs(
425 signature,
426 message_col
427 .as_any()
428 .downcast_ref::<StringArray>()
429 .context("failed to downcast message column to StringArray")?,
430 allow_decode_fail,
431 )
432 } else if message_col.data_type() == &DataType::LargeUtf8 {
433 decode_logs(
434 signature,
435 message_col
436 .as_any()
437 .downcast_ref::<LargeStringArray>()
438 .context("failed to downcast message column to LargeStringArray")?,
439 allow_decode_fail,
440 )
441 } else {
442 Err(anyhow!("expected String or LargeString message column"))
443 }?;
444
445 if hstack {
446 hstack_impl(&decoded, batch)
447 } else {
448 Ok(decoded)
449 }
450}
451
452pub fn decode_logs<I: OffsetSizeTrait>(
458 signature: &LogSignature,
459 data: &GenericStringArray<I>,
460 allow_decode_fail: bool,
461) -> Result<RecordBatch> {
462 let num_params = signature.params.len();
463
464 let mut decoded_params_vec: Vec<Vec<Option<DynValue>>> =
465 (0..num_params).map(|_| Vec::new()).collect();
466
467 for row_idx in 0..data.len() {
468 if data.is_null(row_idx) {
469 if allow_decode_fail {
470 log::debug!("Log data is null in row {row_idx}");
471 for v in &mut decoded_params_vec {
472 v.push(None);
473 }
474 continue;
475 }
476 return Err(anyhow::anyhow!("Log data is null in row {row_idx}"));
477 }
478
479 let log_data = data.value(row_idx);
480 let log_data = STANDARD.decode(log_data);
481 let log_data = match log_data {
482 Ok(log_data) => log_data,
483 Err(e) if allow_decode_fail => {
484 log::debug!("Error base 64 decoding log data in row {row_idx}: {e:?}");
485 for v in &mut decoded_params_vec {
486 v.push(None);
487 }
488 continue;
489 }
490 Err(e) => {
491 return Err(anyhow::anyhow!(
492 "Error base 64 decoding log data in row {row_idx}: {e:?}"
493 ));
494 }
495 };
496
497 let decoded_log_result = deserialize_data(&log_data, &signature.params, false);
498 let decoded_log = match decoded_log_result {
499 Ok(log) => log,
500 Err(e) if allow_decode_fail => {
501 log::debug!("Error deserializing log in row {row_idx}: {e:?}");
502 for v in &mut decoded_params_vec {
503 v.push(None);
504 }
505 continue;
506 }
507 Err(e) => {
508 return Err(anyhow::anyhow!(
509 "Error deserializing log in row {row_idx}: {e:?}"
510 ));
511 }
512 };
513
514 for (i, value) in decoded_log.into_iter().enumerate() {
515 decoded_params_vec[i].push(Some(value));
516 }
517 }
518
519 let mut data_arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(decoded_params_vec.len());
520 for (i, v) in decoded_params_vec.iter().enumerate() {
521 let array = to_arrow(&signature.params[i].param_type, v.clone())
522 .context("unable to convert log value to a arrow format value")?;
523 data_arrays.push(array);
524 }
525
526 let mut data_fields = Vec::with_capacity(signature.params.len());
527 for param in &signature.params {
528 let field = Field::new(
529 param.name.clone(),
530 to_arrow_dtype(¶m.param_type)
531 .context("unable to convert log param type to arrow dtype")?,
532 true,
533 );
534 data_fields.push(field);
535 }
536
537 let schema = Arc::new(Schema::new(data_fields));
538 let batch = RecordBatch::try_new(schema, data_arrays)
539 .context("Failed to create record batch from data arrays")?;
540
541 Ok(batch)
542}
543
544pub fn match_discriminators(instr_data: &[u8], discriminator: &[u8]) -> Result<Vec<u8>> {
545 let discriminator_len = discriminator.len();
546 if instr_data.len() < discriminator_len {
547 return Err(anyhow::anyhow!(
548 "Instruction data is too short to contain discriminator. Expected at least {} bytes, got {} bytes",
549 discriminator_len,
550 instr_data.len()
551 ));
552 }
553 let disc = &instr_data[..discriminator_len].to_vec();
554 let ix_data = &instr_data[discriminator_len..];
555 if !disc.eq(discriminator) {
556 return Err(anyhow::anyhow!(
557 "Instruction data discriminator doesn't match signature discriminator"
558 ));
559 }
560 Ok(ix_data.to_vec())
561}
562
563fn filter_by_discriminator_impl(
564 signature: &InstructionSignature,
565 batch: &RecordBatch,
566) -> Result<RecordBatch> {
567 let Some(data_col) = batch.column_by_name("data") else {
568 return Ok(batch.clone());
569 };
570
571 let discriminator = &signature.discriminator;
572 let mask = build_discriminator_mask(data_col.as_ref(), discriminator)
573 .context("build discriminator filter mask")?;
574
575 let non_matching = mask.iter().filter(|v| !v.unwrap_or(false)).count();
576 if non_matching > 0 {
577 log::debug!("filtering out {non_matching} instructions whose discriminator does not match");
578 }
579
580 compute::filter_record_batch(batch, &mask).context("filter record batch by discriminator")
581}
582
583fn build_discriminator_mask(col: &dyn Array, discriminator: &[u8]) -> Result<BooleanArray> {
584 if col.data_type() == &DataType::Binary {
585 let arr = col
586 .as_any()
587 .downcast_ref::<BinaryArray>()
588 .context("downcast data to BinaryArray")?;
589 Ok(BooleanArray::from(
590 arr.iter()
591 .map(|v| v.map(|b| b.starts_with(discriminator)))
592 .collect::<Vec<_>>(),
593 ))
594 } else if col.data_type() == &DataType::LargeBinary {
595 let arr = col
596 .as_any()
597 .downcast_ref::<LargeBinaryArray>()
598 .context("downcast data to LargeBinaryArray")?;
599 Ok(BooleanArray::from(
600 arr.iter()
601 .map(|v| v.map(|b| b.starts_with(discriminator)))
602 .collect::<Vec<_>>(),
603 ))
604 } else {
605 Err(anyhow!(
606 "unexpected data column type {}. Expected Binary or LargeBinary",
607 col.data_type()
608 ))
609 }
610}
611
612fn hstack_impl(decoded: &RecordBatch, input: &RecordBatch) -> Result<RecordBatch> {
613 let mut fields: Vec<Arc<Field>> = decoded.schema().fields().iter().cloned().collect();
614 let mut arrays: Vec<Arc<dyn Array>> = decoded.columns().to_vec();
615
616 for (i, col) in input.columns().iter().enumerate() {
617 fields.push(input.schema().field(i).clone().into());
618 arrays.push(col.clone());
619 }
620
621 let schema = Schema::new(fields);
622 RecordBatch::try_new(Arc::new(schema), arrays).context("construct hstacked arrow batch")
623}
624
625pub fn instruction_signature_to_arrow_schema(signature: &InstructionSignature) -> Result<Schema> {
628 let mut fields = Vec::new();
629
630 for param in &signature.params {
631 let field = Field::new(
632 param.name.clone(),
633 to_arrow_dtype(¶m.param_type)
634 .context("unable to convert instruction param type to arrow dtype")?,
635 true,
636 );
637 fields.push(field);
638 }
639
640 for account in &signature.accounts_names {
641 let field = Field::new(account.clone(), DataType::Binary, true);
642 fields.push(field);
643 }
644
645 Ok(Schema::new(fields))
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651 use crate::deserialize::{DynType, ParamInput};
652 use std::fs::File;
653
654 #[test]
655 #[ignore]
656 fn test_instructions_with_real_data() {
657 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
658
659 let builder =
660 ParquetRecordBatchReaderBuilder::try_new(File::open("jup.parquet").unwrap()).unwrap();
661 let mut reader = builder.build().unwrap();
662 let instructions = reader.next().unwrap().unwrap();
663 let ix_signature = InstructionSignature {
664 discriminator: vec![229, 23, 203, 151, 122, 227, 173, 42],
706 params: vec![
707 ParamInput {
708 name: "RoutePlan".to_string(),
709 param_type: DynType::Array(Box::new(DynType::Struct(vec![
710 (
711 "Swap".to_string(),
712 DynType::Enum(vec![
713 ("Saber".to_string(), None),
714 ("SaberAddDecimalsDeposit".to_string(), None),
715 ("SaberAddDecimalsWithdraw".to_string(), None),
716 ("TokenSwap".to_string(), None),
717 ("Sencha".to_string(), None),
718 ("Step".to_string(), None),
719 ("Cropper".to_string(), None),
720 ("Raydium".to_string(), None),
721 (
722 "Crema".to_string(),
723 Some(DynType::Struct(vec![(
724 "a_to_b".to_string(),
725 DynType::Bool,
726 )])),
727 ),
728 ("Lifinity".to_string(), None),
729 ("Mercurial".to_string(), None),
730 ("Cykura".to_string(), None),
731 (
732 "Serum".to_string(),
733 Some(DynType::Struct(vec![(
734 "side".to_string(),
735 DynType::Enum(vec![
736 ("Bid".to_string(), None),
737 ("Ask".to_string(), None),
738 ]),
739 )])),
740 ),
741 ("MarinadeDeposit".to_string(), None),
742 ("MarinadeUnstake".to_string(), None),
743 (
744 "Aldrin".to_string(),
745 Some(DynType::Struct(vec![(
746 "side".to_string(),
747 DynType::Enum(vec![
748 ("Bid".to_string(), None),
749 ("Ask".to_string(), None),
750 ]),
751 )])),
752 ),
753 (
754 "AldrinV2".to_string(),
755 Some(DynType::Struct(vec![(
756 "side".to_string(),
757 DynType::Enum(vec![
758 ("Bid".to_string(), None),
759 ("Ask".to_string(), None),
760 ]),
761 )])),
762 ),
763 (
764 "Whirlpool".to_string(),
765 Some(DynType::Struct(vec![(
766 "a_to_b".to_string(),
767 DynType::Bool,
768 )])),
769 ),
770 (
771 "Invariant".to_string(),
772 Some(DynType::Struct(vec![(
773 "x_to_y".to_string(),
774 DynType::Bool,
775 )])),
776 ),
777 ("Meteora".to_string(), None),
778 ("GooseFX".to_string(), None),
779 (
780 "DeltaFi".to_string(),
781 Some(DynType::Struct(vec![(
782 "stable".to_string(),
783 DynType::Bool,
784 )])),
785 ),
786 ("Balansol".to_string(), None),
787 (
788 "MarcoPolo".to_string(),
789 Some(DynType::Struct(vec![(
790 "x_to_y".to_string(),
791 DynType::Bool,
792 )])),
793 ),
794 (
795 "Dradex".to_string(),
796 Some(DynType::Struct(vec![(
797 "side".to_string(),
798 DynType::Enum(vec![
799 ("Bid".to_string(), None),
800 ("Ask".to_string(), None),
801 ]),
802 )])),
803 ),
804 ("LifinityV2".to_string(), None),
805 ("RaydiumClmm".to_string(), None),
806 (
807 "Openbook".to_string(),
808 Some(DynType::Struct(vec![(
809 "side".to_string(),
810 DynType::Enum(vec![
811 ("Bid".to_string(), None),
812 ("Ask".to_string(), None),
813 ]),
814 )])),
815 ),
816 (
817 "Phoenix".to_string(),
818 Some(DynType::Struct(vec![(
819 "side".to_string(),
820 DynType::Enum(vec![
821 ("Bid".to_string(), None),
822 ("Ask".to_string(), None),
823 ]),
824 )])),
825 ),
826 (
827 "Symmetry".to_string(),
828 Some(DynType::Struct(vec![
829 ("from_token_id".to_string(), DynType::U64),
830 ("to_token_id".to_string(), DynType::U64),
831 ])),
832 ),
833 ("TokenSwapV2".to_string(), None),
834 ("HeliumTreasuryManagementRedeemV0".to_string(), None),
835 ("StakeDexStakeWrappedSol".to_string(), None),
836 (
837 "StakeDexSwapViaStake".to_string(),
838 Some(DynType::Struct(vec![(
839 "bridge_stake_seed".to_string(),
840 DynType::U32,
841 )])),
842 ),
843 ("GooseFXV2".to_string(), None),
844 ("Perps".to_string(), None),
845 ("PerpsAddLiquidity".to_string(), None),
846 ("PerpsRemoveLiquidity".to_string(), None),
847 ("MeteoraDlmm".to_string(), None),
848 (
849 "OpenBookV2".to_string(),
850 Some(DynType::Struct(vec![(
851 "side".to_string(),
852 DynType::Enum(vec![
853 ("Bid".to_string(), None),
854 ("Ask".to_string(), None),
855 ]),
856 )])),
857 ),
858 ("RaydiumClmmV2".to_string(), None),
859 (
860 "StakeDexPrefundWithdrawStakeAndDepositStake".to_string(),
861 Some(DynType::Struct(vec![(
862 "bridge_stake_seed".to_string(),
863 DynType::U32,
864 )])),
865 ),
866 (
867 "Clone".to_string(),
868 Some(DynType::Struct(vec![
869 ("pool_index".to_string(), DynType::U8),
870 ("quantity_is_input".to_string(), DynType::Bool),
871 ("quantity_is_collateral".to_string(), DynType::Bool),
872 ])),
873 ),
874 (
875 "SanctumS".to_string(),
876 Some(DynType::Struct(vec![
877 ("src_lst_value_calc_accs".to_string(), DynType::U8),
878 ("dst_lst_value_calc_accs".to_string(), DynType::U8),
879 ("src_lst_index".to_string(), DynType::U32),
880 ("dst_lst_index".to_string(), DynType::U32),
881 ])),
882 ),
883 (
884 "SanctumSAddLiquidity".to_string(),
885 Some(DynType::Struct(vec![
886 ("lst_value_calc_accs".to_string(), DynType::U8),
887 ("lst_index".to_string(), DynType::U32),
888 ])),
889 ),
890 (
891 "SanctumSRemoveLiquidity".to_string(),
892 Some(DynType::Struct(vec![
893 ("lst_value_calc_accs".to_string(), DynType::U8),
894 ("lst_index".to_string(), DynType::U32),
895 ])),
896 ),
897 ("RaydiumCP".to_string(), None),
898 (
899 "WhirlpoolSwapV2".to_string(),
900 Some(DynType::Struct(vec![
901 ("a_to_b".to_string(), DynType::Bool),
902 (
903 "remaining_accounts_info".to_string(),
904 DynType::Struct(vec![(
905 "slices".to_string(),
906 DynType::Array(Box::new(DynType::Struct(vec![(
907 "remaining_accounts_slice".to_string(),
908 DynType::Struct(vec![
909 ("accounts_type".to_string(), DynType::U8),
910 ("length".to_string(), DynType::U8),
911 ]),
912 )]))),
913 )]),
914 ),
915 ])),
916 ),
917 ("OneIntro".to_string(), None),
918 ("PumpdotfunWrappedBuy".to_string(), None),
919 ("PumpdotfunWrappedSell".to_string(), None),
920 ("PerpsV2".to_string(), None),
921 ("PerpsV2AddLiquidity".to_string(), None),
922 ("PerpsV2RemoveLiquidity".to_string(), None),
923 ("MoonshotWrappedBuy".to_string(), None),
924 ("MoonshotWrappedSell".to_string(), None),
925 ("StabbleStableSwap".to_string(), None),
926 ("StabbleWeightedSwap".to_string(), None),
927 (
928 "Obric".to_string(),
929 Some(DynType::Struct(vec![(
930 "x_to_y".to_string(),
931 DynType::Bool,
932 )])),
933 ),
934 ("FoxBuyFromEstimatedCost".to_string(), None),
935 (
936 "FoxClaimPartial".to_string(),
937 Some(DynType::Struct(vec![(
938 "is_y".to_string(),
939 DynType::Bool,
940 )])),
941 ),
942 (
943 "SolFi".to_string(),
944 Some(DynType::Struct(vec![(
945 "is_quote_to_base".to_string(),
946 DynType::Bool,
947 )])),
948 ),
949 ("SolayerDelegateNoInit".to_string(), None),
950 ("SolayerUndelegateNoInit".to_string(), None),
951 (
952 "TokenMill".to_string(),
953 Some(DynType::Struct(vec![(
954 "side".to_string(),
955 DynType::Enum(vec![
956 ("Bid".to_string(), None),
957 ("Ask".to_string(), None),
958 ]),
959 )])),
960 ),
961 ("DaosFunBuy".to_string(), None),
962 ("DaosFunSell".to_string(), None),
963 ("ZeroFi".to_string(), None),
964 ("StakeDexWithdrawWrappedSol".to_string(), None),
965 ("VirtualsBuy".to_string(), None),
966 ("VirtualsSell".to_string(), None),
967 (
968 "Peren".to_string(),
969 Some(DynType::Struct(vec![
970 ("in_index".to_string(), DynType::U8),
971 ("out_index".to_string(), DynType::U8),
972 ])),
973 ),
974 ("PumpdotfunAmmBuy".to_string(), None),
975 ("PumpdotfunAmmSell".to_string(), None),
976 ("Gamma".to_string(), None),
977 ]),
978 ),
979 ("Percent".to_string(), DynType::U8),
980 ("InputIndex".to_string(), DynType::U8),
981 ("OutputIndex".to_string(), DynType::U8),
982 ]))),
983 },
984 ParamInput {
985 name: "InAmount".to_string(),
986 param_type: DynType::U64,
987 },
988 ParamInput {
989 name: "QuotedOutAmount".to_string(),
990 param_type: DynType::U64,
991 },
992 ParamInput {
993 name: "SlippageBps".to_string(),
994 param_type: DynType::U16,
995 },
996 ParamInput {
997 name: "PlatformFeeBps".to_string(),
998 param_type: DynType::U8,
999 },
1000 ],
1001 accounts_names: vec![
1002 "TokenProgram".to_string(),
1003 "UserTransferAuthority".to_string(),
1004 "UserSourceTokenAccount".to_string(),
1005 "UserDestinationTokenAccount".to_string(),
1006 "DestinationTokenAccount".to_string(),
1007 "PlatformFeeAccount".to_string(),
1008 "EventAuthority".to_string(),
1009 "Program".to_string(),
1010 "test8".to_string(),
1011 "test9".to_string(),
1012 ],
1013 };
1014
1015 let result = decode_instructions_batch(&ix_signature, &instructions, true)
1016 .context("decode failed")
1017 .unwrap();
1018
1019 let mut file = File::create("decoded_instructions.parquet").unwrap();
1021 let mut writer =
1022 parquet::arrow::ArrowWriter::try_new(&mut file, result.schema(), None).unwrap();
1023 writer.write(&result).unwrap();
1024 writer.close().unwrap();
1025 }
1026
1027 #[test]
1028 #[ignore]
1029 fn test_decode_logs_with_real_data() {
1030 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1031
1032 let builder =
1033 ParquetRecordBatchReaderBuilder::try_new(File::open("logs.parquet").unwrap()).unwrap();
1034 let mut reader = builder.build().unwrap();
1035 let logs = reader.next().unwrap().unwrap();
1036
1037 let signature = LogSignature {
1038 params: vec![
1039 ParamInput {
1040 name: "whirlpool".to_string(),
1041 param_type: DynType::FixedArray(Box::new(DynType::U8), 32),
1042 },
1043 ParamInput {
1044 name: "a_to_b".to_string(),
1045 param_type: DynType::Bool,
1046 },
1047 ParamInput {
1048 name: "pre_sqrt_price".to_string(),
1049 param_type: DynType::U128,
1050 },
1051 ParamInput {
1052 name: "post_sqrt_price".to_string(),
1053 param_type: DynType::U128,
1054 },
1055 ParamInput {
1056 name: "x".to_string(),
1057 param_type: DynType::U64,
1058 },
1059 ParamInput {
1060 name: "input_amount".to_string(),
1061 param_type: DynType::U64,
1062 },
1063 ParamInput {
1064 name: "output_amount".to_string(),
1065 param_type: DynType::U64,
1066 },
1067 ParamInput {
1068 name: "input_transfer_fee".to_string(),
1069 param_type: DynType::U64,
1070 },
1071 ParamInput {
1072 name: "output_transfer_fee".to_string(),
1073 param_type: DynType::U64,
1074 },
1075 ParamInput {
1076 name: "lp_fee".to_string(),
1077 param_type: DynType::U64,
1078 },
1079 ParamInput {
1080 name: "protocol_fee".to_string(),
1081 param_type: DynType::U64,
1082 },
1083 ],
1084 };
1085
1086 let result = decode_logs_batch(&signature, &logs, true)
1087 .context("decode failed")
1088 .unwrap();
1089
1090 let mut file = File::create("decoded_logs.parquet").unwrap();
1092 let mut writer =
1093 parquet::arrow::ArrowWriter::try_new(&mut file, result.schema(), None).unwrap();
1094 writer.write(&result).unwrap();
1095 writer.close().unwrap();
1096 }
1097
1098 #[test]
1099 #[ignore]
1100 fn test_instruction_signature_to_arrow_schema() {
1101 let signature = InstructionSignature {
1103 discriminator: vec![],
1104 params: vec![
1105 ParamInput {
1106 name: "amount".to_string(),
1107 param_type: DynType::U64,
1108 },
1109 ParamInput {
1110 name: "is_valid".to_string(),
1111 param_type: DynType::Bool,
1112 },
1113 ParamInput {
1114 name: "amm".to_string(),
1115 param_type: DynType::FixedArray(Box::new(DynType::U8), 32),
1116 },
1117 ],
1118 accounts_names: vec!["source".to_string(), "destination".to_string()],
1119 };
1120
1121 let schema = instruction_signature_to_arrow_schema(&signature).unwrap();
1123
1124 assert_eq!(schema.fields().len(), 5); let amount_field = schema.field_with_name("amount").unwrap();
1129 assert_eq!(amount_field.name(), "amount");
1130 assert!(amount_field.is_nullable());
1131
1132 let is_valid_field = schema.field_with_name("is_valid").unwrap();
1133 assert_eq!(is_valid_field.name(), "is_valid");
1134 assert!(is_valid_field.is_nullable());
1135
1136 let amm_field = schema.field_with_name("amm").unwrap();
1137 assert_eq!(amm_field.name(), "amm");
1138 assert!(amm_field.is_nullable());
1139
1140 let source_field = schema.field_with_name("source").unwrap();
1142 assert_eq!(source_field.name(), "source");
1143 assert_eq!(source_field.data_type(), &DataType::Binary);
1144 assert!(source_field.is_nullable());
1145
1146 let dest_field = schema.field_with_name("destination").unwrap();
1147 assert_eq!(dest_field.name(), "destination");
1148 assert_eq!(dest_field.data_type(), &DataType::Binary);
1149 assert!(dest_field.is_nullable());
1150 }
1151}