Refactor issue indexer (#5363)

release/v1.8
Lunny Xiao 5 years ago committed by techknowlogick
parent 094263db4d
commit 830ae61456

9
Gopkg.lock generated

@ -616,6 +616,14 @@
pruneopts = "NUT"
revision = "e3534c89ef969912856dfa39e56b09e58c5f5daf"
[[projects]]
branch = "master"
digest = "1:3ea59a5ada4bbac04da58e6177ca63da8c377a3143b48fca584408bf415fdafb"
name = "github.com/lunny/levelqueue"
packages = ["."]
pruneopts = "NUT"
revision = "02b525a4418e684a7786215296984e364746806f"
[[projects]]
digest = "1:1e6a29ed1f189354030e3371f63ec58aacbc2bf232fd104c6e0d41174ac5af48"
name = "github.com/lunny/log"
@ -1270,6 +1278,7 @@
"github.com/lafriks/xormstore",
"github.com/lib/pq",
"github.com/lunny/dingtalk_webhook",
"github.com/lunny/levelqueue",
"github.com/markbates/goth",
"github.com/markbates/goth/gothic",
"github.com/markbates/goth/providers/bitbucket",

@ -183,12 +183,21 @@ func (issue *Issue) LoadPullRequest() error {
}
func (issue *Issue) loadComments(e Engine) (err error) {
return issue.loadCommentsByType(e, CommentTypeUnknown)
}
// LoadDiscussComments loads discuss comments
func (issue *Issue) LoadDiscussComments() error {
return issue.loadCommentsByType(x, CommentTypeComment)
}
func (issue *Issue) loadCommentsByType(e Engine, tp CommentType) (err error) {
if issue.Comments != nil {
return nil
}
issue.Comments, err = findComments(e, FindCommentsOptions{
IssueID: issue.ID,
Type: CommentTypeUnknown,
Type: tp,
})
return err
}
@ -681,7 +690,6 @@ func updateIssueCols(e Engine, issue *Issue, cols ...string) error {
if _, err := e.ID(issue.ID).Cols(cols...).Update(issue); err != nil {
return err
}
UpdateIssueIndexerCols(issue.ID, cols...)
return nil
}
@ -1217,6 +1225,12 @@ func getIssuesByIDs(e Engine, issueIDs []int64) ([]*Issue, error) {
return issues, e.In("id", issueIDs).Find(&issues)
}
func getIssueIDsByRepoID(e Engine, repoID int64) ([]int64, error) {
var ids = make([]int64, 0, 10)
err := e.Table("issue").Where("repo_id = ?", repoID).Find(&ids)
return ids, err
}
// GetIssuesByIDs return issues with the given IDs.
func GetIssuesByIDs(issueIDs []int64) ([]*Issue, error) {
return getIssuesByIDs(x, issueIDs)

@ -1035,6 +1035,7 @@ func UpdateComment(doer *User, c *Comment, oldContent string) error {
if err := c.LoadIssue(); err != nil {
return err
}
if err := c.Issue.LoadAttributes(); err != nil {
return err
}
@ -1093,6 +1094,7 @@ func DeleteComment(doer *User, comment *Comment) error {
if err := comment.LoadIssue(); err != nil {
return err
}
if err := comment.Issue.LoadAttributes(); err != nil {
return err
}

@ -7,25 +7,60 @@ package models
import (
"fmt"
"code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/indexer/issues"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
)
// issueIndexerUpdateQueue queue of issue ids to be updated
var issueIndexerUpdateQueue chan int64
var (
// issueIndexerUpdateQueue queue of issue ids to be updated
issueIndexerUpdateQueue issues.Queue
issueIndexer issues.Indexer
)
// InitIssueIndexer initialize issue indexer
func InitIssueIndexer() {
indexer.InitIssueIndexer(populateIssueIndexer)
issueIndexerUpdateQueue = make(chan int64, setting.Indexer.UpdateQueueLength)
go processIssueIndexerUpdateQueue()
func InitIssueIndexer() error {
var populate bool
switch setting.Indexer.IssueType {
case "bleve":
issueIndexer = issues.NewBleveIndexer(setting.Indexer.IssuePath)
exist, err := issueIndexer.Init()
if err != nil {
return err
}
populate = !exist
default:
return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
}
var err error
switch setting.Indexer.IssueIndexerQueueType {
case setting.LevelQueueType:
issueIndexerUpdateQueue, err = issues.NewLevelQueue(
issueIndexer,
setting.Indexer.IssueIndexerQueueDir,
setting.Indexer.IssueIndexerQueueBatchNumber)
if err != nil {
return err
}
case setting.ChannelQueueType:
issueIndexerUpdateQueue = issues.NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber)
default:
return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
}
go issueIndexerUpdateQueue.Run()
if populate {
go populateIssueIndexer()
}
return nil
}
// populateIssueIndexer populate the issue indexer with issue data
func populateIssueIndexer() error {
batch := indexer.IssueIndexerBatch()
func populateIssueIndexer() {
for page := 1; ; page++ {
repos, _, err := SearchRepositoryByName(&SearchRepoOptions{
Page: page,
@ -35,98 +70,79 @@ func populateIssueIndexer() error {
Collaborate: util.OptionalBoolFalse,
})
if err != nil {
return fmt.Errorf("Repositories: %v", err)
log.Error(4, "SearchRepositoryByName: %v", err)
continue
}
if len(repos) == 0 {
return batch.Flush()
return
}
for _, repo := range repos {
issues, err := Issues(&IssuesOptions{
is, err := Issues(&IssuesOptions{
RepoIDs: []int64{repo.ID},
IsClosed: util.OptionalBoolNone,
IsPull: util.OptionalBoolNone,
})
if err != nil {
return err
log.Error(4, "Issues: %v", err)
continue
}
if err = IssueList(issues).LoadComments(); err != nil {
return err
if err = IssueList(is).LoadDiscussComments(); err != nil {
log.Error(4, "LoadComments: %v", err)
continue
}
for _, issue := range issues {
if err := issue.update().AddToFlushingBatch(batch); err != nil {
return err
}
for _, issue := range is {
UpdateIssueIndexer(issue)
}
}
}
}
func processIssueIndexerUpdateQueue() {
batch := indexer.IssueIndexerBatch()
for {
var issueID int64
select {
case issueID = <-issueIndexerUpdateQueue:
default:
// flush whatever updates we currently have, since we
// might have to wait a while
if err := batch.Flush(); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
issueID = <-issueIndexerUpdateQueue
}
issue, err := GetIssueByID(issueID)
if err != nil {
log.Error(4, "GetIssueByID: %v", err)
} else if err = issue.update().AddToFlushingBatch(batch); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
}
}
func (issue *Issue) update() indexer.IssueIndexerUpdate {
comments := make([]string, 0, 5)
// UpdateIssueIndexer add/update an issue to the issue indexer
func UpdateIssueIndexer(issue *Issue) {
var comments []string
for _, comment := range issue.Comments {
if comment.Type == CommentTypeComment {
comments = append(comments, comment.Content)
}
}
return indexer.IssueIndexerUpdate{
IssueID: issue.ID,
Data: &indexer.IssueIndexerData{
RepoID: issue.RepoID,
Title: issue.Title,
Content: issue.Content,
Comments: comments,
},
}
issueIndexerUpdateQueue.Push(&issues.IndexerData{
ID: issue.ID,
RepoID: issue.RepoID,
Title: issue.Title,
Content: issue.Content,
Comments: comments,
})
}
// updateNeededCols whether a change to the specified columns requires updating
// the issue indexer
func updateNeededCols(cols []string) bool {
for _, col := range cols {
switch col {
case "name", "content":
return true
}
// DeleteRepoIssueIndexer deletes repo's all issues indexes
func DeleteRepoIssueIndexer(repo *Repository) {
var ids []int64
ids, err := getIssueIDsByRepoID(x, repo.ID)
if err != nil {
log.Error(4, "getIssueIDsByRepoID failed: %v", err)
return
}
if len(ids) <= 0 {
return
}
return false
}
// UpdateIssueIndexerCols update an issue in the issue indexer, given changes
// to the specified columns
func UpdateIssueIndexerCols(issueID int64, cols ...string) {
updateNeededCols(cols)
issueIndexerUpdateQueue.Push(&issues.IndexerData{
IDs: ids,
IsDelete: true,
})
}
// UpdateIssueIndexer add/update an issue to the issue indexer
func UpdateIssueIndexer(issueID int64) {
select {
case issueIndexerUpdateQueue <- issueID:
default:
go func() {
issueIndexerUpdateQueue <- issueID
}()
// SearchIssuesByKeyword search issue ids by keywords and repo id
func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
var issueIDs []int64
res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
if err != nil {
return nil, err
}
for _, r := range res.Hits {
issueIDs = append(issueIDs, r.ID)
}
return issueIDs, nil
}

@ -4,7 +4,11 @@
package models
import "fmt"
import (
"fmt"
"github.com/go-xorm/builder"
)
// IssueList defines a list of issues
type IssueList []*Issue
@ -338,7 +342,7 @@ func (issues IssueList) loadAttachments(e Engine) (err error) {
return nil
}
func (issues IssueList) loadComments(e Engine) (err error) {
func (issues IssueList) loadComments(e Engine, cond builder.Cond) (err error) {
if len(issues) == 0 {
return nil
}
@ -354,6 +358,7 @@ func (issues IssueList) loadComments(e Engine) (err error) {
rows, err := e.Table("comment").
Join("INNER", "issue", "issue.id = comment.issue_id").
In("issue.id", issuesIDs[:limit]).
Where(cond).
Rows(new(Comment))
if err != nil {
return err
@ -479,5 +484,10 @@ func (issues IssueList) LoadAttachments() error {
// LoadComments loads comments
func (issues IssueList) LoadComments() error {
return issues.loadComments(x)
return issues.loadComments(x, builder.NewCond())
}
// LoadDiscussComments loads discuss comments
func (issues IssueList) LoadDiscussComments() error {
return issues.loadComments(x, builder.Eq{"comment.type": CommentTypeComment})
}

@ -12,7 +12,6 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"strings"
"code.gitea.io/gitea/modules/log"
@ -158,19 +157,6 @@ func LoadConfigs() {
DbCfg.SSLMode = sec.Key("SSL_MODE").MustString("disable")
DbCfg.Path = sec.Key("PATH").MustString("data/gitea.db")
DbCfg.Timeout = sec.Key("SQLITE_TIMEOUT").MustInt(500)
sec = setting.Cfg.Section("indexer")
setting.Indexer.IssuePath = sec.Key("ISSUE_INDEXER_PATH").MustString(path.Join(setting.AppDataPath, "indexers/issues.bleve"))
if !filepath.IsAbs(setting.Indexer.IssuePath) {
setting.Indexer.IssuePath = path.Join(setting.AppWorkPath, setting.Indexer.IssuePath)
}
setting.Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
setting.Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(setting.AppDataPath, "indexers/repos.bleve"))
if !filepath.IsAbs(setting.Indexer.RepoPath) {
setting.Indexer.RepoPath = path.Join(setting.AppWorkPath, setting.Indexer.RepoPath)
}
setting.Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20)
setting.Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024)
}
// parsePostgreSQLHostPort parses given input in various forms defined in

@ -44,6 +44,10 @@ func MainTest(m *testing.M, pathToGiteaRoot string) {
fatalTestError("Error creating test engine: %v\n", err)
}
if err = InitIssueIndexer(); err != nil {
fatalTestError("Error InitIssueIndexer: %v\n", err)
}
setting.AppURL = "https://try.gitea.io/"
setting.RunUser = "runuser"
setting.SSH.Port = 3000

@ -0,0 +1,250 @@
// Copyright 2018 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package issues
import (
"fmt"
"os"
"strconv"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query"
"github.com/ethantkoenig/rupture"
)
const (
issueIndexerAnalyzer = "issueIndexer"
issueIndexerDocType = "issueIndexerDocType"
issueIndexerLatestVersion = 1
)
// indexerID a bleve-compatible unique identifier for an integer id
func indexerID(id int64) string {
return strconv.FormatInt(id, 36)
}
// idOfIndexerID the integer id associated with an indexer id
func idOfIndexerID(indexerID string) (int64, error) {
id, err := strconv.ParseInt(indexerID, 36, 64)
if err != nil {
return 0, fmt.Errorf("Unexpected indexer ID %s: %v", indexerID, err)
}
return id, nil
}
// numericEqualityQuery a numeric equality query for the given value and field
func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery {
f := float64(value)
tru := true
q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru)
q.SetField(field)
return q
}
func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhraseQuery {
q := bleve.NewMatchPhraseQuery(matchPhrase)
q.FieldVal = field
q.Analyzer = analyzer
return q
}
const unicodeNormalizeName = "unicodeNormalize"
func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{
"type": unicodenorm.Name,
"form": unicodenorm.NFC,
})
}
const maxBatchSize = 16
// openIndexer open the index at the specified path, checking for metadata
// updates and bleve version updates. If index needs to be created (or
// re-created), returns (nil, nil)
func openIndexer(path string, latestVersion int) (bleve.Index, error) {
_, err := os.Stat(path)
if err != nil && os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
metadata, err := rupture.ReadIndexMetadata(path)
if err != nil {
return nil, err
}
if metadata.Version < latestVersion {
// the indexer is using a previous version, so we should delete it and
// re-populate
return nil, os.RemoveAll(path)
}
index, err := bleve.Open(path)
if err != nil && err == upsidedown.IncompatibleVersion {
// the indexer was built with a previous version of bleve, so we should
// delete it and re-populate
return nil, os.RemoveAll(path)
} else if err != nil {
return nil, err
}
return index, nil
}
// BleveIndexerData an update to the issue indexer
type BleveIndexerData IndexerData
// Type returns the document type, for bleve's mapping.Classifier interface.
func (i *BleveIndexerData) Type() string {
return issueIndexerDocType
}
// createIssueIndexer create an issue indexer if one does not already exist
func createIssueIndexer(path string, latestVersion int) (bleve.Index, error) {
mapping := bleve.NewIndexMapping()
docMapping := bleve.NewDocumentMapping()
numericFieldMapping := bleve.NewNumericFieldMapping()
numericFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
textFieldMapping := bleve.NewTextFieldMapping()
textFieldMapping.Store = false
textFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("Title", textFieldMapping)
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
return nil, err
} else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{
"type": custom.Name,
"char_filters": []string{},
"tokenizer": unicode.Name,
"token_filters": []string{unicodeNormalizeName, lowercase.Name},
}); err != nil {
return nil, err
}
mapping.DefaultAnalyzer = issueIndexerAnalyzer
mapping.AddDocumentMapping(issueIndexerDocType, docMapping)
mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
index, err := bleve.New(path, mapping)
if err != nil {
return nil, err
}
if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{
Version: latestVersion,
}); err != nil {
return nil, err
}
return index, nil
}
var (
_ Indexer = &BleveIndexer{}
)
// BleveIndexer implements Indexer interface
type BleveIndexer struct {
indexDir string
indexer bleve.Index
}
// NewBleveIndexer creates a new bleve local indexer
func NewBleveIndexer(indexDir string) *BleveIndexer {
return &BleveIndexer{
indexDir: indexDir,
}
}
// Init will initial the indexer
func (b *BleveIndexer) Init() (bool, error) {
var err error
b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion)
if err != nil {
return false, err
}
if b.indexer != nil {
return true, nil
}
b.indexer, err = createIssueIndexer(b.indexDir, issueIndexerLatestVersion)
return false, err
}
// Index will save the index data
func (b *BleveIndexer) Index(issues []*IndexerData) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
for _, issue := range issues {
if err := batch.Index(indexerID(issue.ID), struct {
RepoID int64
Title string
Content string
Comments []string
}{
RepoID: issue.RepoID,
Title: issue.Title,
Content: issue.Content,
Comments: issue.Comments,
}); err != nil {
return err
}
}
return batch.Flush()
}
// Delete deletes indexes by ids
func (b *BleveIndexer) Delete(ids ...int64) error {
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
for _, id := range ids {
if err := batch.Delete(indexerID(id)); err != nil {
return err
}
}
return batch.Flush()
}
// Search searches for issues by given conditions.
// Returns the matching issue IDs
func (b *BleveIndexer) Search(keyword string, repoID int64, limit, start int) (*SearchResult, error) {
indexerQuery := bleve.NewConjunctionQuery(
numericEqualityQuery(repoID, "RepoID"),
bleve.NewDisjunctionQuery(
newMatchPhraseQuery(keyword, "Title", issueIndexerAnalyzer),
newMatchPhraseQuery(keyword, "Content", issueIndexerAnalyzer),
newMatchPhraseQuery(keyword, "Comments", issueIndexerAnalyzer),
))
search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false)
result, err := b.indexer.Search(search)
if err != nil {
return nil, err
}
var ret = SearchResult{
Hits: make([]Match, 0, len(result.Hits)),
}
for _, hit := range result.Hits {
id, err := idOfIndexerID(hit.ID)
if err != nil {
return nil, err
}
ret.Hits = append(ret.Hits, Match{
ID: id,
RepoID: repoID,
})
}
return &ret, nil
}

@ -0,0 +1,88 @@
// Copyright 2018 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package issues
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestIndexAndSearch(t *testing.T) {
dir := "./bleve.index"
indexer := NewBleveIndexer(dir)
defer os.RemoveAll(dir)
_, err := indexer.Init()
assert.NoError(t, err)
err = indexer.Index([]*IndexerData{
{
ID: 1,
RepoID: 2,
Title: "Issue search should support Chinese",
Content: "As title",
Comments: []string{
"test1",
"test2",
},
},
{
ID: 2,
RepoID: 2,
Title: "CJK support could be optional",
Content: "Chinese Korean and Japanese should be supported but I would like it's not enabled by default",
Comments: []string{
"LGTM",
"Good idea",
},
},
})
assert.NoError(t, err)
var (
keywords = []struct {
Keyword string
IDs []int64
}{
{
Keyword: "search",
IDs: []int64{1},
},
{
Keyword: "test1",
IDs: []int64{1},
},
{
Keyword: "test2",
IDs: []int64{1},
},
{
Keyword: "support",
IDs: []int64{1, 2},
},
{
Keyword: "chinese",
IDs: []int64{1, 2},
},
{
Keyword: "help",
IDs: []int64{},
},
}
)
for _, kw := range keywords {
res, err := indexer.Search(kw.Keyword, 2, 10, 0)
assert.NoError(t, err)
var ids = make([]int64, 0, len(res.Hits))
for _, hit := range res.Hits {
ids = append(ids, hit.ID)
}
assert.EqualValues(t, kw.IDs, ids)
}
}

@ -0,0 +1,36 @@
// Copyright 2018 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package issues
// IndexerData data stored in the issue indexer
type IndexerData struct {
ID int64
RepoID int64
Title string
Content string
Comments []string
IsDelete bool
IDs []int64
}
// Match represents on search result
type Match struct {
ID int64 `json:"id"`
RepoID int64 `json:"repo_id"`
Score float64 `json:"score"`
}
// SearchResult represents search results
type SearchResult struct {
Hits []Match
}
// Indexer defines an inteface to indexer issues contents
type Indexer interface {
Init() (bool, error)
Index(issue []*IndexerData) error
Delete(ids ...int64) error
Search(kw string, repoID int64, limit, start int) (*SearchResult, error)
}

@ -0,0 +1,11 @@
// Copyright 2018 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package issues
// Queue defines an interface to save an issue indexer queue
type Queue interface {
Run() error
Push(*IndexerData)
}

@ -0,0 +1,56 @@
// Copyright 2018 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package issues
import (
"time"
"code.gitea.io/gitea/modules/setting"
)
// ChannelQueue implements
type ChannelQueue struct {
queue chan *IndexerData
indexer Indexer
batchNumber int
}
// NewChannelQueue create a memory channel queue
func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
return &ChannelQueue{
queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
indexer: indexer,
batchNumber: batchNumber,
}
}
// Run starts to run the queue
func (c *ChannelQueue) Run() error {
var i int
var datas = make([]*IndexerData, 0, c.batchNumber)
for {
select {
case data := <-c.queue:
datas = append(datas, data)
if len(datas) >= c.batchNumber {
c.indexer.Index(datas)
// TODO: save the point
datas = make([]*IndexerData, 0, c.batchNumber)
}
case <-time.After(time.Millisecond * 100):
i++
if i >= 3 && len(datas) > 0 {
c.indexer.Index(datas)
// TODO: save the point
datas = make([]*IndexerData, 0, c.batchNumber)
}
}
}
}
// Push will push the indexer data to queue
func (c *ChannelQueue) Push(data *IndexerData) {
c.queue <- data
}

@ -0,0 +1,104 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package issues
import (
"encoding/json"
"time"
"code.gitea.io/gitea/modules/log"
"github.com/lunny/levelqueue"
)
var (
_ Queue = &LevelQueue{}
)
// LevelQueue implements a disk library queue
type LevelQueue struct {
indexer Indexer
queue *levelqueue.Queue
batchNumber int
}
// NewLevelQueue creates a ledis local queue
func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) {
queue, err := levelqueue.Open(dataDir)
if err != nil {
return nil, err
}
return &LevelQueue{
indexer: indexer,
queue: queue,
batchNumber: batchNumber,
}, nil
}
// Run starts to run the queue
func (l *LevelQueue) Run() error {
var i int
var datas = make([]*IndexerData, 0, l.batchNumber)
for {
bs, err := l.queue.RPop()
if err != nil {
log.Error(4, "RPop: %v", err)
time.Sleep(time.Millisecond * 100)
continue
}
i++
if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
l.indexer.Index(datas)
datas = make([]*IndexerData, 0, l.batchNumber)
i = 0
}
if len(bs) <= 0 {
time.Sleep(time.Millisecond * 100)
continue
}
var data IndexerData
err = json.Unmarshal(bs, &data)
if err != nil {
log.Error(4, "Unmarshal: %v", err)
time.Sleep(time.Millisecond * 100)
continue
}
log.Trace("LedisLocalQueue: task found: %#v", data)
if data.IsDelete {
if data.ID > 0 {
if err = l.indexer.Delete(data.ID); err != nil {
log.Error(4, "indexer.Delete: %v", err)
}
} else if len(data.IDs) > 0 {
if err = l.indexer.Delete(data.IDs...); err != nil {
log.Error(4, "indexer.Delete: %v", err)
}
}
time.Sleep(time.Millisecond * 10)
continue
}
datas = append(datas, &data)
time.Sleep(time.Millisecond * 10)
}
}
// Push will push the indexer data to queue
func (l *LevelQueue) Push(data *IndexerData) {
bs, err := json.Marshal(data)
if err != nil {
log.Error(4, "Marshal: %v", err)
return
}
err = l.queue.LPush(bs)
if err != nil {
log.Error(4, "LPush: %v", err)
}
}

@ -6,6 +6,7 @@ package indexer
import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification/base"
)
@ -25,38 +26,83 @@ func NewNotifier() base.Notifier {
func (r *indexerNotifier) NotifyCreateIssueComment(doer *models.User, repo *models.Repository,
issue *models.Issue, comment *models.Comment) {
if comment.Type == models.CommentTypeComment {
models.UpdateIssueIndexer(issue.ID)
if issue.Comments == nil {
if err := issue.LoadDiscussComments(); err != nil {
log.Error(4, "LoadComments failed: %v", err)
return
}
} else {
issue.Comments = append(issue.Comments, comment)
}
models.UpdateIssueIndexer(issue)
}
}
func (r *indexerNotifier) NotifyNewIssue(issue *models.Issue) {
models.UpdateIssueIndexer(issue.ID)
models.UpdateIssueIndexer(issue)
}
func (r *indexerNotifier) NotifyNewPullRequest(pr *models.PullRequest) {
models.UpdateIssueIndexer(pr.Issue.ID)
models.UpdateIssueIndexer(pr.Issue)
}
func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comment, oldContent string) {
if c.Type == models.CommentTypeComment {
models.UpdateIssueIndexer(c.IssueID)
var found bool
if c.Issue.Comments != nil {
for i := 0; i < len(c.Issue.Comments); i++ {
if c.Issue.Comments[i].ID == c.ID {
c.Issue.Comments[i] = c
found = true
break
}
}
}
if !found {
if err := c.Issue.LoadDiscussComments(); err != nil {
log.Error(4, "LoadComments failed: %v", err)
return
}
}
models.UpdateIssueIndexer(c.Issue)
}
}
func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models.Comment) {
if comment.Type == models.CommentTypeComment {
models.UpdateIssueIndexer(comment.IssueID)
var found bool
if comment.Issue.Comments != nil {
for i := 0; i < len(comment.Issue.Comments); i++ {
if comment.Issue.Comments[i].ID == comment.ID {
comment.Issue.Comments = append(comment.Issue.Comments[:i], comment.Issue.Comments[i+1:]...)
found = true
break
}
}
}
if !found {
if err := comment.Issue.LoadDiscussComments(); err != nil {
log.Error(4, "LoadComments failed: %v", err)
return
}
}
// reload comments to delete the old comment
models.UpdateIssueIndexer(comment.Issue)
}
}
func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) {
models.DeleteRepoFromIndexer(repo)
models.DeleteRepoIssueIndexer(repo)
}
func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) {
models.UpdateIssueIndexer(issue.ID)
models.UpdateIssueIndexer(issue)
}
func (r *indexerNotifier) NotifyIssueChangeTitle(doer *models.User, issue *models.Issue, oldTitle string) {
models.UpdateIssueIndexer(issue.ID)
models.UpdateIssueIndexer(issue)
}

