Reduce repo indexer disk usage (#3452)

release/v1.5
Ethan Koenig 6 years ago committed by Lauris BH
parent 283e87d814
commit a89592d4ab

@ -53,7 +53,7 @@ func populateIssueIndexer() error {
return err
}
for _, issue := range issues {
if err := batch.Add(issue.update()); err != nil {
if err := issue.update().AddToFlushingBatch(batch); err != nil {
return err
}
}
@ -78,7 +78,7 @@ func processIssueIndexerUpdateQueue() {
issue, err := GetIssueByID(issueID)
if err != nil {
log.Error(4, "GetIssueByID: %v", err)
} else if err = batch.Add(issue.update()); err != nil {
} else if err = issue.update().AddToFlushingBatch(batch); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
}

@ -14,6 +14,8 @@ import (
"code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"github.com/ethantkoenig/rupture"
)
// RepoIndexerStatus status of a repo's entry in the repo indexer
@ -187,7 +189,7 @@ func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) {
return nonGenesisChanges(repo, revision)
}
func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error {
func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error {
stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
RunInDir(repo.RepoPath())
if err != nil {
@ -206,24 +208,26 @@ func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error
} else if !base.IsTextFile(fileContents) {
return nil
}
return batch.Add(indexer.RepoIndexerUpdate{
indexerUpdate := indexer.RepoIndexerUpdate{
Filepath: update.Filename,
Op: indexer.RepoIndexerOpUpdate,
Data: &indexer.RepoIndexerData{
RepoID: repo.ID,
Content: string(fileContents),
},
})
}
return indexerUpdate.AddToFlushingBatch(batch)
}
func addDelete(filename string, repo *Repository, batch *indexer.Batch) error {
return batch.Add(indexer.RepoIndexerUpdate{
func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error {
indexerUpdate := indexer.RepoIndexerUpdate{
Filepath: filename,
Op: indexer.RepoIndexerOpDelete,
Data: &indexer.RepoIndexerData{
RepoID: repo.ID,
},
})
}
return indexerUpdate.AddToFlushingBatch(batch)
}
// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command

@ -6,12 +6,17 @@ package indexer
import (
"fmt"
"os"
"strconv"
"code.gitea.io/gitea/modules/setting"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query"
"github.com/ethantkoenig/rupture"
)
// indexerID a bleve-compatible unique identifier for an integer id
@ -53,40 +58,36 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
})
}
// Update represents an update to an indexer
type Update interface {
addToBatch(batch *bleve.Batch) error
}
const maxBatchSize = 16
// Batch batch of indexer updates that automatically flushes once it
// reaches a certain size
type Batch struct {
batch *bleve.Batch
index bleve.Index
}
// Add add update to batch, possibly flushing
func (batch *Batch) Add(update Update) error {
if err := update.addToBatch(batch.batch); err != nil {
return err
// openIndexer open the index at the specified path, checking for metadata
// updates and bleve version updates. If index needs to be created (or
// re-created), returns (nil, nil)
func openIndexer(path string, latestVersion int) (bleve.Index, error) {
_, err := os.Stat(setting.Indexer.IssuePath)
if err != nil && os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
return batch.flushIfFull()
}
func (batch *Batch) flushIfFull() error {
if batch.batch.Size() >= maxBatchSize {
return batch.Flush()
metadata, err := rupture.ReadIndexMetadata(path)
if err != nil {
return nil, err
}
if metadata.Version < latestVersion {
// the indexer is using a previous version, so we should delete it and
// re-populate
return nil, os.RemoveAll(path)
}
return nil
}
// Flush manually flush the batch, regardless of its size
func (batch *Batch) Flush() error {
if err := batch.index.Batch(batch.batch); err != nil {
return err
index, err := bleve.Open(path)
if err != nil && err == upsidedown.IncompatibleVersion {
// the indexer was built with a previous version of bleve, so we should
// delete it and re-populate
return nil, os.RemoveAll(path)
} else if err != nil {
return nil, err
}
batch.batch.Reset()
return nil
return index, nil
}

@ -5,8 +5,6 @@
package indexer
import (
"os"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
@ -14,12 +12,19 @@ import (
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/ethantkoenig/rupture"
)
// issueIndexer (thread-safe) index for searching issues
var issueIndexer bleve.Index
const (
issueIndexerAnalyzer = "issueIndexer"
issueIndexerDocType = "issueIndexerDocType"
issueIndexerLatestVersion = 1
)
// IssueIndexerData data stored in the issue indexer
type IssueIndexerData struct {
RepoID int64
@ -28,35 +33,33 @@ type IssueIndexerData struct {
Comments []string
}
// Type returns the document type, for bleve's mapping.Classifier interface.
func (i *IssueIndexerData) Type() string {
return issueIndexerDocType
}
// IssueIndexerUpdate an update to the issue indexer
type IssueIndexerUpdate struct {
IssueID int64
Data *IssueIndexerData
}
func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error {
return batch.Index(indexerID(update.IssueID), update.Data)
// AddToFlushingBatch adds the update to the given flushing batch.
func (i IssueIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error {
return batch.Index(indexerID(i.IssueID), i.Data)
}
const issueIndexerAnalyzer = "issueIndexer"
// InitIssueIndexer initialize issue indexer
func InitIssueIndexer(populateIndexer func() error) {
_, err := os.Stat(setting.Indexer.IssuePath)
if err != nil && !os.IsNotExist(err) {
var err error
issueIndexer, err = openIndexer(setting.Indexer.IssuePath, issueIndexerLatestVersion)
if err != nil {
log.Fatal(4, "InitIssueIndexer: %v", err)
} else if err == nil {
issueIndexer, err = bleve.Open(setting.Indexer.IssuePath)
if err == nil {
return
} else if err != upsidedown.IncompatibleVersion {
log.Fatal(4, "InitIssueIndexer, open index: %v", err)
}
log.Warn("Incompatible bleve version, deleting and recreating issue indexer")
if err = os.RemoveAll(setting.Indexer.IssuePath); err != nil {
log.Fatal(4, "InitIssueIndexer: remove index, %v", err)
}
}
if issueIndexer != nil {
return
}
if err = createIssueIndexer(); err != nil {
log.Fatal(4, "InitIssuesIndexer: create index, %v", err)
}
@ -70,9 +73,13 @@ func createIssueIndexer() error {
mapping := bleve.NewIndexMapping()
docMapping := bleve.NewDocumentMapping()
docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping())
numericFieldMapping := bleve.NewNumericFieldMapping()
numericFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
textFieldMapping := bleve.NewTextFieldMapping()
textFieldMapping.Store = false
textFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("Title", textFieldMapping)
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
@ -89,7 +96,8 @@ func createIssueIndexer() error {
}
mapping.DefaultAnalyzer = issueIndexerAnalyzer
mapping.AddDocumentMapping("issues", docMapping)
mapping.AddDocumentMapping(issueIndexerDocType, docMapping)
mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
var err error
issueIndexer, err = bleve.New(setting.Indexer.IssuePath, mapping)
@ -97,11 +105,8 @@ func createIssueIndexer() error {
}
// IssueIndexerBatch batch to add updates to
func IssueIndexerBatch() *Batch {
return &Batch{
batch: issueIndexer.NewBatch(),
index: issueIndexer,
}
func IssueIndexerBatch() rupture.FlushingBatch {
return rupture.NewFlushingBatch(issueIndexer, maxBatchSize)
}
// SearchIssuesByKeyword searches for issues by given conditions.

@ -5,7 +5,6 @@
package indexer
import (
"os"
"strings"
"code.gitea.io/gitea/modules/log"
@ -15,10 +14,17 @@ import (
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/camelcase"
"github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/token/unique"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/ethantkoenig/rupture"
)
const repoIndexerAnalyzer = "repoIndexerAnalyzer"
const (
repoIndexerAnalyzer = "repoIndexerAnalyzer"
repoIndexerDocType = "repoIndexerDocType"
repoIndexerLatestVersion = 1
)
// repoIndexer (thread-safe) index for repository contents
var repoIndexer bleve.Index
@ -40,6 +46,11 @@ type RepoIndexerData struct {
Content string
}
// Type returns the document type, for bleve's mapping.Classifier interface.
func (d *RepoIndexerData) Type() string {
return repoIndexerDocType
}
// RepoIndexerUpdate an update to the repo indexer
type RepoIndexerUpdate struct {
Filepath string
@ -47,13 +58,14 @@ type RepoIndexerUpdate struct {
Data *RepoIndexerData
}
func (update RepoIndexerUpdate) addToBatch(batch *bleve.Batch) error {
// AddToFlushingBatch adds the update to the given flushing batch.
func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error {
id := filenameIndexerID(update.Data.RepoID, update.Filepath)
switch update.Op {
case RepoIndexerOpUpdate:
return batch.Index(id, update.Data)
case RepoIndexerOpDelete:
batch.Delete(id)
return batch.Delete(id)
default:
log.Error(4, "Unrecognized repo indexer op: %d", update.Op)
}
@ -62,48 +74,50 @@ func (update RepoIndexerUpdate) addToBatch(batch *bleve.Batch) error {
// InitRepoIndexer initialize repo indexer
func InitRepoIndexer(populateIndexer func() error) {
_, err := os.Stat(setting.Indexer.RepoPath)
var err error
repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion)
if err != nil {
if os.IsNotExist(err) {
if err = createRepoIndexer(); err != nil {
log.Fatal(4, "CreateRepoIndexer: %v", err)
}
if err = populateIndexer(); err != nil {
log.Fatal(4, "PopulateRepoIndex: %v", err)
}
} else {
log.Fatal(4, "InitRepoIndexer: %v", err)
}
} else {
repoIndexer, err = bleve.Open(setting.Indexer.RepoPath)
if err != nil {
log.Fatal(4, "InitRepoIndexer, open index: %v", err)
}
log.Fatal(4, "InitRepoIndexer: %v", err)
}
if repoIndexer != nil {
return
}
if err = createRepoIndexer(); err != nil {
log.Fatal(4, "CreateRepoIndexer: %v", err)
}
if err = populateIndexer(); err != nil {
log.Fatal(4, "PopulateRepoIndex: %v", err)
}
}
// createRepoIndexer create a repo indexer if one does not already exist
func createRepoIndexer() error {
var err error
docMapping := bleve.NewDocumentMapping()
docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping())
numericFieldMapping := bleve.NewNumericFieldMapping()
numericFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
textFieldMapping := bleve.NewTextFieldMapping()
textFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
mapping := bleve.NewIndexMapping()
if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
if err = addUnicodeNormalizeTokenFilter(mapping); err != nil {
return err
} else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{
} else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{
"type": custom.Name,
"char_filters": []string{},
"tokenizer": unicode.Name,
"token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name},
"token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name, unique.Name},
}); err != nil {
return err
}
mapping.DefaultAnalyzer = repoIndexerAnalyzer
mapping.AddDocumentMapping("repo", docMapping)
var err error
mapping.AddDocumentMapping(repoIndexerDocType, docMapping)
mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
repoIndexer, err = bleve.New(setting.Indexer.RepoPath, mapping)
return err
}
@ -121,11 +135,8 @@ func filenameOfIndexerID(indexerID string) string {
}
// RepoIndexerBatch batch to add updates to
func RepoIndexerBatch() *Batch {
return &Batch{
batch: repoIndexer.NewBatch(),
index: repoIndexer,
}
func RepoIndexerBatch() rupture.FlushingBatch {
return rupture.NewFlushingBatch(repoIndexer, maxBatchSize)
}
// DeleteRepoFromIndexer delete all of a repo's files from indexer
@ -138,8 +149,7 @@ func DeleteRepoFromIndexer(repoID int64) error {
}
batch := RepoIndexerBatch()
for _, hit := range result.Hits {
batch.batch.Delete(hit.ID)
if err = batch.flushIfFull(); err != nil {
if err = batch.Delete(hit.ID); err != nil {
return err
}
}

@ -0,0 +1,53 @@
// Copyright (c) 2018 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unique
import (
"github.com/blevesearch/bleve/analysis"
"github.com/blevesearch/bleve/registry"
)
const Name = "unique"
// UniqueTermFilter retains only the tokens which mark the first occurence of
// a term. Tokens whose term appears in a preceding token are dropped.
type UniqueTermFilter struct{}
func NewUniqueTermFilter() *UniqueTermFilter {
return &UniqueTermFilter{}
}
func (f *UniqueTermFilter) Filter(input analysis.TokenStream) analysis.TokenStream {
encounteredTerms := make(map[string]struct{}, len(input)/4)
j := 0
for _, token := range input {
term := string(token.Term)
if _, ok := encounteredTerms[term]; ok {
continue
}
encounteredTerms[term] = struct{}{}
input[j] = token
j++
}
return input[:j]
}
func UniqueTermFilterConstructor(config map[string]interface{}, cache *registry.Cache) (analysis.TokenFilter, error) {
return NewUniqueTermFilter(), nil
}
func init() {
registry.RegisterTokenFilter(Name, UniqueTermFilterConstructor)
}

@ -0,0 +1,173 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
name = "github.com/RoaringBitmap/roaring"
packages = ["."]
revision = "84551f0e309d6f9bafa428ef39b31ab7f16ff7b8"
version = "v0.4.1"
[[projects]]
branch = "master"
name = "github.com/Smerity/govarint"
packages = ["."]
revision = "7265e41f48f15fd61751e16da866af3c704bb3ab"
[[projects]]
name = "github.com/blevesearch/bleve"
packages = [
".",
"analysis",
"analysis/analyzer/standard",
"analysis/datetime/flexible",
"analysis/datetime/optional",
"analysis/lang/en",
"analysis/token/lowercase",
"analysis/token/porter",
"analysis/token/stop",
"analysis/tokenizer/unicode",
"document",
"geo",
"index",
"index/scorch",
"index/scorch/mergeplan",
"index/scorch/segment",
"index/scorch/segment/mem",
"index/scorch/segment/zap",
"index/store",
"index/store/boltdb",
"index/store/gtreap",
"index/upsidedown",
"mapping",
"numeric",
"registry",
"search",
"search/collector",
"search/facet",
"search/highlight",
"search/highlight/format/html",
"search/highlight/fragmenter/simple",
"search/highlight/highlighter/html",
"search/highlight/highlighter/simple",
"search/query",
"search/scorer",
"search/searcher"
]
revision = "a3b125508b4443344b596888ca58467b6c9310b9"
[[projects]]
branch = "master"
name = "github.com/blevesearch/go-porterstemmer"
packages = ["."]
revision = "23a2c8e5cf1f380f27722c6d2ae8896431dc7d0e"
[[projects]]
branch = "master"
name = "github.com/blevesearch/segment"
packages = ["."]
revision = "762005e7a34fd909a84586299f1dd457371d36ee"
[[projects]]
branch = "master"
name = "github.com/boltdb/bolt"
packages = ["."]
revision = "9da31745363232bc1e27dbab3569e77383a51585"
[[projects]]
branch = "master"
name = "github.com/couchbase/vellum"
packages = [
".",
"regexp",
"utf8"
]
revision = "ed84a675e24ed0a0bf6859b1ddec7e7c858354bd"
[[projects]]
name = "github.com/davecgh/go-spew"
packages = ["spew"]
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
branch = "master"
name = "github.com/edsrzf/mmap-go"
packages = ["."]
revision = "0bce6a6887123b67a60366d2c9fe2dfb74289d2e"
[[projects]]
branch = "master"
name = "github.com/glycerine/go-unsnap-stream"
packages = ["."]
revision = "62a9a9eb44fd8932157b1a8ace2149eff5971af6"
[[projects]]
name = "github.com/golang/protobuf"
packages = ["proto"]
revision = "925541529c1fa6821df4e44ce2723319eb2be768"
version = "v1.0.0"
[[projects]]
branch = "master"
name = "github.com/golang/snappy"
packages = ["."]
revision = "553a641470496b2327abcac10b36396bd98e45c9"
[[projects]]
branch = "master"
name = "github.com/mschoch/smat"
packages = ["."]
revision = "90eadee771aeab36e8bf796039b8c261bebebe4f"
[[projects]]
name = "github.com/philhofer/fwd"
packages = ["."]
revision = "bb6d471dc95d4fe11e432687f8b70ff496cf3136"
version = "v1.0.0"
[[projects]]
name = "github.com/pmezard/go-difflib"
packages = ["difflib"]
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
branch = "master"
name = "github.com/steveyen/gtreap"
packages = ["."]
revision = "0abe01ef9be25c4aedc174758ec2d917314d6d70"
[[projects]]
name = "github.com/stretchr/testify"
packages = ["assert"]
revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71"
version = "v1.2.1"
[[projects]]
branch = "master"
name = "github.com/tinylib/msgp"
packages = ["msgp"]
revision = "03a79185462ad029a6e7e05b2f3f3e0498d0a6c0"
[[projects]]
branch = "master"
name = "github.com/willf/bitset"
packages = ["."]
revision = "1a37ad96e8c1a11b20900a232874843b5174221f"
[[projects]]
name = "golang.org/x/net"
packages = ["context"]
revision = "309822c5b9b9f80db67f016069a12628d94fad34"
[[projects]]
name = "golang.org/x/sys"
packages = ["unix"]
revision = "3dbebcf8efb6a5011a60c2b4591c1022a759af8a"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "61c759f0c1136cadf86ae8a30bb78edf33fc844cdcb2316469b4ae14a8d051b0"
solver-name = "gps-cdcl"
solver-version = 1

@ -0,0 +1,34 @@
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
#
# [prune]
# non-go = false
# go-tests = true
# unused-packages = true
[[constraint]]
name = "github.com/stretchr/testify"
version = "1.2.1"
[prune]
go-tests = true
unused-packages = true

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2018 Ethan Koenig
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,13 @@
# rupture
[![Build Status](https://travis-ci.org/ethantkoenig/rupture.svg?branch=master)](https://travis-ci.org/ethantkoenig/rupture) [![GoDoc](https://godoc.org/github.com/ethantkoenig/rupture?status.svg)](https://godoc.org/github.com/ethantkoenig/rupture) [![Go Report Card](https://goreportcard.com/badge/blevesearch/bleve)](https://goreportcard.com/report/blevesearch/bleve)
An explosive companion to the [bleve indexing library](https://www.github.com/blevesearch/bleve)
## Features
`rupture` includes the following additions to `bleve`:
- __Flushing batches__: Batches of operation which automatically flush to the underlying bleve index.
- __Sharded indices__: An index-like abstraction built on top of several underlying indices. Sharded indices provide lower write latencies for indices with large amounts of data.
- __Index metadata__: Track index version for easily managing migrations and schema changes.

@ -0,0 +1,67 @@
package rupture
import (
"github.com/blevesearch/bleve"
)
// FlushingBatch is a batch of operations that automatically flushes to the
// underlying index once it reaches a certain size.
type FlushingBatch interface {
// Index adds the specified index operation batch, possibly triggering a
// flush.
Index(id string, data interface{}) error
// Remove adds the specified delete operation to the batch, possibly
// triggering a flush.
Delete(id string) error
// Flush flushes the batch's contents.
Flush() error
}
type singleIndexFlushingBatch struct {
maxBatchSize int
batch *bleve.Batch
index bleve.Index
}
func newFlushingBatch(index bleve.Index, maxBatchSize int) *singleIndexFlushingBatch {
return &singleIndexFlushingBatch{
maxBatchSize: maxBatchSize,
batch: index.NewBatch(),
index: index,
}
}
// NewFlushingBatch creates a new flushing batch for the specified index. Once
// the number of operations in the batch reaches the specified limit, the batch
// automatically flushes its operations to the index.
func NewFlushingBatch(index bleve.Index, maxBatchSize int) FlushingBatch {
return newFlushingBatch(index, maxBatchSize)
}
func (b *singleIndexFlushingBatch) Index(id string, data interface{}) error {
if err := b.batch.Index(id, data); err != nil {
return err
}
return b.flushIfFull()
}
func (b *singleIndexFlushingBatch) Delete(id string) error {
b.batch.Delete(id)
return b.flushIfFull()
}
func (b *singleIndexFlushingBatch) flushIfFull() error {
if b.batch.Size() < b.maxBatchSize {
return nil
}
return b.Flush()
}
func (b *singleIndexFlushingBatch) Flush() error {
err := b.index.Batch(b.batch)
if err != nil {
return err
}
b.batch.Reset()
return nil
}

@ -0,0 +1,68 @@
package rupture
import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
)
const metaFilename = "rupture_meta.json"
func indexMetadataPath(dir string) string {
return filepath.Join(dir, metaFilename)
}
// IndexMetadata contains metadata about a bleve index.
type IndexMetadata struct {
// The version of the data in the index. This can be useful for tracking
// schema changes or data migrations.
Version int `json:"version"`
}
// in addition to the user-exposed metadata, we keep additional, internal-only
// metadata for sharded indices.
const shardedMetadataFilename = "rupture_sharded_meta.json"
func shardedIndexMetadataPath(dir string) string {
return filepath.Join(dir, shardedMetadataFilename)
}
type shardedIndexMetadata struct {
NumShards int `json:"num_shards"`
}
func readJSON(path string, meta interface{}) error {
metaBytes, err := ioutil.ReadFile(path)
if err != nil {
return err
}
return json.Unmarshal(metaBytes, meta)
}
func writeJSON(path string, meta interface{}) error {
metaBytes, err := json.Marshal(meta)
if err != nil {
return err
}
return ioutil.WriteFile(path, metaBytes, 0666)
}
// ReadIndexMetadata returns the metadata for the index at the specified path.
// If no such index metadata exists, an empty metadata and a nil error are
// returned.
func ReadIndexMetadata(path string) (*IndexMetadata, error) {
meta := &IndexMetadata{}
metaPath := indexMetadataPath(path)
if _, err := os.Stat(metaPath); os.IsNotExist(err) {
return meta, nil
} else if err != nil {
return nil, err
}
return meta, readJSON(metaPath, meta)
}
// WriteIndexMetadata writes metadata for the index at the specified path.
func WriteIndexMetadata(path string, meta *IndexMetadata) error {
return writeJSON(indexMetadataPath(path), meta)
}

@ -0,0 +1,146 @@
package rupture
import (
"fmt"
"hash/fnv"
"path/filepath"
"strconv"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/mapping"
)
// ShardedIndex an index that is built onto of multiple underlying bleve
// indices (i.e. shards). Similar to bleve's index aliases, some methods may
// not be supported.
type ShardedIndex interface {
bleve.Index
shards() []bleve.Index
}
// a type alias for bleve.Index, so that the anonymous field of
// shardedIndex does not conflict with the Index(..) method.
type bleveIndex bleve.Index
type shardedIndex struct {
bleveIndex
indices []bleve.Index
}
func hash(id string, n int) uint64 {
fnvHash := fnv.New64()
fnvHash.Write([]byte(id))
return fnvHash.Sum64() % uint64(n)
}
func childIndexerPath(rootPath string, i int) string {
return filepath.Join(rootPath, strconv.Itoa(i))
}
// NewShardedIndex creates a sharded index at the specified path, with the
// specified mapping and number of shards.
func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) {
if numShards <= 0 {
return nil, fmt.Errorf("Invalid number of shards: %d", numShards)
}
err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards})
if err != nil {
return nil, err
}
s := &shardedIndex{
indices: make([]bleve.Index, numShards),
}
for i := 0; i < numShards; i++ {
s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping)
if err != nil {
return nil, err
}
}
s.bleveIndex = bleve.NewIndexAlias(s.indices...)
return s, nil
}
// OpenShardedIndex opens a sharded index at the specified path.
func OpenShardedIndex(path string) (ShardedIndex, error) {
var meta shardedIndexMetadata
var err error
if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil {
return nil, err
}
s := &shardedIndex{
indices: make([]bleve.Index, meta.NumShards),
}
for i := 0; i < meta.NumShards; i++ {
s.indices[i], err = bleve.Open(childIndexerPath(path, i))
if err != nil {
return nil, err
}
}
s.bleveIndex = bleve.NewIndexAlias(s.indices...)
return s, nil
}
func (s *shardedIndex) Index(id string, data interface{}) error {
return s.indices[hash(id, len(s.indices))].Index(id, data)
}
func (s *shardedIndex) Delete(id string) error {
return s.indices[hash(id, len(s.indices))].Delete(id)
}
func (s *shardedIndex) Document(id string) (*document.Document, error) {
return s.indices[hash(id, len(s.indices))].Document(id)
}
func (s *shardedIndex) Close() error {
if err := s.bleveIndex.Close(); err != nil {
return err
}
for _, index := range s.indices {
if err := index.Close(); err != nil {
return err
}
}
return nil
}
func (s *shardedIndex) shards() []bleve.Index {
return s.indices
}
type shardedIndexFlushingBatch struct {
batches []*singleIndexFlushingBatch
}
// NewShardedFlushingBatch creates a flushing batch with the specified batch
// size for the specified sharded index.
func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch {
indices := index.shards()
b := &shardedIndexFlushingBatch{
batches: make([]*singleIndexFlushingBatch, len(indices)),
}
for i, index := range indices {
b.batches[i] = newFlushingBatch(index, maxBatchSize)
}
return b
}
func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error {
return b.batches[hash(id, len(b.batches))].Index(id, data)
}
func (b *shardedIndexFlushingBatch) Delete(id string) error {
return b.batches[hash(id, len(b.batches))].Delete(id)
}
func (b *shardedIndexFlushingBatch) Flush() error {
for _, batch := range b.batches {
if err := batch.Flush(); err != nil {
return err
}
}
return nil
}

12
vendor/vendor.json vendored

@ -128,6 +128,12 @@
"revision": "174f8ed44a0bf65e7c8fb228b60b58de62654cd2",
"revisionTime": "2017-06-28T17:18:15Z"
},
{
"checksumSHA1": "unacAFTLwgpg7wyI/mYf7Zd9eaU=",
"path": "github.com/blevesearch/bleve/analysis/token/unique",
"revision": "ff210fbc6d348ad67aa5754eaea11a463fcddafd",
"revisionTime": "2018-02-01T18:20:06Z"
},
{
"checksumSHA1": "q7C04nlJLxKmemXLop0oyJhfi5M=",
"path": "github.com/blevesearch/bleve/analysis/tokenizer/unicode",
@ -347,6 +353,12 @@
"revision": "57eb5e1fc594ad4b0b1dbea7b286d299e0cb43c2",
"revisionTime": "2015-12-24T04:54:52Z"
},
{
"checksumSHA1": "06ofBxeJ9c4LS2p31PCMIj7IjJU=",
"path": "github.com/ethantkoenig/rupture",
"revision": "0a76f03a811abcca2e6357329b673e9bb8ef9643",
"revisionTime": "2018-02-03T18:25:44Z"
},
{
"checksumSHA1": "imR2wF388/0fBU6RRWx8RvTi8Q8=",
"path": "github.com/facebookgo/clock",

Loading…
Cancel
Save