diff --git a/wal.go b/wal.go index d6db18a..27d00dd 100644 --- a/wal.go +++ b/wal.go @@ -481,8 +481,8 @@ func (l *Log) writeBatch(b *Batch) error { return nil } -// FirstIndex returns the index of the first entry in the log. Returns zero -// when log has no entries. +// FirstIndex returns the index of the next entry to read in the log. +// It points to the next future index if the log is currently empty. func (l *Log) FirstIndex() (index uint64, err error) { l.mu.RLock() defer l.mu.RUnlock() @@ -491,16 +491,17 @@ func (l *Log) FirstIndex() (index uint64, err error) { } else if l.closed { return 0, ErrClosed } - // We check the lastIndex for zero because the firstIndex is always one or - // more, even when there's no entries - if l.lastIndex == 0 { - return 0, nil - } + // No longer check the lastIndex for zero because we allow empty logs since + // #31 was merged. + // https://github.com/tidwall/wal/pull/31 + //if l.lastIndex == 0 { + // return 0, nil + //} return l.firstIndex, nil } -// LastIndex returns the index of the last entry in the log. Returns zero when -// log has no entries. +// LastIndex returns the index of the last entry has been written to the log. +// It points to the last deleted index if the log is currently empty. func (l *Log) LastIndex() (index uint64, err error) { l.mu.RLock() defer l.mu.RUnlock() @@ -509,9 +510,12 @@ func (l *Log) LastIndex() (index uint64, err error) { } else if l.closed { return 0, ErrClosed } - if l.lastIndex == 0 { - return 0, nil - } + // No longer check the lastIndex for zero because we allow empty logs since + // #31 was merged. + // https://github.com/tidwall/wal/pull/31 + //if l.lastIndex == 0 { + // return 0, nil + //} return l.lastIndex, nil } @@ -699,6 +703,7 @@ func (l *Log) clearCache() { // TruncateFront truncates the front of the log by removing all entries that // are before the provided `index`. In other words the entry at // `index` becomes the first entry in the log. +// If `index` equals to `LastIndex()+1`, all entries will be truncated. func (l *Log) TruncateFront(index uint64) error { l.mu.Lock() defer l.mu.Unlock() @@ -709,23 +714,32 @@ func (l *Log) TruncateFront(index uint64) error { } return l.truncateFront(index) } + func (l *Log) truncateFront(index uint64) (err error) { - if index == 0 || l.lastIndex == 0 || - index < l.firstIndex || index > l.lastIndex { + if index < l.firstIndex || index > l.lastIndex+1 { return ErrOutOfRange } if index == l.firstIndex { // nothing to truncate return nil } - segIdx := l.findSegment(index) + var segIdx int var s *segment - s, err = l.loadSegment(index) - if err != nil { - return err + var ebuf []byte + if index == l.lastIndex+1 { + // Truncate all entries, only care about the last segment + segIdx = len(l.segments) - 1 + s = l.segments[segIdx] + ebuf = nil + } else { + segIdx = l.findSegment(index) + s, err = l.loadSegment(index) + if err != nil { + return err + } + epos := s.epos[index-s.index:] + ebuf = s.ebuf[epos[0].pos:] } - epos := s.epos[index-s.index:] - ebuf := s.ebuf[epos[0].pos:] // Create a temp file contains the truncated segment. tempName := filepath.Join(l.path, "TEMP") if err = func() error { @@ -806,6 +820,7 @@ func (l *Log) truncateFront(index uint64) (err error) { // TruncateBack truncates the back of the log by removing all entries that // are after the provided `index`. In other words the entry at `index` // becomes the last entry in the log. +// If `index` equals to `FirstIndex()-1`, all entries will be truncated. func (l *Log) TruncateBack(index uint64) error { l.mu.Lock() defer l.mu.Unlock() @@ -818,22 +833,30 @@ func (l *Log) TruncateBack(index uint64) error { } func (l *Log) truncateBack(index uint64) (err error) { - if index == 0 || l.lastIndex == 0 || - index < l.firstIndex || index > l.lastIndex { + if index < l.firstIndex-1 || index > l.lastIndex { return ErrOutOfRange } if index == l.lastIndex { // nothing to truncate return nil } - segIdx := l.findSegment(index) + var segIdx int var s *segment - s, err = l.loadSegment(index) - if err != nil { - return err + var ebuf []byte + if index == l.firstIndex-1 { + // Truncate all entries, only care about the first segment + segIdx = 0 + s = l.segments[segIdx] + ebuf = nil + } else { + segIdx = l.findSegment(index) + s, err = l.loadSegment(index) + if err != nil { + return err + } + epos := s.epos[:index-s.index+1] + ebuf = s.ebuf[:epos[len(epos)-1].end] } - epos := s.epos[:index-s.index+1] - ebuf := s.ebuf[:epos[len(epos)-1].end] // Create a temp file contains the truncated segment. tempName := filepath.Join(l.path, "TEMP") if err = func() error { diff --git a/wal_test.go b/wal_test.go index b23a002..eea520a 100644 --- a/wal_test.go +++ b/wal_test.go @@ -1,11 +1,13 @@ package wal import ( + "errors" "fmt" "io/ioutil" "math/rand" "os" "strings" + "sync" "sync/atomic" "testing" ) @@ -30,8 +32,8 @@ func testLog(t *testing.T, opts *Options, N int) { if err != nil { t.Fatal(err) } - if n != 0 { - t.Fatalf("expected %d, got %d", 0, n) + if n != 1 { + t.Fatalf("expected %d, got %d", 1, n) } // LastIndex - should be zero @@ -287,7 +289,7 @@ func testLog(t *testing.T, opts *Options, N int) { } // TruncateFront -- should fail, out of range - for _, i := range []int{0, N + 1} { + for _, i := range []int{0, N + 2} { index := uint64(i) if err = l.TruncateFront(index); err != ErrOutOfRange { t.Fatalf("expected %v, got %v", ErrOutOfRange, err) @@ -332,7 +334,7 @@ func testLog(t *testing.T, opts *Options, N int) { } // TruncateBack -- should fail, out of range - for _, i := range []int{0, 80} { + for _, i := range []int{0, 79} { index := uint64(i) if err = l.TruncateBack(index); err != ErrOutOfRange { t.Fatalf("expected %v, got %v", ErrOutOfRange, err) @@ -416,6 +418,25 @@ func testLog(t *testing.T, opts *Options, N int) { l.Sync() testFirstLast(t, l, uint64(N-1), uint64(N), nil) + + // TruncateFront -- truncate all entries + if err = l.TruncateFront(uint64(N + 1)); err != nil { + t.Fatal(err) + } + testFirstLast(t, l, uint64(N+1), uint64(N), nil) + + err = l.Write(uint64(N+1), []byte(dataStr(uint64(N+1)))) + if err != nil { + t.Fatal(err) + } + N++ + testFirstLast(t, l, uint64(N), uint64(N), nil) + + // TruncateBack -- truncate all entries + if err = l.TruncateBack(uint64(N - 1)); err != nil { + t.Fatal(err) + } + testFirstLast(t, l, uint64(N), uint64(N-1), nil) } func testFirstLast(t *testing.T, l *Log, expectFirst, expectLast uint64, data func(index uint64) []byte) { @@ -890,6 +911,72 @@ func TestConcurrency(t *testing.T) { } } +func TestRWConcurrency(t *testing.T) { + os.RemoveAll("testlog") + + l, err := Open("testlog", &Options{ + NoSync: true, + NoCopy: true, + }) + if err != nil { + t.Fatal(err) + } + defer l.Close() + + wg := sync.WaitGroup{} + wg.Add(2) + notify := make(chan struct{}, 1) + count := 100 + + go func() { + defer wg.Done() + defer close(notify) + idx, _ := l.LastIndex() + for i := 0; i < count; i++ { + idx++ + if err := l.Write(idx, []byte(dataStr(uint64(i)))); err != nil { + t.Fatal(err) + } + select { + case notify <- struct{}{}: + default: + } + } + if idx != uint64(count) { + t.Fatalf("expected last index %d, got %d", count, idx) + } + }() + + go func() { + defer wg.Done() + idx, _ := l.FirstIndex() + for { + var ok bool + select { + case _, ok = <-notify: + } + for { + _, err := l.Read(idx) + if errors.Is(err, ErrNotFound) { + break + } + if err := l.TruncateFront(idx + 1); err != nil { + t.Fatal(err) + } + idx++ + } + if !ok { + break + } + } + if idx != uint64(count+1) { + t.Fatalf("expected first index %d, got %d", count+1, idx) + } + }() + + wg.Wait() +} + func must(v interface{}, err error) interface{} { if err != nil { panic(err)