跳轉至

Rust

快速開始

請參考README.md

代碼示例

use {
    anyhow::Context,
    futures::{stream::StreamExt},
    solana_signature::Signature,
    std::{
        collections::{HashMap},
        error::Error,
    },
    tonic::transport::ClientTlsConfig,
    tonic::transport::Channel,
    tonic::IntoStreamingRequest,
    geyser_stream::geyser_client::GeyserClient,
    geyser_stream::subscribe_update::UpdateOneof,
    geyser_stream::SubscribeRequestFilterBlocks,
    geyser_stream::SubscribeRequestFilterTransactions,
    geyser_stream::SubscribeRequest,
    geyser_stream::SubscribeRequestFilterAccounts,
};

mod solana_storage_flat {
    include!(concat!(env!("OUT_DIR"), "/solana.storage.confirmed_block.rs"));
}

pub mod solana {
    pub mod storage {
        pub mod confirmed_block {
            pub use super::super::super::solana_storage_flat::*;
        }
    }
}

pub mod geyser_stream {
    pub use crate::solana;
    include!(concat!(env!("OUT_DIR"), "/geyser.rs"));
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let endpoint = "https://geyserstream-tokyo.blockrazor.xyz:443";
    let token = "";
    let subscribe_accounts = true;
    let subscribe_transactions = false;
    let subscribe_blocks = false;
    let subscribe_test_account = "HV1KXxWFaSeriyFvXyx48FqG9BoFbfinB8njCJonqP7K".to_string();

    let channel = Channel::from_shared(endpoint.to_string())
        .map_err(|e| Box::<dyn Error>::from(format!("Invalid URI: {}", e)))?
        .tls_config(ClientTlsConfig::new().with_native_roots())
        .map_err(|e| Box::<dyn Error>::from(format!("TLS config error: {}", e)))?
        .connect()
        .await
        .map_err(|e| Box::<dyn Error>::from(format!("Connection error: {}", e)))?;

    let mut client = GeyserClient::new(channel.clone());

    // subscribe account filters
    let mut accounts_filter = HashMap::new();
    if subscribe_accounts {
        accounts_filter.insert(
            "client".to_owned(),
            SubscribeRequestFilterAccounts {
                account: vec![subscribe_test_account],
                owner: vec![],
                filters: vec![],
                nonempty_txn_signature: Some(true),
            },
        );
    }

    // subscribe transaction filters
    let mut transactions_filter = HashMap::new();
    if subscribe_transactions {
        transactions_filter.insert(
            "client".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),
                failed: Some(false),
                signature: None,
                account_include: vec![],
                account_exclude: vec![],
                account_required: vec![],
            },
        );
    }

    // subscribe block filters
    let mut blocks_filter = HashMap::new();
    if subscribe_blocks {
        blocks_filter.insert(
            "client".to_owned(),
            SubscribeRequestFilterBlocks {
                account_include: vec![],
                include_transactions: Some(false),
                include_accounts: Some(false),
                include_entries: Some(false),
            }
        );
    }

    // subscribe request
    let request = SubscribeRequest {
        accounts: accounts_filter,
        transactions: transactions_filter,
        blocks: blocks_filter,
        transactions_status: HashMap::new(),
        entry: HashMap::new(),
        blocks_meta: HashMap::new(),
        slots: HashMap::new(),
        commitment: None,
        accounts_data_slice: vec![],
        ping: None,
        from_slot: None,
    };
    let streaming_req = to_streaming_request(request, token.to_string());

    // subscribe
    let response = client.subscribe(streaming_req).await.unwrap();
    let mut resp_stream = response.into_inner();
    while let Some(message) = resp_stream.next().await {
        match message {
            Ok(msg) => {
                match msg.update_oneof {
                    Some(UpdateOneof::Account(msg)) => {
                        println!("receive account: {:?}", msg.account);
                    }
                    Some(UpdateOneof::Transaction(msg)) => {
                        let tx = msg
                            .transaction
                            .ok_or(anyhow::anyhow!("no transaction in the message"))?;
                        println!("receive transaction: {:?}", Signature::try_from(tx.signature.as_slice()).context("invalid signature")?.to_string());
                    }
                    Some(UpdateOneof::Block(msg)) => {
                        println!("receive block: {:?}", msg.slot);
                    }

                    // other implementations can be added here

                    _ => {
                        println!("receive unknown message");
                        continue
                    }
                }
            }
            Err(error) => {
                println!("stream error: {error:?}");
                break;
            }
        }
    }

    Ok(())
}

fn to_streaming_request(req: SubscribeRequest, token: String) -> impl IntoStreamingRequest<Message = SubscribeRequest> {
    let stream = tokio_stream::iter(std::iter::once(req));
    let mut request = tonic::Request::new(stream);

    request.metadata_mut().insert(
        "x-token",
        token.parse().unwrap(),
    );

    request
}