Skip to content

Commit

Permalink
Batch deal activations (#1310)
Browse files Browse the repository at this point in the history
* wip: batch onboarding deals test works

* fix activate deals failures tests

* verified deal activation test

* fix market tests

* refactor to a map based pattern to ensure parallel return structure

* fix deal failure test expectations

* adjust market tests for new failure expectations

* cron epoch test

* fix the tests

* remain ActivateDealsResult to DealActivation

* commit deal states into state once for all sectors

* use sectordeals for batchactivate

* cleanup logic for marketactor::BatchActivateDealsResult shortcut

* refactor Market::BatchActivateDeals to use BatchReturn

* revert verifreg to use BatchReturn

* better error context when deal activation fails

* remove shortcut path, market actor already handles empty sectors

* don't activate sectors with duplicate deals

* use a batch activation helper

* de duplicate harness deal activation paths

* drop Copy requirement on BatchGen::success

* simple tests for batch_activate_deals

* fix tests
  • Loading branch information
alexytsu authored Jun 26, 2023
1 parent b712d1e commit f414380
Show file tree
Hide file tree
Showing 19 changed files with 795 additions and 366 deletions.
183 changes: 117 additions & 66 deletions actors/market/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use std::cmp::min;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};

use cid::multihash::{Code, MultihashDigest, MultihashGeneric};
use cid::Cid;
use fil_actors_runtime::{extract_send_result, FIRST_ACTOR_SPECIFIC_EXIT_CODE};
use fil_actors_runtime::{extract_send_result, BatchReturnGen, FIRST_ACTOR_SPECIFIC_EXIT_CODE};
use frc46_token::token::types::{BalanceReturn, TransferFromParams, TransferFromReturn};
use fvm_ipld_bitfield::BitField;
use fvm_ipld_blockstore::Blockstore;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub enum Method {
WithdrawBalance = 3,
PublishStorageDeals = 4,
VerifyDealsForActivation = 5,
ActivateDeals = 6,
BatchActivateDeals = 6,
OnMinerSectorsTerminate = 7,
ComputeDataCommitment = 8,
CronTick = 9,
Expand Down Expand Up @@ -530,93 +530,145 @@ impl Actor {
Ok(VerifyDealsForActivationReturn { sectors: sectors_data })
}

/// Activate a set of deals, returning the combined deal space and extra info for verified deals.
fn activate_deals(
/// Activate a set of deals, returning the deal space and extra info for sectors containing
/// verified deals. Sectors are activated in parameter-defined order and can fail independently of
/// each other with the responsible ExitCode recorded in a BatchReturn.
fn batch_activate_deals(
rt: &impl Runtime,
params: ActivateDealsParams,
) -> Result<ActivateDealsResult, ActorError> {
params: BatchActivateDealsParams,
) -> Result<BatchActivateDealsResult, ActorError> {
rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?;
let miner_addr = rt.message().caller();
let curr_epoch = rt.curr_epoch();

let (deal_spaces, verified_infos) = rt.transaction(|st: &mut State, rt| {
let proposal_array = st.get_proposal_array(rt.store())?;
let proposals = get_proposals(&proposal_array, &params.deal_ids, st.next_id)?;
let (activations, batch_ret) = rt.transaction(|st: &mut State, rt| {
let mut deal_states: Vec<(DealID, DealState)> = vec![];
let mut batch_gen = BatchReturnGen::new(params.sectors.len());
let mut activations: Vec<DealActivation> = vec![];
let mut activated_deals: HashSet<DealID> = HashSet::new();

for p in params.sectors {
let proposal_array = st.get_proposal_array(rt.store())?;

if p.deal_ids.iter().any(|id| activated_deals.contains(id)) {
log::warn!(
"failed to activate sector containing duplicate deals {:?}",
p.deal_ids
);
batch_gen.add_fail(ExitCode::USR_ILLEGAL_ARGUMENT);
continue;
}

let deal_spaces = {
validate_and_return_deal_space(
let proposals = match get_proposals(&proposal_array, &p.deal_ids, st.next_id) {
Ok(proposals) => proposals,
Err(e) => {
log::warn!("failed to get proposals for deals {:?}: {:?}", p.deal_ids, e);
batch_gen.add_fail(e.exit_code());
continue;
}
};

let deal_spaces = match validate_and_return_deal_space(
&proposals,
&miner_addr,
params.sector_expiry,
p.sector_expiry,
curr_epoch,
None,
)
.context("failed to validate deal proposals for activation")?
};
) {
Ok(ds) => ds,
Err(e) => {
log::warn!("failed validate deals {:?}: {}", p.deal_ids, e);
batch_gen.add_fail(e.exit_code());
continue;
}
};

// Update deal states
let mut verified_infos = Vec::new();
let mut deal_states: Vec<(DealID, DealState)> = vec![];
// Update deal states
let mut verified_infos = Vec::new();

for (deal_id, proposal) in proposals {
// This construction could be replaced with a single "update deal state"
// state method, possibly batched over all deal ids at once.
let s = st.find_deal_state(rt.store(), deal_id)?;
let update_result: Result<(), ActorError> =
proposals.into_iter().try_for_each(|(deal_id, proposal)| {
let s = st
.find_deal_state(rt.store(), deal_id)
.context(format!("error looking up deal state for {}", deal_id))?;

if s.is_some() {
return Err(actor_error!(
illegal_argument,
"deal {} already activated",
deal_id
));
}
if s.is_some() {
return Err(actor_error!(
illegal_argument,
"deal {} already activated",
deal_id
));
}

let propc = rt_deal_cid(rt, &proposal)?;
let propc = rt_deal_cid(rt, &proposal)?;

// Confirm the deal is in the pending proposals queue.
// It will be removed from this queue later, during cron.
let has = st.has_pending_deal(rt.store(), propc)?;
// Confirm the deal is in the pending proposals queue.
// It will be removed from this queue later, during cron.
let has = st.has_pending_deal(rt.store(), propc)?;

if !has {
return Err(actor_error!(
illegal_state,
"tried to activate deal that was not in the pending set ({})",
propc
));
}
if !has {
return Err(actor_error!(
illegal_state,
"tried to activate deal that was not in the pending set ({})",
propc
));
}

// Extract and remove any verified allocation ID for the pending deal.
let allocation = st
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))?
.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID))
.1;

if allocation != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: allocation,
data: proposal.piece_cid,
size: proposal.piece_size,
})
}
// Extract and remove any verified allocation ID for the pending deal.
let allocation = st
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))
.context(format!(
"failed to remove pending deal allocation id {}",
deal_id
))?
.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID))
.1;