@ -0,0 +1,55 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package setting
import (
"path"
"path/filepath"
)
// enumerates all the indexer queue types
const (
LevelQueueType = "levelqueue"
ChannelQueueType = "channel"
)
var (
// Indexer settings
Indexer = struct {
IssueType string
IssuePath string
RepoIndexerEnabled bool
RepoPath string
UpdateQueueLength int
MaxIndexerFileSize int64
IssueIndexerQueueType string
IssueIndexerQueueDir string
IssueIndexerQueueBatchNumber int
}{
IssueType: "bleve",
IssuePath: "indexers/issues.bleve",
IssueIndexerQueueType: LevelQueueType,
IssueIndexerQueueDir: "indexers/issues.queue",
IssueIndexerQueueBatchNumber: 20,
}
)
func newIndexerService() {
sec := Cfg.Section("indexer")
Indexer.IssuePath = sec.Key("ISSUE_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/issues.bleve"))
if !filepath.IsAbs(Indexer.IssuePath) {
Indexer.IssuePath = path.Join(AppWorkPath, Indexer.IssuePath)
}
Indexer.RepoIndexerEnabled = sec.Key("REPO_INDEXER_ENABLED").MustBool(false)
Indexer.RepoPath = sec.Key("REPO_INDEXER_PATH").MustString(path.Join(AppDataPath, "indexers/repos.bleve"))
if !filepath.IsAbs(Indexer.RepoPath) {
Indexer.RepoPath = path.Join(AppWorkPath, Indexer.RepoPath)
}
Indexer.UpdateQueueLength = sec.Key("UPDATE_BUFFER_LEN").MustInt(20)
Indexer.MaxIndexerFileSize = sec.Key("MAX_FILE_SIZE").MustInt64(1024 * 1024)
Indexer.IssueIndexerQueueType = sec.Key("ISSUE_INDEXER_QUEUE_TYPE").MustString(LevelQueueType)
Indexer.IssueIndexerQueueDir = sec.Key("ISSUE_INDEXER_QUEUE_DIR").MustString(path.Join(AppDataPath, "indexers/issues.queue"))
Indexer.IssueIndexerQueueBatchNumber = sec.Key("ISSUE_INDEXER_QUEUE_BATCH_NUMBER").MustInt(20)
}

