You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gitea-fork-majority-judgment/vendor/github.com/blevesearch/bleve/index/scorch/merge.go

190 lines
5.0 KiB

// Copyright (c) 2017 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
"fmt"
"os"
"sync/atomic"
"time"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index/scorch/mergeplan"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
)
func (s *Scorch) mergerLoop() {
var lastEpochMergePlanned uint64
OUTER:
for {
select {
case <-s.closeCh:
break OUTER
default:
// check to see if there is a new snapshot to persist
s.rootLock.RLock()
ourSnapshot := s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
startTime := time.Now()
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
}
_ = ourSnapshot.DecRef()
// tell the persister we're waiting for changes
// first make a notification chan
notifyUs := make(notificationChan)
// give it to the persister
select {
case <-s.closeCh:
break OUTER
case s.persisterNotifier <- notifyUs:
}
// check again
s.rootLock.RLock()
ourSnapshot = s.root
ourSnapshot.AddRef()
s.rootLock.RUnlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
startTime := time.Now()
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
continue OUTER
}
lastEpochMergePlanned = ourSnapshot.epoch
s.fireEvent(EventKindMergerProgress, time.Since(startTime))
}
_ = ourSnapshot.DecRef()
// now wait for it (but also detect close)
select {
case <-s.closeCh:
break OUTER
case <-notifyUs:
// woken up, next loop should pick up work
}
}
}
s.asyncTasks.Done()
}
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot) error {
// build list of zap segments in this snapshot
var onlyZapSnapshots []mergeplan.Segment
for _, segmentSnapshot := range ourSnapshot.segment {
if _, ok := segmentSnapshot.segment.(*zap.Segment); ok {
onlyZapSnapshots = append(onlyZapSnapshots, segmentSnapshot)
}
}
// give this list to the planner
resultMergePlan, err := mergeplan.Plan(onlyZapSnapshots, nil)
if err != nil {
return fmt.Errorf("merge planning err: %v", err)
}
if resultMergePlan == nil {
// nothing to do
return nil
}
// process tasks in serial for now
var notifications []notificationChan
for _, task := range resultMergePlan.Tasks {
oldMap := make(map[uint64]*SegmentSnapshot)
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
segmentsToMerge := make([]*zap.Segment, 0, len(task.Segments))
docsToDrop := make([]*roaring.Bitmap, 0, len(task.Segments))
for _, planSegment := range task.Segments {
if segSnapshot, ok := planSegment.(*SegmentSnapshot); ok {
oldMap[segSnapshot.id] = segSnapshot
if zapSeg, ok := segSnapshot.segment.(*zap.Segment); ok {
segmentsToMerge = append(segmentsToMerge, zapSeg)
docsToDrop = append(docsToDrop, segSnapshot.deleted)
}
}
}
filename := zapFileName(newSegmentID)
s.markIneligibleForRemoval(filename)
path := s.path + string(os.PathSeparator) + filename
newDocNums, err := zap.Merge(segmentsToMerge, docsToDrop, path, DefaultChunkFactor)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return fmt.Errorf("merging failed: %v", err)
}
segment, err := zap.Open(path)
if err != nil {
s.unmarkIneligibleForRemoval(filename)
return err
}
sm := &segmentMerge{
id: newSegmentID,
old: oldMap,
oldNewDocNums: make(map[uint64][]uint64),
new: segment,
notify: make(notificationChan),
}
notifications = append(notifications, sm.notify)
for i, segNewDocNums := range newDocNums {
sm.oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
}
// give it to the introducer
select {
case <-s.closeCh:
return nil
case s.merges <- sm:
}
}
for _, notification := range notifications {
select {
case <-s.closeCh:
return nil
case <-notification:
}
}
return nil
}
type segmentMerge struct {
id uint64
old map[uint64]*SegmentSnapshot
oldNewDocNums map[uint64][]uint64
new segment.Segment
notify notificationChan
}