if allocation != NO_ALLOCATION_ID {
verified_infos.push(VerifiedDealInfo {
client: proposal.client.id().unwrap(),
allocation_id: allocation,
data: proposal.piece_cid,
size: proposal.piece_size,
})
}

deal_states.push((
deal_id,
DealState {
sector_start_epoch: curr_epoch,
last_updated_epoch: EPOCH_UNDEFINED,
slash_epoch: EPOCH_UNDEFINED,
verified_claim: allocation,
},
));
deal_states.push((
deal_id,
DealState {
sector_start_epoch: curr_epoch,
last_updated_epoch: EPOCH_UNDEFINED,
slash_epoch: EPOCH_UNDEFINED,
verified_claim: allocation,
},
));
activated_deals.insert(deal_id);
Ok(())
});

match update_result {
Ok(_) => {
activations.push(DealActivation {
nonverified_deal_space: deal_spaces.deal_space,
verified_infos,
});
batch_gen.add_success();
}
Err(e) => {
log::warn!("failed to activate deals {:?}: {}", p.deal_ids, e);
batch_gen.add_fail(e.exit_code());
}
}
}

st.put_deal_states(rt.store(), &deal_states)?;

Ok((deal_spaces, verified_infos))
Ok((activations, batch_gen.gen()))
})?;

Ok(ActivateDealsResult { nonverified_deal_space: deal_spaces.deal_space, verified_infos })
Ok(BatchActivateDealsResult { activations, activation_results: batch_ret })
}

/// Terminate a set of deals in response to their containing sector being terminated.
Expand All @@ -634,7 +686,6 @@ impl Actor {

for id in params.deal_ids {
let deal = st.find_proposal(rt.store(), id)?;

// The deal may have expired and been deleted before the sector is terminated.
// Nothing to do, but continue execution for the other deals.
if deal.is_none() {
Expand Down Expand Up @@ -1403,7 +1454,7 @@ impl ActorCode for Actor {
WithdrawBalance|WithdrawBalanceExported => withdraw_balance,
PublishStorageDeals|PublishStorageDealsExported => publish_storage_deals,
VerifyDealsForActivation => verify_deals_for_activation,
ActivateDeals => activate_deals,
BatchActivateDeals => batch_activate_deals,
OnMinerSectorsTerminate => on_miner_sectors_terminate,
ComputeDataCommitment => compute_data_commitment,
CronTick => cron_tick,
Expand Down
15 changes: 11 additions & 4 deletions actors/market/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::ext::verifreg::AllocationID;
use cid::Cid;
use fil_actors_runtime::Array;
use fil_actors_runtime::BatchReturn;
use fvm_ipld_bitfield::BitField;
use fvm_ipld_encoding::strict_bytes;
use fvm_ipld_encoding::tuple::*;
Expand Down Expand Up @@ -97,9 +98,9 @@ pub struct SectorDealData {
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct ActivateDealsParams {
pub deal_ids: Vec<DealID>,
pub sector_expiry: ChainEpoch,
#[serde(transparent)]
pub struct BatchActivateDealsParams {
pub sectors: Vec<SectorDeals>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
Expand All @@ -111,12 +112,18 @@ pub struct VerifiedDealInfo {
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct ActivateDealsResult {
pub struct DealActivation {
#[serde(with = "bigint_ser")]
pub nonverified_deal_space: BigInt,
pub verified_infos: Vec<VerifiedDealInfo>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct BatchActivateDealsResult {
pub activation_results: BatchReturn,
pub activations: Vec<DealActivation>,
}

#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
pub struct DealSpaces {
#[serde(with = "bigint_ser")]
Expand Down
Loading

0 comments on commit f414380

Please sign in to comment.