Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/sof-observer/fuzz/fuzz_targets/shred_fec_recover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ fuzz_target!(|bytes: &[u8]| {
assert!(recoverer.tracked_sets() <= max_tracked_sets);

for recovered_packet in recovered {
assert_eq!(recovered_packet.len(), SIZE_OF_DATA_SHRED_PAYLOAD);
let parsed = parse_shred(&recovered_packet);
assert_eq!(recovered_packet.bytes.len(), SIZE_OF_DATA_SHRED_PAYLOAD);
let parsed = parse_shred(&recovered_packet.bytes);
assert!(matches!(parsed, Ok(ParsedShred::Data(_))));
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sof-observer/src/app/runtime/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(super) use crate::{
repair::MissingShredTracker,
shred::{
fec::FecRecoverer,
wire::{ParseError, ParsedShred, ParsedShredHeader, parse_shred, parse_shred_header},
wire::{ParseError, ParsedShredHeader, parse_shred_header},
},
verify::{ShredVerifier, VerifyStatus},
};
Expand Down
80 changes: 41 additions & 39 deletions crates/sof-observer/src/app/runtime/runloop/packet_workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,14 +515,10 @@ where
push_primary_shred(packet, &mut accepted_shreds);

for recovered in recovered_packets {
let parsed_recovered = match parse_shred(&recovered) {
Ok(parsed) => parsed,
Err(_) => continue,
};
if verify_recovered_shreds {
let recovered_accepted = match verify_packet_with_counters(
shred_verifier.as_deref_mut(),
&recovered,
&recovered.bytes,
observed_at,
verify_strict_unknown,
&mut verify_counters,
Expand All @@ -534,40 +530,46 @@ where
continue;
}
}
if let ParsedShred::Data(data) = parsed_recovered {
let Some(signature) = packet_signature_bytes(&recovered) else {
continue;
};
let Some(variant) = packet_variant_byte(&recovered) else {
continue;
};
#[cfg(feature = "gossip-bootstrap")]
maybe_record_observed_leader(
shred_verifier.as_deref(),
data.common.slot,
&mut observed_slot_leaders,
);
accepted_shreds.push(WorkerAcceptedShred {
source: None,
observed_at,
slot: data.common.slot,
index: data.common.index,
fec_set_index: data.common.fec_set_index,
version: data.common.version,
variant,
signature,
kind: WorkerAcceptedShredKind::RecoveredData {
parent_slot: derive_parent_slot(
data.common.slot,
data.data_header.parent_offset,
),
data_complete: data.data_header.data_complete(),
last_in_slot: data.data_header.last_in_slot(),
reference_tick: data.data_header.reference_tick(),
},
payload_fragment: Some(SharedPayloadFragment::owned(data.payload)),
});
}
let Some(signature) = packet_signature_bytes(&recovered.bytes) else {
continue;
};
let Some(variant) = packet_variant_byte(&recovered.bytes) else {
continue;
};
#[cfg(feature = "gossip-bootstrap")]
maybe_record_observed_leader(
shred_verifier.as_deref(),
recovered.parsed.common.slot,
&mut observed_slot_leaders,
);
let packet_bytes: Arc<[u8]> = Arc::from(recovered.bytes);
let Some(payload_fragment) = SharedPayloadFragment::borrowed(
Arc::clone(&packet_bytes),
recovered.parsed.payload_offset,
recovered.parsed.payload_len,
) else {
continue;
};
accepted_shreds.push(WorkerAcceptedShred {
source: None,
observed_at,
slot: recovered.parsed.common.slot,
index: recovered.parsed.common.index,
fec_set_index: recovered.parsed.common.fec_set_index,
version: recovered.parsed.common.version,
variant,
signature,
kind: WorkerAcceptedShredKind::RecoveredData {
parent_slot: derive_parent_slot(
recovered.parsed.common.slot,
recovered.parsed.data_header.parent_offset,
),
data_complete: recovered.parsed.data_header.data_complete(),
last_in_slot: recovered.parsed.data_header.last_in_slot(),
reference_tick: recovered.parsed.data_header.reference_tick(),
},
payload_fragment: Some(payload_fragment),
});
}
}

Expand Down
8 changes: 6 additions & 2 deletions crates/sof-observer/src/shred/fec/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::shred::wire::{ParsedShredHeader, ShredVariant};
#[path = "recover.rs"]
mod recover;

use recover::{parse_packet_signature, recover_missing_data};
use recover::{RecoveredDataPacket, parse_packet_signature, recover_missing_data};

const SIZE_OF_SIGNATURE: usize = 64;
const SIZE_OF_MERKLE_ROOT: usize = 32;
Expand Down Expand Up @@ -69,7 +69,11 @@ impl FecRecoverer {
}
}

pub fn ingest_packet(&mut self, packet: &[u8], parsed: &ParsedShredHeader) -> Vec<Vec<u8>> {
pub fn ingest_packet(
&mut self,
packet: &[u8],
parsed: &ParsedShredHeader,
) -> Vec<RecoveredDataPacket> {
let signature = match parse_packet_signature(packet) {
Some(signature) => signature,
None => return Vec::new(),
Expand Down
23 changes: 17 additions & 6 deletions crates/sof-observer/src/shred/fec/recover.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
use super::*;
use crate::shred::wire::{ParsedShred, SIZE_OF_DATA_SHRED_PAYLOAD, parse_shred};
use crate::shred::wire::{
ParsedDataShredHeader, ParsedShredHeader, SIZE_OF_DATA_SHRED_PAYLOAD, parse_shred_header,
};

#[derive(Debug)]
pub struct RecoveredDataPacket {
pub bytes: Vec<u8>,
pub parsed: ParsedDataShredHeader,
}

pub(super) fn recover_missing_data(
set: &mut ErasureSet,
fec_set_index: u32,
reed_solomon_cache: &mut HashMap<(usize, usize), ReedSolomon>,
) -> Option<Vec<Vec<u8>>> {
) -> Option<Vec<RecoveredDataPacket>> {
let config = set.config?;
let variant = set.variant?;
if config.num_data == 0 || config.num_coding == 0 {
Expand Down Expand Up @@ -106,7 +114,7 @@ fn build_recovered_data_shred(
erasure_shard: &[u8],
leader_signature: &[u8; SIZE_OF_SIGNATURE],
payload_len: usize,
) -> Option<Vec<u8>> {
) -> Option<RecoveredDataPacket> {
let mut recovered = vec![0_u8; payload_len];
recovered
.get_mut(..SIZE_OF_SIGNATURE)?
Expand All @@ -116,9 +124,12 @@ fn build_recovered_data_shred(
recovered
.get_mut(start..end)?
.copy_from_slice(erasure_shard);
match parse_shred(&recovered) {
Ok(ParsedShred::Data(_)) => Some(recovered),
Ok(ParsedShred::Code(_)) | Err(_) => None,
match parse_shred_header(&recovered) {
Ok(ParsedShredHeader::Data(parsed)) => Some(RecoveredDataPacket {
bytes: recovered,
parsed,
}),
Ok(ParsedShredHeader::Code(_)) | Err(_) => None,
}
}

Expand Down
Loading