Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 51 additions & 28 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (l *Log) writeBatch(b *Batch) error {
return nil
}

// FirstIndex returns the index of the first entry in the log. Returns zero
// when log has no entries.
// FirstIndex returns the index of the next entry to read in the log.
// It points to the next future index if the log is currently empty.
func (l *Log) FirstIndex() (index uint64, err error) {
l.mu.RLock()
defer l.mu.RUnlock()
Expand All @@ -491,16 +491,17 @@ func (l *Log) FirstIndex() (index uint64, err error) {
} else if l.closed {
return 0, ErrClosed
}
// We check the lastIndex for zero because the firstIndex is always one or
// more, even when there's no entries
if l.lastIndex == 0 {
return 0, nil
}
// No longer check the lastIndex for zero because we allow empty logs since
// #31 was merged.
// https://github.com/tidwall/wal/pull/31
//if l.lastIndex == 0 {
// return 0, nil
//}
return l.firstIndex, nil
}

// LastIndex returns the index of the last entry in the log. Returns zero when
// log has no entries.
// LastIndex returns the index of the last entry has been written to the log.
// It points to the last deleted index if the log is currently empty.
func (l *Log) LastIndex() (index uint64, err error) {
l.mu.RLock()
defer l.mu.RUnlock()
Expand All @@ -509,9 +510,12 @@ func (l *Log) LastIndex() (index uint64, err error) {
} else if l.closed {
return 0, ErrClosed
}
if l.lastIndex == 0 {
return 0, nil
}
// No longer check the lastIndex for zero because we allow empty logs since
// #31 was merged.
// https://github.com/tidwall/wal/pull/31
//if l.lastIndex == 0 {
// return 0, nil
//}
return l.lastIndex, nil
}

