1#![expect(
2 clippy::should_implement_trait,
3 reason = "LogKind::from_str is a constructor pattern, not Trait impl"
4)]
5#![expect(
6 clippy::field_reassign_with_default,
7 reason = "ProviderConfig is built by setting fields after construction"
8)]
9
10use std::{collections::BTreeMap, pin::Pin, sync::Arc};
25
26#[cfg(feature = "pyo3")]
27use anyhow::anyhow;
28use anyhow::{Context, Result};
29use arrow::record_batch::RecordBatch;
30use futures_lite::{Stream, StreamExt};
31use provider::common::{evm_query_to_generic, svm_query_to_generic};
32use serde::de::DeserializeOwned;
33
34pub mod evm;
35mod provider;
36mod rayon_async;
37pub mod svm;
38
39#[derive(Debug, Clone)]
41pub enum Query {
42 Evm(evm::Query),
43 Svm(svm::Query),
44}
45
46#[cfg(feature = "pyo3")]
47impl<'py> pyo3::FromPyObject<'py> for Query {
48 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
49 use pyo3::types::PyAnyMethods;
50
51 let kind = ob.getattr("kind").context("get kind attribute")?;
52 let kind: &str = kind.extract().context("kind as str")?;
53
54 let query = ob.getattr("params").context("get params attribute")?;
55
56 match kind {
57 "evm" => Ok(Self::Evm(query.extract().context("parse query")?)),
58 "svm" => Ok(Self::Svm(query.extract().context("parse query")?)),
59 _ => Err(anyhow!("unknown query kind: {kind}").into()),
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66#[cfg_attr(feature = "pyo3", derive(pyo3::FromPyObject))]
67pub struct ProviderConfig {
68 pub kind: ProviderKind,
69 pub url: Option<String>,
70 pub bearer_token: Option<String>,
71 pub max_num_retries: Option<usize>,
72 pub retry_backoff_ms: Option<u64>,
73 pub retry_base_ms: Option<u64>,
74 pub retry_ceiling_ms: Option<u64>,
75 pub req_timeout_millis: Option<u64>,
76 pub stop_on_head: bool,
77 pub head_poll_interval_millis: Option<u64>,
78 pub buffer_size: Option<usize>,
79 pub compute_units_per_second: Option<u64>,
81 pub batch_size: Option<usize>,
82 pub reorg_safe_distance: Option<u64>,
83 pub trace_method: Option<RpcTraceMethod>,
84}
85
86impl ProviderConfig {
87 pub fn new(kind: ProviderKind) -> Self {
88 Self {
89 kind,
90 url: None,
91 bearer_token: None,
92 max_num_retries: None,
93 retry_backoff_ms: None,
94 retry_base_ms: None,
95 retry_ceiling_ms: None,
96 req_timeout_millis: None,
97 stop_on_head: false,
98 head_poll_interval_millis: None,
99 buffer_size: None,
100 compute_units_per_second: None,
101 batch_size: None,
102 reorg_safe_distance: None,
103 trace_method: None,
104 }
105 }
106}
107
108#[derive(Debug, Clone, Copy)]
110pub enum RpcTraceMethod {
111 TraceBlock,
113 DebugTraceBlockByNumber,
115}
116
117#[cfg(feature = "pyo3")]
118impl<'py> pyo3::FromPyObject<'py> for RpcTraceMethod {
119 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
120 use pyo3::types::PyAnyMethods;
121
122 let out: &str = ob.extract().context("read as string")?;
123
124 match out {
125 "trace_block" => Ok(Self::TraceBlock),
126 "debug_trace_block_by_number" => Ok(Self::DebugTraceBlockByNumber),
127 _ => Err(anyhow!("unknown trace method: {out}").into()),
128 }
129 }
130}
131
132#[derive(Debug, Clone, Copy)]
134pub enum ProviderKind {
135 Sqd,
137 Hypersync,
139 Rpc,
141}
142
143#[cfg(feature = "pyo3")]
144impl<'py> pyo3::FromPyObject<'py> for ProviderKind {
145 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
146 use pyo3::types::PyAnyMethods;
147
148 let out: &str = ob.extract().context("read as string")?;
149
150 match out {
151 "sqd" => Ok(Self::Sqd),
152 "hypersync" => Ok(Self::Hypersync),
153 "rpc" => Ok(Self::Rpc),
154 _ => Err(anyhow!("unknown provider kind: {out}").into()),
155 }
156 }
157}
158
159type DataStream = Pin<Box<dyn Stream<Item = Result<BTreeMap<String, RecordBatch>>> + Send + Sync>>;
160
161fn make_req_fields<T: DeserializeOwned>(query: &tiders_query::Query) -> Result<T> {
162 let mut req_fields_query = query.clone();
163 req_fields_query
164 .add_request_and_include_fields()
165 .context("add req and include fields")?;
166
167 let fields = req_fields_query
168 .fields
169 .into_iter()
170 .map(|(k, v)| -> Result<_> {
171 Ok((
172 k.strip_suffix('s')
173 .context("field key should end with 's'")?
174 .to_owned(),
175 v.into_iter()
176 .map(|v| (v, true))
177 .collect::<BTreeMap<String, bool>>(),
178 ))
179 })
180 .collect::<Result<BTreeMap<String, _>>>()?;
181
182 let json_value = serde_json::to_value(&fields).context("serialize fields to JSON")?;
183 serde_json::from_value(json_value).context("deserialize fields from JSON")
184}
185
186pub async fn start_stream(provider_config: ProviderConfig, mut query: Query) -> Result<DataStream> {
188 let generic_query = match &mut query {
189 Query::Evm(evm_query) => {
190 let generic_query = evm_query_to_generic(evm_query).context("validate evm query")?;
191
192 evm_query.fields = make_req_fields(&generic_query).context("make req fields")?;
193
194 generic_query
195 }
196 Query::Svm(svm_query) => {
197 let generic_query = svm_query_to_generic(svm_query);
198
199 svm_query.fields = make_req_fields(&generic_query).context("make req fields")?;
200
201 generic_query
202 }
203 };
204 let generic_query = Arc::new(generic_query);
205
206 let stream = match provider_config.kind {
207 ProviderKind::Sqd => {
208 provider::sqd::start_stream(provider_config, query).context("start sqd stream")?
209 }
210 ProviderKind::Hypersync => provider::hypersync::start_stream(provider_config, query)
211 .await
212 .context("start hypersync stream")?,
213 ProviderKind::Rpc => {
214 provider::rpc::start_stream(&provider_config, query).context("start rpc stream")?
215 }
216 };
217
218 let stream = stream.then(move |res| {
219 let generic_query = Arc::clone(&generic_query);
220 async {
221 rayon_async::spawn(move || {
222 res.and_then(move |data| {
223 let data = tiders_query::run_query(&data, &generic_query)
224 .context("run local query")?;
225 Ok(data)
226 })
227 })
228 .await
229 .context("rayon task was cancelled")
230 .and_then(|r| r)
231 }
232 });
233
234 Ok(Box::pin(stream))
235}
236
237#[cfg(test)]
238mod tests {
239
240 use super::*;
241 use crate::svm::*;
242 use parquet::arrow::ArrowWriter;
243 use std::fs::File;
244
245 #[tokio::test]
246 #[ignore]
247 async fn simple_svm_start_stream() {
248 let mut provider_config = ProviderConfig::new(ProviderKind::Sqd);
249 provider_config.url = Some("https://portal.sqd.dev/datasets/solana-mainnet".to_string());
250
251 let program_id = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA";
252 let program_id: [u8; 32] = bs58::decode(program_id)
253 .into_vec()
254 .unwrap()
255 .try_into()
256 .unwrap();
257 let program_id = Address(program_id);
258
259 let query = crate::Query::Svm(svm::Query {
260 from_block: 329443000,
261 to_block: Some(329443000),
262 include_all_blocks: false,
263 fields: Fields {
264 instruction: InstructionFields::all(),
265 transaction: TransactionFields::default(),
266 log: LogFields::default(),
267 balance: BalanceFields::default(),
268 token_balance: TokenBalanceFields::default(),
269 reward: RewardFields::default(),
270 block: BlockFields::default(),
271 },
272 instructions: vec![
273 InstructionRequest {
275 program_id: vec![program_id],
276 discriminator: vec![Data(vec![12, 96, 49, 128, 22])],
277 ..Default::default()
278 },
279 ],
280 transactions: vec![],
281 logs: vec![],
282 balances: vec![],
283 token_balances: vec![],
284 rewards: vec![],
285 });
286 let mut stream = start_stream(provider_config, query).await.unwrap();
287 let data = stream.next().await.unwrap().unwrap();
288 for (k, v) in data.into_iter() {
289 let mut file = File::create(format!("{}.parquet", k)).unwrap();
290 let mut writer = ArrowWriter::try_new(&mut file, v.schema(), None).unwrap();
291 writer.write(&v).unwrap();
292 writer.close().unwrap();
293 }
294 }
295
296 #[tokio::test]
297 async fn simple_rpc_start_stream() {
298 let mut provider_config = ProviderConfig::new(ProviderKind::Rpc);
299 provider_config.url = Some("http://localhost:8545".to_string());
300
301 let query = crate::Query::Evm(evm::Query {
302 from_block: 0,
303 to_block: Some(0),
304 include_all_blocks: true,
305 logs: vec![],
306 transactions: vec![],
307 traces: vec![],
308 fields: evm::Fields::all(),
309 });
310
311 let mut stream = start_stream(provider_config, query).await.unwrap();
312 let data = stream.next().await.unwrap().unwrap();
313
314 assert!(data.contains_key("blocks"));
318
319 for (_name, batch) in &data {
320 assert_eq!(batch.num_rows(), 0);
321 }
322 }
323}