From be06dee04ce46de2da222fc9b2be4fc3b68b816d Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 9 Dec 2019 03:15:35 +0800 Subject: [PATCH] Move code indexer related code to a new package (#9191) * move code indexer related code to a new package * fix lint * fix tests * fix fmt * GetMaxID support interface parameter --- integrations/repo_search_test.go | 7 +- models/models.go | 25 ++ models/repo.go | 4 - models/repo_indexer.go | 371 +----------------------- modules/indexer/code/bleve.go | 365 +++++++++++++++++++++++ modules/indexer/code/bleve_test.go | 16 + modules/notification/indexer/indexer.go | 18 +- modules/repofiles/update.go | 4 - routers/init.go | 3 +- 9 files changed, 434 insertions(+), 379 deletions(-) create mode 100644 modules/indexer/code/bleve.go create mode 100644 modules/indexer/code/bleve_test.go diff --git a/integrations/repo_search_test.go b/integrations/repo_search_test.go index eb843525f..701013735 100644 --- a/integrations/repo_search_test.go +++ b/integrations/repo_search_test.go @@ -10,6 +10,7 @@ import ( "time" "code.gitea.io/gitea/models" + code_indexer "code.gitea.io/gitea/modules/indexer/code" "code.gitea.io/gitea/modules/setting" "github.com/PuerkitoBio/goquery" @@ -34,7 +35,7 @@ func TestSearchRepo(t *testing.T) { repo, err := models.GetRepositoryByOwnerAndName("user2", "repo1") assert.NoError(t, err) - executeIndexer(t, repo, models.UpdateRepoIndexer) + executeIndexer(t, repo, code_indexer.UpdateRepoIndexer) testSearch(t, "/user2/repo1/search?q=Description&page=1", []string{"README.md"}) @@ -44,8 +45,8 @@ func TestSearchRepo(t *testing.T) { repo, err = models.GetRepositoryByOwnerAndName("user2", "glob") assert.NoError(t, err) - executeIndexer(t, repo, models.DeleteRepoFromIndexer) - executeIndexer(t, repo, models.UpdateRepoIndexer) + executeIndexer(t, repo, code_indexer.DeleteRepoFromIndexer) + executeIndexer(t, repo, code_indexer.UpdateRepoIndexer) testSearch(t, "/user2/glob/search?q=loren&page=1", []string{"a.txt"}) testSearch(t, "/user2/glob/search?q=file3&page=1", []string{"x/b.txt"}) diff --git a/models/models.go b/models/models.go index 854cb33b1..8c10e7abf 100644 --- a/models/models.go +++ b/models/models.go @@ -254,3 +254,28 @@ func MaxBatchInsertSize(bean interface{}) int { func Count(bean interface{}) (int64, error) { return x.Count(bean) } + +// IsTableNotEmpty returns true if table has at least one record +func IsTableNotEmpty(tableName string) (bool, error) { + return x.Table(tableName).Exist() +} + +// DeleteAllRecords will delete all the records of this table +func DeleteAllRecords(tableName string) error { + _, err := x.Exec(fmt.Sprintf("DELETE FROM %s", tableName)) + return err +} + +// GetMaxID will return max id of the table +func GetMaxID(beanOrTableName interface{}) (maxID int64, err error) { + _, err = x.Select("MAX(id)").Table(beanOrTableName).Get(&maxID) + return +} + +// FindByMaxID filled results as the condition from database +func FindByMaxID(maxID int64, limit int, results interface{}) error { + return x.Where("id <= ?", maxID). + OrderBy("id DESC"). + Limit(limit). + Find(results) +} diff --git a/models/repo.go b/models/repo.go index 2fd4df920..e809bafa3 100644 --- a/models/repo.go +++ b/models/repo.go @@ -1112,10 +1112,6 @@ func MigrateRepositoryGitData(doer, u *User, repo *Repository, opts api.MigrateR repo, err = CleanUpMigrateInfo(repo) } - if err != nil && !repo.IsEmpty { - UpdateRepoIndexer(repo) - } - return repo, err } diff --git a/models/repo_indexer.go b/models/repo_indexer.go index 4085982c2..138ef54d3 100644 --- a/models/repo_indexer.go +++ b/models/repo_indexer.go @@ -4,23 +4,6 @@ package models -import ( - "fmt" - "strconv" - "strings" - "time" - - "code.gitea.io/gitea/modules/base" - "code.gitea.io/gitea/modules/charset" - "code.gitea.io/gitea/modules/git" - "code.gitea.io/gitea/modules/graceful" - "code.gitea.io/gitea/modules/indexer" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/setting" - - "github.com/ethantkoenig/rupture" -) - // RepoIndexerStatus status of a repo's entry in the repo indexer // For now, implicitly refers to default branch type RepoIndexerStatus struct { @@ -29,7 +12,8 @@ type RepoIndexerStatus struct { CommitSha string `xorm:"VARCHAR(40)"` } -func (repo *Repository) getIndexerStatus() error { +// GetIndexerStatus loads repo codes indxer status +func (repo *Repository) GetIndexerStatus() error { if repo.IndexerStatus != nil { return nil } @@ -44,8 +28,9 @@ func (repo *Repository) getIndexerStatus() error { return nil } -func (repo *Repository) updateIndexerStatus(sha string) error { - if err := repo.getIndexerStatus(); err != nil { +// UpdateIndexerStatus updates indexer status +func (repo *Repository) UpdateIndexerStatus(sha string) error { + if err := repo.GetIndexerStatus(); err != nil { return err } if len(repo.IndexerStatus.CommitSha) == 0 { @@ -58,349 +43,3 @@ func (repo *Repository) updateIndexerStatus(sha string) error { Update(repo.IndexerStatus) return err } - -type repoIndexerOperation struct { - repoID int64 - deleted bool - watchers []chan<- error -} - -var repoIndexerOperationQueue chan repoIndexerOperation - -// InitRepoIndexer initialize the repo indexer -func InitRepoIndexer() { - if !setting.Indexer.RepoIndexerEnabled { - return - } - waitChannel := make(chan time.Duration) - repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) - go func() { - start := time.Now() - log.Info("Initializing Repository Indexer") - indexer.InitRepoIndexer(populateRepoIndexerAsynchronously) - go processRepoIndexerOperationQueue() - waitChannel <- time.Since(start) - }() - if setting.Indexer.StartupTimeout > 0 { - go func() { - timeout := setting.Indexer.StartupTimeout - if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { - timeout += setting.GracefulHammerTime - } - select { - case duration := <-waitChannel: - log.Info("Repository Indexer Initialization took %v", duration) - case <-time.After(timeout): - log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) - } - }() - - } -} - -// populateRepoIndexerAsynchronously asynchronously populates the repo indexer -// with pre-existing data. This should only be run when the indexer is created -// for the first time. -func populateRepoIndexerAsynchronously() error { - exist, err := x.Table("repository").Exist() - if err != nil { - return err - } else if !exist { - return nil - } - - // if there is any existing repo indexer metadata in the DB, delete it - // since we are starting afresh. Also, xorm requires deletes to have a - // condition, and we want to delete everything, thus 1=1. - if _, err := x.Where("1=1").Delete(new(RepoIndexerStatus)); err != nil { - return err - } - - var maxRepoID int64 - if _, err = x.Select("MAX(id)").Table("repository").Get(&maxRepoID); err != nil { - return err - } - go populateRepoIndexer(maxRepoID) - return nil -} - -// populateRepoIndexer populate the repo indexer with pre-existing data. This -// should only be run when the indexer is created for the first time. -func populateRepoIndexer(maxRepoID int64) { - log.Info("Populating the repo indexer with existing repositories") - // start with the maximum existing repo ID and work backwards, so that we - // don't include repos that are created after gitea starts; such repos will - // already be added to the indexer, and we don't need to add them again. - for maxRepoID > 0 { - repos := make([]*Repository, 0, RepositoryListDefaultPageSize) - err := x.Where("id <= ?", maxRepoID). - OrderBy("id DESC"). - Limit(RepositoryListDefaultPageSize). - Find(&repos) - if err != nil { - log.Error("populateRepoIndexer: %v", err) - return - } else if len(repos) == 0 { - break - } - for _, repo := range repos { - repoIndexerOperationQueue <- repoIndexerOperation{ - repoID: repo.ID, - deleted: false, - } - maxRepoID = repo.ID - 1 - } - } - log.Info("Done populating the repo indexer with existing repositories") -} - -func updateRepoIndexer(repoID int64) error { - repo, err := getRepositoryByID(x, repoID) - if err != nil { - return err - } - - sha, err := getDefaultBranchSha(repo) - if err != nil { - return err - } - changes, err := getRepoChanges(repo, sha) - if err != nil { - return err - } else if changes == nil { - return nil - } - - batch := indexer.RepoIndexerBatch() - for _, update := range changes.Updates { - if err := addUpdate(update, repo, batch); err != nil { - return err - } - } - for _, filename := range changes.RemovedFilenames { - if err := addDelete(filename, repo, batch); err != nil { - return err - } - } - if err = batch.Flush(); err != nil { - return err - } - return repo.updateIndexerStatus(sha) -} - -// repoChanges changes (file additions/updates/removals) to a repo -type repoChanges struct { - Updates []fileUpdate - RemovedFilenames []string -} - -type fileUpdate struct { - Filename string - BlobSha string -} - -func getDefaultBranchSha(repo *Repository) (string, error) { - stdout, err := git.NewCommand("show-ref", "-s", repo.DefaultBranch).RunInDir(repo.RepoPath()) - if err != nil { - return "", err - } - return strings.TrimSpace(stdout), nil -} - -// getRepoChanges returns changes to repo since last indexer update -func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) { - if err := repo.getIndexerStatus(); err != nil { - return nil, err - } - - if len(repo.IndexerStatus.CommitSha) == 0 { - return genesisChanges(repo, revision) - } - return nonGenesisChanges(repo, revision) -} - -func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error { - stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). - RunInDir(repo.RepoPath()) - if err != nil { - return err - } - if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil { - return fmt.Errorf("Misformatted git cat-file output: %v", err) - } else if int64(size) > setting.Indexer.MaxIndexerFileSize { - return addDelete(update.Filename, repo, batch) - } - - fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). - RunInDirBytes(repo.RepoPath()) - if err != nil { - return err - } else if !base.IsTextFile(fileContents) { - // FIXME: UTF-16 files will probably fail here - return nil - } - indexerUpdate := indexer.RepoIndexerUpdate{ - Filepath: update.Filename, - Op: indexer.RepoIndexerOpUpdate, - Data: &indexer.RepoIndexerData{ - RepoID: repo.ID, - Content: string(charset.ToUTF8DropErrors(fileContents)), - }, - } - return indexerUpdate.AddToFlushingBatch(batch) -} - -func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error { - indexerUpdate := indexer.RepoIndexerUpdate{ - Filepath: filename, - Op: indexer.RepoIndexerOpDelete, - Data: &indexer.RepoIndexerData{ - RepoID: repo.ID, - }, - } - return indexerUpdate.AddToFlushingBatch(batch) -} - -func isIndexable(entry *git.TreeEntry) bool { - if !entry.IsRegular() && !entry.IsExecutable() { - return false - } - name := strings.ToLower(entry.Name()) - for _, g := range setting.Indexer.ExcludePatterns { - if g.Match(name) { - return false - } - } - for _, g := range setting.Indexer.IncludePatterns { - if g.Match(name) { - return true - } - } - return len(setting.Indexer.IncludePatterns) == 0 -} - -// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command -func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { - entries, err := git.ParseTreeEntries(stdout) - if err != nil { - return nil, err - } - var idxCount = 0 - updates := make([]fileUpdate, len(entries)) - for _, entry := range entries { - if isIndexable(entry) { - updates[idxCount] = fileUpdate{ - Filename: entry.Name(), - BlobSha: entry.ID.String(), - } - idxCount++ - } - } - return updates[:idxCount], nil -} - -// genesisChanges get changes to add repo to the indexer for the first time -func genesisChanges(repo *Repository, revision string) (*repoChanges, error) { - var changes repoChanges - stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision). - RunInDirBytes(repo.RepoPath()) - if err != nil { - return nil, err - } - changes.Updates, err = parseGitLsTreeOutput(stdout) - return &changes, err -} - -// nonGenesisChanges get changes since the previous indexer update -func nonGenesisChanges(repo *Repository, revision string) (*repoChanges, error) { - diffCmd := git.NewCommand("diff", "--name-status", - repo.IndexerStatus.CommitSha, revision) - stdout, err := diffCmd.RunInDir(repo.RepoPath()) - if err != nil { - // previous commit sha may have been removed by a force push, so - // try rebuilding from scratch - log.Warn("git diff: %v", err) - if err = indexer.DeleteRepoFromIndexer(repo.ID); err != nil { - return nil, err - } - return genesisChanges(repo, revision) - } - var changes repoChanges - updatedFilenames := make([]string, 0, 10) - for _, line := range strings.Split(stdout, "\n") { - line = strings.TrimSpace(line) - if len(line) == 0 { - continue - } - filename := strings.TrimSpace(line[1:]) - if len(filename) == 0 { - continue - } else if filename[0] == '"' { - filename, err = strconv.Unquote(filename) - if err != nil { - return nil, err - } - } - - switch status := line[0]; status { - case 'M', 'A': - updatedFilenames = append(updatedFilenames, filename) - case 'D': - changes.RemovedFilenames = append(changes.RemovedFilenames, filename) - default: - log.Warn("Unrecognized status: %c (line=%s)", status, line) - } - } - - cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--") - cmd.AddArguments(updatedFilenames...) - lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath()) - if err != nil { - return nil, err - } - changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout) - return &changes, err -} - -func processRepoIndexerOperationQueue() { - for { - op := <-repoIndexerOperationQueue - var err error - if op.deleted { - if err = indexer.DeleteRepoFromIndexer(op.repoID); err != nil { - log.Error("DeleteRepoFromIndexer: %v", err) - } - } else { - if err = updateRepoIndexer(op.repoID); err != nil { - log.Error("updateRepoIndexer: %v", err) - } - } - for _, watcher := range op.watchers { - watcher <- err - } - } -} - -// DeleteRepoFromIndexer remove all of a repository's entries from the indexer -func DeleteRepoFromIndexer(repo *Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) -} - -// UpdateRepoIndexer update a repository's entries in the indexer -func UpdateRepoIndexer(repo *Repository, watchers ...chan<- error) { - addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) -} - -func addOperationToQueue(op repoIndexerOperation) { - if !setting.Indexer.RepoIndexerEnabled { - return - } - select { - case repoIndexerOperationQueue <- op: - break - default: - go func() { - repoIndexerOperationQueue <- op - }() - } -} diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go new file mode 100644 index 000000000..4e7eaa21b --- /dev/null +++ b/modules/indexer/code/bleve.go @@ -0,0 +1,365 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "fmt" + "strconv" + "strings" + "time" + + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/base" + "code.gitea.io/gitea/modules/charset" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/indexer" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "github.com/ethantkoenig/rupture" +) + +type repoIndexerOperation struct { + repoID int64 + deleted bool + watchers []chan<- error +} + +var repoIndexerOperationQueue chan repoIndexerOperation + +// InitRepoIndexer initialize the repo indexer +func InitRepoIndexer() { + if !setting.Indexer.RepoIndexerEnabled { + return + } + waitChannel := make(chan time.Duration) + repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength) + go func() { + start := time.Now() + log.Info("Initializing Repository Indexer") + indexer.InitRepoIndexer(populateRepoIndexerAsynchronously) + go processRepoIndexerOperationQueue() + waitChannel <- time.Since(start) + }() + if setting.Indexer.StartupTimeout > 0 { + go func() { + timeout := setting.Indexer.StartupTimeout + if graceful.Manager.IsChild() && setting.GracefulHammerTime > 0 { + timeout += setting.GracefulHammerTime + } + select { + case duration := <-waitChannel: + log.Info("Repository Indexer Initialization took %v", duration) + case <-time.After(timeout): + log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout) + } + }() + + } +} + +// populateRepoIndexerAsynchronously asynchronously populates the repo indexer +// with pre-existing data. This should only be run when the indexer is created +// for the first time. +func populateRepoIndexerAsynchronously() error { + exist, err := models.IsTableNotEmpty("repository") + if err != nil { + return err + } else if !exist { + return nil + } + + // if there is any existing repo indexer metadata in the DB, delete it + // since we are starting afresh. Also, xorm requires deletes to have a + // condition, and we want to delete everything, thus 1=1. + if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { + return err + } + + var maxRepoID int64 + if maxRepoID, err = models.GetMaxID("repository"); err != nil { + return err + } + go populateRepoIndexer(maxRepoID) + return nil +} + +// populateRepoIndexer populate the repo indexer with pre-existing data. This +// should only be run when the indexer is created for the first time. +func populateRepoIndexer(maxRepoID int64) { + log.Info("Populating the repo indexer with existing repositories") + // start with the maximum existing repo ID and work backwards, so that we + // don't include repos that are created after gitea starts; such repos will + // already be added to the indexer, and we don't need to add them again. + for maxRepoID > 0 { + repos := make([]*models.Repository, 0, models.RepositoryListDefaultPageSize) + err := models.FindByMaxID(maxRepoID, models.RepositoryListDefaultPageSize, &repos) + if err != nil { + log.Error("populateRepoIndexer: %v", err) + return + } else if len(repos) == 0 { + break + } + for _, repo := range repos { + repoIndexerOperationQueue <- repoIndexerOperation{ + repoID: repo.ID, + deleted: false, + } + maxRepoID = repo.ID - 1 + } + } + log.Info("Done populating the repo indexer with existing repositories") +} + +func updateRepoIndexer(repoID int64) error { + repo, err := models.GetRepositoryByID(repoID) + if err != nil { + return err + } + + sha, err := getDefaultBranchSha(repo) + if err != nil { + return err + } + changes, err := getRepoChanges(repo, sha) + if err != nil { + return err + } else if changes == nil { + return nil + } + + batch := indexer.RepoIndexerBatch() + for _, update := range changes.Updates { + if err := addUpdate(update, repo, batch); err != nil { + return err + } + } + for _, filename := range changes.RemovedFilenames { + if err := addDelete(filename, repo, batch); err != nil { + return err + } + } + if err = batch.Flush(); err != nil { + return err + } + return repo.UpdateIndexerStatus(sha) +} + +// repoChanges changes (file additions/updates/removals) to a repo +type repoChanges struct { + Updates []fileUpdate + RemovedFilenames []string +} + +type fileUpdate struct { + Filename string + BlobSha string +} + +func getDefaultBranchSha(repo *models.Repository) (string, error) { + stdout, err := git.NewCommand("show-ref", "-s", repo.DefaultBranch).RunInDir(repo.RepoPath()) + if err != nil { + return "", err + } + return strings.TrimSpace(stdout), nil +} + +// getRepoChanges returns changes to repo since last indexer update +func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) { + if err := repo.GetIndexerStatus(); err != nil { + return nil, err + } + + if len(repo.IndexerStatus.CommitSha) == 0 { + return genesisChanges(repo, revision) + } + return nonGenesisChanges(repo, revision) +} + +func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { + stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). + RunInDir(repo.RepoPath()) + if err != nil { + return err + } + if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil { + return fmt.Errorf("Misformatted git cat-file output: %v", err) + } else if int64(size) > setting.Indexer.MaxIndexerFileSize { + return addDelete(update.Filename, repo, batch) + } + + fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha). + RunInDirBytes(repo.RepoPath()) + if err != nil { + return err + } else if !base.IsTextFile(fileContents) { + // FIXME: UTF-16 files will probably fail here + return nil + } + indexerUpdate := indexer.RepoIndexerUpdate{ + Filepath: update.Filename, + Op: indexer.RepoIndexerOpUpdate, + Data: &indexer.RepoIndexerData{ + RepoID: repo.ID, + Content: string(charset.ToUTF8DropErrors(fileContents)), + }, + } + return indexerUpdate.AddToFlushingBatch(batch) +} + +func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { + indexerUpdate := indexer.RepoIndexerUpdate{ + Filepath: filename, + Op: indexer.RepoIndexerOpDelete, + Data: &indexer.RepoIndexerData{ + RepoID: repo.ID, + }, + } + return indexerUpdate.AddToFlushingBatch(batch) +} + +func isIndexable(entry *git.TreeEntry) bool { + if !entry.IsRegular() && !entry.IsExecutable() { + return false + } + name := strings.ToLower(entry.Name()) + for _, g := range setting.Indexer.ExcludePatterns { + if g.Match(name) { + return false + } + } + for _, g := range setting.Indexer.IncludePatterns { + if g.Match(name) { + return true + } + } + return len(setting.Indexer.IncludePatterns) == 0 +} + +// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command +func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { + entries, err := git.ParseTreeEntries(stdout) + if err != nil { + return nil, err + } + var idxCount = 0 + updates := make([]fileUpdate, len(entries)) + for _, entry := range entries { + if isIndexable(entry) { + updates[idxCount] = fileUpdate{ + Filename: entry.Name(), + BlobSha: entry.ID.String(), + } + idxCount++ + } + } + return updates[:idxCount], nil +} + +// genesisChanges get changes to add repo to the indexer for the first time +func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { + var changes repoChanges + stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision). + RunInDirBytes(repo.RepoPath()) + if err != nil { + return nil, err + } + changes.Updates, err = parseGitLsTreeOutput(stdout) + return &changes, err +} + +// nonGenesisChanges get changes since the previous indexer update +func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { + diffCmd := git.NewCommand("diff", "--name-status", + repo.IndexerStatus.CommitSha, revision) + stdout, err := diffCmd.RunInDir(repo.RepoPath()) + if err != nil { + // previous commit sha may have been removed by a force push, so + // try rebuilding from scratch + log.Warn("git diff: %v", err) + if err = indexer.DeleteRepoFromIndexer(repo.ID); err != nil { + return nil, err + } + return genesisChanges(repo, revision) + } + var changes repoChanges + updatedFilenames := make([]string, 0, 10) + for _, line := range strings.Split(stdout, "\n") { + line = strings.TrimSpace(line) + if len(line) == 0 { + continue + } + filename := strings.TrimSpace(line[1:]) + if len(filename) == 0 { + continue + } else if filename[0] == '"' { + filename, err = strconv.Unquote(filename) + if err != nil { + return nil, err + } + } + + switch status := line[0]; status { + case 'M', 'A': + updatedFilenames = append(updatedFilenames, filename) + case 'D': + changes.RemovedFilenames = append(changes.RemovedFilenames, filename) + default: + log.Warn("Unrecognized status: %c (line=%s)", status, line) + } + } + + cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--") + cmd.AddArguments(updatedFilenames...) + lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath()) + if err != nil { + return nil, err + } + changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout) + return &changes, err +} + +func processRepoIndexerOperationQueue() { + for { + op := <-repoIndexerOperationQueue + var err error + if op.deleted { + if err = indexer.DeleteRepoFromIndexer(op.repoID); err != nil { + log.Error("DeleteRepoFromIndexer: %v", err) + } + } else { + if err = updateRepoIndexer(op.repoID); err != nil { + log.Error("updateRepoIndexer: %v", err) + } + } + for _, watcher := range op.watchers { + watcher <- err + } + } +} + +// DeleteRepoFromIndexer remove all of a repository's entries from the indexer +func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { + addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) +} + +// UpdateRepoIndexer update a repository's entries in the indexer +func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { + addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) +} + +func addOperationToQueue(op repoIndexerOperation) { + if !setting.Indexer.RepoIndexerEnabled { + return + } + select { + case repoIndexerOperationQueue <- op: + break + default: + go func() { + repoIndexerOperationQueue <- op + }() + } +} diff --git a/modules/indexer/code/bleve_test.go b/modules/indexer/code/bleve_test.go new file mode 100644 index 000000000..2eafeef3c --- /dev/null +++ b/modules/indexer/code/bleve_test.go @@ -0,0 +1,16 @@ +// Copyright 2019 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package code + +import ( + "path/filepath" + "testing" + + "code.gitea.io/gitea/models" +) + +func TestMain(m *testing.M) { + models.MainTest(m, filepath.Join("..", "..", "..")) +} diff --git a/modules/notification/indexer/indexer.go b/modules/notification/indexer/indexer.go index 13baa76ac..0e76dde53 100644 --- a/modules/notification/indexer/indexer.go +++ b/modules/notification/indexer/indexer.go @@ -6,9 +6,11 @@ package indexer import ( "code.gitea.io/gitea/models" + code_indexer "code.gitea.io/gitea/modules/indexer/code" issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification/base" + "code.gitea.io/gitea/modules/setting" ) type indexerNotifier struct { @@ -103,7 +105,21 @@ func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) { issue_indexer.DeleteRepoIssueIndexer(repo) - models.DeleteRepoFromIndexer(repo) + if setting.Indexer.RepoIndexerEnabled { + code_indexer.DeleteRepoFromIndexer(repo) + } +} + +func (r *indexerNotifier) NotifyMigrateRepository(doer *models.User, u *models.User, repo *models.Repository) { + if setting.Indexer.RepoIndexerEnabled && !repo.IsEmpty { + code_indexer.UpdateRepoIndexer(repo) + } +} + +func (r *indexerNotifier) NotifyPushCommits(pusher *models.User, repo *models.Repository, refName, oldCommitID, newCommitID string, commits *models.PushCommits) { + if setting.Indexer.RepoIndexerEnabled && refName == repo.DefaultBranch { + code_indexer.UpdateRepoIndexer(repo) + } } func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) { diff --git a/modules/repofiles/update.go b/modules/repofiles/update.go index c1eae5309..0042be52c 100644 --- a/modules/repofiles/update.go +++ b/modules/repofiles/update.go @@ -513,10 +513,6 @@ func PushUpdate(repo *models.Repository, branch string, opts PushUpdateOptions) go pull_service.AddTestPullRequestTask(pusher, repo.ID, branch, true) - if opts.RefFullName == git.BranchPrefix+repo.DefaultBranch { - models.UpdateRepoIndexer(repo) - } - if err = models.WatchIfAuto(opts.PusherID, repo.ID, true); err != nil { log.Warn("Fail to perform auto watch on user %v for repo %v: %v", opts.PusherID, repo.ID, err) } diff --git a/routers/init.go b/routers/init.go index 447c16885..81418a4ad 100644 --- a/routers/init.go +++ b/routers/init.go @@ -15,6 +15,7 @@ import ( "code.gitea.io/gitea/modules/cron" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/highlight" + code_indexer "code.gitea.io/gitea/modules/indexer/code" issue_indexer "code.gitea.io/gitea/modules/indexer/issues" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/markup" @@ -102,7 +103,7 @@ func GlobalInit() { // Booting long running goroutines. cron.NewContext() issue_indexer.InitIssueIndexer(false) - models.InitRepoIndexer() + code_indexer.InitRepoIndexer() mirror_service.InitSyncMirrors() webhook.InitDeliverHooks() pull_service.Init()