Skip to content

Commit

Permalink
Merge pull request #99 from black-puppydog/main
Browse files Browse the repository at this point in the history
fix: Make peer and blob range queries robust to existence of later key prefixes
  • Loading branch information
mycognosist authored Jul 23, 2024
2 parents 5be285d + a8f1f78 commit 9457d51
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 5 deletions.
2 changes: 1 addition & 1 deletion solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ impl EbtManager {
//
// This indicates that the local peer is acting as the session
// requester.
if self.sent_clocks.get(&connection_id).is_none() {
if !self.sent_clocks.contains_key(&connection_id) {
let local_clock = self.local_clock.to_owned();
ch_broker
.send(BrokerEvent::new(
Expand Down
119 changes: 115 additions & 4 deletions solar/src/storage/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ impl KvStorage {
let mut list = Vec::new();

let db = self.db.as_ref().ok_or(Error::OptionIsNone)?;
let scan_key: &[u8] = &[PREFIX_BLOB];
for item in db.range(scan_key..) {
let scan_key_start: &[u8] = &[PREFIX_BLOB];
let scan_key_end: &[u8] = &[PREFIX_BLOB + 1];
for item in db.range(scan_key_start..scan_key_end) {
let (k, v) = item?;
let blob: BlobStatus = serde_cbor::from_slice(&v)?;
if !blob.retrieved {
Expand Down Expand Up @@ -226,8 +227,9 @@ impl KvStorage {
let mut peers = Vec::new();

// Use the generic peer prefix to return an iterator over all peers.
let scan_peer_key: &[u8] = &[PREFIX_PEER];
for peer in db.range(scan_peer_key..) {
let scan_peer_key_start: &[u8] = &[PREFIX_PEER];
let scan_peer_key_end: &[u8] = &[PREFIX_PEER + 1];
for peer in db.range(scan_peer_key_start..scan_peer_key_end) {
let (peer_key, _) = peer?;
// Drop the prefix byte and convert the remaining bytes to
// a string.
Expand Down Expand Up @@ -432,6 +434,68 @@ mod test {
Ok(())
}

#[async_std::test]
async fn test_peer_range_query() -> Result<()> {
let (keypair, kv) = initialise_keypair_and_kv()?;
// Get a list of all replicated peers and their latest sequence
// numbers. This list is expected to be empty because we never
// added any data to the database.
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 0);

// Create a post-type message.
let msg_content = TypedMessage::Post {
text: "A solar flare is an intense localized eruption of electromagnetic radiation."
.to_string(),
mentions: None,
};

// Lookup the value of the previous message. This will be `None`.
let last_msg = kv.get_latest_msg_val(&keypair.id)?;

// Sign the message content using the temporary keypair and value of
// the previous message.
let msg = MessageValue::sign(last_msg.as_ref(), &keypair, json!(msg_content))?;

// Append the signed message to the feed. Returns the sequence number
// of the appended message.
let _ = kv.append_feed(msg).await?;

// now that we have added a message, we should have one peer,
// which is the keypair we used to sign the message.
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 1);
assert_eq!(&peers.get(0).unwrap().0, &keypair.id);

let db = kv.db.as_ref().ok_or(Error::OptionIsNone)?;

// insert one key with PREFIX_PEER+1 as the first byte.
db.insert(
&vec![PREFIX_PEER + 1u8],
"this should not show up in the peers list because it's after the peers range"
.as_bytes()
.to_vec(),
)?;

// this should not have changed the peers list
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 1);

// do the same for PREFIX_PEER-1
db.insert(
&vec![PREFIX_PEER - 1u8],
"this should not show up in the peers list because it's before the peers range"
.as_bytes()
.to_vec(),
)?;

// this should not have changed the peers list
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 1);

Ok(())
}

// In reality this test covers more than just the append method.
// It tests multiple methods exposed by the kv database.
// The main reason for combining the tests is the cost of setting up
Expand Down Expand Up @@ -517,6 +581,53 @@ mod test {
Ok(())
}

#[async_std::test]
async fn test_blobs_range_query_when_peers_exist() -> Result<()> {
let (keypair, kv) = initialise_keypair_and_kv()?;
kv.set_blob(
"b1",
&BlobStatus {
retrieved: false,
users: ["u2".to_string()].to_vec(),
},
)?;

assert_eq!(kv.get_peers().await?.len(), 0);

assert_eq!(kv.get_pending_blobs().unwrap(), vec!["b1".to_string()]);

println!("Inserting a new message and thus peer");
let msg_content = TypedMessage::Post {
text: "A solar flare is an intense localized eruption of electromagnetic radiation."
.to_string(),
mentions: None,
};
// Passing None as the last message since we start from an empty feed
let msg = MessageValue::sign(None, &keypair, json!(msg_content))?;
let _ = kv.append_feed(msg).await?;

// now that we have added a message, we should have one peer,
// which is the keypair we used to sign the message.
let peers = kv.get_peers().await?;
assert_eq!(peers.len(), 1);

println!("Inserting a second blob");
kv.set_blob(
"b2",
&BlobStatus {
retrieved: false,
users: ["u7".to_string()].to_vec(),
},
)?;

assert_eq!(
kv.get_pending_blobs()?,
vec!["b1".to_string(), "b2".to_string()]
);

Ok(())
}

#[test]
fn test_blobs() -> Result<()> {
let kv = open_temporary_kv()?;
Expand Down

0 comments on commit 9457d51

Please sign in to comment.