Expand Down Expand Up @@ -699,6 +703,7 @@ func (l *Log) clearCache() {
// TruncateFront truncates the front of the log by removing all entries that
// are before the provided `index`. In other words the entry at
// `index` becomes the first entry in the log.
// If `index` equals to `LastIndex()+1`, all entries will be truncated.
func (l *Log) TruncateFront(index uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -709,23 +714,32 @@ func (l *Log) TruncateFront(index uint64) error {
}
return l.truncateFront(index)
}

func (l *Log) truncateFront(index uint64) (err error) {
if index == 0 || l.lastIndex == 0 ||
index < l.firstIndex || index > l.lastIndex {
if index < l.firstIndex || index > l.lastIndex+1 {
return ErrOutOfRange
}
if index == l.firstIndex {
// nothing to truncate
return nil
}
segIdx := l.findSegment(index)
var segIdx int
var s *segment
s, err = l.loadSegment(index)
if err != nil {
return err
var ebuf []byte
if index == l.lastIndex+1 {
// Truncate all entries, only care about the last segment
segIdx = len(l.segments) - 1
s = l.segments[segIdx]
ebuf = nil
} else {
segIdx = l.findSegment(index)
s, err = l.loadSegment(index)
if err != nil {
return err
}
epos := s.epos[index-s.index:]
ebuf = s.ebuf[epos[0].pos:]
}
epos := s.epos[index-s.index:]
ebuf := s.ebuf[epos[0].pos:]
// Create a temp file contains the truncated segment.
tempName := filepath.Join(l.path, "TEMP")
if err = func() error {
Expand Down Expand Up @@ -806,6 +820,7 @@ func (l *Log) truncateFront(index uint64) (err error) {
// TruncateBack truncates the back of the log by removing all entries that
// are after the provided `index`. In other words the entry at `index`
// becomes the last entry in the log.
// If `index` equals to `FirstIndex()-1`, all entries will be truncated.
func (l *Log) TruncateBack(index uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -818,22 +833,30 @@ func (l *Log) TruncateBack(index uint64) error {
}

func (l *Log) truncateBack(index uint64) (err error) {
if index == 0 || l.lastIndex == 0 ||
index < l.firstIndex || index > l.lastIndex {
if index < l.firstIndex-1 || index > l.lastIndex {
return ErrOutOfRange
}
if index == l.lastIndex {
// nothing to truncate
return nil
}
segIdx := l.findSegment(index)
var segIdx int
var s *segment
s, err = l.loadSegment(index)
if err != nil {
return err
var ebuf []byte
if index == l.firstIndex-1 {
// Truncate all entries, only care about the first segment
segIdx = 0
s = l.segments[segIdx]
ebuf = nil
} else {
segIdx = l.findSegment(index)
s, err = l.loadSegment(index)
if err != nil {
return err
}
epos := s.epos[:index-s.index+1]
ebuf = s.ebuf[:epos[len(epos)-1].end]
}
epos := s.epos[:index-s.index+1]
ebuf := s.ebuf[:epos[len(epos)-1].end]
// Create a temp file contains the truncated segment.
tempName := filepath.Join(l.path, "TEMP")
if err = func() error {
Expand Down
95 changes: 91 additions & 4 deletions wal_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package wal

import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
)
Expand All @@ -30,8 +32,8 @@ func testLog(t *testing.T, opts *Options, N int) {
if err != nil {
t.Fatal(err)
}
if n != 0 {
t.Fatalf("expected %d, got %d", 0, n)
if n != 1 {
t.Fatalf("expected %d, got %d", 1, n)
}

// LastIndex - should be zero
Expand Down Expand Up @@ -287,7 +289,7 @@ func testLog(t *testing.T, opts *Options, N int) {
}

// TruncateFront -- should fail, out of range
for _, i := range []int{0, N + 1} {
for _, i := range []int{0, N + 2} {
index := uint64(i)
if err = l.TruncateFront(index); err != ErrOutOfRange {
t.Fatalf("expected %v, got %v", ErrOutOfRange, err)
Expand Down Expand Up @@ -332,7 +334,7 @@ func testLog(t *testing.T, opts *Options, N int) {
}

// TruncateBack -- should fail, out of range
for _, i := range []int{0, 80} {
for _, i := range []int{0, 79} {
index := uint64(i)
if err = l.TruncateBack(index); err != ErrOutOfRange {
t.Fatalf("expected %v, got %v", ErrOutOfRange, err)
Expand Down Expand Up @@ -416,6 +418,25 @@ func testLog(t *testing.T, opts *Options, N int) {

l.Sync()
testFirstLast(t, l, uint64(N-1), uint64(N), nil)

// TruncateFront -- truncate all entries
if err = l.TruncateFront(uint64(N + 1)); err != nil {
t.Fatal(err)
}
testFirstLast(t, l, uint64(N+1), uint64(N), nil)

err = l.Write(uint64(N+1), []byte(dataStr(uint64(N+1))))
if err != nil {
t.Fatal(err)
}
N++
testFirstLast(t, l, uint64(N), uint64(N), nil)

// TruncateBack -- truncate all entries
if err = l.TruncateBack(uint64(N - 1)); err != nil {
t.Fatal(err)
}
testFirstLast(t, l, uint64(N), uint64(N-1), nil)
}

func testFirstLast(t *testing.T, l *Log, expectFirst, expectLast uint64, data func(index uint64) []byte) {
Expand Down Expand Up @@ -890,6 +911,72 @@ func TestConcurrency(t *testing.T) {
}
}

func TestRWConcurrency(t *testing.T) {
os.RemoveAll("testlog")

l, err := Open("testlog", &Options{
NoSync: true,
NoCopy: true,
})
if err != nil {
t.Fatal(err)
}
defer l.Close()

wg := sync.WaitGroup{}
wg.Add(2)
notify := make(chan struct{}, 1)
count := 100

go func() {
defer wg.Done()
defer close(notify)
idx, _ := l.LastIndex()
for i := 0; i < count; i++ {
idx++
if err := l.Write(idx, []byte(dataStr(uint64(i)))); err != nil {
t.Fatal(err)
}
select {
case notify <- struct{}{}:
default:
}
}
if idx != uint64(count) {
t.Fatalf("expected last index %d, got %d", count, idx)
}
}()

go func() {
defer wg.Done()
idx, _ := l.FirstIndex()
for {
var ok bool
select {
case _, ok = <-notify:
}
for {
_, err := l.Read(idx)
if errors.Is(err, ErrNotFound) {
break
}
if err := l.TruncateFront(idx + 1); err != nil {
t.Fatal(err)
}
idx++
}
if !ok {
break
}
}
if idx != uint64(count+1) {
t.Fatalf("expected first index %d, got %d", count+1, idx)
}
}()

wg.Wait()
}

func must(v interface{}, err error) interface{} {
if err != nil {
panic(err)
Expand Down