Skip to content

Commit

Permalink
Created snapshot test, optimised snapshots and fixed bug (#462)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Jul 6, 2022
1 parent f829e8b commit 02c8d2d
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 16 deletions.
11 changes: 11 additions & 0 deletions cluster/dragon/dragon.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/lni/dragonboat/v3/logger"
Expand Down Expand Up @@ -84,6 +85,8 @@ type Dragon struct {
requestClientPool []remoting.Client
requestClientPoolLock sync.Mutex
healthChecker *remoting.HealthChecker
saveSnapshotCount int64
restoreSnapshotCount int64
}

type snapshot struct {
Expand Down Expand Up @@ -1167,3 +1170,11 @@ func (d *Dragon) ensureNodeHostAvailable() error {
}
return nil
}

func (d *Dragon) SaveSnapshotCount() int64 {
return atomic.LoadInt64(&d.saveSnapshotCount)
}

func (d *Dragon) RestoreSnapshotCount() int64 {
return atomic.LoadInt64(&d.restoreSnapshotCount)
}
2 changes: 1 addition & 1 deletion cluster/dragon/locks_odsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *locksODStateMachine) SaveSnapshot(i interface{}, writer io.Writer, i2 <
}
prefix := table.EncodeTableKeyPrefix(common.LocksTableID, locksClusterID, 16)
log.Printf("Saving locks snapshot on node id %d for shard id %d prefix is %v", s.dragon.cnf.NodeID, locksClusterID, prefix)
return saveSnapshotDataToWriter(snapshot, prefix, writer, locksClusterID)
return saveSnapshotDataToWriter(s.dragon.pebble, snapshot, prefix, writer, locksClusterID)
}

func (s *locksODStateMachine) RecoverFromSnapshot(reader io.Reader, i <-chan struct{}) error {
Expand Down
2 changes: 1 addition & 1 deletion cluster/dragon/sequence_odsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *sequenceODStateMachine) SaveSnapshot(i interface{}, writer io.Writer, i
}
prefix := table.EncodeTableKeyPrefix(common.SequenceGeneratorTableID, tableSequenceClusterID, 16)
log.Printf("Saving sequence snapshot on node id %d for shard id %d prefix is %v", s.dragon.cnf.NodeID, tableSequenceClusterID, prefix)
err := saveSnapshotDataToWriter(snapshot, prefix, writer, tableSequenceClusterID)
err := saveSnapshotDataToWriter(s.dragon.pebble, snapshot, prefix, writer, tableSequenceClusterID)
log.Info("sequence shard save snapshot done")
return err
}
Expand Down
23 changes: 17 additions & 6 deletions cluster/dragon/shard_odsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"math"
"sync"
"sync/atomic"
)

