This commit is contained in:
2025-11-06 12:46:09 +01:00
commit c8058b1e9b
8 changed files with 5601 additions and 0 deletions

6
.env Normal file
View File

@@ -0,0 +1,6 @@
YELLOWSTONE_ENDPOINT=https://solana-yellowstone-grpc.publicnode.com:443
YELLOWSTONE_COMMITMENT=confirmed
BUNDLER=5jYaYv7HoiFVrY9bAcruj6dH8fCBseky4sBmnTFGSaeW
RUST_LOG=info

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

5296
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

21
Cargo.toml Normal file
View File

@@ -0,0 +1,21 @@
[package]
name = "sniper-bot"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1"
futures = "0.3"
tokio = { version = "1.39", features = ["rt-multi-thread", "macros", "signal", "fs"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
openssl = { version = "0.10", features = ["vendored"] }
yellowstone-grpc-client = "4"
yellowstone-grpc-proto = "4"
tonic = "0.11"
bs58 = "0.5"
dotenvy = "0.15"

2
src/lib.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod listener;
pub mod utils;

140
src/listener.rs Normal file
View File

@@ -0,0 +1,140 @@
use anyhow::{Context, Result};
use futures::{Stream,StreamExt};
use tracing::{info,warn};
use tokio::sync::mpsc::Sender;
use std::{collections::HashMap, pin::Pin,};
use yellowstone_grpc_client::{ClientTlsConfig,GeyserGrpcClient,};
use yellowstone_grpc_proto::geyser::{
subscribe_update::UpdateOneof,
CommitmentLevel,
SubscribeRequest,
SubscribeRequestFilterTransactions,
SubscribeUpdate,
SubscribeUpdateTransaction
};
pub const METEORA_DBC_PROGRAM: &str = "dbcij3LWUppWqq96dh6gJWwBifmcGfLSB5D4DuSMaqN";
pub const METEORA_DLMM_PROGRAM: &str = "LBUZKhRxPF3XUpBCjp4YzTKgLccjZhTSDM9YuVaPwxo";
pub const METEORA_POOLS_PROGRAM: &str = "Eo7WjKq67rjJQSZxS6z3YkapzY3eMj6Xy8X5EQVn5UaB";
pub const METEORA_VAULT_PROGRAM: &str = "24Uqj9JCLxUeoC3hGfh5W3s9FM9uCHDS2SG3LYwBpyTi";
pub struct YellowstoneSource {
stream: Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, yellowstone_grpc_proto::tonic::Status>> + Send>>,
}
impl YellowstoneSource {
pub async fn connect(
bundler: String,
) -> Result<Self>
{
let endpoint = std::env::var("YELLOWSTONE_ENDPOINT")?;
let x_token = std::env::var("YELLOWSTONE_X_TOKEN").ok();
let commitment = std::env::var("YELLOWSTONE_COMMITMENT").ok();
// Initial log
info!("[CONNECT] commitment={:?}", commitment);
info!("[CONNECT] bundler(account_include)={:?}", bundler);
// Configure gRPC client
let tls = ClientTlsConfig::new().with_native_roots();
let mut builder = GeyserGrpcClient::build_from_shared(endpoint.clone())?
.tls_config(tls)?;
if let Some(tok) = x_token {
builder = builder.x_token(Some(tok))?;
};
let mut client = builder.connect().await?;
// Create filters for subscriptions
let mut tx_filter = SubscribeRequestFilterTransactions {
vote: Some(false), // Exclude vote transactions
failed: Some(false), // Exclude failed transactions
..Default::default() // Use default for other fields
};
tx_filter.account_required.push(bundler.clone());
for pid in [
METEORA_DBC_PROGRAM,
METEORA_DLMM_PROGRAM,
METEORA_POOLS_PROGRAM,
METEORA_VAULT_PROGRAM,
] {
tx_filter.account_include.push(pid.to_string());
}
// Determine commitment level for subscriptions
let commitment = match commitment.as_deref() {
Some("confirmed") => CommitmentLevel::Confirmed,
Some("finalized") => CommitmentLevel::Finalized,
_ => CommitmentLevel::Processed,
};
let mut tx_map: HashMap<String, SubscribeRequestFilterTransactions> = HashMap::new();
tx_map.insert("tx".to_string(), tx_filter);
// Create subscription request
let req = SubscribeRequest {
transactions: tx_map,
commitment: Some(commitment as i32),
..Default::default()
};
// Subscribe to Yellowstone stream
let (_sink, stream) = client
.subscribe_with_request(Some(req))
.await
.context("[ERR] error trying to subscribing to Yellowstone")?;
// Log successful connection
info!("[INFO] connected to Yellowstone at {}", endpoint);
// Return YellowstoneSource instance with stream
Ok(Self {
stream: Box::pin(stream)
})
}
// Wait until the next transaction update is available
async fn next(&mut self) -> Result<Option<SubscribeUpdateTransaction>> {
// Determine the next update from the stream
match self.stream.next().await {
None => Ok(None),
Some(Err(e)) => Err(e.into()),
Some(Ok(update)) => {
if let Some(UpdateOneof::Transaction(txu)) = update.update_oneof {
tracing::info!("[INFO] received transaction from Yellowstone: \n{:#?}", txu);
Ok(Some(txu))
} else {
Ok(None)
}
}
}
}
}
// Forward transaction updates from Yellowstone source to channel
pub async fn yellowstone_forward_source_to_channel(
mut source: YellowstoneSource,
tx_out: Sender<SubscribeUpdateTransaction>,
) -> Result<()> {
// Continuously forward transaction updates from source to channel
while let Some(txu) = source.next().await? {
if tx_out.send(txu).await.is_err() {
warn!("[WARN] error getting next transaction update from Yellowstone source");
break;
}
}
Ok(())
}

81
src/main.rs Normal file
View File

@@ -0,0 +1,81 @@
use anyhow::Result;
use tokio::sync::mpsc;
use yellowstone_grpc_proto::geyser::SubscribeUpdateTransaction;
use bs58;
use dotenvy::dotenv;
use tracing_subscriber::EnvFilter;
use sniper_bot::listener;
use sniper_bot::listener::YellowstoneSource;
use sniper_bot::utils::save_tx_update;
#[tokio::main]
async fn main() -> Result<()> {
init_tracing();
// Load environment variables from .env file
dotenv().ok();
let bundler = std::env::var("BUNDLER")?;
tracing::info!("[INFO] Bot initialited");
// Create a channel for transaction updates
let (tx, mut rx) = mpsc::channel::<SubscribeUpdateTransaction>(1024);
// Connect to Yellowstone source
let yellowstone_source = YellowstoneSource::connect(
bundler
).await?;
// Launch listener
let _run = tokio::spawn({
let tx = tx.clone();
async move {
if let Err(e) = listener::yellowstone_forward_source_to_channel(yellowstone_source, tx).await {
tracing::error!("[ERR] listener error: {}", e);
}
}
});
while let Some(txu) = rx.recv().await {
// Guarda en disco
if let Err(e) = save_tx_update(&txu).await {
tracing::warn!("No se pudo guardar el frame: {e:?}");
}
tracing::info!("[INFO] received transaction update: \n{:#?}", txu);
let sig = txu
.transaction
.as_ref()
.map(|t| bs58::encode(&t.signature).into_string())
.unwrap_or("<no-signature>".to_string());
tracing::info!("OK: slot={} sig={}", txu.slot, sig);
}
Ok(())
}
fn init_tracing() {
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
// No peta si ya estaba inicializado (devuelve Err y lo ignoramos)
let _ = tracing_subscriber::fmt()
.with_env_filter(filter)
.with_target(false)
.try_init();
}

54
src/utils.rs Normal file
View File

@@ -0,0 +1,54 @@
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::Result;
use bs58;
use tokio::fs;
use yellowstone_grpc_proto::geyser::SubscribeUpdateTransaction;
use yellowstone_grpc_proto::prost::Message;
// Save file from tx received
pub async fn save_tx_update(txu: &SubscribeUpdateTransaction) -> Result<(PathBuf, PathBuf)> {
// 1) Carpetas
let base = PathBuf::from("./logs/frames");
let bin_dir = base.join("bin");
let txt_dir = base.join("txt");
fs::create_dir_all(&bin_dir).await?;
fs::create_dir_all(&txt_dir).await?;
// 2) Timestamp (ms) + slot + firma corta (si existe)
let ts_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let slot = txu.slot;
// Firma corta integrada aquí (si existe)
let sig_short = if let Some(ref tx) = txu.transaction {
if !tx.signature.is_empty() {
// firma viene como bytes -> base58 -> recorta
bs58::encode(&tx.signature).into_string().chars().take(12).collect::<String>()
} else {
"nosig".to_string()
}
} else {
"nosig".to_string()
};
// 3) Nombres de archivo
let bin_name = format!("{}_slot{}_{}_tx.bin", ts_ms, slot, sig_short);
let txt_name = format!("{}_slot{}_{}_tx.txt", ts_ms, slot, sig_short);
let bin_path = bin_dir.join(bin_name);
let txt_path = txt_dir.join(txt_name);
// 4) BIN: protobuf crudo
let bytes = txu.encode_to_vec();
fs::write(&bin_path, &bytes).await?;
// 5) TXT: volcado legible
let pretty = format!("{:#?}", txu);
fs::write(&txt_path, pretty).await?;
Ok((bin_path, txt_path))
}