From 7cfe98f56aee33baf2966dd43ad2d4844e70a3b0 Mon Sep 17 00:00:00 2001 From: Tei Im Date: Thu, 22 Jun 2023 13:05:52 +0900 Subject: [PATCH 1/5] op-node: Implement engine P2P sync mode - Add new flag and sync.Config struct for engine p2p sync - Fix EngineQueue to support engine p2p sync - Add op-e2e test casees - Fix related components to pass sync config - Fix execution engine specs --- op-e2e/actions/l2_batcher_test.go | 9 ++- op-e2e/actions/l2_sequencer.go | 3 +- op-e2e/actions/l2_verifier.go | 12 ++- op-e2e/actions/l2_verifier_test.go | 7 +- op-e2e/actions/reorg_test.go | 3 +- op-e2e/actions/sync_test.go | 74 ++++++++++++++++++ op-e2e/actions/system_config_test.go | 5 +- op-node/eth/sync_status.go | 3 + op-node/flags/flags.go | 8 ++ op-node/node/config.go | 3 + op-node/node/node.go | 2 +- op-node/node/server_test.go | 1 + op-node/rollup/derive/engine_queue.go | 90 +++++++++++++++++++--- op-node/rollup/derive/engine_queue_test.go | 12 +-- op-node/rollup/derive/error.go | 3 + op-node/rollup/derive/pipeline.go | 12 ++- op-node/rollup/driver/driver.go | 6 +- op-node/rollup/driver/state.go | 5 +- op-node/rollup/sync/config.go | 6 ++ op-node/service.go | 10 +++ op-node/testutils/random.go | 1 + op-program/client/driver/driver.go | 3 +- specs/exec-engine.md | 2 +- 23 files changed, 240 insertions(+), 40 deletions(-) create mode 100644 op-node/rollup/sync/config.go diff --git a/op-e2e/actions/l2_batcher_test.go b/op-e2e/actions/l2_batcher_test.go index bd7072e9bf..2053b4f264 100644 --- a/op-e2e/actions/l2_batcher_test.go +++ b/op-e2e/actions/l2_batcher_test.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/testlog" ) @@ -30,7 +31,7 @@ func TestBatcher(gt *testing.T) { sd := e2eutils.Setup(t, dp, defaultAlloc) log := testlog.Logger(t, log.LvlDebug) miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) - verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) + verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{}) rollupSeqCl := sequencer.RollupClient() batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ @@ -268,7 +269,7 @@ func TestGarbageBatch(gt *testing.T) { log := testlog.Logger(t, log.LvlError) miner, engine, sequencer := setupSequencerTest(t, sd, log) - _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) + _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{}) batcherCfg := &BatcherCfg{ MinL1TxSize: 0, @@ -350,7 +351,7 @@ func TestExtendedTimeWithoutL1Batches(gt *testing.T) { log := testlog.Logger(t, log.LvlError) miner, engine, sequencer := setupSequencerTest(t, sd, log) - _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) + _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{}) batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ MinL1TxSize: 0, @@ -407,7 +408,7 @@ func TestBigL2Txs(gt *testing.T) { log := testlog.Logger(t, log.LvlInfo) miner, engine, sequencer := setupSequencerTest(t, sd, log) - _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) + _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{}) batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ MinL1TxSize: 0, diff --git a/op-e2e/actions/l2_sequencer.go b/op-e2e/actions/l2_sequencer.go index 3d9deefcd5..980de5821f 100644 --- a/op-e2e/actions/l2_sequencer.go +++ b/op-e2e/actions/l2_sequencer.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/driver" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" ) // MockL1OriginSelector is a shim to override the origin as sequencer, so we can force it to stay on an older origin. @@ -40,7 +41,7 @@ type L2Sequencer struct { } func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, seqConfDepth uint64) *L2Sequencer { - ver := NewL2Verifier(t, log, l1, eng, cfg) + ver := NewL2Verifier(t, log, l1, eng, cfg, &sync.Config{}) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, eng) seqConfDepthL1 := driver.NewConfDepth(seqConfDepth, ver.l1State.L1Head, l1) l1OriginSelector := &MockL1OriginSelector{ diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index d1293ecc7c..069980c81e 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/driver" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/testutils" ) @@ -55,9 +56,9 @@ type L2API interface { GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) } -func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config) *L2Verifier { +func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, eng L2API, cfg *rollup.Config, syncCfg *sync.Config) *L2Verifier { metrics := &testutils.TestDerivationMetrics{} - pipeline := derive.NewDerivationPipeline(log, cfg, l1, eng, metrics) + pipeline := derive.NewDerivationPipeline(log, cfg, l1, eng, metrics, syncCfg) pipeline.Reset() rollupNode := &L2Verifier{ @@ -133,6 +134,10 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef { return s.derivation.UnsafeL2Head() } +func (s *L2Verifier) EngineSyncTarget() eth.L2BlockRef { + return s.derivation.EngineSyncTarget() +} + func (s *L2Verifier) SyncStatus() *eth.SyncStatus { return ð.SyncStatus{ CurrentL1: s.derivation.Origin(), @@ -144,6 +149,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { SafeL2: s.L2Safe(), FinalizedL2: s.L2Finalized(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), + EngineSyncTarget: s.EngineSyncTarget(), } } @@ -200,7 +206,7 @@ func (s *L2Verifier) ActL2PipelineStep(t Testing) { s.l2PipelineIdle = false err := s.derivation.Step(t.Ctx()) - if err == io.EOF { + if err == io.EOF || (err != nil && errors.Is(err, derive.EngineP2PSyncing)) { s.l2PipelineIdle = true return } else if err != nil && errors.Is(err, derive.NotEnoughData) { diff --git a/op-e2e/actions/l2_verifier_test.go b/op-e2e/actions/l2_verifier_test.go index c402558f8f..09606d67f2 100644 --- a/op-e2e/actions/l2_verifier_test.go +++ b/op-e2e/actions/l2_verifier_test.go @@ -9,21 +9,22 @@ import ( "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/testlog" ) -func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher) (*L2Engine, *L2Verifier) { +func setupVerifier(t Testing, sd *e2eutils.SetupData, log log.Logger, l1F derive.L1Fetcher, syncCfg *sync.Config) (*L2Engine, *L2Verifier) { jwtPath := e2eutils.WriteDefaultJWT(t) engine := NewL2Engine(t, log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) engCl := engine.EngineClient(t, sd.RollupCfg) - verifier := NewL2Verifier(t, log, l1F, engCl, sd.RollupCfg) + verifier := NewL2Verifier(t, log, l1F, engCl, sd.RollupCfg, syncCfg) return engine, verifier } func setupVerifierOnlyTest(t Testing, sd *e2eutils.SetupData, log log.Logger) (*L1Miner, *L2Engine, *L2Verifier) { miner := NewL1Miner(t, log, sd.L1Cfg) l1Cl := miner.L1Client(t, sd.RollupCfg) - engine, verifier := setupVerifier(t, sd, log, l1Cl) + engine, verifier := setupVerifier(t, sd, log, l1Cl, &sync.Config{}) return miner, engine, verifier } diff --git a/op-e2e/actions/reorg_test.go b/op-e2e/actions/reorg_test.go index 47b570a80e..40884f4f76 100644 --- a/op-e2e/actions/reorg_test.go +++ b/op-e2e/actions/reorg_test.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/testlog" ) @@ -33,7 +34,7 @@ func setupReorgTestActors(t Testing, dp *e2eutils.DeployParams, sd *e2eutils.Set miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) miner.ActL1SetFeeRecipient(common.Address{'A'}) sequencer.ActL2PipelineFull(t) - verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) + verifEngine, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{}) rollupSeqCl := sequencer.RollupClient() batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{ MinL1TxSize: 0, diff --git a/op-e2e/actions/sync_test.go b/op-e2e/actions/sync_test.go index d5c44a379e..dee7b4dc20 100644 --- a/op-e2e/actions/sync_test.go +++ b/op-e2e/actions/sync_test.go @@ -6,6 +6,9 @@ import ( "testing" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" + "github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" @@ -92,3 +95,74 @@ func TestFinalizeWhileSyncing(gt *testing.T) { // Verify the verifier finalized something new require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync") } + +func TestUnsafeSync(gt *testing.T) { + t := NewDefaultTesting(gt) + dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams) + sd := e2eutils.Setup(t, dp, defaultAlloc) + log := testlog.Logger(t, log.LvlInfo) + + sd, _, _, sequencer, seqEng, verifier, _, _ := setupReorgTestActors(t, dp, sd, log) + seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) + require.NoError(t, err) + + sequencer.ActL2PipelineFull(t) + verifier.ActL2PipelineFull(t) + + for i := 0; i < 10; i++ { + // Build a L2 block + sequencer.ActL2StartBlock(t) + sequencer.ActL2EndBlock(t) + // Notify new L2 block to verifier by unsafe gossip + seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe) + require.NoError(t, err) + verifier.ActL2UnsafeGossipReceive(seqHead)(t) + // Handle unsafe payload + verifier.ActL2PipelineFull(t) + // Verifier must advance its unsafe head and engine sync target. + require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash) + // Check engine sync target updated. + require.Equal(t, sequencer.L2Unsafe().Hash, sequencer.EngineSyncTarget().Hash) + require.Equal(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash) + } +} + +func TestEngineP2PSync(gt *testing.T) { + t := NewDefaultTesting(gt) + dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams) + sd := e2eutils.Setup(t, dp, defaultAlloc) + log := testlog.Logger(t, log.LvlInfo) + + miner, seqEng, sequencer := setupSequencerTest(t, sd, log) + // Enable engine P2P sync + _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{EngineP2PEnabled: true}) + + seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) + require.NoError(t, err) + + sequencer.ActL2PipelineFull(t) + verifier.ActL2PipelineFull(t) + + verifierUnsafeHead := verifier.L2Unsafe() + + // Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself. + sequencer.ActL2StartBlock(t) + sequencer.ActL2EndBlock(t) + + for i := 0; i < 10; i++ { + // Build a L2 block + sequencer.ActL2StartBlock(t) + sequencer.ActL2EndBlock(t) + // Notify new L2 block to verifier by unsafe gossip + seqHead, err := seqEngCl.PayloadByLabel(t.Ctx(), eth.Unsafe) + require.NoError(t, err) + verifier.ActL2UnsafeGossipReceive(seqHead)(t) + // Handle unsafe payload + verifier.ActL2PipelineFull(t) + // Verifier must advance only engine sync target. + require.NotEqual(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash) + require.NotEqual(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash) + require.Equal(t, verifier.L2Unsafe().Hash, verifierUnsafeHead.Hash) + require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash) + } +} diff --git a/op-e2e/actions/system_config_test.go b/op-e2e/actions/system_config_test.go index 970eebb57c..ff68710c48 100644 --- a/op-e2e/actions/system_config_test.go +++ b/op-e2e/actions/system_config_test.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/testlog" ) @@ -29,7 +30,7 @@ func TestBatcherKeyRotation(gt *testing.T) { miner, seqEngine, sequencer := setupSequencerTest(t, sd, log) miner.ActL1SetFeeRecipient(common.Address{'A'}) sequencer.ActL2PipelineFull(t) - _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) + _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{}) rollupSeqCl := sequencer.RollupClient() // the default batcher @@ -358,7 +359,7 @@ func TestGasLimitChange(gt *testing.T) { miner.ActL1IncludeTx(dp.Addresses.Batcher)(t) miner.ActL1EndBlock(t) - _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg)) + _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{}) verifier.ActL2PipelineFull(t) require.Equal(t, sequencer.L2Unsafe(), verifier.L2Safe(), "verifier stays in sync, even with gaslimit changes") diff --git a/op-node/eth/sync_status.go b/op-node/eth/sync_status.go index 66b6162df1..74ac24c12c 100644 --- a/op-node/eth/sync_status.go +++ b/op-node/eth/sync_status.go @@ -35,4 +35,7 @@ type SyncStatus struct { // UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block. // It may be zeroed if there is no targeted block. UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"` + // EngineSyncTarget points to the L2 block that the execution engine is syncing to. + // If it is ahead from UnsafeL2, the engine is in progress of P2P sync. + EngineSyncTarget L2BlockRef `json:"engine_sync_target"` } diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 0c9df1251b..cca005aeb7 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -208,6 +208,13 @@ var ( EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), Required: false, } + L2EngineP2PEnabled = &cli.BoolFlag{ + Name: "l2.engine-p2p.enabled", + Usage: "Enables or disables execution engine P2P sync", + EnvVars: prefixEnvVars("L2_ENGINE_P2P_ENABLED"), + Required: false, + Value: false, + } ) var requiredFlags = []cli.Flag{ @@ -245,6 +252,7 @@ var optionalFlags = []cli.Flag{ HeartbeatURLFlag, BackupL2UnsafeSyncRPC, BackupL2UnsafeSyncRPCTrustRPC, + L2EngineP2PEnabled, } // Flags contains the list of configuration options available to the binary. diff --git a/op-node/node/config.go b/op-node/node/config.go index 37533a868a..612a842186 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/driver" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" ) @@ -39,6 +40,8 @@ type Config struct { // Optional Tracer Tracer Heartbeat HeartbeatConfig + + Sync sync.Config } type RPCConfig struct { diff --git a/op-node/node/node.go b/op-node/node/node.go index c76591fa98..2b9bfba1cc 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -199,7 +199,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger return err } - n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics) + n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics, &cfg.Sync) return nil } diff --git a/op-node/node/server_test.go b/op-node/node/server_test.go index 9f89fc7edd..5f39deeef3 100644 --- a/op-node/node/server_test.go +++ b/op-node/node/server_test.go @@ -167,6 +167,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { SafeL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng), UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng), + EngineSyncTarget: testutils.RandomL2BlockRef(rng), } } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 6f6c2fa5e1..367754ce7c 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -102,6 +102,10 @@ type EngineQueue struct { safeHead eth.L2BlockRef unsafeHead eth.L2BlockRef + // Target L2 block the engine is currently syncing to. + // If the engine p2p sync is enabled, it can be different with unsafeHead. Otherwise, it must be same with unsafeHead. + engineSyncTarget eth.L2BlockRef + buildingOnto eth.L2BlockRef buildingID eth.PayloadID buildingSafe bool @@ -133,12 +137,14 @@ type EngineQueue struct { metrics Metrics l1Fetcher L1Fetcher + + syncCfg *sync.Config } var _ EngineControl = (*EngineQueue)(nil) // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. -func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue { +func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher, syncCfg *sync.Config) *EngineQueue { return &EngineQueue{ log: log, cfg: cfg, @@ -148,6 +154,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize), prev: prev, l1Fetcher: l1Fetcher, + syncCfg: syncCfg, } } @@ -165,6 +172,11 @@ func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) { eq.metrics.RecordL2Ref("l2_unsafe", head) } +func (eq *EngineQueue) SetEngineSyncTarget(head eth.L2BlockRef) { + eq.engineSyncTarget = head + eq.metrics.RecordL2Ref("l2_engineSyncTarget", head) +} + func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) { if payload == nil { eq.log.Warn("cannot add nil unsafe payload") @@ -221,10 +233,31 @@ func (eq *EngineQueue) SafeL2Head() eth.L2BlockRef { return eq.safeHead } +func (eq *EngineQueue) EngineSyncTarget() eth.L2BlockRef { + return eq.engineSyncTarget +} + +// Determine if the engine is syncing to the target block +func (eq *EngineQueue) isEngineSyncing() bool { + return eq.unsafeHead.Hash != eq.engineSyncTarget.Hash +} + func (eq *EngineQueue) Step(ctx context.Context) error { if eq.needForkchoiceUpdate { return eq.tryUpdateEngine(ctx) } + // Trying unsafe payload should be done before safe attributes + // It allows the unsafe head can move forward while the long-range consolidation is in progress. + if eq.unsafePayloads.Len() > 0 { + if err := eq.tryNextUnsafePayload(ctx); err != io.EOF { + return err + } + // EOF error means we can't process the next unsafe payload. Then we should process next safe attributes. + } + if eq.isEngineSyncing() { + // Make pipeline first focus to sync unsafe blocks to engineSyncTarget + return EngineP2PSyncing + } if eq.safeAttributes != nil { return eq.tryNextSafeAttributes(ctx) } @@ -253,10 +286,6 @@ func (eq *EngineQueue) Step(ctx context.Context) error { return NotEnoughData } - if eq.unsafePayloads.Len() > 0 { - return eq.tryNextUnsafePayload(ctx) - } - if outOfData { return io.EOF } else { @@ -381,6 +410,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) { "l2_finalized", eq.finalized, "l2_safe", eq.safeHead, "l2_unsafe", eq.unsafeHead, + "l2_engineSyncTarget", eq.engineSyncTarget, "l2_time", eq.unsafeHead.Time, "l1_derived", eq.origin, ) @@ -389,8 +419,11 @@ func (eq *EngineQueue) logSyncProgress(reason string) { // tryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node, // this is a no-op if the nodes already agree on the forkchoice state. func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error { + if eq.unsafeHead.Hash != eq.engineSyncTarget.Hash { + eq.log.Warn("Attempting to update forkchoice state while engine is P2P syncing") + } fc := eth.ForkchoiceState{ - HeadBlockHash: eq.unsafeHead.Hash, + HeadBlockHash: eq.engineSyncTarget.Hash, SafeBlockHash: eq.safeHead.Hash, FinalizedBlockHash: eq.finalized.Hash, } @@ -412,6 +445,26 @@ func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error { return nil } +// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload. +// It returns true if the status is acceptable. +func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool { + if eq.syncCfg.EngineP2PEnabled { + // Allow SYNCING and ACCEPTED if engine P2P sync is enabled + return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted + } + return status == eth.ExecutionValid +} + +// checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload. +// It returns true if the status is acceptable. +func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool { + if eq.syncCfg.EngineP2PEnabled { + // Allow SYNCING if engine P2P sync is enabled + return status == eth.ExecutionValid || status == eth.ExecutionSyncing + } + return status == eth.ExecutionValid +} + func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { first := eq.unsafePayloads.Peek() @@ -422,8 +475,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { } // Ensure that the unsafe payload builds upon the current unsafe head - // TODO: once we support snap-sync we can remove this condition, and handle the "SYNCING" status of the execution engine. - if first.ParentHash != eq.unsafeHead.Hash { + if !eq.syncCfg.EngineP2PEnabled && first.ParentHash != eq.unsafeHead.Hash { if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 { eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID()) eq.unsafePayloads.Pop() @@ -442,7 +494,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { if err != nil { return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err)) } - if status.Status != eth.ExecutionValid { + if !eq.checkNewPayloadStatus(status.Status) { eq.unsafePayloads.Pop() return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w", first.ID(), first.ParentID(), eth.NewPayloadErr(first, status))) @@ -468,15 +520,20 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err)) } } - if fcRes.PayloadStatus.Status != eth.ExecutionValid { + if !eq.checkForkchoiceUpdatedStatus(fcRes.PayloadStatus.Status) { eq.unsafePayloads.Pop() return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w", first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus))) } - eq.unsafeHead = ref + eq.engineSyncTarget = ref + eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref) + // unsafeHead should be updated only if the payload status is VALID + if fcRes.PayloadStatus.Status == eth.ExecutionValid { + eq.unsafeHead = ref + eq.metrics.RecordL2Ref("l2_unsafe", ref) + } eq.unsafePayloads.Pop() - eq.metrics.RecordL2Ref("l2_unsafe", ref) eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin) eq.logSyncProgress("unsafe payload from sequencer") @@ -510,7 +567,9 @@ func (eq *EngineQueue) tryNextSafeAttributes(ctx context.Context) error { // For some reason the unsafe head is behind the safe head. Log it, and correct it. eq.log.Error("invalid sync state, unsafe head is behind safe head", "unsafe", eq.unsafeHead, "safe", eq.safeHead) eq.unsafeHead = eq.safeHead + eq.engineSyncTarget = eq.safeHead eq.metrics.RecordL2Ref("l2_unsafe", eq.unsafeHead) + eq.metrics.RecordL2Ref("l2_engineSyncTarget", eq.unsafeHead) return nil } } @@ -603,6 +662,9 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error { } func (eq *EngineQueue) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) { + if eq.isEngineSyncing() { + return BlockInsertTemporaryErr, fmt.Errorf("engine is in progess of p2p sync") + } if eq.buildingID != (eth.PayloadID{}) { eq.log.Warn("did not finish previous block building, starting new building now", "prev_onto", eq.buildingOnto, "prev_payload_id", eq.buildingID, "new_onto", parent) // TODO: maybe worth it to force-cancel the old payload ID here. @@ -644,7 +706,9 @@ func (eq *EngineQueue) ConfirmPayload(ctx context.Context) (out *eth.ExecutionPa } eq.unsafeHead = ref + eq.engineSyncTarget = ref eq.metrics.RecordL2Ref("l2_unsafe", ref) + eq.metrics.RecordL2Ref("l2_engineSyncTarget", ref) if eq.buildingSafe { eq.safeHead = ref @@ -725,6 +789,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System } eq.log.Debug("Reset engine queue", "safeHead", safe, "unsafe", unsafe, "safe_timestamp", safe.Time, "unsafe_timestamp", unsafe.Time, "l1Origin", l1Origin) eq.unsafeHead = unsafe + eq.engineSyncTarget = unsafe eq.safeHead = safe eq.safeAttributes = nil eq.finalized = finalized @@ -738,6 +803,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System eq.metrics.RecordL2Ref("l2_finalized", finalized) eq.metrics.RecordL2Ref("l2_safe", safe) eq.metrics.RecordL2Ref("l2_unsafe", unsafe) + eq.metrics.RecordL2Ref("l2_engineSyncTarget", unsafe) eq.logSyncProgress("reset derivation work") return io.EOF } diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 59795efd67..5e65b12e9d 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testutils" ) @@ -246,7 +247,7 @@ func TestEngineQueue_Finalize(t *testing.T) { prev := &fakeAttributesQueue{} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) + eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") @@ -480,7 +481,7 @@ func TestEngineQueue_ResetWhenUnsafeOriginNotCanonical(t *testing.T) { prev := &fakeAttributesQueue{origin: refE} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) + eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") @@ -811,7 +812,7 @@ func TestVerifyNewL1Origin(t *testing.T) { }, nil) prev := &fakeAttributesQueue{origin: refE} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) + eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for") @@ -909,7 +910,7 @@ func TestBlockBuildingRace(t *testing.T) { } prev := &fakeAttributesQueue{origin: refA, attrs: attrs} - eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F) + eq := NewEngineQueue(logger, cfg, eng, metrics, prev, l1F, &sync.Config{}) require.ErrorIs(t, eq.Reset(context.Background(), eth.L1BlockRef{}, eth.SystemConfig{}), io.EOF) id := eth.PayloadID{0xff} @@ -1079,8 +1080,9 @@ func TestResetLoop(t *testing.T) { prev := &fakeAttributesQueue{origin: refA, attrs: attrs} - eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F) + eq := NewEngineQueue(logger, cfg, eng, metrics.NoopMetrics, prev, l1F, &sync.Config{}) eq.unsafeHead = refA2 + eq.engineSyncTarget = refA2 eq.safeHead = refA1 eq.finalized = refA0 diff --git a/op-node/rollup/derive/error.go b/op-node/rollup/derive/error.go index ef896d2fa6..929c93648d 100644 --- a/op-node/rollup/derive/error.go +++ b/op-node/rollup/derive/error.go @@ -96,3 +96,6 @@ var ErrCritical = NewCriticalError(nil) // NotEnoughData implies that the function currently does not have enough data to progress // but if it is retried enough times, it will eventually return a real value or io.EOF var NotEnoughData = errors.New("not enough data") + +// EngineP2PSyncing implies that the execution engine is currently in progress of P2P sync. +var EngineP2PSyncing = errors.New("engine is P2P syncing") diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 4dcd74a8e7..3ae50e4dd9 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" ) type Metrics interface { @@ -46,6 +47,7 @@ type EngineQueueStage interface { Finalized() eth.L2BlockRef UnsafeL2Head() eth.L2BlockRef SafeL2Head() eth.L2BlockRef + EngineSyncTarget() eth.L2BlockRef Origin() eth.L1BlockRef SystemConfig() eth.SystemConfig SetUnsafeHead(head eth.L2BlockRef) @@ -75,7 +77,7 @@ type DerivationPipeline struct { } // NewDerivationPipeline creates a derivation pipeline, which should be reset before use. -func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics) *DerivationPipeline { +func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher, engine Engine, metrics Metrics, syncCfg *sync.Config) *DerivationPipeline { // Pull stages l1Traversal := NewL1Traversal(log, cfg, l1Fetcher) @@ -89,7 +91,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue) // Step stages - eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher) + eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue, l1Fetcher, syncCfg) // Reset from engine queue then up from L1 Traversal. The stages do not talk to each other during // the reset, but after the engine queue, this is the order in which the stages could talk to each other. @@ -147,6 +149,10 @@ func (dp *DerivationPipeline) UnsafeL2Head() eth.L2BlockRef { return dp.eng.UnsafeL2Head() } +func (dp *DerivationPipeline) EngineSyncTarget() eth.L2BlockRef { + return dp.eng.EngineSyncTarget() +} + func (dp *DerivationPipeline) StartPayload(ctx context.Context, parent eth.L2BlockRef, attrs *eth.PayloadAttributes, updateSafe bool) (errType BlockInsertionErrType, err error) { return dp.eng.StartPayload(ctx, parent, attrs, updateSafe) } @@ -199,6 +205,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { if err := dp.eng.Step(ctx); err == io.EOF { // If every stage has returned io.EOF, try to advance the L1 Origin return dp.traversal.AdvanceL1Block(ctx) + } else if err == EngineP2PSyncing { + return err } else if err != nil { return fmt.Errorf("engine stage failed: %w", err) } else { diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 302129cc8d..f5323b25a6 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" ) type Metrics interface { @@ -58,6 +59,7 @@ type DerivationPipeline interface { UnsafeL2Head() eth.L2BlockRef Origin() eth.L1BlockRef EngineReady() bool + EngineSyncTarget() eth.L2BlockRef } type L1StateIface interface { @@ -103,13 +105,13 @@ type AltSync interface { } // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. -func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { +func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics, syncCfg *sync.Config) *Driver { l1 = NewMeteredL1Fetcher(l1, metrics) l1State := NewL1State(log, metrics) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) verifConfDepth := NewConfDepth(driverCfg.VerifierConfDepth, l1State.L1Head, l1) - derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics) + derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l2, metrics, syncCfg) attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2) engine := derivationPipeline meteredEngine := NewMeteredEngine(cfg, engine, metrics, log) diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 79fd55f2af..ac8ce7ee4a 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -290,8 +290,8 @@ func (s *Driver) eventLoop() { s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) err := s.derivation.Step(context.Background()) stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress. - if err == io.EOF { - s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin()) + if err == io.EOF || (err != nil && errors.Is(err, derive.EngineP2PSyncing)) { + s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err) stepAttempts = 0 s.metrics.SetDerivationIdle(true) continue @@ -423,6 +423,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus { SafeL2: s.derivation.SafeL2Head(), FinalizedL2: s.derivation.Finalized(), UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), + EngineSyncTarget: s.derivation.EngineSyncTarget(), } } diff --git a/op-node/rollup/sync/config.go b/op-node/rollup/sync/config.go new file mode 100644 index 0000000000..2441b00d98 --- /dev/null +++ b/op-node/rollup/sync/config.go @@ -0,0 +1,6 @@ +package sync + +type Config struct { + // EngineP2PEnabled is true when the EngineQueue can trigger execution engine P2P sync. + EngineP2PEnabled bool `json:"engine_p2p_enabled"` +} diff --git a/op-node/service.go b/op-node/service.go index 33c3d5c7d0..e5483ce1dd 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -23,6 +23,7 @@ import ( p2pcli "github.com/ethereum-optimism/optimism/op-node/p2p/cli" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/driver" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" ) // NewConfig creates a Config from the provided flags or environment variables. @@ -57,6 +58,8 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { l2SyncEndpoint := NewL2SyncEndpointConfig(ctx) + syncConfig := NewSyncConfig(ctx) + cfg := &node.Config{ L1: l1Endpoint, L2: l2Endpoint, @@ -86,6 +89,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { Moniker: ctx.GlobalString(flags.HeartbeatMonikerFlag.Name), URL: ctx.GlobalString(flags.HeartbeatURLFlag.Name), }, + Sync: *syncConfig, } if err := cfg.Check(); err != nil { return nil, err @@ -193,3 +197,9 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) { logger.SetHandler(handler) return logger, nil } + +func NewSyncConfig(ctx *cli.Context) *sync.Config { + return &sync.Config{ + EngineP2PEnabled: ctx.Bool(flags.L2EngineP2PEnabled.Name), + } +} diff --git a/op-node/testutils/random.go b/op-node/testutils/random.go index 2b74832651..48564a5c69 100644 --- a/op-node/testutils/random.go +++ b/op-node/testutils/random.go @@ -266,6 +266,7 @@ func RandomOutputResponse(rng *rand.Rand) *eth.OutputResponse { UnsafeL2: RandomL2BlockRef(rng), SafeL2: RandomL2BlockRef(rng), FinalizedL2: RandomL2BlockRef(rng), + EngineSyncTarget: RandomL2BlockRef(rng), }, } } diff --git a/op-program/client/driver/driver.go b/op-program/client/driver/driver.go index 8372d1996a..67ec37228f 100644 --- a/op-program/client/driver/driver.go +++ b/op-program/client/driver/driver.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-node/rollup/sync" "github.com/ethereum/go-ethereum/log" ) @@ -35,7 +36,7 @@ type Driver struct { } func NewDriver(logger log.Logger, cfg *rollup.Config, l1Source derive.L1Fetcher, l2Source L2Source, targetBlockNum uint64) *Driver { - pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l2Source, metrics.NoopMetrics) + pipeline := derive.NewDerivationPipeline(logger, cfg, l1Source, l2Source, metrics.NoopMetrics, &sync.Config{}) pipeline.Reset() return &Driver{ logger: logger, diff --git a/specs/exec-engine.md b/specs/exec-engine.md index c132fb4d3c..9730ee5bca 100644 --- a/specs/exec-engine.md +++ b/specs/exec-engine.md @@ -242,7 +242,7 @@ as the engine implementation can sync state faster through methods like [snap-sy ### Happy-path sync 1. The rollup node informs the engine of the L2 chain head, unconditionally (part of regular node operation): - - [`engine_newPayloadV1`][engine_newPayloadV1] is called with latest L2 block derived from L1. + - [`engine_newPayloadV1`][engine_newPayloadV1] is called with latest L2 block received from P2P. - [`engine_forkchoiceUpdatedV1`][engine_forkchoiceUpdatedV1] is called with the current `unsafe`/`safe`/`finalized` L2 block hashes. 2. The engine requests headers from peers, in reverse till the parent hash matches the local chain From 79c7a366ee34ce9f833281b49a11fc7194e1aa5a Mon Sep 17 00:00:00 2001 From: Tei Im Date: Wed, 28 Jun 2023 19:46:54 +0900 Subject: [PATCH 2/5] op-node: Fix reorg depth check, Add skip-sanity-check flag --- op-node/flags/flags.go | 8 +++++++ op-node/rollup/derive/engine_queue.go | 2 +- op-node/rollup/sync/config.go | 2 ++ op-node/rollup/sync/start.go | 23 +++++++++++-------- op-node/rollup/sync/start_test.go | 33 ++++++++++++++++++++++++++- op-node/service.go | 1 + 6 files changed, 58 insertions(+), 11 deletions(-) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index cca005aeb7..747a0e97f6 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -215,6 +215,13 @@ var ( Required: false, Value: false, } + SkipSanityCheck = &cli.BoolFlag{ + Name: "skip-sanity-check", + Usage: "Skip chain sanity check on pipeline reset", + EnvVars: prefixEnvVars("SKIP_SANITY_CHECK"), + Required: false, + Value: false, + } ) var requiredFlags = []cli.Flag{ @@ -253,6 +260,7 @@ var optionalFlags = []cli.Flag{ BackupL2UnsafeSyncRPC, BackupL2UnsafeSyncRPCTrustRPC, L2EngineP2PEnabled, + SkipSanityCheck, } // Flags contains the list of configuration options available to the binary. diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 367754ce7c..7c279a489e 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -749,7 +749,7 @@ func (eq *EngineQueue) resetBuildingState() { // ResetStep Walks the L2 chain backwards until it finds an L2 block whose L1 origin is canonical. // The unsafe head is set to the head of the L2 chain, unless the existing safe head is not canonical. func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error { - result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine, eq.log) + result, err := sync.FindL2Heads(ctx, eq.cfg, eq.l1Fetcher, eq.engine, eq.log, eq.syncCfg) if err != nil { return NewTemporaryError(fmt.Errorf("failed to find the L2 Heads to start from: %w", err)) } diff --git a/op-node/rollup/sync/config.go b/op-node/rollup/sync/config.go index 2441b00d98..4b83170b87 100644 --- a/op-node/rollup/sync/config.go +++ b/op-node/rollup/sync/config.go @@ -3,4 +3,6 @@ package sync type Config struct { // EngineP2PEnabled is true when the EngineQueue can trigger execution engine P2P sync. EngineP2PEnabled bool `json:"engine_p2p_enabled"` + // SkipSanityCheck is true when the EngineQueue does not do sanity check on pipeline reset. + SkipSanityCheck bool `json:"skip_sanity_check"` } diff --git a/op-node/rollup/sync/start.go b/op-node/rollup/sync/start.go index 91118222d0..38e2cae5a6 100644 --- a/op-node/rollup/sync/start.go +++ b/op-node/rollup/sync/start.go @@ -102,7 +102,7 @@ func currentHeads(ctx context.Context, cfg *rollup.Config, l2 L2Chain) (*FindHea // Plausible: meaning that the blockhash of the L2 block's L1 origin // (as reported in the L1 Attributes deposit within the L2 block) is not canonical at another height in the L1 chain, // and the same holds for all its ancestors. -func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain, lgr log.Logger) (result *FindHeadsResult, err error) { +func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain, lgr log.Logger, syncCfg *Config) (result *FindHeadsResult, err error) { // Fetch current L2 forkchoice state result, err = currentHeads(ctx, cfg, l2) if err != nil { @@ -170,18 +170,18 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain if (n.Number == result.Finalized.Number) && (n.Hash != result.Finalized.Hash) { return nil, fmt.Errorf("%w: finalized %s, got: %s", ReorgFinalizedErr, result.Finalized, n) } - // Check we are not reorging L2 incredibly deep - if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SeqWindowSize) < prevUnsafe.L1Origin.Number { - // If the reorg depth is too large, something is fishy. - // This can legitimately happen if L1 goes down for a while. But in that case, - // restarting the L2 node with a bigger configured MaxReorgDepth is an acceptable - // stopgap solution. - return nil, fmt.Errorf("%w: traversed back to L2 block %s, but too deep compared to previous unsafe block %s", TooDeepReorgErr, n, prevUnsafe) - } // If we don't have a usable unsafe head, then set it if result.Unsafe == (eth.L2BlockRef{}) { result.Unsafe = n + // Check we are not reorging L2 incredibly deep + if n.L1Origin.Number+(MaxReorgSeqWindows*cfg.SeqWindowSize) < prevUnsafe.L1Origin.Number { + // If the reorg depth is too large, something is fishy. + // This can legitimately happen if L1 goes down for a while. But in that case, + // restarting the L2 node with a bigger configured MaxReorgDepth is an acceptable + // stopgap solution. + return nil, fmt.Errorf("%w: traversed back to L2 block %s, but too deep compared to previous unsafe block %s", TooDeepReorgErr, n, prevUnsafe) + } } if ahead { @@ -212,6 +212,11 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain return result, nil } + if syncCfg.SkipSanityCheck && highestL2WithCanonicalL1Origin.Hash == n.Hash { + lgr.Info("Found highest L2 block with canonical L1 origin. Skip further sanity check and jump to the safe head") + n = result.Safe + continue + } // Pull L2 parent for next iteration parent, err := l2.L2BlockRefByHash(ctx, n.ParentHash) if err != nil { diff --git a/op-node/rollup/sync/start_test.go b/op-node/rollup/sync/start_test.go index 376ce77677..1b820d22ec 100644 --- a/op-node/rollup/sync/start_test.go +++ b/op-node/rollup/sync/start_test.go @@ -76,7 +76,7 @@ func (c *syncStartTestCase) Run(t *testing.T) { } lgr := log.New() lgr.SetHandler(log.DiscardHandler()) - result, err := FindL2Heads(context.Background(), cfg, chain, chain, lgr) + result, err := FindL2Heads(context.Background(), cfg, chain, chain, lgr, &Config{}) if c.ExpectedErr != nil { require.ErrorIs(t, err, c.ExpectedErr, "expected error") return @@ -286,6 +286,37 @@ func TestFindSyncStart(t *testing.T) { SafeL2Head: 'D', ExpectedErr: WrongChainErr, }, + { + // FindL2Heads() keeps walking back to safe head after finding canonical unsafe head + // TooDeepReorgErr must not be raised + Name: "long traverse to safe head", + GenesisL1Num: 0, + L1: "abcdefgh", + L2: "ABCDEFGH", + NewL1: "abcdefgx", + PreFinalizedL2: 'B', + PreSafeL2: 'B', + GenesisL1: 'a', + GenesisL2: 'A', + UnsafeL2Head: 'G', + SeqWindowSize: 1, + SafeL2Head: 'B', + ExpectedErr: nil, + }, + { + // L2 reorg is too deep + Name: "reorg too deep", + GenesisL1Num: 0, + L1: "abcdefgh", + L2: "ABCDEFGH", + NewL1: "abijklmn", + PreFinalizedL2: 'B', + PreSafeL2: 'B', + GenesisL1: 'a', + GenesisL2: 'A', + SeqWindowSize: 1, + ExpectedErr: TooDeepReorgErr, + }, } for _, testCase := range testCases { diff --git a/op-node/service.go b/op-node/service.go index e5483ce1dd..fb5eab3126 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -201,5 +201,6 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) { func NewSyncConfig(ctx *cli.Context) *sync.Config { return &sync.Config{ EngineP2PEnabled: ctx.Bool(flags.L2EngineP2PEnabled.Name), + SkipSanityCheck: ctx.Bool(flags.SkipSanityCheck.Name), } } From 458d86efd82bf43e1140c86dcfc78d9c9b0aa874 Mon Sep 17 00:00:00 2001 From: Tei Im Date: Wed, 26 Jul 2023 14:02:15 +0900 Subject: [PATCH 3/5] Rename flags and config variables for clarify to the user Co-Authored-By: protolambda <19571989+protolambda@users.noreply.github.com> --- op-e2e/actions/sync_test.go | 2 +- op-node/flags/flags.go | 19 ++++++++++--------- op-node/rollup/derive/engine_queue.go | 6 +++--- op-node/rollup/sync/config.go | 8 ++++---- op-node/rollup/sync/start.go | 2 +- op-node/service.go | 4 ++-- 6 files changed, 21 insertions(+), 20 deletions(-) diff --git a/op-e2e/actions/sync_test.go b/op-e2e/actions/sync_test.go index dee7b4dc20..3d9bf56ea6 100644 --- a/op-e2e/actions/sync_test.go +++ b/op-e2e/actions/sync_test.go @@ -135,7 +135,7 @@ func TestEngineP2PSync(gt *testing.T) { miner, seqEng, sequencer := setupSequencerTest(t, sd, log) // Enable engine P2P sync - _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{EngineP2PEnabled: true}) + _, verifier := setupVerifier(t, sd, log, miner.L1Client(t, sd.RollupCfg), &sync.Config{EngineSync: true}) seqEngCl, err := sources.NewEngineClient(seqEng.RPCClient(), log, nil, sources.EngineClientDefaultConfig(sd.RollupCfg)) require.NoError(t, err) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 747a0e97f6..4f7ece7476 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -208,17 +208,18 @@ var ( EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), Required: false, } - L2EngineP2PEnabled = &cli.BoolFlag{ - Name: "l2.engine-p2p.enabled", + L2EngineSyncEnabled = &cli.BoolFlag{ + Name: "l2.engine-sync", Usage: "Enables or disables execution engine P2P sync", - EnvVars: prefixEnvVars("L2_ENGINE_P2P_ENABLED"), + EnvVars: prefixEnvVars("L2_ENGINE_SYNC_ENABLED"), Required: false, Value: false, } - SkipSanityCheck = &cli.BoolFlag{ - Name: "skip-sanity-check", - Usage: "Skip chain sanity check on pipeline reset", - EnvVars: prefixEnvVars("SKIP_SANITY_CHECK"), + SkipSyncStartCheck = &cli.BoolFlag{ + Name: "l2.skip-sync-start-check", + Usage: "Skip sanity check of consistency of L1 origins of the unsafe L2 blocks when determining the sync-starting point. " + + "This defers the L1-origin verification, and is recommended to use in when utilizing l2.engine-sync", + EnvVars: prefixEnvVars("L2_SKIP_SYNC_START_CHECK"), Required: false, Value: false, } @@ -259,8 +260,8 @@ var optionalFlags = []cli.Flag{ HeartbeatURLFlag, BackupL2UnsafeSyncRPC, BackupL2UnsafeSyncRPCTrustRPC, - L2EngineP2PEnabled, - SkipSanityCheck, + L2EngineSyncEnabled, + SkipSyncStartCheck, } // Flags contains the list of configuration options available to the binary. diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 7c279a489e..92a102c3e0 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -448,7 +448,7 @@ func (eq *EngineQueue) tryUpdateEngine(ctx context.Context) error { // checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload. // It returns true if the status is acceptable. func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool { - if eq.syncCfg.EngineP2PEnabled { + if eq.syncCfg.EngineSync { // Allow SYNCING and ACCEPTED if engine P2P sync is enabled return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted } @@ -458,7 +458,7 @@ func (eq *EngineQueue) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bo // checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload. // It returns true if the status is acceptable. func (eq *EngineQueue) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool { - if eq.syncCfg.EngineP2PEnabled { + if eq.syncCfg.EngineSync { // Allow SYNCING if engine P2P sync is enabled return status == eth.ExecutionValid || status == eth.ExecutionSyncing } @@ -475,7 +475,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error { } // Ensure that the unsafe payload builds upon the current unsafe head - if !eq.syncCfg.EngineP2PEnabled && first.ParentHash != eq.unsafeHead.Hash { + if !eq.syncCfg.EngineSync && first.ParentHash != eq.unsafeHead.Hash { if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 { eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID()) eq.unsafePayloads.Pop() diff --git a/op-node/rollup/sync/config.go b/op-node/rollup/sync/config.go index 4b83170b87..a30ae03678 100644 --- a/op-node/rollup/sync/config.go +++ b/op-node/rollup/sync/config.go @@ -1,8 +1,8 @@ package sync type Config struct { - // EngineP2PEnabled is true when the EngineQueue can trigger execution engine P2P sync. - EngineP2PEnabled bool `json:"engine_p2p_enabled"` - // SkipSanityCheck is true when the EngineQueue does not do sanity check on pipeline reset. - SkipSanityCheck bool `json:"skip_sanity_check"` + // EngineSync is true when the EngineQueue can trigger execution engine P2P sync. + EngineSync bool `json:"engine_sync"` + // SkipSyncStartCheck skip the sanity check of consistency of L1 origins of the unsafe L2 blocks when determining the sync-starting point. This defers the L1-origin verification, and is recommended to use in when utilizing l2.engine-sync + SkipSyncStartCheck bool `json:"skip_sync_start_check"` } diff --git a/op-node/rollup/sync/start.go b/op-node/rollup/sync/start.go index 38e2cae5a6..a1f3faea17 100644 --- a/op-node/rollup/sync/start.go +++ b/op-node/rollup/sync/start.go @@ -212,7 +212,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain return result, nil } - if syncCfg.SkipSanityCheck && highestL2WithCanonicalL1Origin.Hash == n.Hash { + if syncCfg.SkipSyncStartCheck && highestL2WithCanonicalL1Origin.Hash == n.Hash { lgr.Info("Found highest L2 block with canonical L1 origin. Skip further sanity check and jump to the safe head") n = result.Safe continue diff --git a/op-node/service.go b/op-node/service.go index fb5eab3126..050ef3fef4 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -200,7 +200,7 @@ func NewSnapshotLogger(ctx *cli.Context) (log.Logger, error) { func NewSyncConfig(ctx *cli.Context) *sync.Config { return &sync.Config{ - EngineP2PEnabled: ctx.Bool(flags.L2EngineP2PEnabled.Name), - SkipSanityCheck: ctx.Bool(flags.SkipSanityCheck.Name), + EngineSync: ctx.Bool(flags.L2EngineSyncEnabled.Name), + SkipSyncStartCheck: ctx.Bool(flags.SkipSyncStartCheck.Name), } } From f05057ce291ba676f244e79040d66fd842379800 Mon Sep 17 00:00:00 2001 From: Tei Im Date: Wed, 26 Jul 2023 14:15:38 +0900 Subject: [PATCH 4/5] Improve error handling --- op-node/rollup/derive/pipeline.go | 3 ++- op-node/rollup/driver/state.go | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 3ae50e4dd9..a2b7019d5f 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -2,6 +2,7 @@ package derive import ( "context" + "errors" "fmt" "io" @@ -205,7 +206,7 @@ func (dp *DerivationPipeline) Step(ctx context.Context) error { if err := dp.eng.Step(ctx); err == io.EOF { // If every stage has returned io.EOF, try to advance the L1 Origin return dp.traversal.AdvanceL1Block(ctx) - } else if err == EngineP2PSyncing { + } else if errors.Is(err, EngineP2PSyncing) { return err } else if err != nil { return fmt.Errorf("engine stage failed: %w", err) diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index ac8ce7ee4a..82a8a993aa 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -290,11 +290,16 @@ func (s *Driver) eventLoop() { s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) err := s.derivation.Step(context.Background()) stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress. - if err == io.EOF || (err != nil && errors.Is(err, derive.EngineP2PSyncing)) { + if err == io.EOF { s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err) stepAttempts = 0 s.metrics.SetDerivationIdle(true) continue + } else if err != nil && errors.Is(err, derive.EngineP2PSyncing) { + s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "sync_target", s.derivation.EngineSyncTarget(), "err", err) + stepAttempts = 0 + s.metrics.SetDerivationIdle(true) + continue } else if err != nil && errors.Is(err, derive.ErrReset) { // If the pipeline corrupts, e.g. due to a reorg, simply reset it s.log.Warn("Derivation pipeline is reset", "err", err) From 85ff5e92c297fa2ab0fd235fdb6dca13e6e7caac Mon Sep 17 00:00:00 2001 From: Krish Date: Mon, 4 Sep 2023 22:41:22 +0800 Subject: [PATCH 5/5] fix: fix flags to adapt current opBNB version --- op-node/flags/flags.go | 10 ++++------ op-node/service.go | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 4f7ece7476..47bd6e9945 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -208,20 +208,18 @@ var ( EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), Required: false, } - L2EngineSyncEnabled = &cli.BoolFlag{ + L2EngineSyncEnabled = cli.BoolFlag{ Name: "l2.engine-sync", Usage: "Enables or disables execution engine P2P sync", - EnvVars: prefixEnvVars("L2_ENGINE_SYNC_ENABLED"), + EnvVar: prefixEnvVar("L2_ENGINE_SYNC_ENABLED"), Required: false, - Value: false, } - SkipSyncStartCheck = &cli.BoolFlag{ + SkipSyncStartCheck = cli.BoolFlag{ Name: "l2.skip-sync-start-check", Usage: "Skip sanity check of consistency of L1 origins of the unsafe L2 blocks when determining the sync-starting point. " + "This defers the L1-origin verification, and is recommended to use in when utilizing l2.engine-sync", - EnvVars: prefixEnvVars("L2_SKIP_SYNC_START_CHECK"), + EnvVar: prefixEnvVar("L2_SKIP_SYNC_START_CHECK"), Required: false, - Value: false, } ) diff --git a/op-node/service.go b/op-node/service.go index 050ef3fef4..7e335e8eda 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -89,7 +89,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { Moniker: ctx.GlobalString(flags.HeartbeatMonikerFlag.Name), URL: ctx.GlobalString(flags.HeartbeatURLFlag.Name), }, - Sync: *syncConfig, + Sync: *syncConfig, } if err := cfg.Check(); err != nil { return nil, err