//取自动排序大小
private Int64 SeqInt(int size){var jobs = db.GetCollection("Counters");var query = Query.And(Query.EQ("_id", "TaoBaoItem"));var sortBy = SortBy.Descending("next");var update = Update.Inc("next", (Int64)size);//.Push("new", true).Push("upsert", true);var result = jobs.FindAndModify(query, sortBy, update, true, true);BsonDocument chosenJob = result.ModifiedDocument;return chosenJob.GetValue("next").AsInt64;}
////// 批量查找存在的数据/// /// ///private MongoCursor QueryItems(List cacheitems){Stopwatch sw = new Stopwatch();sw.Start();BsonArray dd;dd = new BsonArray();//批量插询foreach (var item in cacheitems){dd.Add(item.Iid);}var query = Query.In("Iid", dd);IMongoQuery q = query;// var count = taoBaoItemRepo.Collection.Count(q);var sort = new SortByDocument { { "_id", -1 } };var x = taoBaoItemRepo.Collection.Find(q).SetSortOrder(sort);sw.Stop();WorkMsg("自写查询{0}条耗时:{1}\r\n".FormatWith(cacheitems.Count, sw.ElapsedMilliseconds));return x;}
业务部份
////// 去重入库线程 /// public void Work() { while (true) { Listcacheitems = ItemCache.GetLItem(); if (cacheitems != null) { Stopwatch sw = new Stopwatch(); sw.Start(); MongoCursor rz = QueryItems(cacheitems); foreach (TaoBaoItem doc in rz) { HashCache.Add(doc.Iid); } int i = 0; foreach (var item in cacheitems) { i++; if (HashCache.Add(item.Iid)) { Items.Add(item); //temp计数 并作大于100的判断 if (Items.Count >= 100 || (i == cacheitems.Count && ItemCache.Count() == 0)) { var seqid = SeqInt(Items.Count) ; seqid = seqid - Items.Count; foreach (var x in Items) { x.DocId = seqid; seqid++; } Stopwatch sw1 = new Stopwatch(); sw1.Start(); taoBaoItemRepo.Add(Items); sw1.Stop(); WorkMsg("插入{0}条宝贝耗时:{1}\t库内共有宝贝{2}\t".FormatWith(Items.Count, sw1.ElapsedMilliseconds, taoBaoItemRepo.Count())); Items.Clear(); } } } sw.Stop(); WorkMsg("{0}条记录处理[去重+入库]总耗时:{1}\r\n".FormatWith(cacheitems.Count, sw.ElapsedMilliseconds)); } else { Thread.Sleep(5000); } } }