diff --git a/models/issue_indexer.go b/models/issue_indexer.go index 922b66f95..b94ba5f2d 100644 --- a/models/issue_indexer.go +++ b/models/issue_indexer.go @@ -53,7 +53,7 @@ func populateIssueIndexer() error { return err } for _, issue := range issues { - if err := batch.Add(issue.update()); err != nil { + if err := issue.update().AddToFlushingBatch(batch); err != nil { return err } } @@ -78,7 +78,7 @@ func processIssueIndexerUpdateQueue() { issue, err := GetIssueByID(issueID) if err != nil { log.Error(4, "GetIssueByID: %v", err) - } else if err = batch.Add(issue.update()); err != nil { + } else if err = issue.update().AddToFlushingBatch(batch); err != nil { log.Error(4, "IssueIndexer: %v", err) } } diff --git a/models/repo_indexer.go b/models/repo_indexer.go index fee478479..ecd629587 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -14,6 +14,8 @@ import ( "code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" + + "github.com/ethantkoenig/rupture" ) // RepoIndexerStatus status of a repo's entry in the repo indexer @@ -187,7 +189,7 @@ func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) { return nonGenesisChanges(repo, revision) } -func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error { +func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error { stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). RunInDir(repo.RepoPath()) if err != nil { @@ -206,24 +208,26 @@ func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error } else if !base.IsTextFile(fileContents) { return nil } - return batch.Add(indexer.RepoIndexerUpdate{ + indexerUpdate := indexer.RepoIndexerUpdate{ Filepath: update.Filename, Op: indexer.RepoIndexerOpUpdate, Data: &indexer.RepoIndexerData{ RepoID: repo.ID, Content: string(fileContents), }, - }) + } + return indexerUpdate.AddToFlushingBatch(batch) } -func addDelete(filename string, repo *Repository, batch *indexer.Batch) error { - return batch.Add(indexer.RepoIndexerUpdate{ +func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error { + indexerUpdate := indexer.RepoIndexerUpdate{ Filepath: filename, Op: indexer.RepoIndexerOpDelete, Data: &indexer.RepoIndexerData{ RepoID: repo.ID, }, - }) + } + return indexerUpdate.AddToFlushingBatch(batch) } // parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command diff --git a/modules/indexer/indexer.go b/modules/indexer/indexer.go index d5bdd51f9..9e12a7f50 100644 --- a/modules/indexer/indexer.go +++ b/modules/indexer/indexer.go @@ -6,12 +6,17 @@ package indexer import ( "fmt" + "os" "strconv" + "code.gitea.io/gitea/modules/setting" + "github.com/blevesearch/bleve" "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/index/upsidedown" "github.com/blevesearch/bleve/mapping" "github.com/blevesearch/bleve/search/query" + "github.com/ethantkoenig/rupture" ) // indexerID a bleve-compatible unique identifier for an integer id @@ -53,40 +58,36 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { }) } -// Update represents an update to an indexer -type Update interface { - addToBatch(batch *bleve.Batch) error -} - const maxBatchSize = 16 -// Batch batch of indexer updates that automatically flushes once it -// reaches a certain size -type Batch struct { - batch *bleve.Batch - index bleve.Index -} - -// Add add update to batch, possibly flushing -func (batch *Batch) Add(update Update) error { - if err := update.addToBatch(batch.batch); err != nil { - return err +// openIndexer open the index at the specified path, checking for metadata +// updates and bleve version updates. If index needs to be created (or +// re-created), returns (nil, nil) +func openIndexer(path string, latestVersion int) (bleve.Index, error) { + _, err := os.Stat(setting.Indexer.IssuePath) + if err != nil && os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err } - return batch.flushIfFull() -} -func (batch *Batch) flushIfFull() error { - if batch.batch.Size() >= maxBatchSize { - return batch.Flush() + metadata, err := rupture.ReadIndexMetadata(path) + if err != nil { + return nil, err + } + if metadata.Version < latestVersion { + // the indexer is using a previous version, so we should delete it and + // re-populate + return nil, os.RemoveAll(path) } - return nil -} -// Flush manually flush the batch, regardless of its size -func (batch *Batch) Flush() error { - if err := batch.index.Batch(batch.batch); err != nil { - return err + index, err := bleve.Open(path) + if err != nil && err == upsidedown.IncompatibleVersion { + // the indexer was built with a previous version of bleve, so we should + // delete it and re-populate + return nil, os.RemoveAll(path) + } else if err != nil { + return nil, err } - batch.batch.Reset() - return nil + return index, nil } diff --git a/modules/indexer/issue.go b/modules/indexer/issue.go index 62a18e2b3..b0d231a7c 100644 --- a/modules/indexer/issue.go +++ b/modules/indexer/issue.go @@ -5,8 +5,6 @@ package indexer import ( - "os" - "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" @@ -14,12 +12,19 @@ import ( "github.com/blevesearch/bleve/analysis/analyzer/custom" "github.com/blevesearch/bleve/analysis/token/lowercase" "github.com/blevesearch/bleve/analysis/tokenizer/unicode" - "github.com/blevesearch/bleve/index/upsidedown" + "github.com/ethantkoenig/rupture" ) // issueIndexer (thread-safe) index for searching issues var issueIndexer bleve.Index +const ( + issueIndexerAnalyzer = "issueIndexer" + issueIndexerDocType = "issueIndexerDocType" + + issueIndexerLatestVersion = 1 +) + // IssueIndexerData data stored in the issue indexer type IssueIndexerData struct { RepoID int64 @@ -28,35 +33,33 @@ type IssueIndexerData struct { Comments []string } +// Type returns the document type, for bleve's mapping.Classifier interface. +func (i *IssueIndexerData) Type() string { + return issueIndexerDocType +} + // IssueIndexerUpdate an update to the issue indexer type IssueIndexerUpdate struct { IssueID int64 Data *IssueIndexerData } -func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error { - return batch.Index(indexerID(update.IssueID), update.Data) +// AddToFlushingBatch adds the update to the given flushing batch. +func (i IssueIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { + return batch.Index(indexerID(i.IssueID), i.Data) } -const issueIndexerAnalyzer = "issueIndexer" - // InitIssueIndexer initialize issue indexer func InitIssueIndexer(populateIndexer func() error) { - _, err := os.Stat(setting.Indexer.IssuePath) - if err != nil && !os.IsNotExist(err) { + var err error + issueIndexer, err = openIndexer(setting.Indexer.IssuePath, issueIndexerLatestVersion) + if err != nil { log.Fatal(4, "InitIssueIndexer: %v", err) - } else if err == nil { - issueIndexer, err = bleve.Open(setting.Indexer.IssuePath) - if err == nil { - return - } else if err != upsidedown.IncompatibleVersion { - log.Fatal(4, "InitIssueIndexer, open index: %v", err) - } - log.Warn("Incompatible bleve version, deleting and recreating issue indexer") - if err = os.RemoveAll(setting.Indexer.IssuePath); err != nil { - log.Fatal(4, "InitIssueIndexer: remove index, %v", err) - } } + if issueIndexer != nil { + return + } + if err = createIssueIndexer(); err != nil { log.Fatal(4, "InitIssuesIndexer: create index, %v", err) } @@ -70,9 +73,13 @@ func createIssueIndexer() error { mapping := bleve.NewIndexMapping() docMapping := bleve.NewDocumentMapping() - docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping()) + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.Store = false + textFieldMapping.IncludeInAll = false docMapping.AddFieldMappingsAt("Title", textFieldMapping) docMapping.AddFieldMappingsAt("Content", textFieldMapping) docMapping.AddFieldMappingsAt("Comments", textFieldMapping) @@ -89,7 +96,8 @@ func createIssueIndexer() error { } mapping.DefaultAnalyzer = issueIndexerAnalyzer - mapping.AddDocumentMapping("issues", docMapping) + mapping.AddDocumentMapping(issueIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) var err error issueIndexer, err = bleve.New(setting.Indexer.IssuePath, mapping) @@ -97,11 +105,8 @@ func createIssueIndexer() error { } // IssueIndexerBatch batch to add updates to -func IssueIndexerBatch() *Batch { - return &Batch{ - batch: issueIndexer.NewBatch(), - index: issueIndexer, - } +func IssueIndexerBatch() rupture.FlushingBatch { + return rupture.NewFlushingBatch(issueIndexer, maxBatchSize) } // SearchIssuesByKeyword searches for issues by given conditions. diff --git a/modules/indexer/repo.go b/modules/indexer/repo.go index 226e565e3..ffb1dc1e6 100644 --- a/modules/indexer/repo.go +++ b/modules/indexer/repo.go @@ -5,7 +5,6 @@ package indexer import ( - "os" "strings" "code.gitea.io/gitea/modules/log" @@ -15,10 +14,17 @@ import ( "github.com/blevesearch/bleve/analysis/analyzer/custom" "github.com/blevesearch/bleve/analysis/token/camelcase" "github.com/blevesearch/bleve/analysis/token/lowercase" + "github.com/blevesearch/bleve/analysis/token/unique" "github.com/blevesearch/bleve/analysis/tokenizer/unicode" + "github.com/ethantkoenig/rupture" ) -const repoIndexerAnalyzer = "repoIndexerAnalyzer" +const ( + repoIndexerAnalyzer = "repoIndexerAnalyzer" + repoIndexerDocType = "repoIndexerDocType" + + repoIndexerLatestVersion = 1 +) // repoIndexer (thread-safe) index for repository contents var repoIndexer bleve.Index @@ -40,6 +46,11 @@ type RepoIndexerData struct { Content string } +// Type returns the document type, for bleve's mapping.Classifier interface. +func (d *RepoIndexerData) Type() string { + return repoIndexerDocType +} + // RepoIndexerUpdate an update to the repo indexer type RepoIndexerUpdate struct { Filepath string @@ -47,13 +58,14 @@ type RepoIndexerUpdate struct { Data *RepoIndexerData } -func (update RepoIndexerUpdate) addToBatch(batch *bleve.Batch) error { +// AddToFlushingBatch adds the update to the given flushing batch. +func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error { id := filenameIndexerID(update.Data.RepoID, update.Filepath) switch update.Op { case RepoIndexerOpUpdate: return batch.Index(id, update.Data) case RepoIndexerOpDelete: - batch.Delete(id) + return batch.Delete(id) default: log.Error(4, "Unrecognized repo indexer op: %d", update.Op) } @@ -62,48 +74,50 @@ func (update RepoIndexerUpdate) addToBatch(batch *bleve.Batch) error { // InitRepoIndexer initialize repo indexer func InitRepoIndexer(populateIndexer func() error) { - _, err := os.Stat(setting.Indexer.RepoPath) + var err error + repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion) if err != nil { - if os.IsNotExist(err) { - if err = createRepoIndexer(); err != nil { - log.Fatal(4, "CreateRepoIndexer: %v", err) - } - if err = populateIndexer(); err != nil { - log.Fatal(4, "PopulateRepoIndex: %v", err) - } - } else { - log.Fatal(4, "InitRepoIndexer: %v", err) - } - } else { - repoIndexer, err = bleve.Open(setting.Indexer.RepoPath) - if err != nil { - log.Fatal(4, "InitRepoIndexer, open index: %v", err) - } + log.Fatal(4, "InitRepoIndexer: %v", err) + } + if repoIndexer != nil { + return + } + + if err = createRepoIndexer(); err != nil { + log.Fatal(4, "CreateRepoIndexer: %v", err) + } + if err = populateIndexer(); err != nil { + log.Fatal(4, "PopulateRepoIndex: %v", err) } } // createRepoIndexer create a repo indexer if one does not already exist func createRepoIndexer() error { + var err error docMapping := bleve.NewDocumentMapping() - docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping()) + numericFieldMapping := bleve.NewNumericFieldMapping() + numericFieldMapping.IncludeInAll = false + docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping) textFieldMapping := bleve.NewTextFieldMapping() + textFieldMapping.IncludeInAll = false docMapping.AddFieldMappingsAt("Content", textFieldMapping) mapping := bleve.NewIndexMapping() - if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { + if err = addUnicodeNormalizeTokenFilter(mapping); err != nil { return err - } else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ + } else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{ "type": custom.Name, "char_filters": []string{}, "tokenizer": unicode.Name, - "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name}, + "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name, unique.Name}, }); err != nil { return err } mapping.DefaultAnalyzer = repoIndexerAnalyzer - mapping.AddDocumentMapping("repo", docMapping) - var err error + mapping.AddDocumentMapping(repoIndexerDocType, docMapping) + mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) + repoIndexer, err = bleve.New(setting.Indexer.RepoPath, mapping) return err } @@ -121,11 +135,8 @@ func filenameOfIndexerID(indexerID string) string { } // RepoIndexerBatch batch to add updates to -func RepoIndexerBatch() *Batch { - return &Batch{ - batch: repoIndexer.NewBatch(), - index: repoIndexer, - } +func RepoIndexerBatch() rupture.FlushingBatch { + return rupture.NewFlushingBatch(repoIndexer, maxBatchSize) } // DeleteRepoFromIndexer delete all of a repo's files from indexer @@ -138,8 +149,7 @@ func DeleteRepoFromIndexer(repoID int64) error { } batch := RepoIndexerBatch() for _, hit := range result.Hits { - batch.batch.Delete(hit.ID) - if err = batch.flushIfFull(); err != nil { + if err = batch.Delete(hit.ID); err != nil { return err } } diff --git a/vendor/github.com/blevesearch/bleve/analysis/token/unique/unique.go b/vendor/github.com/blevesearch/bleve/analysis/token/unique/unique.go new file mode 100644 index 000000000..f0d96c504 --- /dev/null +++ b/vendor/github.com/blevesearch/bleve/analysis/token/unique/unique.go @@ -0,0 +1,53 @@ +// Copyright (c) 2018 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package unique + +import ( + "github.com/blevesearch/bleve/analysis" + "github.com/blevesearch/bleve/registry" +) + +const Name = "unique" + +// UniqueTermFilter retains only the tokens which mark the first occurence of +// a term. Tokens whose term appears in a preceding token are dropped. +type UniqueTermFilter struct{} + +func NewUniqueTermFilter() *UniqueTermFilter { + return &UniqueTermFilter{} +} + +func (f *UniqueTermFilter) Filter(input analysis.TokenStream) analysis.TokenStream { + encounteredTerms := make(map[string]struct{}, len(input)/4) + j := 0 + for _, token := range input { + term := string(token.Term) + if _, ok := encounteredTerms[term]; ok { + continue + } + encounteredTerms[term] = struct{}{} + input[j] = token + j++ + } + return input[:j] +} + +func UniqueTermFilterConstructor(config map[string]interface{}, cache *registry.Cache) (analysis.TokenFilter, error) { + return NewUniqueTermFilter(), nil +} + +func init() { + registry.RegisterTokenFilter(Name, UniqueTermFilterConstructor) +} diff --git a/vendor/github.com/ethantkoenig/rupture/Gopkg.lock b/vendor/github.com/ethantkoenig/rupture/Gopkg.lock new file mode 100644 index 000000000..86e495e78 --- /dev/null +++ b/vendor/github.com/ethantkoenig/rupture/Gopkg.lock @@ -0,0 +1,173 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + name = "github.com/RoaringBitmap/roaring" + packages = ["."] + revision = "84551f0e309d6f9bafa428ef39b31ab7f16ff7b8" + version = "v0.4.1" + +[[projects]] + branch = "master" + name = "github.com/Smerity/govarint" + packages = ["."] + revision = "7265e41f48f15fd61751e16da866af3c704bb3ab" + +[[projects]] + name = "github.com/blevesearch/bleve" + packages = [ + ".", + "analysis", + "analysis/analyzer/standard", + "analysis/datetime/flexible", + "analysis/datetime/optional", + "analysis/lang/en", + "analysis/token/lowercase", + "analysis/token/porter", + "analysis/token/stop", + "analysis/tokenizer/unicode", + "document", + "geo", + "index", + "index/scorch", + "index/scorch/mergeplan", + "index/scorch/segment", + "index/scorch/segment/mem", + "index/scorch/segment/zap", + "index/store", + "index/store/boltdb", + "index/store/gtreap", + "index/upsidedown", + "mapping", + "numeric", + "registry", + "search", + "search/collector", + "search/facet", + "search/highlight", + "search/highlight/format/html", + "search/highlight/fragmenter/simple", + "search/highlight/highlighter/html", + "search/highlight/highlighter/simple", + "search/query", + "search/scorer", + "search/searcher" + ] + revision = "a3b125508b4443344b596888ca58467b6c9310b9" + +[[projects]] + branch = "master" + name = "github.com/blevesearch/go-porterstemmer" + packages = ["."] + revision = "23a2c8e5cf1f380f27722c6d2ae8896431dc7d0e" + +[[projects]] + branch = "master" + name = "github.com/blevesearch/segment" + packages = ["."] + revision = "762005e7a34fd909a84586299f1dd457371d36ee" + +[[projects]] + branch = "master" + name = "github.com/boltdb/bolt" + packages = ["."] + revision = "9da31745363232bc1e27dbab3569e77383a51585" + +[[projects]] + branch = "master" + name = "github.com/couchbase/vellum" + packages = [ + ".", + "regexp", + "utf8" + ] + revision = "ed84a675e24ed0a0bf6859b1ddec7e7c858354bd" + +[[projects]] + name = "github.com/davecgh/go-spew" + packages = ["spew"] + revision = "346938d642f2ec3594ed81d874461961cd0faa76" + version = "v1.1.0" + +[[projects]] + branch = "master" + name = "github.com/edsrzf/mmap-go" + packages = ["."] + revision = "0bce6a6887123b67a60366d2c9fe2dfb74289d2e" + +[[projects]] + branch = "master" + name = "github.com/glycerine/go-unsnap-stream" + packages = ["."] + revision = "62a9a9eb44fd8932157b1a8ace2149eff5971af6" + +[[projects]] + name = "github.com/golang/protobuf" + packages = ["proto"] + revision = "925541529c1fa6821df4e44ce2723319eb2be768" + version = "v1.0.0" + +[[projects]] + branch = "master" + name = "github.com/golang/snappy" + packages = ["."] + revision = "553a641470496b2327abcac10b36396bd98e45c9" + +[[projects]] + branch = "master" + name = "github.com/mschoch/smat" + packages = ["."] + revision = "90eadee771aeab36e8bf796039b8c261bebebe4f" + +[[projects]] + name = "github.com/philhofer/fwd" + packages = ["."] + revision = "bb6d471dc95d4fe11e432687f8b70ff496cf3136" + version = "v1.0.0" + +[[projects]] + name = "github.com/pmezard/go-difflib" + packages = ["difflib"] + revision = "792786c7400a136282c1664665ae0a8db921c6c2" + version = "v1.0.0" + +[[projects]] + branch = "master" + name = "github.com/steveyen/gtreap" + packages = ["."] + revision = "0abe01ef9be25c4aedc174758ec2d917314d6d70" + +[[projects]] + name = "github.com/stretchr/testify" + packages = ["assert"] + revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71" + version = "v1.2.1" + +[[projects]] + branch = "master" + name = "github.com/tinylib/msgp" + packages = ["msgp"] + revision = "03a79185462ad029a6e7e05b2f3f3e0498d0a6c0" + +[[projects]] + branch = "master" + name = "github.com/willf/bitset" + packages = ["."] + revision = "1a37ad96e8c1a11b20900a232874843b5174221f" + +[[projects]] + name = "golang.org/x/net" + packages = ["context"] + revision = "309822c5b9b9f80db67f016069a12628d94fad34" + +[[projects]] + name = "golang.org/x/sys" + packages = ["unix"] + revision = "3dbebcf8efb6a5011a60c2b4591c1022a759af8a" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "61c759f0c1136cadf86ae8a30bb78edf33fc844cdcb2316469b4ae14a8d051b0" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/vendor/github.com/ethantkoenig/rupture/Gopkg.toml b/vendor/github.com/ethantkoenig/rupture/Gopkg.toml new file mode 100644 index 000000000..55dbd3b23 --- /dev/null +++ b/vendor/github.com/ethantkoenig/rupture/Gopkg.toml @@ -0,0 +1,34 @@ +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true + + +[[constraint]] + name = "github.com/stretchr/testify" + version = "1.2.1" + +[prune] + go-tests = true + unused-packages = true diff --git a/vendor/github.com/ethantkoenig/rupture/LICENSE b/vendor/github.com/ethantkoenig/rupture/LICENSE new file mode 100644 index 000000000..30adfac94 --- /dev/null +++ b/vendor/github.com/ethantkoenig/rupture/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Ethan Koenig + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/ethantkoenig/rupture/README.md b/vendor/github.com/ethantkoenig/rupture/README.md new file mode 100644 index 000000000..da76681e3 --- /dev/null +++ b/vendor/github.com/ethantkoenig/rupture/README.md @@ -0,0 +1,13 @@ +# rupture + +[![Build Status](https://travis-ci.org/ethantkoenig/rupture.svg?branch=master)](https://travis-ci.org/ethantkoenig/rupture) [![GoDoc](https://godoc.org/github.com/ethantkoenig/rupture?status.svg)](https://godoc.org/github.com/ethantkoenig/rupture) [![Go Report Card](https://goreportcard.com/badge/blevesearch/bleve)](https://goreportcard.com/report/blevesearch/bleve) + +An explosive companion to the [bleve indexing library](https://www.github.com/blevesearch/bleve) + +## Features + +`rupture` includes the following additions to `bleve`: + +- __Flushing batches__: Batches of operation which automatically flush to the underlying bleve index. +- __Sharded indices__: An index-like abstraction built on top of several underlying indices. Sharded indices provide lower write latencies for indices with large amounts of data. +- __Index metadata__: Track index version for easily managing migrations and schema changes. diff --git a/vendor/github.com/ethantkoenig/rupture/flushing_batch.go b/vendor/github.com/ethantkoenig/rupture/flushing_batch.go new file mode 100644 index 000000000..b4948f674 --- /dev/null +++ b/vendor/github.com/ethantkoenig/rupture/flushing_batch.go @@ -0,0 +1,67 @@ +package rupture + +import ( + "github.com/blevesearch/bleve" +) + +// FlushingBatch is a batch of operations that automatically flushes to the +// underlying index once it reaches a certain size. +type FlushingBatch interface { + // Index adds the specified index operation batch, possibly triggering a + // flush. + Index(id string, data interface{}) error + // Remove adds the specified delete operation to the batch, possibly + // triggering a flush. + Delete(id string) error + // Flush flushes the batch's contents. + Flush() error +} + +type singleIndexFlushingBatch struct { + maxBatchSize int + batch *bleve.Batch + index bleve.Index +} + +func newFlushingBatch(index bleve.Index, maxBatchSize int) *singleIndexFlushingBatch { + return &singleIndexFlushingBatch{ + maxBatchSize: maxBatchSize, + batch: index.NewBatch(), + index: index, + } +} + +// NewFlushingBatch creates a new flushing batch for the specified index. Once +// the number of operations in the batch reaches the specified limit, the batch +// automatically flushes its operations to the index. +func NewFlushingBatch(index bleve.Index, maxBatchSize int) FlushingBatch { + return newFlushingBatch(index, maxBatchSize) +} + +func (b *singleIndexFlushingBatch) Index(id string, data interface{}) error { + if err := b.batch.Index(id, data); err != nil { + return err + } + return b.flushIfFull() +} + +func (b *singleIndexFlushingBatch) Delete(id string) error { + b.batch.Delete(id) + return b.flushIfFull() +} + +func (b *singleIndexFlushingBatch) flushIfFull() error { + if b.batch.Size() < b.maxBatchSize { + return nil + } + return b.Flush() +} + +func (b *singleIndexFlushingBatch) Flush() error { + err := b.index.Batch(b.batch) + if err != nil { + return err + } + b.batch.Reset() + return nil +} diff --git a/vendor/github.com/ethantkoenig/rupture/metadata.go b/vendor/github.com/ethantkoenig/rupture/metadata.go new file mode 100644 index 000000000..f26b53d96 --- /dev/null +++ b/vendor/github.com/ethantkoenig/rupture/metadata.go @@ -0,0 +1,68 @@ +package rupture + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" +) + +const metaFilename = "rupture_meta.json" + +func indexMetadataPath(dir string) string { + return filepath.Join(dir, metaFilename) +} + +// IndexMetadata contains metadata about a bleve index. +type IndexMetadata struct { + // The version of the data in the index. This can be useful for tracking + // schema changes or data migrations. + Version int `json:"version"` +} + +// in addition to the user-exposed metadata, we keep additional, internal-only +// metadata for sharded indices. +const shardedMetadataFilename = "rupture_sharded_meta.json" + +func shardedIndexMetadataPath(dir string) string { + return filepath.Join(dir, shardedMetadataFilename) +} + +type shardedIndexMetadata struct { + NumShards int `json:"num_shards"` +} + +func readJSON(path string, meta interface{}) error { + metaBytes, err := ioutil.ReadFile(path) + if err != nil { + return err + } + return json.Unmarshal(metaBytes, meta) +} + +func writeJSON(path string, meta interface{}) error { + metaBytes, err := json.Marshal(meta) + if err != nil { + return err + } + return ioutil.WriteFile(path, metaBytes, 0666) +} + +// ReadIndexMetadata returns the metadata for the index at the specified path. +// If no such index metadata exists, an empty metadata and a nil error are +// returned. +func ReadIndexMetadata(path string) (*IndexMetadata, error) { + meta := &IndexMetadata{} + metaPath := indexMetadataPath(path) + if _, err := os.Stat(metaPath); os.IsNotExist(err) { + return meta, nil + } else if err != nil { + return nil, err + } + return meta, readJSON(metaPath, meta) +} + +// WriteIndexMetadata writes metadata for the index at the specified path. +func WriteIndexMetadata(path string, meta *IndexMetadata) error { + return writeJSON(indexMetadataPath(path), meta) +} diff --git a/vendor/github.com/ethantkoenig/rupture/sharded_index.go b/vendor/github.com/ethantkoenig/rupture/sharded_index.go new file mode 100644 index 000000000..8e4cb9338 --- /dev/null +++ b/vendor/github.com/ethantkoenig/rupture/sharded_index.go @@ -0,0 +1,146 @@ +package rupture + +import ( + "fmt" + "hash/fnv" + "path/filepath" + "strconv" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/document" + "github.com/blevesearch/bleve/mapping" +) + +// ShardedIndex an index that is built onto of multiple underlying bleve +// indices (i.e. shards). Similar to bleve's index aliases, some methods may +// not be supported. +type ShardedIndex interface { + bleve.Index + shards() []bleve.Index +} + +// a type alias for bleve.Index, so that the anonymous field of +// shardedIndex does not conflict with the Index(..) method. +type bleveIndex bleve.Index + +type shardedIndex struct { + bleveIndex + indices []bleve.Index +} + +func hash(id string, n int) uint64 { + fnvHash := fnv.New64() + fnvHash.Write([]byte(id)) + return fnvHash.Sum64() % uint64(n) +} + +func childIndexerPath(rootPath string, i int) string { + return filepath.Join(rootPath, strconv.Itoa(i)) +} + +// NewShardedIndex creates a sharded index at the specified path, with the +// specified mapping and number of shards. +func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) { + if numShards <= 0 { + return nil, fmt.Errorf("Invalid number of shards: %d", numShards) + } + err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards}) + if err != nil { + return nil, err + } + + s := &shardedIndex{ + indices: make([]bleve.Index, numShards), + } + for i := 0; i < numShards; i++ { + s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping) + if err != nil { + return nil, err + } + } + s.bleveIndex = bleve.NewIndexAlias(s.indices...) + return s, nil +} + +// OpenShardedIndex opens a sharded index at the specified path. +func OpenShardedIndex(path string) (ShardedIndex, error) { + var meta shardedIndexMetadata + var err error + if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil { + return nil, err + } + + s := &shardedIndex{ + indices: make([]bleve.Index, meta.NumShards), + } + for i := 0; i < meta.NumShards; i++ { + s.indices[i], err = bleve.Open(childIndexerPath(path, i)) + if err != nil { + return nil, err + } + } + s.bleveIndex = bleve.NewIndexAlias(s.indices...) + return s, nil +} + +func (s *shardedIndex) Index(id string, data interface{}) error { + return s.indices[hash(id, len(s.indices))].Index(id, data) +} + +func (s *shardedIndex) Delete(id string) error { + return s.indices[hash(id, len(s.indices))].Delete(id) +} + +func (s *shardedIndex) Document(id string) (*document.Document, error) { + return s.indices[hash(id, len(s.indices))].Document(id) +} + +func (s *shardedIndex) Close() error { + if err := s.bleveIndex.Close(); err != nil { + return err + } + for _, index := range s.indices { + if err := index.Close(); err != nil { + return err + } + } + return nil +} + +func (s *shardedIndex) shards() []bleve.Index { + return s.indices +} + +type shardedIndexFlushingBatch struct { + batches []*singleIndexFlushingBatch +} + +// NewShardedFlushingBatch creates a flushing batch with the specified batch +// size for the specified sharded index. +func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch { + indices := index.shards() + b := &shardedIndexFlushingBatch{ + batches: make([]*singleIndexFlushingBatch, len(indices)), + } + for i, index := range indices { + b.batches[i] = newFlushingBatch(index, maxBatchSize) + } + return b +} + +func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error { + return b.batches[hash(id, len(b.batches))].Index(id, data) +} + +func (b *shardedIndexFlushingBatch) Delete(id string) error { + return b.batches[hash(id, len(b.batches))].Delete(id) +} + +func (b *shardedIndexFlushingBatch) Flush() error { + for _, batch := range b.batches { + if err := batch.Flush(); err != nil { + return err + } + } + return nil +} diff --git a/vendor/vendor.json b/vendor/vendor.json index a108bbbab..520cf8e22 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -128,6 +128,12 @@ "revision": "174f8ed44a0bf65e7c8fb228b60b58de62654cd2", "revisionTime": "2017-06-28T17:18:15Z" }, + { + "checksumSHA1": "unacAFTLwgpg7wyI/mYf7Zd9eaU=", + "path": "github.com/blevesearch/bleve/analysis/token/unique", + "revision": "ff210fbc6d348ad67aa5754eaea11a463fcddafd", + "revisionTime": "2018-02-01T18:20:06Z" + }, { "checksumSHA1": "q7C04nlJLxKmemXLop0oyJhfi5M=", "path": "github.com/blevesearch/bleve/analysis/tokenizer/unicode", @@ -347,6 +353,12 @@ "revision": "57eb5e1fc594ad4b0b1dbea7b286d299e0cb43c2", "revisionTime": "2015-12-24T04:54:52Z" }, + { + "checksumSHA1": "06ofBxeJ9c4LS2p31PCMIj7IjJU=", + "path": "github.com/ethantkoenig/rupture", + "revision": "0a76f03a811abcca2e6357329b673e9bb8ef9643", + "revisionTime": "2018-02-03T18:25:44Z" + }, { "checksumSHA1": "imR2wF388/0fBU6RRWx8RvTi8Q8=", "path": "github.com/facebookgo/clock",