const (
Expand Down Expand Up @@ -328,23 +329,32 @@ func (s *ShardOnDiskStateMachine) PrepareSnapshot() (interface{}, error) {
}

func (s *ShardOnDiskStateMachine) SaveSnapshot(i interface{}, writer io.Writer, _ <-chan struct{}) error {
log.Debugf("data shard %d saving snapshot", s.shardID)
log.Debugf("data shard %d saving snapshot on node id %d", s.shardID, s.dragon.cnf.NodeID)
snapshot, ok := i.(*pebble.Snapshot)
if !ok {
panic("not a snapshot")
}
prefix := make([]byte, 0, 8)
prefix = common.AppendUint64ToBufferBE(prefix, s.shardID)
log.Debugf("Saving data snapshot on node id %d for shard id %d prefix is %v", s.dragon.cnf.NodeID, s.shardID, prefix)
err := saveSnapshotDataToWriter(snapshot, prefix, writer, s.shardID)
log.Debugf("data shard %d save snapshot done", s.shardID)
return err
err := saveSnapshotDataToWriter(s.dragon.pebble, snapshot, prefix, writer, s.shardID)
atomic.AddInt64(&s.dragon.saveSnapshotCount, 1)
if err != nil {
// According to the docs for IOnDiskStateMachine we should return ErrSnapshotStreaming
if errors.Is(err, statemachine.ErrSnapshotStreaming) {
return err
}
log.Errorf("failure in streaming snapshot %+v", err)
return statemachine.ErrSnapshotStreaming
}
log.Debugf("data shard %d save snapshot done on node id %d", s.shardID, s.dragon.cnf.NodeID)
return nil
}

func (s *ShardOnDiskStateMachine) RecoverFromSnapshot(reader io.Reader, i <-chan struct{}) error {
s.lock.Lock()
defer s.lock.Unlock()
log.Debugf("data shard %d recover from snapshot", s.shardID)
log.Debugf("data shard %d recover from snapshot on node %d", s.shardID, s.dragon.cnf.NodeID)
s.dedupSequences = make(map[string]uint64)
startPrefix := common.AppendUint64ToBufferBE(make([]byte, 0, 8), s.shardID)
endPrefix := common.AppendUint64ToBufferBE(make([]byte, 0, 8), s.shardID+1)
Expand All @@ -357,7 +367,8 @@ func (s *ShardOnDiskStateMachine) RecoverFromSnapshot(reader io.Reader, i <-chan
return err
}
s.maybeTriggerRemoteWriteOccurred()
log.Debugf("data shard %d recover from snapshot done", s.shardID)
log.Debugf("data shard %d recover from snapshot done on node %d", s.shardID, s.dragon.cnf.NodeID)
atomic.AddInt64(&s.dragon.restoreSnapshotCount, 1)
return nil
}

Expand Down
51 changes: 44 additions & 7 deletions cluster/dragon/sm_utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dragon

import (
"bufio"
"io"
"io/ioutil"
"math"
Expand All @@ -18,11 +19,21 @@ import (
var syncWriteOptions = &pebble.WriteOptions{Sync: true}
var nosyncWriteOptions = &pebble.WriteOptions{Sync: false}

func saveSnapshotDataToWriter(snapshot *pebble.Snapshot, prefix []byte, writer io.Writer, shardID uint64) error {
const (
saveSnapshotBufferSize = 32 * 1024
restoreSnapshotBufferSize = 32 * 1024
)

func saveSnapshotDataToWriter(peb *pebble.DB, snapshot *pebble.Snapshot, prefix []byte, writer io.Writer, shardID uint64) error {
var w writeCloseSyncer
w, ok := writer.(writeCloseSyncer)
if !ok {
w = &nopWriteCloseSyncer{writer}
// We use our own buffered writer so we can increase buffer size and also sync
bufWriter := bufio.NewWriterSize(writer, saveSnapshotBufferSize)
w = &snapshotWriteCloseSyncer{
peb: peb,
bufWriter: bufWriter,
}
}
tbl := sstable.NewWriter(w, sstable.WriterOptions{})
upper := common.IncrementBytesBigEndian(prefix)
Expand Down Expand Up @@ -53,6 +64,9 @@ func saveSnapshotDataToWriter(snapshot *pebble.Snapshot, prefix []byte, writer i
return errors.WithStack(err)
}
}
if err := iter.Close(); err != nil {
return errors.WithStack(err)
}
if err := tbl.Close(); err != nil {
return errors.WithStack(err)
}
Expand All @@ -79,7 +93,8 @@ func restoreSnapshotDataFromReader(peb *pebble.DB, startPrefix []byte, endPrefix
}()

log.Info("Copying reader to temp file")
if _, err := io.Copy(f, reader); err != nil {
bufReader := bufio.NewReaderSize(reader, restoreSnapshotBufferSize)
if _, err := io.Copy(f, bufReader); err != nil {
return errors.WithStack(err)
}
if err := f.Sync(); err != nil {
Expand Down Expand Up @@ -149,12 +164,34 @@ type writeCloseSyncer interface {
Sync() error
}

type nopWriteCloseSyncer struct{ io.Writer }
type snapshotWriteCloseSyncer struct {
peb *pebble.DB
bufWriter *bufio.Writer
needsFlush bool
}

func (w *nopWriteCloseSyncer) Close() error {
return nil
func (w *snapshotWriteCloseSyncer) Write(p []byte) (n int, err error) {
n, err = w.bufWriter.Write(p)
if err == nil {
w.needsFlush = true
}
return
}

func (w *nopWriteCloseSyncer) Sync() error {
func (w *snapshotWriteCloseSyncer) Close() error {
if w.needsFlush {
w.needsFlush = false
return w.bufWriter.Flush()
}
return nil
}

func (w *snapshotWriteCloseSyncer) Sync() error {
if w.needsFlush {
if err := w.bufWriter.Flush(); err != nil {
return err
}
w.needsFlush = false
}
return syncPebble(w.peb)
}
2 changes: 1 addition & 1 deletion cluster/dragon/sm_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSaveRestoreSnapshot(t *testing.T) {

buf := &bytes.Buffer{}
snapshotPrefix := table.EncodeTableKeyPrefix(tableID, shardIDs[1], 20)
require.NoError(t, saveSnapshotDataToWriter(srcDB.NewSnapshot(), snapshotPrefix, buf, shardIDToSnapshot))
require.NoError(t, saveSnapshotDataToWriter(srcDB, srcDB.NewSnapshot(), snapshotPrefix, buf, shardIDToSnapshot))

startPrefix := common.AppendUint64ToBufferBE(make([]byte, 0, 8), shardIDToSnapshot)
endPrefix := common.AppendUint64ToBufferBE(make([]byte, 0, 8), shardIDToSnapshot+1)
Expand Down
171 changes: 171 additions & 0 deletions cluster/dragon/snapshottest/dragon_snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package snapshottest

import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/squareup/pranadb/cluster"
"github.com/squareup/pranadb/cluster/dragon"
"github.com/squareup/pranadb/conf"
"github.com/squareup/pranadb/errors"
"github.com/squareup/pranadb/table"
"github.com/stretchr/testify/require"
"io/ioutil"
"testing"
"time"
)

func TestSnapshot(t *testing.T) {
dataDir, err := ioutil.TempDir("", "dragon-test")
if err != nil {
panic("failed to create temp dir")
}
// We start just two nodes of the three node cluster - this provides a quorum so they will accept writes
dragonCluster, err := startDragonNodes(dataDir, 0, 2)
if err != nil {
panic(fmt.Sprintf("failed to start dragon cluster %+v", err))
}
node := dragonCluster[0]

// Write a bunch of data to a shard
numKeys := 50
batchSize := 1
var wb *cluster.WriteBatch
shardID := node.GetAllShardIDs()[0]
tableID := uint64(1056)
var keys [][]byte
var vals []string
for i := 0; i < numKeys; i++ {
skey := fmt.Sprintf("some-key-%06d", i)
value := fmt.Sprintf("some-value-%d", i)
key := table.EncodeTableKeyPrefix(tableID, shardID, 16)
key = append(key, []byte(skey)...)
keys = append(keys, key)
vals = append(vals, value)
if wb == nil {
wb = cluster.NewWriteBatch(shardID)
}
wb.AddPut(key, []byte(value))
if i%batchSize == 0 {
err := node.WriteBatch(wb)
require.NoError(t, err)
log.Info("wrote batch")
wb = nil
}
}
if wb != nil {
err := node.WriteBatch(wb)
require.NoError(t, err)
log.Info("wrote batch")
}

// At this point no snapshots should have been done.
// We use an IOnDiskStateMachine and Dragon will NOT create snapshots periodically because the state machine
// is persisted to disk anyway (that's not true for standard state machines)
// Snapshots are only created when another node starts and needs to get its state machine in sync with the rest
// of the group. In this case a snapshot stream is requested from an existing node, the snapshot is created and
// streamed to the new node
require.Equal(t, int64(0), dragonCluster[0].SaveSnapshotCount())
require.Equal(t, int64(0), dragonCluster[0].RestoreSnapshotCount())
require.Equal(t, int64(0), dragonCluster[2].SaveSnapshotCount())
require.Equal(t, int64(0), dragonCluster[2].RestoreSnapshotCount())

// Now start the another node
// This should trigger a snapshot on one of the existing nodes which should then be streamed to the new node
// so it can get up to state
dragonCluster2, err := startDragonNodes(dataDir, 1)
if err != nil {
panic(fmt.Sprintf("failed to start dragon cluster %+v", err))
}
require.Equal(t, int64(0), dragonCluster2[1].SaveSnapshotCount())
require.Equal(t, int64(1), dragonCluster2[1].RestoreSnapshotCount())

require.Equal(t, int64(0), dragonCluster[0].RestoreSnapshotCount())
require.Equal(t, int64(0), dragonCluster[2].RestoreSnapshotCount())

require.True(t, dragonCluster[0].SaveSnapshotCount() == 1 || dragonCluster[2].SaveSnapshotCount() == 1)
require.False(t, dragonCluster[0].SaveSnapshotCount() == 1 && dragonCluster[2].SaveSnapshotCount() == 1)

// Now we read the data back from this node
// We read it directly from Pebble to make sure it's actually got to that node
for i, key := range keys {
va, err := dragonCluster2[1].LocalGet(key)
require.NoError(t, err)
require.Equal(t, vals[i], string(va))
}

stopDragonCluster(dragonCluster)
stopDragonCluster(dragonCluster2)
}

func startDragonNodes(dataDir string, nodes ...int) ([]*dragon.Dragon, error) {

nodeAddresses := []string{
"localhost:63101",
"localhost:63102",
"localhost:63103",
}

requiredNodes := map[int]struct{}{}
for _, node := range nodes {
requiredNodes[node] = struct{}{}
}

var chans []chan error
clusterNodes := make([]*dragon.Dragon, len(nodeAddresses))
for i := 0; i < len(nodeAddresses); i++ {
_, ok := requiredNodes[i]
if !ok {
continue
}
ch := make(chan error)
chans = append(chans, ch)
cnf := conf.NewDefaultConfig()
cnf.NodeID = i
cnf.ClusterID = 123
cnf.RaftAddresses = nodeAddresses
cnf.NumShards = 10
cnf.DataDir = dataDir
cnf.ReplicationFactor = 3
cnf.TestServer = true
cnf.DataSnapshotEntries = 10
cnf.DataCompactionOverhead = 5
clus, err := dragon.NewDragon(*cnf)
if err != nil {
return nil, err
}
clusterNodes[i] = clus
clus.RegisterShardListenerFactory(&cluster.DummyShardListenerFactory{})
clus.SetRemoteQueryExecutionCallback(&cluster.DummyRemoteQueryExecutionCallback{})

go startDragonNode(clus, ch)
}

for _, ch := range chans {
err, ok := <-ch
if !ok {
return nil, errors.Error("channel was closed")
}
if err != nil {
return nil, errors.WithStack(err)
}
}

time.Sleep(5 * time.Second)
return clusterNodes, nil
}

func startDragonNode(clus cluster.Cluster, ch chan error) {
err := clus.Start()
ch <- err
}

func stopDragonCluster(dragonCluster []*dragon.Dragon) {
for _, dragon := range dragonCluster {
if dragon != nil {
err := dragon.Stop()
if err != nil {
panic(fmt.Sprintf("failed to stop dragon cluster %+v", err))
}
}
}
}

0 comments on commit 02c8d2d

Please sign in to comment.