@ -179,15 +179,6 @@ var (
DBConnectRetries int
DBConnectBackoff time.Duration
// Indexer settings
Indexer struct {
IssuePath string
RepoIndexerEnabled bool
RepoPath string
UpdateQueueLength int
MaxIndexerFileSize int64
}
// Repository settings
Repository = struct {
AnsiCharset string
@ -1214,6 +1205,7 @@ func NewContext() {
IsInputFile: sec.Key("IS_INPUT_FILE").MustBool(false),
})
}
sec = Cfg.Section("U2F")
U2F.TrustedFacets, _ = shellquote.Split(sec.Key("TRUSTED_FACETS").MustString(strings.TrimRight(AppURL, "/")))
U2F.AppID = sec.Key("APP_ID").MustString(strings.TrimRight(AppURL, "/"))
@ -1240,4 +1232,5 @@ func NewServices() {
newRegisterMailService()
newNotifyMailService()
newWebhookService()
newIndexerService()
}

@ -13,7 +13,6 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
@ -78,7 +77,7 @@ func ListIssues(ctx *context.APIContext) {
var labelIDs []int64
var err error
if len(keyword) > 0 {
issueIDs, err = indexer.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword)
issueIDs, err = models.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword)
}
if splitted := strings.Split(ctx.Query("labels"), ","); len(splitted) > 0 {

@ -90,7 +90,9 @@ func GlobalInit() {
// Booting long running goroutines.
cron.NewContext()
models.InitIssueIndexer()
if err := models.InitIssueIndexer(); err != nil {
log.Fatal(4, "Failed to initialize issue indexer: %v", err)
}
models.InitRepoIndexer()
models.InitSyncMirrors()
models.InitDeliverHooks()

@ -23,7 +23,6 @@ import (
"code.gitea.io/gitea/modules/auth"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/markup/markdown"
"code.gitea.io/gitea/modules/notification"
@ -147,7 +146,11 @@ func issues(ctx *context.Context, milestoneID int64, isPullOption util.OptionalB
var issueIDs []int64
if len(keyword) > 0 {
issueIDs, err = indexer.SearchIssuesByKeyword(repo.ID, keyword)
issueIDs, err = models.SearchIssuesByKeyword(repo.ID, keyword)
if err != nil {
ctx.ServerError("issueIndexer.Search", err)
return
}
if len(issueIDs) == 0 {
forceEmpty = true
}

@ -0,0 +1,19 @@
Copyright (c) 2019 Lunny Xiao
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

@ -0,0 +1,12 @@
// Copyright 2019 Lunny Xiao. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package levelqueue
import "errors"
var (
// ErrNotFound means no element in queue
ErrNotFound = errors.New("no key found")
)

@ -0,0 +1,214 @@
// Copyright 2019 Lunny Xiao. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package levelqueue
import (
"bytes"
"encoding/binary"
"sync"
"github.com/syndtr/goleveldb/leveldb"
)
// Queue defines a queue struct
type Queue struct {
db *leveldb.DB
highLock sync.Mutex
lowLock sync.Mutex
low int64
high int64
}
// Open opens a queue object or create it if not exist
func Open(dataDir string) (*Queue, error) {
db, err := leveldb.OpenFile(dataDir, nil)
if err != nil {
return nil, err
}
var queue = &Queue{
db: db,
}
queue.low, err = queue.readID(lowKey)
if err == leveldb.ErrNotFound {
queue.low = 1
err = db.Put(lowKey, id2bytes(1), nil)
}
if err != nil {
return nil, err
}
queue.high, err = queue.readID(highKey)
if err == leveldb.ErrNotFound {
err = db.Put(highKey, id2bytes(0), nil)
}
if err != nil {
return nil, err
}
return queue, nil
}
func (queue *Queue) readID(key []byte) (int64, error) {
bs, err := queue.db.Get(key, nil)
if err != nil {
return 0, err
}
return bytes2id(bs)
}
var (
lowKey = []byte("low")
highKey = []byte("high")
)
func (queue *Queue) highincrement() (int64, error) {
id := queue.high + 1
queue.high = id
err := queue.db.Put(highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high - 1
return 0, err
}
return id, nil
}
func (queue *Queue) highdecrement() (int64, error) {
queue.high = queue.high - 1
err := queue.db.Put(highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high + 1
return 0, err
}
return queue.high, nil
}
func (queue *Queue) lowincrement() (int64, error) {
queue.low = queue.low + 1
err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low - 1
return 0, err
}
return queue.low, nil
}
func (queue *Queue) lowdecrement() (int64, error) {
queue.low = queue.low - 1
err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low + 1
return 0, err
}
return queue.low, nil
}
// Len returns the length of the queue
func (queue *Queue) Len() int64 {
queue.lowLock.Lock()
queue.highLock.Lock()
l := queue.high - queue.low + 1
queue.highLock.Unlock()
queue.lowLock.Unlock()
return l
}
func id2bytes(id int64) []byte {
var buf = make([]byte, 8)
binary.PutVarint(buf, id)
return buf
}
func bytes2id(b []byte) (int64, error) {
return binary.ReadVarint(bytes.NewReader(b))
}
// RPush pushes a data from right of queue
func (queue *Queue) RPush(data []byte) error {
queue.highLock.Lock()
id, err := queue.highincrement()
if err != nil {
queue.highLock.Unlock()
return err
}
err = queue.db.Put(id2bytes(id), data, nil)
queue.highLock.Unlock()
return err
}
// LPush pushes a data from left of queue
func (queue *Queue) LPush(data []byte) error {
queue.highLock.Lock()
id, err := queue.lowdecrement()
if err != nil {
queue.highLock.Unlock()
return err
}
err = queue.db.Put(id2bytes(id), data, nil)
queue.highLock.Unlock()
return err
}
// RPop pop a data from right of queue
func (queue *Queue) RPop() ([]byte, error) {
queue.highLock.Lock()
currentID := queue.high
res, err := queue.db.Get(id2bytes(currentID), nil)
if err != nil {
queue.highLock.Unlock()
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
_, err = queue.highdecrement()
if err != nil {
queue.highLock.Unlock()
return nil, err
}
err = queue.db.Delete(id2bytes(currentID), nil)
queue.highLock.Unlock()
if err != nil {
return nil, err
}
return res, nil
}
// LPop pop a data from left of queue
func (queue *Queue) LPop() ([]byte, error) {
queue.lowLock.Lock()
currentID := queue.low
res, err := queue.db.Get(id2bytes(currentID), nil)
if err != nil {
queue.lowLock.Unlock()
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
_, err = queue.lowincrement()
if err != nil {
return nil, err
}
err = queue.db.Delete(id2bytes(currentID), nil)
queue.lowLock.Unlock()
if err != nil {
return nil, err
}
return res, nil
}
// Close closes the queue
func (queue *Queue) Close() error {
err := queue.db.Close()
queue.db = nil
return err
}
Loading…
Cancel
Save