1use std::sync::Arc;
20
21use arrow::array::builder;
22use arrow::datatypes::{DataType, Field, Fields, Schema};
23use arrow::record_batch::RecordBatch;
24
25pub fn blocks_schema() -> Schema {
27 Schema::new(vec![
28 Field::new("slot", DataType::UInt64, true),
29 Field::new("hash", DataType::Binary, true),
30 Field::new("parent_slot", DataType::UInt64, true),
31 Field::new("parent_hash", DataType::Binary, true),
32 Field::new("height", DataType::UInt64, true),
33 Field::new("timestamp", DataType::Int64, true),
34 ])
35}
36
37pub fn rewards_schema() -> Schema {
39 Schema::new(vec![
40 Field::new("block_slot", DataType::UInt64, true),
41 Field::new("block_hash", DataType::Binary, true),
42 Field::new("pubkey", DataType::Binary, true),
43 Field::new("lamports", DataType::Int64, true),
44 Field::new("post_balance", DataType::UInt64, true),
45 Field::new("reward_type", DataType::Utf8, true),
46 Field::new("commission", DataType::UInt8, true),
47 ])
48}
49
50pub fn token_balances_schema() -> Schema {
52 Schema::new(vec![
53 Field::new("block_slot", DataType::UInt64, true),
54 Field::new("block_hash", DataType::Binary, true),
55 Field::new("transaction_index", DataType::UInt32, true),
56 Field::new("account", DataType::Binary, true),
57 Field::new("pre_mint", DataType::Binary, true),
58 Field::new("post_mint", DataType::Binary, true),
59 Field::new("pre_decimals", DataType::UInt16, true),
60 Field::new("post_decimals", DataType::UInt16, true),
61 Field::new("pre_program_id", DataType::Binary, true),
62 Field::new("post_program_id", DataType::Binary, true),
63 Field::new("pre_owner", DataType::Binary, true),
64 Field::new("post_owner", DataType::Binary, true),
65 Field::new("pre_amount", DataType::UInt64, true),
66 Field::new("post_amount", DataType::UInt64, true),
67 ])
68}
69
70pub fn balances_schema() -> Schema {
72 Schema::new(vec![
73 Field::new("block_slot", DataType::UInt64, true),
74 Field::new("block_hash", DataType::Binary, true),
75 Field::new("transaction_index", DataType::UInt32, true),
76 Field::new("account", DataType::Binary, true),
77 Field::new("pre", DataType::UInt64, true),
78 Field::new("post", DataType::UInt64, true),
79 ])
80}
81
82pub fn logs_schema() -> Schema {
84 Schema::new(vec![
85 Field::new("block_slot", DataType::UInt64, true),
86 Field::new("block_hash", DataType::Binary, true),
87 Field::new("transaction_index", DataType::UInt32, true),
88 Field::new("log_index", DataType::UInt32, true),
89 Field::new(
90 "instruction_address",
91 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
92 true,
93 ),
94 Field::new("program_id", DataType::Binary, true),
95 Field::new("kind", DataType::Utf8, true),
96 Field::new("message", DataType::Utf8, true),
97 ])
98}
99
100pub fn transactions_schema() -> Schema {
102 Schema::new(vec![
103 Field::new("block_slot", DataType::UInt64, true),
104 Field::new("block_hash", DataType::Binary, true),
105 Field::new("transaction_index", DataType::UInt32, true),
106 Field::new("signature", DataType::Binary, true),
107 Field::new("version", DataType::Int8, true),
108 Field::new(
109 "account_keys",
110 DataType::List(Arc::new(Field::new("item", DataType::Binary, true))),
111 true,
112 ),
113 Field::new(
114 "address_table_lookups",
115 DataType::List(Arc::new(Field::new(
116 "item",
117 address_table_lookup_dt(),
118 true,
119 ))),
120 true,
121 ),
122 Field::new("num_readonly_signed_accounts", DataType::UInt32, true),
123 Field::new("num_readonly_unsigned_accounts", DataType::UInt32, true),
124 Field::new("num_required_signatures", DataType::UInt32, true),
125 Field::new("recent_blockhash", DataType::Binary, true),
126 Field::new(
127 "signatures",
128 DataType::List(Arc::new(Field::new("item", DataType::Binary, true))),
129 true,
130 ),
131 Field::new("err", DataType::Utf8, true),
132 Field::new("fee", DataType::UInt64, true),
133 Field::new("compute_units_consumed", DataType::UInt64, true),
134 Field::new(
135 "loaded_readonly_addresses",
136 DataType::List(Arc::new(Field::new("item", DataType::Binary, true))),
137 true,
138 ),
139 Field::new(
140 "loaded_writable_addresses",
141 DataType::List(Arc::new(Field::new("item", DataType::Binary, true))),
142 true,
143 ),
144 Field::new("fee_payer", DataType::Binary, true),
145 Field::new("has_dropped_log_messages", DataType::Boolean, true),
146 ])
147}
148
149fn address_table_lookup_dt() -> DataType {
150 DataType::Struct(Fields::from(vec![
151 Arc::new(Field::new("account_key", DataType::Binary, true)),
152 Arc::new(Field::new(
153 "writable_indexes",
154 DataType::List(Arc::new(Field::new("item", DataType::UInt64, true))),
155 true,
156 )),
157 Arc::new(Field::new(
158 "readonly_indexes",
159 DataType::List(Arc::new(Field::new("item", DataType::UInt64, true))),
160 true,
161 )),
162 ]))
163}
164
165pub fn instructions_schema() -> Schema {
167 Schema::new(vec![
168 Field::new("block_slot", DataType::UInt64, true),
169 Field::new("block_hash", DataType::Binary, true),
170 Field::new("transaction_index", DataType::UInt32, true),
171 Field::new(
172 "instruction_address",
173 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
174 true,
175 ),
176 Field::new("program_id", DataType::Binary, true),
177 Field::new("a0", DataType::Binary, true),
178 Field::new("a1", DataType::Binary, true),
179 Field::new("a2", DataType::Binary, true),
180 Field::new("a3", DataType::Binary, true),
181 Field::new("a4", DataType::Binary, true),
182 Field::new("a5", DataType::Binary, true),
183 Field::new("a6", DataType::Binary, true),
184 Field::new("a7", DataType::Binary, true),
185 Field::new("a8", DataType::Binary, true),
186 Field::new("a9", DataType::Binary, true),
187 Field::new(
188 "rest_of_accounts",
189 DataType::List(Arc::new(Field::new("item", DataType::Binary, true))),
190 true,
191 ),
192 Field::new("data", DataType::Binary, true),
193 Field::new("d1", DataType::Binary, true),
194 Field::new("d2", DataType::Binary, true),
195 Field::new("d4", DataType::Binary, true),
196 Field::new("d8", DataType::Binary, true),
197 Field::new("error", DataType::Utf8, true),
198 Field::new("compute_units_consumed", DataType::UInt64, true),
199 Field::new("is_committed", DataType::Boolean, true),
200 Field::new("has_dropped_log_messages", DataType::Boolean, true),
201 ])
202}
203
204#[derive(Default)]
206pub struct BlocksBuilder {
207 pub slot: builder::UInt64Builder,
208 pub hash: builder::BinaryBuilder,
209 pub parent_slot: builder::UInt64Builder,
210 pub parent_hash: builder::BinaryBuilder,
211 pub height: builder::UInt64Builder,
212 pub timestamp: builder::Int64Builder,
213}
214
215impl BlocksBuilder {
216 #[expect(clippy::unwrap_used, reason = "schema is a compile-time constant")]
218 pub fn finish(mut self) -> RecordBatch {
219 RecordBatch::try_new(
220 Arc::new(blocks_schema()),
221 vec![
222 Arc::new(self.slot.finish()),
223 Arc::new(self.hash.finish()),
224 Arc::new(self.parent_slot.finish()),
225 Arc::new(self.parent_hash.finish()),
226 Arc::new(self.height.finish()),
227 Arc::new(self.timestamp.finish()),
228 ],
229 )
230 .unwrap()
231 }
232}
233
234#[derive(Default)]
236pub struct RewardsBuilder {
237 pub block_slot: builder::UInt64Builder,
238 pub block_hash: builder::BinaryBuilder,
239 pub pubkey: builder::BinaryBuilder,
240 pub lamports: builder::Int64Builder,
241 pub post_balance: builder::UInt64Builder,
242 pub reward_type: builder::StringBuilder,
243 pub commission: builder::UInt8Builder,
244}
245
246impl RewardsBuilder {
247 #[expect(clippy::unwrap_used, reason = "schema is a compile-time constant")]
249 pub fn finish(mut self) -> RecordBatch {
250 RecordBatch::try_new(
251 Arc::new(rewards_schema()),
252 vec![
253 Arc::new(self.block_slot.finish()),
254 Arc::new(self.block_hash.finish()),
255 Arc::new(self.pubkey.finish()),
256 Arc::new(self.lamports.finish()),
257 Arc::new(self.post_balance.finish()),
258 Arc::new(self.reward_type.finish()),
259 Arc::new(self.commission.finish()),
260 ],
261 )
262 .unwrap()
263 }
264}
265
266#[derive(Default)]
268pub struct TokenBalancesBuilder {
269 pub block_slot: builder::UInt64Builder,
270 pub block_hash: builder::BinaryBuilder,
271 pub transaction_index: builder::UInt32Builder,
272 pub account: builder::BinaryBuilder,
273 pub pre_mint: builder::BinaryBuilder,
274 pub post_mint: builder::BinaryBuilder,
275 pub pre_decimals: builder::UInt16Builder,
276 pub post_decimals: builder::UInt16Builder,
277 pub pre_program_id: builder::BinaryBuilder,
278 pub post_program_id: builder::BinaryBuilder,
279 pub pre_owner: builder::BinaryBuilder,
280 pub post_owner: builder::BinaryBuilder,
281 pub pre_amount: builder::UInt64Builder,
282 pub post_amount: builder::UInt64Builder,
283}
284
285impl TokenBalancesBuilder {
286 #[expect(clippy::unwrap_used, reason = "schema is a compile-time constant")]
288 pub fn finish(mut self) -> RecordBatch {
289 RecordBatch::try_new(
290 Arc::new(token_balances_schema()),
291 vec![
292 Arc::new(self.block_slot.finish()),
293 Arc::new(self.block_hash.finish()),
294 Arc::new(self.transaction_index.finish()),
295 Arc::new(self.account.finish()),
296 Arc::new(self.pre_mint.finish()),
297 Arc::new(self.post_mint.finish()),
298 Arc::new(self.pre_decimals.finish()),
299 Arc::new(self.post_decimals.finish()),
300 Arc::new(self.pre_program_id.finish()),
301 Arc::new(self.post_program_id.finish()),
302 Arc::new(self.pre_owner.finish()),
303 Arc::new(self.post_owner.finish()),
304 Arc::new(self.pre_amount.finish()),
305 Arc::new(self.post_amount.finish()),
306 ],
307 )
308 .unwrap()
309 }
310}
311
312#[derive(Default)]
314pub struct BalancesBuilder {
315 pub block_slot: builder::UInt64Builder,
316 pub block_hash: builder::BinaryBuilder,
317 pub transaction_index: builder::UInt32Builder,
318 pub account: builder::BinaryBuilder,
319 pub pre: builder::UInt64Builder,
320 pub post: builder::UInt64Builder,
321}
322
323impl BalancesBuilder {
324 #[expect(clippy::unwrap_used, reason = "schema is a compile-time constant")]
326 pub fn finish(mut self) -> RecordBatch {
327 RecordBatch::try_new(
328 Arc::new(balances_schema()),
329 vec![
330 Arc::new(self.block_slot.finish()),
331 Arc::new(self.block_hash.finish()),
332 Arc::new(self.transaction_index.finish()),
333 Arc::new(self.account.finish()),
334 Arc::new(self.pre.finish()),
335 Arc::new(self.post.finish()),
336 ],
337 )
338 .unwrap()
339 }
340}
341
342#[derive(Default)]
344pub struct LogsBuilder {
345 pub block_slot: builder::UInt64Builder,
346 pub block_hash: builder::BinaryBuilder,
347 pub transaction_index: builder::UInt32Builder,
348 pub log_index: builder::UInt32Builder,
349 pub instruction_address: builder::ListBuilder<builder::UInt32Builder>,
350 pub program_id: builder::BinaryBuilder,
351 pub kind: builder::StringBuilder,
352 pub message: builder::StringBuilder,
353}
354
355impl LogsBuilder {
356 #[expect(clippy::unwrap_used, reason = "schema is a compile-time constant")]
358 pub fn finish(mut self) -> RecordBatch {
359 RecordBatch::try_new(
360 Arc::new(logs_schema()),
361 vec![
362 Arc::new(self.block_slot.finish()),
363 Arc::new(self.block_hash.finish()),
364 Arc::new(self.transaction_index.finish()),
365 Arc::new(self.log_index.finish()),
366 Arc::new(self.instruction_address.finish()),
367 Arc::new(self.program_id.finish()),
368 Arc::new(self.kind.finish()),
369 Arc::new(self.message.finish()),
370 ],
371 )
372 .unwrap()
373 }
374}
375
376#[derive(Default)]
378pub struct TransactionsBuilder {
379 pub block_slot: builder::UInt64Builder,
380 pub block_hash: builder::BinaryBuilder,
381 pub transaction_index: builder::UInt32Builder,
382 pub signature: builder::BinaryBuilder,
383 pub version: builder::Int8Builder,
384 pub account_keys: builder::ListBuilder<builder::BinaryBuilder>,
385 pub address_table_lookups: AddressTableLookupsBuilder,
386 pub num_readonly_signed_accounts: builder::UInt32Builder,
387 pub num_readonly_unsigned_accounts: builder::UInt32Builder,
388 pub num_required_signatures: builder::UInt32Builder,
389 pub recent_blockhash: builder::BinaryBuilder,
390 pub signatures: builder::ListBuilder<builder::BinaryBuilder>,
391 pub err: builder::StringBuilder,
393 pub fee: builder::UInt64Builder,
394 pub compute_units_consumed: builder::UInt64Builder,
395 pub loaded_readonly_addresses: builder::ListBuilder<builder::BinaryBuilder>,
396 pub loaded_writable_addresses: builder::ListBuilder<builder::BinaryBuilder>,
397 pub fee_payer: builder::BinaryBuilder,
398 pub has_dropped_log_messages: builder::BooleanBuilder,
399}
400
401pub struct AddressTableLookupsBuilder(pub builder::ListBuilder<builder::StructBuilder>);
403
404impl Default for AddressTableLookupsBuilder {
405 fn default() -> Self {
406 Self(builder::ListBuilder::new(builder::StructBuilder::new(
407 match address_table_lookup_dt() {
408 DataType::Struct(fields) => fields,
409 _ => unreachable!(),
410 },
411 vec![
412 Box::new(builder::BinaryBuilder::default()),
413 Box::new(builder::ListBuilder::new(builder::UInt64Builder::default())),
414 Box::new(builder::ListBuilder::new(builder::UInt64Builder::default())),
415 ],
416 )))
417 }
418}
419
420impl TransactionsBuilder {
421 #[expect(clippy::unwrap_used, reason = "schema is a compile-time constant")]
423 pub fn finish(mut self) -> RecordBatch {
424 RecordBatch::try_new(
425 Arc::new(transactions_schema()),
426 vec![
427 Arc::new(self.block_slot.finish()),
428 Arc::new(self.block_hash.finish()),
429 Arc::new(self.transaction_index.finish()),
430 Arc::new(self.signature.finish()),
431 Arc::new(self.version.finish()),
432 Arc::new(self.account_keys.finish()),
433 Arc::new(self.address_table_lookups.0.finish()),
434 Arc::new(self.num_readonly_signed_accounts.finish()),
435 Arc::new(self.num_readonly_unsigned_accounts.finish()),
436 Arc::new(self.num_required_signatures.finish()),
437 Arc::new(self.recent_blockhash.finish()),
438 Arc::new(self.signatures.finish()),
439 Arc::new(self.err.finish()),
440 Arc::new(self.fee.finish()),
441 Arc::new(self.compute_units_consumed.finish()),
442 Arc::new(self.loaded_readonly_addresses.finish()),
443 Arc::new(self.loaded_writable_addresses.finish()),
444 Arc::new(self.fee_payer.finish()),
445 Arc::new(self.has_dropped_log_messages.finish()),
446 ],
447 )
448 .unwrap()
449 }
450}
451
452#[derive(Default)]
454pub struct InstructionsBuilder {
455 pub block_slot: builder::UInt64Builder,
456 pub block_hash: builder::BinaryBuilder,
457 pub transaction_index: builder::UInt32Builder,
458 pub instruction_address: builder::ListBuilder<builder::UInt32Builder>,
459 pub program_id: builder::BinaryBuilder,
460 pub a0: builder::BinaryBuilder,
461 pub a1: builder::BinaryBuilder,
462 pub a2: builder::BinaryBuilder,
463 pub a3: builder::BinaryBuilder,
464 pub a4: builder::BinaryBuilder,
465 pub a5: builder::BinaryBuilder,
466 pub a6: builder::BinaryBuilder,
467 pub a7: builder::BinaryBuilder,
468 pub a8: builder::BinaryBuilder,
469 pub a9: builder::BinaryBuilder,
470 pub rest_of_accounts: builder::ListBuilder<builder::BinaryBuilder>,
472 pub data: builder::BinaryBuilder,
473 pub d1: builder::BinaryBuilder,
474 pub d2: builder::BinaryBuilder,
475 pub d4: builder::BinaryBuilder,
476 pub d8: builder::BinaryBuilder,
477 pub error: builder::StringBuilder,
478 pub compute_units_consumed: builder::UInt64Builder,
479 pub is_committed: builder::BooleanBuilder,
480 pub has_dropped_log_messages: builder::BooleanBuilder,
481}
482
483impl InstructionsBuilder {
484 #[expect(clippy::unwrap_used, reason = "schema is a compile-time constant")]
486 pub fn finish(mut self) -> RecordBatch {
487 RecordBatch::try_new(
488 Arc::new(instructions_schema()),
489 vec![
490 Arc::new(self.block_slot.finish()),
491 Arc::new(self.block_hash.finish()),
492 Arc::new(self.transaction_index.finish()),
493 Arc::new(self.instruction_address.finish()),
494 Arc::new(self.program_id.finish()),
495 Arc::new(self.a0.finish()),
496 Arc::new(self.a1.finish()),
497 Arc::new(self.a2.finish()),
498 Arc::new(self.a3.finish()),
499 Arc::new(self.a4.finish()),
500 Arc::new(self.a5.finish()),
501 Arc::new(self.a6.finish()),
502 Arc::new(self.a7.finish()),
503 Arc::new(self.a8.finish()),
504 Arc::new(self.a9.finish()),
505 Arc::new(self.rest_of_accounts.finish()),
506 Arc::new(self.data.finish()),
507 Arc::new(self.d1.finish()),
508 Arc::new(self.d2.finish()),
509 Arc::new(self.d4.finish()),
510 Arc::new(self.d8.finish()),
511 Arc::new(self.error.finish()),
512 Arc::new(self.compute_units_consumed.finish()),
513 Arc::new(self.is_committed.finish()),
514 Arc::new(self.has_dropped_log_messages.finish()),
515 ],
516 )
517 .unwrap()
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524
525 #[test]
526 fn smoke() {
527 BlocksBuilder::default().finish();
528 RewardsBuilder::default().finish();
529 TokenBalancesBuilder::default().finish();
530 BalancesBuilder::default().finish();
531 LogsBuilder::default().finish();
532 TransactionsBuilder::default().finish();
533 InstructionsBuilder::default().finish();
534 }
535}