tiders_svm_decode/
lib.rs

1//! # tiders-svm-decode
2//!
3//! Decodes Solana (SVM) program data from binary format into Apache Arrow RecordBatches.
4//!
5//! Supports two types of decoding:
6//! - **Instructions** — Decodes Borsh-serialized instruction data using 8-byte discriminators
7//!   and a typed parameter schema ([`InstructionSignature`]).
8//! - **Program logs** — Decodes base64-encoded log event data using a typed parameter
9//!   schema ([`LogSignature`]).
10//!
11//! The type system ([`DynType`] / [`DynValue`]) supports all Borsh primitives (i8–i128,
12//! u8–u128, bool) plus complex types (arrays, structs, enums, options).
13
14use anyhow::{anyhow, Context, Result};
15use arrow::array::{
16    builder, Array, BinaryArray, BooleanArray, GenericBinaryArray, GenericListArray,
17    GenericStringArray, LargeBinaryArray, LargeStringArray, OffsetSizeTrait, StringArray,
18};
19use arrow::{
20    array::RecordBatch,
21    compute,
22    datatypes::{DataType, Field, Schema},
23};
24use base64::{engine::general_purpose::STANDARD, Engine as _};
25use std::sync::Arc;
26mod deserialize;
27pub use deserialize::{deserialize_data, DynType, DynValue, ParamInput};
28mod arrow_converter;
29use arrow_converter::{to_arrow, to_arrow_dtype};
30
31/// Defines the schema for decoding a Solana instruction: discriminator bytes, parameter types, and account names.
32#[derive(Debug, Clone)]
33pub struct InstructionSignature {
34    pub discriminator: Vec<u8>,
35    pub params: Vec<ParamInput>,
36    pub accounts_names: Vec<String>,
37}
38
39/// Defines the schema for decoding a Solana program log event: parameter types to decode from base64 data.
40#[derive(Debug, Clone)]
41pub struct LogSignature {
42    pub params: Vec<ParamInput>,
43}
44
45#[cfg(feature = "pyo3")]
46impl<'py> pyo3::FromPyObject<'py> for InstructionSignature {
47    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
48        use pyo3::types::PyAnyMethods;
49        use pyo3::types::PyTypeMethods;
50
51        let discriminator_ob = ob.getattr("discriminator")?;
52
53        let discriminator_ob_type: String = discriminator_ob.get_type().name()?.to_string();
54        let discriminator = match discriminator_ob_type.as_str() {
55            "str" => {
56                let s: &str = discriminator_ob.extract()?;
57                hex_to_bytes(s).context("failed to decode hex")?
58            }
59            "bytes" => discriminator_ob.extract()?,
60            _ => return Err(anyhow!("unknown type: {discriminator_ob_type}").into()),
61        };
62
63        let params = ob.getattr("params")?.extract::<Vec<ParamInput>>()?;
64        let accounts_names = ob.getattr("accounts_names")?.extract::<Vec<String>>()?;
65
66        Ok(InstructionSignature {
67            discriminator,
68            params,
69            accounts_names,
70        })
71    }
72}
73
74#[cfg(feature = "pyo3")]
75fn hex_to_bytes(hex_string: &str) -> Result<Vec<u8>> {
76    let hex_string = hex_string.strip_prefix("0x").unwrap_or(hex_string);
77    let hex_string = if hex_string.len() % 2 == 1 {
78        format!("0{hex_string}")
79    } else {
80        hex_string.to_string()
81    };
82    let out = (0..hex_string.len())
83        .step_by(2)
84        .map(|i| {
85            u8::from_str_radix(&hex_string[i..i + 2], 16)
86                .context("failed to parse hexstring to bytes")
87        })
88        .collect::<Result<Vec<_>, _>>()?;
89
90    Ok(out)
91}
92
93#[cfg(feature = "pyo3")]
94impl<'py> pyo3::FromPyObject<'py> for LogSignature {
95    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
96        use pyo3::types::PyAnyMethods;
97
98        let params = ob.getattr("params")?.extract::<Vec<ParamInput>>()?;
99
100        Ok(LogSignature { params })
101    }
102}
103
104fn unpack_rest_of_accounts<ListI: OffsetSizeTrait, InnerI: OffsetSizeTrait>(
105    num_acc: usize,
106    rest_of_acc: &GenericListArray<ListI>,
107    account_arrays: &mut Vec<BinaryArray>,
108) -> Result<()> {
109    let data_size = rest_of_acc.len() * 32;
110
111    for acc_arr in rest_of_acc.iter().flatten() {
112        if acc_arr.len() < num_acc {
113            return Err(anyhow!(
114                "expected rest_of_accounts to have at least {} addresses but it has {}",
115                num_acc,
116                acc_arr.len()
117            ));
118        }
119    }
120
121    for i in 0..num_acc {
122        let mut builder = builder::BinaryBuilder::with_capacity(rest_of_acc.len(), data_size);
123
124        for acc_arr in rest_of_acc.iter() {
125            let Some(acc_arr) = acc_arr else {
126                builder.append_null();
127                continue;
128            };
129
130            let arr = acc_arr
131                .as_any()
132                .downcast_ref::<GenericBinaryArray<InnerI>>()
133                .context("failed to downcast account array in rest_of_accounts")?;
134            if arr.is_null(i) {
135                builder.append_null();
136            } else {
137                builder.append_value(arr.value(i));
138            }
139        }
140
141        account_arrays.push(builder.finish());
142    }
143
144    Ok(())
145}
146
147/// Decodes instruction data from an Arrow `RecordBatch` into a new `RecordBatch` of decoded parameters and accounts.
148///
149/// Expects the input batch to contain `data` (binary) and `accounts` (list-of-binary) columns.
150/// Optionally filters rows by discriminator match and horizontally stacks the result with the original batch.
151pub fn decode_instructions_batch(
152    signature: &InstructionSignature,
153    batch: &RecordBatch,
154    allow_decode_fail: bool,
155    filter_by_discriminator: bool,
156    hstack: bool,
157) -> Result<RecordBatch> {
158    let batch = if filter_by_discriminator {
159        filter_by_discriminator_impl(signature, batch)?
160    } else {
161        batch.clone()
162    };
163    let batch = &batch;
164
165    let mut account_arrays: Vec<BinaryArray> = Vec::with_capacity(20);
166
167    for i in 0..signature.accounts_names.len().min(10) {
168        let col_name = format!("a{i}");
169        let col = batch
170            .column_by_name(&col_name)
171            .with_context(|| format!("account {i} not found but was required"))?;
172
173        if col.data_type() == &DataType::Binary {
174            account_arrays.push(
175                col.as_any()
176                    .downcast_ref::<BinaryArray>()
177                    .context("failed to downcast Binary account column")?
178                    .clone(),
179            );
180        } else if col.data_type() == &DataType::LargeBinary {
181            account_arrays.push(
182                arrow::compute::cast(col, &DataType::Binary)
183                    .context("failed to cast LargeBinary account column to Binary")?
184                    .as_any()
185                    .downcast_ref::<BinaryArray>()
186                    .context("failed to downcast casted account column to BinaryArray")?
187                    .clone(),
188            );
189        }
190    }
191
192    if signature.accounts_names.len() > 10 {
193        let rest_of_acc = batch
194            .column_by_name("rest_of_accounts")
195            .context("rest_of_accounts column not found in instructions batch")?;
196
197        let num_acc = signature.accounts_names.len() - 10;
198        if rest_of_acc.data_type() == &DataType::new_list(DataType::Binary, true) {
199            unpack_rest_of_accounts::<i32, i32>(
200                num_acc,
201                rest_of_acc
202                    .as_any()
203                    .downcast_ref()
204                    .context("failed to downcast rest_of_accounts to List<Binary>")?,
205                &mut account_arrays,
206            )
207            .context("unpack rest_of_accounts column")?;
208        } else if rest_of_acc.data_type() == &DataType::new_list(DataType::LargeBinary, true) {
209            unpack_rest_of_accounts::<i32, i64>(
210                num_acc,
211                rest_of_acc
212                    .as_any()
213                    .downcast_ref()
214                    .context("failed to downcast rest_of_accounts to List<LargeBinary>")?,
215                &mut account_arrays,
216            )
217            .context("unpack rest_of_accounts column")?;
218        } else if rest_of_acc.data_type() == &DataType::new_large_list(DataType::Binary, true) {
219            unpack_rest_of_accounts::<i64, i32>(
220                num_acc,
221                rest_of_acc
222                    .as_any()
223                    .downcast_ref()
224                    .context("failed to downcast rest_of_accounts to LargeList<Binary>")?,
225                &mut account_arrays,
226            )
227            .context("unpack rest_of_accounts column")?;
228        } else if rest_of_acc.data_type() == &DataType::new_large_list(DataType::LargeBinary, true)
229        {
230            unpack_rest_of_accounts::<i64, i64>(
231                num_acc,
232                rest_of_acc
233                    .as_any()
234                    .downcast_ref()
235                    .context("failed to downcast rest_of_accounts to LargeList<LargeBinary>")?,
236                &mut account_arrays,
237            )
238            .context("unpack rest_of_accounts column")?;
239        }
240    }
241
242    let data_col = batch
243        .column_by_name("data")
244        .context("data column not found in instructions batch")?;
245
246    let decoded = if data_col.data_type() == &DataType::Binary {
247        decode_instructions(
248            signature,
249            &account_arrays,
250            data_col
251                .as_any()
252                .downcast_ref::<BinaryArray>()
253                .context("failed to downcast data column to BinaryArray")?,
254            allow_decode_fail,
255        )
256    } else if data_col.data_type() == &DataType::LargeBinary {
257        decode_instructions(
258            signature,
259            &account_arrays,
260            data_col
261                .as_any()
262                .downcast_ref::<LargeBinaryArray>()
263                .context("failed to downcast data column to LargeBinaryArray")?,
264            allow_decode_fail,
265        )
266    } else {
267        Err(anyhow!(
268            "expected the data column to be Binary or LargeBinary"
269        ))
270    }?;
271
272    if hstack {
273        hstack_impl(&decoded, batch)
274    } else {
275        Ok(decoded)
276    }
277}
278
279pub fn decode_instructions<I: OffsetSizeTrait>(
280    signature: &InstructionSignature,
281    accounts: &[BinaryArray],
282    data: &GenericBinaryArray<I>,
283    allow_decode_fail: bool,
284) -> Result<RecordBatch> {
285    let num_params = signature.params.len();
286
287    let mut decoded_params_vec: Vec<Vec<Option<DynValue>>> =
288        (0..num_params).map(|_| Vec::new()).collect();
289
290    for row_idx in 0..data.len() {
291        if data.is_null(row_idx) {
292            if allow_decode_fail {
293                log::debug!("Instruction data is null in row {row_idx}");
294                for v in &mut decoded_params_vec {
295                    v.push(None);
296                }
297                continue;
298            }
299            return Err(anyhow::anyhow!("Instruction data is null in row {row_idx}"));
300        }
301
302        let instruction_data = data.value(row_idx).to_vec();
303        let data_result = match_discriminators(&instruction_data, &signature.discriminator);
304        let data = match data_result {
305            Ok(data) => data,
306            Err(e) if allow_decode_fail => {
307                log::debug!("Error matching discriminators in row {row_idx}: {e:?}");
308                for v in &mut decoded_params_vec {
309                    v.push(None);
310                }
311                continue;
312            }
313            Err(e) => {
314                return Err(anyhow::anyhow!(
315                    "Error matching discriminators in row {row_idx}: {e:?}"
316                ));
317            }
318        };
319
320        // Don't error on remaining data because this is the behavior implemented by anchor.
321        // Note that borsh does error if there is remaining data after deserialization but anchor
322        // doesn't.
323        //
324        // Might be a good idea to extract this to a parameter to this function as well
325        let error_on_remanining = false;
326        let decoded_ix_result = deserialize_data(&data, &signature.params, error_on_remanining);
327        let decoded_ix = match decoded_ix_result {
328            Ok(ix) => ix,
329            Err(e) if allow_decode_fail => {
330                log::debug!("Error deserializing instruction in row {row_idx}: {e:?}");
331                for v in &mut decoded_params_vec {
332                    v.push(None);
333                }
334                continue;
335            }
336            Err(e) => {
337                return Err(anyhow::anyhow!(
338                    "Error deserializing instruction in row {row_idx}: {e:?}"
339                ));
340            }
341        };
342
343        for (i, value) in decoded_ix.into_iter().enumerate() {
344            decoded_params_vec[i].push(Some(value));
345        }
346    }
347
348    let mut data_arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(decoded_params_vec.len());
349    for (i, v) in decoded_params_vec.iter().enumerate() {
350        let array = to_arrow(&signature.params[i].param_type, v.clone())
351            .context("unable to convert instruction value to a arrow format value")?;
352        data_arrays.push(array);
353    }
354
355    let mut data_fields = Vec::with_capacity(signature.params.len());
356    for param in &signature.params {
357        let field = Field::new(
358            param.name.clone(),
359            to_arrow_dtype(&param.param_type)
360                .context("unable to convert instruction param type to arrow dtype")?,
361            true,
362        );
363        data_fields.push(field);
364    }
365
366    let acc_names_len = signature.accounts_names.len();
367    let mut accounts_arrays = Vec::new();
368    let mut acc_fields = Vec::new();
369
370    for i in 0..acc_names_len {
371        let arr = accounts
372            .get(i)
373            .context(format!("Account a{i} not found during decoding"))?;
374
375        if arr.data_type() == &DataType::LargeBinary {
376            accounts_arrays.push(
377                arrow::compute::cast(arr, &DataType::Binary)
378                    .context("failed to cast LargeBinary account to Binary")?,
379            );
380        } else {
381            accounts_arrays.push(Arc::new(arr.clone()) as Arc<dyn Array>);
382        }
383
384        if signature.accounts_names[i].is_empty() {
385            let field = Field::new(format!("a{i}"), DataType::Binary, true);
386            acc_fields.push(field);
387        } else {
388            let field = Field::new(signature.accounts_names[i].clone(), DataType::Binary, true);
389            acc_fields.push(field);
390        }
391    }
392
393    let decoded_instructions_array = data_arrays
394        .into_iter()
395        .chain(accounts_arrays)
396        .collect::<Vec<_>>();
397    let decoded_instructions_fields = data_fields
398        .into_iter()
399        .chain(acc_fields.clone())
400        .collect::<Vec<_>>();
401
402    let schema = Arc::new(Schema::new(decoded_instructions_fields));
403    let batch = RecordBatch::try_new(schema, decoded_instructions_array)
404        .context("Failed to create record batch from data arrays")?;
405
406    Ok(batch)
407}
408
409/// Decodes log event data from an Arrow `RecordBatch` into a new `RecordBatch` of decoded parameters.
410///
411/// Expects the input batch to contain a `message` column (Utf8 or LargeUtf8) with base64-encoded event data.
412/// Optionally horizontally stacks the result with the original batch.
413pub fn decode_logs_batch(
414    signature: &LogSignature,
415    batch: &RecordBatch,
416    allow_decode_fail: bool,
417    hstack: bool,
418) -> Result<RecordBatch> {
419    let message_col = batch
420        .column_by_name("message")
421        .context("message column not found in logs batch")?;
422
423    let decoded = if message_col.data_type() == &DataType::Utf8 {
424        decode_logs(
425            signature,
426            message_col
427                .as_any()
428                .downcast_ref::<StringArray>()
429                .context("failed to downcast message column to StringArray")?,
430            allow_decode_fail,
431        )
432    } else if message_col.data_type() == &DataType::LargeUtf8 {
433        decode_logs(
434            signature,
435            message_col
436                .as_any()
437                .downcast_ref::<LargeStringArray>()
438                .context("failed to downcast message column to LargeStringArray")?,
439            allow_decode_fail,
440        )
441    } else {
442        Err(anyhow!("expected String or LargeString message column"))
443    }?;
444
445    if hstack {
446        hstack_impl(&decoded, batch)
447    } else {
448        Ok(decoded)
449    }
450}
451
452/// Decodes base64-encoded log messages into a `RecordBatch` of typed parameter columns.
453///
454/// Each row's string value is base64-decoded and then Borsh-deserialized according to the
455/// parameter types in `signature`. Rows that fail to decode are filled with nulls when
456/// `allow_decode_fail` is `true`.
457pub fn decode_logs<I: OffsetSizeTrait>(
458    signature: &LogSignature,
459    data: &GenericStringArray<I>,
460    allow_decode_fail: bool,
461) -> Result<RecordBatch> {
462    let num_params = signature.params.len();
463
464    let mut decoded_params_vec: Vec<Vec<Option<DynValue>>> =
465        (0..num_params).map(|_| Vec::new()).collect();
466
467    for row_idx in 0..data.len() {
468        if data.is_null(row_idx) {
469            if allow_decode_fail {
470                log::debug!("Log data is null in row {row_idx}");
471                for v in &mut decoded_params_vec {
472                    v.push(None);
473                }
474                continue;
475            }
476            return Err(anyhow::anyhow!("Log data is null in row {row_idx}"));
477        }
478
479        let log_data = data.value(row_idx);
480        let log_data = STANDARD.decode(log_data);
481        let log_data = match log_data {
482            Ok(log_data) => log_data,
483            Err(e) if allow_decode_fail => {
484                log::debug!("Error base 64 decoding log data in row {row_idx}: {e:?}");
485                for v in &mut decoded_params_vec {
486                    v.push(None);
487                }
488                continue;
489            }
490            Err(e) => {
491                return Err(anyhow::anyhow!(
492                    "Error base 64 decoding log data in row {row_idx}: {e:?}"
493                ));
494            }
495        };
496
497        let decoded_log_result = deserialize_data(&log_data, &signature.params, false);
498        let decoded_log = match decoded_log_result {
499            Ok(log) => log,
500            Err(e) if allow_decode_fail => {
501                log::debug!("Error deserializing log in row {row_idx}: {e:?}");
502                for v in &mut decoded_params_vec {
503                    v.push(None);
504                }
505                continue;
506            }
507            Err(e) => {
508                return Err(anyhow::anyhow!(
509                    "Error deserializing log in row {row_idx}: {e:?}"
510                ));
511            }
512        };
513
514        for (i, value) in decoded_log.into_iter().enumerate() {
515            decoded_params_vec[i].push(Some(value));
516        }
517    }
518
519    let mut data_arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(decoded_params_vec.len());
520    for (i, v) in decoded_params_vec.iter().enumerate() {
521        let array = to_arrow(&signature.params[i].param_type, v.clone())
522            .context("unable to convert log value to a arrow format value")?;
523        data_arrays.push(array);
524    }
525
526    let mut data_fields = Vec::with_capacity(signature.params.len());
527    for param in &signature.params {
528        let field = Field::new(
529            param.name.clone(),
530            to_arrow_dtype(&param.param_type)
531                .context("unable to convert log param type to arrow dtype")?,
532            true,
533        );
534        data_fields.push(field);
535    }
536
537    let schema = Arc::new(Schema::new(data_fields));
538    let batch = RecordBatch::try_new(schema, data_arrays)
539        .context("Failed to create record batch from data arrays")?;
540
541    Ok(batch)
542}
543
544pub fn match_discriminators(instr_data: &[u8], discriminator: &[u8]) -> Result<Vec<u8>> {
545    let discriminator_len = discriminator.len();
546    if instr_data.len() < discriminator_len {
547        return Err(anyhow::anyhow!(
548            "Instruction data is too short to contain discriminator. Expected at least {} bytes, got {} bytes",
549            discriminator_len,
550            instr_data.len()
551        ));
552    }
553    let disc = &instr_data[..discriminator_len].to_vec();
554    let ix_data = &instr_data[discriminator_len..];
555    if !disc.eq(discriminator) {
556        return Err(anyhow::anyhow!(
557            "Instruction data discriminator doesn't match signature discriminator"
558        ));
559    }
560    Ok(ix_data.to_vec())
561}
562
563fn filter_by_discriminator_impl(
564    signature: &InstructionSignature,
565    batch: &RecordBatch,
566) -> Result<RecordBatch> {
567    let Some(data_col) = batch.column_by_name("data") else {
568        return Ok(batch.clone());
569    };
570
571    let discriminator = &signature.discriminator;
572    let mask = build_discriminator_mask(data_col.as_ref(), discriminator)
573        .context("build discriminator filter mask")?;
574
575    let non_matching = mask.iter().filter(|v| !v.unwrap_or(false)).count();
576    if non_matching > 0 {
577        log::debug!("filtering out {non_matching} instructions whose discriminator does not match");
578    }
579
580    compute::filter_record_batch(batch, &mask).context("filter record batch by discriminator")
581}
582
583fn build_discriminator_mask(col: &dyn Array, discriminator: &[u8]) -> Result<BooleanArray> {
584    if col.data_type() == &DataType::Binary {
585        let arr = col
586            .as_any()
587            .downcast_ref::<BinaryArray>()
588            .context("downcast data to BinaryArray")?;
589        Ok(BooleanArray::from(
590            arr.iter()
591                .map(|v| v.map(|b| b.starts_with(discriminator)))
592                .collect::<Vec<_>>(),
593        ))
594    } else if col.data_type() == &DataType::LargeBinary {
595        let arr = col
596            .as_any()
597            .downcast_ref::<LargeBinaryArray>()
598            .context("downcast data to LargeBinaryArray")?;
599        Ok(BooleanArray::from(
600            arr.iter()
601                .map(|v| v.map(|b| b.starts_with(discriminator)))
602                .collect::<Vec<_>>(),
603        ))
604    } else {
605        Err(anyhow!(
606            "unexpected data column type {}. Expected Binary or LargeBinary",
607            col.data_type()
608        ))
609    }
610}
611
612fn hstack_impl(decoded: &RecordBatch, input: &RecordBatch) -> Result<RecordBatch> {
613    let mut fields: Vec<Arc<Field>> = decoded.schema().fields().iter().cloned().collect();
614    let mut arrays: Vec<Arc<dyn Array>> = decoded.columns().to_vec();
615
616    for (i, col) in input.columns().iter().enumerate() {
617        fields.push(input.schema().field(i).clone().into());
618        arrays.push(col.clone());
619    }
620
621    let schema = Schema::new(fields);
622    RecordBatch::try_new(Arc::new(schema), arrays).context("construct hstacked arrow batch")
623}
624
625/// Converts an [`InstructionSignature`] into an Arrow [`Schema`], mapping each parameter
626/// and account name to the corresponding Arrow field and data type.
627pub fn instruction_signature_to_arrow_schema(signature: &InstructionSignature) -> Result<Schema> {
628    let mut fields = Vec::new();
629
630    for param in &signature.params {
631        let field = Field::new(
632            param.name.clone(),
633            to_arrow_dtype(&param.param_type)
634                .context("unable to convert instruction param type to arrow dtype")?,
635            true,
636        );
637        fields.push(field);
638    }
639
640    for account in &signature.accounts_names {
641        let field = Field::new(account.clone(), DataType::Binary, true);
642        fields.push(field);
643    }
644
645    Ok(Schema::new(fields))
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651    use crate::deserialize::{DynType, ParamInput};
652    use std::fs::File;
653
654    #[test]
655    #[ignore]
656    fn test_instructions_with_real_data() {
657        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
658
659        let builder =
660            ParquetRecordBatchReaderBuilder::try_new(File::open("jup.parquet").unwrap()).unwrap();
661        let mut reader = builder.build().unwrap();
662        let instructions = reader.next().unwrap().unwrap();
663        let ix_signature = InstructionSignature {
664            // // SPL Token Transfer
665            // discriminator: &[3],
666            // params: vec![ParamInput {
667            //     name: "Amount".to_string(),
668            //     param_type: DynType::U64,
669            // }],
670            // accounts: vec![
671            //     "Source".to_string(),
672            //     "Destination".to_string(),
673            //     "Authority".to_string(),
674            // ],
675
676            // // JUP SwapEvent
677            // discriminator: &[
678            //     228, 69, 165, 46, 81, 203, 154, 29, 64, 198, 205, 232, 38, 8, 113, 226,
679            // ],
680            // params: vec![
681            //     ParamInput {
682            //         name: "Amm".to_string(),
683            //         param_type: DynType::Pubkey,
684            //     },
685            //     ParamInput {
686            //         name: "InputMint".to_string(),
687            //         param_type: DynType::Pubkey,
688            //     },
689            //     ParamInput {
690            //         name: "InputAmount".to_string(),
691            //         param_type: DynType::U64,
692            //     },
693            //     ParamInput {
694            //         name: "OutputMint".to_string(),
695            //         param_type: DynType::Pubkey,
696            //     },
697            //     ParamInput {
698            //         name: "OutputAmount".to_string(),
699            //         param_type: DynType::U64,
700            //     },
701            // ],
702            // accounts: vec![],
703
704            // JUP Route
705            discriminator: vec![229, 23, 203, 151, 122, 227, 173, 42],
706            params: vec![
707                ParamInput {
708                    name: "RoutePlan".to_string(),
709                    param_type: DynType::Array(Box::new(DynType::Struct(vec![
710                        (
711                            "Swap".to_string(),
712                            DynType::Enum(vec![
713                                ("Saber".to_string(), None),
714                                ("SaberAddDecimalsDeposit".to_string(), None),
715                                ("SaberAddDecimalsWithdraw".to_string(), None),
716                                ("TokenSwap".to_string(), None),
717                                ("Sencha".to_string(), None),
718                                ("Step".to_string(), None),
719                                ("Cropper".to_string(), None),
720                                ("Raydium".to_string(), None),
721                                (
722                                    "Crema".to_string(),
723                                    Some(DynType::Struct(vec![(
724                                        "a_to_b".to_string(),
725                                        DynType::Bool,
726                                    )])),
727                                ),
728                                ("Lifinity".to_string(), None),
729                                ("Mercurial".to_string(), None),
730                                ("Cykura".to_string(), None),
731                                (
732                                    "Serum".to_string(),
733                                    Some(DynType::Struct(vec![(
734                                        "side".to_string(),
735                                        DynType::Enum(vec![
736                                            ("Bid".to_string(), None),
737                                            ("Ask".to_string(), None),
738                                        ]),
739                                    )])),
740                                ),
741                                ("MarinadeDeposit".to_string(), None),
742                                ("MarinadeUnstake".to_string(), None),
743                                (
744                                    "Aldrin".to_string(),
745                                    Some(DynType::Struct(vec![(
746                                        "side".to_string(),
747                                        DynType::Enum(vec![
748                                            ("Bid".to_string(), None),
749                                            ("Ask".to_string(), None),
750                                        ]),
751                                    )])),
752                                ),
753                                (
754                                    "AldrinV2".to_string(),
755                                    Some(DynType::Struct(vec![(
756                                        "side".to_string(),
757                                        DynType::Enum(vec![
758                                            ("Bid".to_string(), None),
759                                            ("Ask".to_string(), None),
760                                        ]),
761                                    )])),
762                                ),
763                                (
764                                    "Whirlpool".to_string(),
765                                    Some(DynType::Struct(vec![(
766                                        "a_to_b".to_string(),
767                                        DynType::Bool,
768                                    )])),
769                                ),
770                                (
771                                    "Invariant".to_string(),
772                                    Some(DynType::Struct(vec![(
773                                        "x_to_y".to_string(),
774                                        DynType::Bool,
775                                    )])),
776                                ),
777                                ("Meteora".to_string(), None),
778                                ("GooseFX".to_string(), None),
779                                (
780                                    "DeltaFi".to_string(),
781                                    Some(DynType::Struct(vec![(
782                                        "stable".to_string(),
783                                        DynType::Bool,
784                                    )])),
785                                ),
786                                ("Balansol".to_string(), None),
787                                (
788                                    "MarcoPolo".to_string(),
789                                    Some(DynType::Struct(vec![(
790                                        "x_to_y".to_string(),
791                                        DynType::Bool,
792                                    )])),
793                                ),
794                                (
795                                    "Dradex".to_string(),
796                                    Some(DynType::Struct(vec![(
797                                        "side".to_string(),
798                                        DynType::Enum(vec![
799                                            ("Bid".to_string(), None),
800                                            ("Ask".to_string(), None),
801                                        ]),
802                                    )])),
803                                ),
804                                ("LifinityV2".to_string(), None),
805                                ("RaydiumClmm".to_string(), None),
806                                (
807                                    "Openbook".to_string(),
808                                    Some(DynType::Struct(vec![(
809                                        "side".to_string(),
810                                        DynType::Enum(vec![
811                                            ("Bid".to_string(), None),
812                                            ("Ask".to_string(), None),
813                                        ]),
814                                    )])),
815                                ),
816                                (
817                                    "Phoenix".to_string(),
818                                    Some(DynType::Struct(vec![(
819                                        "side".to_string(),
820                                        DynType::Enum(vec![
821                                            ("Bid".to_string(), None),
822                                            ("Ask".to_string(), None),
823                                        ]),
824                                    )])),
825                                ),
826                                (
827                                    "Symmetry".to_string(),
828                                    Some(DynType::Struct(vec![
829                                        ("from_token_id".to_string(), DynType::U64),
830                                        ("to_token_id".to_string(), DynType::U64),
831                                    ])),
832                                ),
833                                ("TokenSwapV2".to_string(), None),
834                                ("HeliumTreasuryManagementRedeemV0".to_string(), None),
835                                ("StakeDexStakeWrappedSol".to_string(), None),
836                                (
837                                    "StakeDexSwapViaStake".to_string(),
838                                    Some(DynType::Struct(vec![(
839                                        "bridge_stake_seed".to_string(),
840                                        DynType::U32,
841                                    )])),
842                                ),
843                                ("GooseFXV2".to_string(), None),
844                                ("Perps".to_string(), None),
845                                ("PerpsAddLiquidity".to_string(), None),
846                                ("PerpsRemoveLiquidity".to_string(), None),
847                                ("MeteoraDlmm".to_string(), None),
848                                (
849                                    "OpenBookV2".to_string(),
850                                    Some(DynType::Struct(vec![(
851                                        "side".to_string(),
852                                        DynType::Enum(vec![
853                                            ("Bid".to_string(), None),
854                                            ("Ask".to_string(), None),
855                                        ]),
856                                    )])),
857                                ),
858                                ("RaydiumClmmV2".to_string(), None),
859                                (
860                                    "StakeDexPrefundWithdrawStakeAndDepositStake".to_string(),
861                                    Some(DynType::Struct(vec![(
862                                        "bridge_stake_seed".to_string(),
863                                        DynType::U32,
864                                    )])),
865                                ),
866                                (
867                                    "Clone".to_string(),
868                                    Some(DynType::Struct(vec![
869                                        ("pool_index".to_string(), DynType::U8),
870                                        ("quantity_is_input".to_string(), DynType::Bool),
871                                        ("quantity_is_collateral".to_string(), DynType::Bool),
872                                    ])),
873                                ),
874                                (
875                                    "SanctumS".to_string(),
876                                    Some(DynType::Struct(vec![
877                                        ("src_lst_value_calc_accs".to_string(), DynType::U8),
878                                        ("dst_lst_value_calc_accs".to_string(), DynType::U8),
879                                        ("src_lst_index".to_string(), DynType::U32),
880                                        ("dst_lst_index".to_string(), DynType::U32),
881                                    ])),
882                                ),
883                                (
884                                    "SanctumSAddLiquidity".to_string(),
885                                    Some(DynType::Struct(vec![
886                                        ("lst_value_calc_accs".to_string(), DynType::U8),
887                                        ("lst_index".to_string(), DynType::U32),
888                                    ])),
889                                ),
890                                (
891                                    "SanctumSRemoveLiquidity".to_string(),
892                                    Some(DynType::Struct(vec![
893                                        ("lst_value_calc_accs".to_string(), DynType::U8),
894                                        ("lst_index".to_string(), DynType::U32),
895                                    ])),
896                                ),
897                                ("RaydiumCP".to_string(), None),
898                                (
899                                    "WhirlpoolSwapV2".to_string(),
900                                    Some(DynType::Struct(vec![
901                                        ("a_to_b".to_string(), DynType::Bool),
902                                        (
903                                            "remaining_accounts_info".to_string(),
904                                            DynType::Struct(vec![(
905                                                "slices".to_string(),
906                                                DynType::Array(Box::new(DynType::Struct(vec![(
907                                                    "remaining_accounts_slice".to_string(),
908                                                    DynType::Struct(vec![
909                                                        ("accounts_type".to_string(), DynType::U8),
910                                                        ("length".to_string(), DynType::U8),
911                                                    ]),
912                                                )]))),
913                                            )]),
914                                        ),
915                                    ])),
916                                ),
917                                ("OneIntro".to_string(), None),
918                                ("PumpdotfunWrappedBuy".to_string(), None),
919                                ("PumpdotfunWrappedSell".to_string(), None),
920                                ("PerpsV2".to_string(), None),
921                                ("PerpsV2AddLiquidity".to_string(), None),
922                                ("PerpsV2RemoveLiquidity".to_string(), None),
923                                ("MoonshotWrappedBuy".to_string(), None),
924                                ("MoonshotWrappedSell".to_string(), None),
925                                ("StabbleStableSwap".to_string(), None),
926                                ("StabbleWeightedSwap".to_string(), None),
927                                (
928                                    "Obric".to_string(),
929                                    Some(DynType::Struct(vec![(
930                                        "x_to_y".to_string(),
931                                        DynType::Bool,
932                                    )])),
933                                ),
934                                ("FoxBuyFromEstimatedCost".to_string(), None),
935                                (
936                                    "FoxClaimPartial".to_string(),
937                                    Some(DynType::Struct(vec![(
938                                        "is_y".to_string(),
939                                        DynType::Bool,
940                                    )])),
941                                ),
942                                (
943                                    "SolFi".to_string(),
944                                    Some(DynType::Struct(vec![(
945                                        "is_quote_to_base".to_string(),
946                                        DynType::Bool,
947                                    )])),
948                                ),
949                                ("SolayerDelegateNoInit".to_string(), None),
950                                ("SolayerUndelegateNoInit".to_string(), None),
951                                (
952                                    "TokenMill".to_string(),
953                                    Some(DynType::Struct(vec![(
954                                        "side".to_string(),
955                                        DynType::Enum(vec![
956                                            ("Bid".to_string(), None),
957                                            ("Ask".to_string(), None),
958                                        ]),
959                                    )])),
960                                ),
961                                ("DaosFunBuy".to_string(), None),
962                                ("DaosFunSell".to_string(), None),
963                                ("ZeroFi".to_string(), None),
964                                ("StakeDexWithdrawWrappedSol".to_string(), None),
965                                ("VirtualsBuy".to_string(), None),
966                                ("VirtualsSell".to_string(), None),
967                                (
968                                    "Peren".to_string(),
969                                    Some(DynType::Struct(vec![
970                                        ("in_index".to_string(), DynType::U8),
971                                        ("out_index".to_string(), DynType::U8),
972                                    ])),
973                                ),
974                                ("PumpdotfunAmmBuy".to_string(), None),
975                                ("PumpdotfunAmmSell".to_string(), None),
976                                ("Gamma".to_string(), None),
977                            ]),
978                        ),
979                        ("Percent".to_string(), DynType::U8),
980                        ("InputIndex".to_string(), DynType::U8),
981                        ("OutputIndex".to_string(), DynType::U8),
982                    ]))),
983                },
984                ParamInput {
985                    name: "InAmount".to_string(),
986                    param_type: DynType::U64,
987                },
988                ParamInput {
989                    name: "QuotedOutAmount".to_string(),
990                    param_type: DynType::U64,
991                },
992                ParamInput {
993                    name: "SlippageBps".to_string(),
994                    param_type: DynType::U16,
995                },
996                ParamInput {
997                    name: "PlatformFeeBps".to_string(),
998                    param_type: DynType::U8,
999                },
1000            ],
1001            accounts_names: vec![
1002                "TokenProgram".to_string(),
1003                "UserTransferAuthority".to_string(),
1004                "UserSourceTokenAccount".to_string(),
1005                "UserDestinationTokenAccount".to_string(),
1006                "DestinationTokenAccount".to_string(),
1007                "PlatformFeeAccount".to_string(),
1008                "EventAuthority".to_string(),
1009                "Program".to_string(),
1010                "test8".to_string(),
1011                "test9".to_string(),
1012            ],
1013        };
1014
1015        let result = decode_instructions_batch(&ix_signature, &instructions, true)
1016            .context("decode failed")
1017            .unwrap();
1018
1019        // Save the filtered instructions to a new parquet file
1020        let mut file = File::create("decoded_instructions.parquet").unwrap();
1021        let mut writer =
1022            parquet::arrow::ArrowWriter::try_new(&mut file, result.schema(), None).unwrap();
1023        writer.write(&result).unwrap();
1024        writer.close().unwrap();
1025    }
1026
1027    #[test]
1028    #[ignore]
1029    fn test_decode_logs_with_real_data() {
1030        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1031
1032        let builder =
1033            ParquetRecordBatchReaderBuilder::try_new(File::open("logs.parquet").unwrap()).unwrap();
1034        let mut reader = builder.build().unwrap();
1035        let logs = reader.next().unwrap().unwrap();
1036
1037        let signature = LogSignature {
1038            params: vec![
1039                ParamInput {
1040                    name: "whirlpool".to_string(),
1041                    param_type: DynType::FixedArray(Box::new(DynType::U8), 32),
1042                },
1043                ParamInput {
1044                    name: "a_to_b".to_string(),
1045                    param_type: DynType::Bool,
1046                },
1047                ParamInput {
1048                    name: "pre_sqrt_price".to_string(),
1049                    param_type: DynType::U128,
1050                },
1051                ParamInput {
1052                    name: "post_sqrt_price".to_string(),
1053                    param_type: DynType::U128,
1054                },
1055                ParamInput {
1056                    name: "x".to_string(),
1057                    param_type: DynType::U64,
1058                },
1059                ParamInput {
1060                    name: "input_amount".to_string(),
1061                    param_type: DynType::U64,
1062                },
1063                ParamInput {
1064                    name: "output_amount".to_string(),
1065                    param_type: DynType::U64,
1066                },
1067                ParamInput {
1068                    name: "input_transfer_fee".to_string(),
1069                    param_type: DynType::U64,
1070                },
1071                ParamInput {
1072                    name: "output_transfer_fee".to_string(),
1073                    param_type: DynType::U64,
1074                },
1075                ParamInput {
1076                    name: "lp_fee".to_string(),
1077                    param_type: DynType::U64,
1078                },
1079                ParamInput {
1080                    name: "protocol_fee".to_string(),
1081                    param_type: DynType::U64,
1082                },
1083            ],
1084        };
1085
1086        let result = decode_logs_batch(&signature, &logs, true)
1087            .context("decode failed")
1088            .unwrap();
1089
1090        // Save the filtered instructions to a new parquet file
1091        let mut file = File::create("decoded_logs.parquet").unwrap();
1092        let mut writer =
1093            parquet::arrow::ArrowWriter::try_new(&mut file, result.schema(), None).unwrap();
1094        writer.write(&result).unwrap();
1095        writer.close().unwrap();
1096    }
1097
1098    #[test]
1099    #[ignore]
1100    fn test_instruction_signature_to_arrow_schema() {
1101        // Create a test instruction signature
1102        let signature = InstructionSignature {
1103            discriminator: vec![],
1104            params: vec![
1105                ParamInput {
1106                    name: "amount".to_string(),
1107                    param_type: DynType::U64,
1108                },
1109                ParamInput {
1110                    name: "is_valid".to_string(),
1111                    param_type: DynType::Bool,
1112                },
1113                ParamInput {
1114                    name: "amm".to_string(),
1115                    param_type: DynType::FixedArray(Box::new(DynType::U8), 32),
1116                },
1117            ],
1118            accounts_names: vec!["source".to_string(), "destination".to_string()],
1119        };
1120
1121        // Convert to schema
1122        let schema = instruction_signature_to_arrow_schema(&signature).unwrap();
1123
1124        // Verify the schema has the correct number of fields
1125        assert_eq!(schema.fields().len(), 5); // 2 params + 2 accounts
1126
1127        // Verify param fields
1128        let amount_field = schema.field_with_name("amount").unwrap();
1129        assert_eq!(amount_field.name(), "amount");
1130        assert!(amount_field.is_nullable());
1131
1132        let is_valid_field = schema.field_with_name("is_valid").unwrap();
1133        assert_eq!(is_valid_field.name(), "is_valid");
1134        assert!(is_valid_field.is_nullable());
1135
1136        let amm_field = schema.field_with_name("amm").unwrap();
1137        assert_eq!(amm_field.name(), "amm");
1138        assert!(amm_field.is_nullable());
1139
1140        // Verify account fields
1141        let source_field = schema.field_with_name("source").unwrap();
1142        assert_eq!(source_field.name(), "source");
1143        assert_eq!(source_field.data_type(), &DataType::Binary);
1144        assert!(source_field.is_nullable());
1145
1146        let dest_field = schema.field_with_name("destination").unwrap();
1147        assert_eq!(dest_field.name(), "destination");
1148        assert_eq!(dest_field.data_type(), &DataType::Binary);
1149        assert!(dest_field.is_nullable());
1150    }
1151}