package search import ( "fmt" "io" "log" "net" "strings" "time" "xiaoniaokuaiyan.com/xiaoniao/config" "github.com/blevesearch/bleve" _ "github.com/blevesearch/bleve/search/highlight/highlighter/html" bq "github.com/blevesearch/bleve/search/query" "gopkg.in/guregu/null.v3" "gopkg.in/redis.v2" _ "xiaoniaokuaiyan.com/xiaoniao/search/analyzer" "xiaoniaokuaiyan.com/xiaoniao/util" ) var engine bleve.Index type SearchResult struct { Total uint64 `json:"total"` PageIndex int `json:"pageIndex"` Hits []interface{} `json:"hits"` Size int `json:"size"` Took time.Duration `json:"took"` } type HitItem struct { Locations interface{} `json:"-"` Fragments interface{} `json:"-"` Fields map[string]interface{} `json:"fields"` } type QueryParam struct { Text string `json:"text"` CityId uint `json:"cityId"` PageIndex int `json:"pageIndex"` Size int `json"size"` IsHighlight bool `json:"isHighlight"` CateIds []string `json:"cate_ids"` SortBy string `json:"sort_by"` IsSale bool `json:"is_sale"` IsZFB bool `json:"is_zfb"` } const DEFAULT_PAGE_SIZE = 10 func Query(param *QueryParam) (*SearchResult, error) { //if param.CityId <= 0 { // return nil, fmt.Errorf("param city id is invalid, passed %v", param.CityId) //} text := strings.ToLower(param.Text) text = strings.Trim(text, " ") var ( textQ bq.Query ) if text != "" { query := bleve.NewQueryStringQuery(fmt.Sprintf("name:%s items:%s keywords:%s", text, text, text)) pyQuery := bleve.NewRegexpQuery(".*" + text + ".*") pyQuery.SetField("py") jpQuery := bleve.NewRegexpQuery(".*" + text + ".*") jpQuery.SetField("jp") tQ := bleve.NewDisjunctionQuery( query, jpQuery, pyQuery, ) tQ.SetMin(1) textQ = tQ } else { textQ = bleve.NewMatchAllQuery() } var q bq.Query = textQ cityQ := bleve.NewTermQuery(fmt.Sprintf("%d", param.CityId)) cityQ.SetField("cityIds") var conjunctionQuery *bq.ConjunctionQuery = bleve.NewConjunctionQuery(textQ) if param.CityId > 0 { conjunctionQuery.AddQuery(cityQ) } if len(param.CateIds) > 0 { cateQ := bleve.NewTermQuery(param.CateIds[0]) cateQ.SetField("cates") conjunctionQuery.AddQuery(cateQ) } if param.IsSale { saleQ := bleve.NewBoolFieldQuery(true) saleQ.SetField("is_sale") conjunctionQuery.AddQuery(saleQ) } if param.IsZFB { zfbQ := bleve.NewBoolFieldQuery(true) zfbQ.SetField("is_zfb") conjunctionQuery.AddQuery(zfbQ) } if len(conjunctionQuery.Conjuncts) > 0 { q = conjunctionQuery } request := bleve.NewSearchRequest(q) if param.SortBy != "" { request.SortBy([]string{param.SortBy}) } //request.Fields = []string{"*"} if param.IsHighlight { request.Highlight = bleve.NewHighlightWithStyle("custom") request.Highlight.Fields = []string{"name", "items"} } request.Fields = []string{"id", "name", "price", "picture", "isRecommend", "items", "cityIds", "market_price", "zfb_price"} request.From = (param.PageIndex - 1) * param.Size if param.Size <= 0 { param.Size = DEFAULT_PAGE_SIZE } request.Size = param.Size result, err := engine.Search(request) if err != nil { return nil, err } searchResult := &SearchResult{ Total: result.Total, PageIndex: param.PageIndex, Size: param.Size, Took: result.Took, } for _, hit := range result.Hits { hitem := &HitItem{ Locations: hit.Locations, Fragments: hit.Fragments, Fields: hit.Fields, } //替换搜索匹配项 for k, v := range hit.Fragments { hitem.Fields[k] = v } if param.IsZFB { hitem.Fields["price"] = hitem.Fields["zfb_price"] } searchResult.Hits = append(searchResult.Hits, hitem) } return searchResult, nil } func GetSearchTips(text string, cityId int, isZFB bool) ([]TipItem, error) { text = strings.ToLower(text) query := bleve.NewQueryStringQuery(fmt.Sprintf("name:%s items:%s keywords:%s", text, text, text)) cityQ := bleve.NewTermQuery(fmt.Sprintf("%d", cityId)) cityQ.SetField("cityIds") pyQuery := bleve.NewRegexpQuery(".*" + text + ".*") pyQuery.SetField("py") jpQuery := bleve.NewRegexpQuery(".*" + text + ".*") jpQuery.SetField("jp") textQ := bleve.NewDisjunctionQuery( query, jpQuery, pyQuery, ) zfbQ := bleve.NewBoolFieldQuery(true) zfbQ.SetField("is_zfb") //var q bq.Query //if cityId > 0 { // q = bleve.NewConjunctionQuery( // textQ, // cityQ, // ) //} else { // q = textQ //} q := bleve.NewConjunctionQuery(textQ) if cityId > 0 { q.AddQuery(cityQ) } if isZFB { q.AddQuery(zfbQ) } request := bleve.NewSearchRequest(q) request.Fields = []string{"name", "items"} request.Size = 10 result, err := engine.Search(request) if err != nil { return nil, err } tipList := []TipItem{} for _, hit := range result.Hits { //fmt.Println(hit.Fields["name"]) tipList = append(tipList, TipItem{hit.Fields["name"]}) } return tipList, nil } type TipItem struct { Tip interface{} `json:"tip"` } type productDB struct { Id string `db:"id" json:"id"` Name string `db:"name" json:"name"` Price int `db:"price" json:"price"` IsRecommend int `db:"is_recommend" json:"isRecommend"` Keywords string `db:"-" json:"keywords"` TKeywords null.String `db:"keywords" json:"tkeywords"` Picture string `db:"picture" json:"picture"` Items string `db:"-" json:"items"` CityIds []string `db:"-" json:"cityIds"` Pinyin string `db:"-" json:"py"` JP string `db:"-" json:"jp"` Cates []string `db:"-" json:"cates"` SaleNum int `db:"sale_num" json:"sale_num"` SortNo int `db:"sort_no" json:"sort_no"` CreatedAt string `db:"created_at" json:"created_at"` PutawayTime string `db:"-" json:"putaway_time"` TPutawayTime null.String `db:"putaway_time" json:"-"` MarketPrice int `db:"market_price" json:"market_price"` IsSale bool `db:"-" json:"is_sale"` IsZFB bool `db:"is_zfb" json:"is_zfb"` ZPrice int `db:"zfb_price" json:"zfb_price"` } func UpdateIndex() { var ( client *redis.Client //productIndex = engine ) //defer ps.Close() RESETPUBSUB: client = util.GetRedis() ps := client.PubSub() err := ps.Subscribe("product_trace") if err != nil { log.Println("Subscribe error:", err) return } for { payload, err := ps.ReceiveTimeout(time.Second * 5) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { continue } if err == io.EOF { ps.Close() time.Sleep(time.Second * 5) goto RESETPUBSUB } log.Println(err) continue } if payload == nil { continue } switch payload.(type) { case *redis.Subscription: case *redis.Message: msg := payload.(*redis.Message) if msg.Channel == "product_trace" { pieces := strings.Split(msg.Payload, "-") if len(pieces) < 2 { log.Println("error:wrong product_change message") break } if pieces[1] == "UPDATE" { err = addProductIdx(pieces[0]) if err != nil { log.Println(err) break } } else if pieces[1] == "DELETE" { err = deleteProductIdx(pieces[0]) if err != nil { log.Println(err) break } } else if pieces[1] == "ALL" { err = allProductIdx() if err != nil { log.Println(err) break } } } default: fmt.Printf("redis: unknown message: %v\n", payload) } } } func addProductIdx(productId string) error { strSql := "select id, name,price, is_recommend, picture,keywords, sale_num,putaway_time,sort_no,created_at,market_price,is_zfb,zfb_price from t_product where id = ? and is_putaway=0" var product = productDB{} db := util.GetWriteSqlDB() err := db.Get(&product, strSql, productId) if err != nil { return fmt.Errorf("query product %d error: %v", productId, err) } strSql = "select t1.name from t_detect_product t1 left join t_product_detect_product t2 on t1.id = t2.detect_product_id where t2.product_id = ?" itemList := []string{} db.Select(&itemList, strSql, product.Id) product.Items = strings.Join(itemList, " ") /*strSql = "select tag_name from t_product_tag t1 left join t_tag t2 on t1.tag_id = t2.id where t1.product_id = ?" tagList := []string{} db.Select(&tagList, strSql, product.Id) product.Tags = strings.Join(tagList, " ")*/ product.Keywords = product.TKeywords.String product.PutawayTime = product.TPutawayTime.String product.IsSale = product.Price != product.MarketPrice strSql = "select city_id from v_product_city where product_id = ?" cityIds := []string{} db.Select(&cityIds, strSql, product.Id) product.CityIds = cityIds product.Pinyin, product.JP, _ = util.GetPinyin(product.Name) strSql = "SELECT cat_id from t_product_category_product where product_id = ?" var cateIds = []string{} db.Select(&cateIds, strSql, product.Id) product.Cates = cateIds return engine.Index(product.Id, product) } func deleteProductIdx(productId string) error { return engine.Delete(productId) } func Init() { var err error var INDEX_NAME = config.IniConf.Section("search").Key("IndexName").Value() fmt.Println(INDEX_NAME) engine, err = bleve.Open(INDEX_NAME) if err != nil { log.Fatal(err) } _, err = bleve.Config.Cache.DefineFragmenter("size300", map[string]interface{}{ "type": "simple", "size": 300.0, }) if err != nil { log.Fatal(err) } _, err = bleve.Config.Cache.DefineHighlighter("custom", map[string]interface{}{ "type": "simple", "fragmenter": "size300", "formatter": "html", }) if err != nil { log.Fatal(err) } } func init() { config.RegistChangeCallback(Init) } func allProductIdx() error { strSql := "select id, name,price, is_recommend, picture, keywords,sale_num,putaway_time,sort_no,created_at,market_price,is_zfb,zfb_price from t_product where is_putaway = 0 and is_delete = 0;" db := util.GetSqlDB() productList := []productDB{} err := db.Select(&productList, strSql) if err != nil { log.Fatal(err) } for _, product := range productList { strSql := "select t1.name from t_detect_product t1 left join t_product_detect_product t2 on t1.id = t2.detect_product_id where t2.product_id = " + product.Id itemList := []string{} db.Select(&itemList, strSql) //strSql = "select tag_name from t_product_tag t1 left join t_tag t2 on t1.tag_id = t2.id where t1.product_id = ?" //tagList := []string{} //db.Select(&tagList, strSql, product.Id) strSql = "select city_id from v_product_city where product_id = ?" cityIds := []string{} db.Select(&cityIds, strSql, product.Id) product.Items = strings.Join(itemList, " ") //product.Tags = strings.Join(tagList, " ") product.CityIds = cityIds product.Pinyin, product.JP, _ = util.GetPinyin(product.Name) product.Keywords = product.TKeywords.String product.PutawayTime = product.TPutawayTime.String product.IsSale = product.Price != product.MarketPrice strSql = "SELECT cat_id from t_product_category_product where product_id = ?" var cateIds = []string{} db.Select(&cateIds, strSql, product.Id) product.Cates = cateIds err = engine.Index(product.Id, product) } return err }