Skip to content

Commit 82f9ff7

Browse files
committed
Powered by distributed architecture.
1 parent 2463c4e commit 82f9ff7

File tree

13 files changed

+159
-31
lines changed

13 files changed

+159
-31
lines changed

crawler-distributed/config/config.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,22 @@ const (
99

1010
// CrawlService
1111
CrawlServiceRpc = "CrawlService.Process"
12+
13+
14+
// worker0
15+
Worker0Host = ":9000"
16+
17+
// worker1
18+
Worker1Host = ":9001"
19+
20+
21+
// itemsaver0
22+
ItemSaver0Host = ":9002"
23+
24+
// itemsaver1
25+
ItemSaver1Host = ":9003"
26+
27+
// ItemSaverService.Save
28+
29+
ItemSaverServiceSave = "ItemSaverService.Save"
1230
)

crawler-distributed/main.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package main
2+
3+
import (
4+
"polaris/crawler-distributed/config"
5+
itemsaver "polaris/crawler-distributed/persist/client"
6+
worker "polaris/crawler-distributed/worker/client"
7+
"polaris/crawler/engine"
8+
"polaris/crawler/scheduler"
9+
"polaris/crawler/zhenai/parser"
10+
)
11+
12+
func main() {
13+
// run condition:
14+
// run itemsaver.go first
15+
// then to run worker.go
16+
itemChan, err := itemsaver.ItemSaver(config.ItemSaver0Host)
17+
if err != nil {
18+
panic(err)
19+
}
20+
21+
processor, err := worker.CreateProcessor()
22+
if err != nil {
23+
panic(err)
24+
}
25+
e := engine.ConcurrentEngine{
26+
Scheduler: &scheduler.QueuedScheduler{},
27+
WorkerCount: 100,
28+
ItemChan: itemChan,
29+
RequestProcessor: processor,
30+
}
31+
32+
e.Run(engine.Request{
33+
Url: "http://www.zhenai.com/zhenghun",
34+
Parser: engine.NewFuncParser(parser.ParseCityList, config.ParseCityList),
35+
})
36+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package client
2+
3+
import (
4+
"log"
5+
"polaris/crawler-distributed/config"
6+
"polaris/crawler-distributed/rpc"
7+
"polaris/crawler/engine"
8+
)
9+
10+
func ItemSaver(host string) (chan engine.Item, error) {
11+
client, err := rpcsupport.NewClient(host)
12+
if err != nil {
13+
return nil, err
14+
}
15+
out := make(chan engine.Item)
16+
go func() {
17+
itemCount := 0
18+
for {
19+
item := <-out
20+
log.Printf("Item Saver: got item #%d: %v", itemCount, item)
21+
itemCount++
22+
result := ""
23+
err = client.Call(config.ItemSaverServiceSave, item, &result)
24+
if err != nil {
25+
log.Printf("Item Saver: error saving item: %v, error: %v", item, err)
26+
}
27+
}
28+
}()
29+
return out, nil
30+
31+
}

crawler-distributed/persist/rpc.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package persist
22

33
import (
4+
"fmt"
5+
"github.com/gpmgo/gopm/modules/log"
46
"polaris/crawler/engine"
57
"polaris/crawler/persist"
68
)
@@ -14,7 +16,11 @@ func (s *ItemSaverService) Save(item engine.Item, result *string) error {
1416
const index = "polaris"
1517
err := persist.Save(item, index)
1618
if err == nil {
19+
fmt.Println("saving item success")
1720
*result = "ok"
21+
} else {
22+
log.Error("saving item fail")
23+
*result = "failed"
1824
}
1925
return err
2026
}

crawler-distributed/persist/server/client_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"polaris/crawler-distributed/config"
45
"polaris/crawler-distributed/rpc"
56
"polaris/crawler/engine"
67
"polaris/crawler/model"
@@ -10,14 +11,12 @@ import (
1011

1112
func TestItemSaver(t *testing.T) {
1213

13-
const host = ":8080"
14-
1514
// start ItemSaverServer
16-
go serveRpc(host, "polaris")
15+
go serveRpc(config.ItemSaver0Host, "polaris")
1716

1817
// start ItemSaverClient
1918
time.Sleep(time.Second)
20-
client, err := rpcsupport.NewClient(host)
19+
client, err := rpcsupport.NewClient(config.ItemSaver0Host)
2120
if err != nil {
2221
panic(err)
2322
}

crawler-distributed/persist/server/main.go renamed to crawler-distributed/persist/server/itemsaver.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"polaris/crawler-distributed/config"
45
"polaris/crawler-distributed/persist"
56
"polaris/crawler-distributed/rpc"
67
)
@@ -11,3 +12,7 @@ func serveRpc(host, index string) error {
1112
Index: index,
1213
})
1314
}
15+
16+
func main() {
17+
serveRpc(config.ItemSaver0Host, "polaris")
18+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"polaris/crawler-distributed/config"
6+
"polaris/crawler-distributed/rpc"
7+
"polaris/crawler-distributed/worker"
8+
"polaris/crawler/engine"
9+
)
10+
11+
func CreateProcessor() (engine.Processor, error) {
12+
client, err := rpcsupport.NewClient(fmt.Sprintf("%s", config.Worker0Host))
13+
if err != nil {
14+
return nil, err
15+
}
16+
17+
return func(request engine.Request) (engine.ParseResult, error) {
18+
sReq := worker.SerializedRequest(request)
19+
var sResult worker.ParseResult
20+
err := client.Call(config.CrawlServiceRpc, sReq, &sResult)
21+
if err != nil {
22+
return engine.ParseResult{}, err
23+
}
24+
return worker.DeserializeParseResult(sResult), nil
25+
}, nil
26+
}

crawler-distributed/worker/server/client_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,12 @@ import (
1010
)
1111

1212
func TestCrawlService(t *testing.T) {
13-
const host = ":8080"
1413
go rpcsupport.ServeRpc(
15-
host, worker.CrawlService{})
14+
config.Worker0Host, worker.CrawlService{})
1615
time.Sleep(time.Second)
1716

1817
client, err := rpcsupport.NewClient(
19-
host)
18+
config.Worker0Host)
2019
if err != nil {
2120
panic(err)
2221
}

crawler-distributed/worker/server/main.go

Lines changed: 0 additions & 10 deletions
This file was deleted.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package main
2+
3+
import (
4+
"polaris/crawler-distributed/config"
5+
"polaris/crawler-distributed/rpc"
6+
"polaris/crawler-distributed/worker"
7+
)
8+
9+
func main() {
10+
err := rpcsupport.ServeRpc(config.Worker0Host, worker.CrawlService{})
11+
if err != nil{
12+
panic(err)
13+
}
14+
}

crawler/engine/concurrent.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package engine
22

33
type ConcurrentEngine struct {
4-
Scheduler Scheduler
5-
WorkerCount int
6-
ItemChan chan Item
4+
Scheduler Scheduler
5+
WorkerCount int
6+
ItemChan chan Item
7+
RequestProcessor Processor
78
}
89

10+
type Processor func(request Request) (ParseResult, error)
11+
912
type Scheduler interface {
1013
Submit(Request)
1114
WorkChan() chan Request
@@ -22,7 +25,7 @@ func (e *ConcurrentEngine) Run(seeds ...Request) {
2225
// Create two queues waiting for the task to arrive.
2326
e.Scheduler.Run()
2427
for i := 0; i < e.WorkerCount; i++ {
25-
createWorker(e.Scheduler.WorkChan(), out, e.Scheduler)
28+
e.createWorker(e.Scheduler.WorkChan(), out, e.Scheduler)
2629
}
2730

2831
for _, r := range seeds {
@@ -49,13 +52,14 @@ func (e *ConcurrentEngine) Run(seeds ...Request) {
4952
}
5053
}
5154

52-
func createWorker(in chan Request, out chan ParseResult, read ReadyNotifier) {
55+
func (e *ConcurrentEngine) createWorker(
56+
in chan Request, out chan ParseResult, read ReadyNotifier) {
5357
go func() {
5458
for {
5559
// Work is ready.
5660
read.WorkerReady(in)
5761
request := <-in
58-
result, err := Worker(request)
62+
result, err := e.RequestProcessor(request)
5963
if err != nil {
6064
continue
6165
}
@@ -64,7 +68,6 @@ func createWorker(in chan Request, out chan ParseResult, read ReadyNotifier) {
6468
}()
6569
}
6670

67-
6871
var visitedUrls = make(map[string]bool)
6972

7073
func isDuplicate(url string) bool {

crawler/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ const shanghai = "http://www.zhenai.com/zhenghun/shanghai"
1313
func main() {
1414

1515
e := engine.ConcurrentEngine{
16-
Scheduler: &scheduler.QueuedScheduler{},
17-
WorkerCount: 100,
18-
ItemChan: persist.ItemSaver(),
16+
Scheduler: &scheduler.QueuedScheduler{},
17+
WorkerCount: 100,
18+
ItemChan: persist.ItemSaver(),
19+
RequestProcessor: engine.Worker,
1920
}
2021

2122
/*e.Run(engine.Request{

crawler/zhenai/parser/citylist_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ func TestParseCityList(t *testing.T) {
1111
panic(err)
1212
}
1313

14-
result := ParseCityList(contents)
14+
result := ParseCityList(contents, "")
1515

1616
const resultSize = 470
1717
expectedUrls := [] string{
@@ -20,7 +20,7 @@ func TestParseCityList(t *testing.T) {
2020
"http://www.zhenai.com/zhenghun/alashanmeng",
2121
}
2222

23-
expectedCites := [] string{
23+
expectedCities := [] string{
2424
"阿坝", "阿克苏", "阿拉善盟",
2525
}
2626

@@ -39,10 +39,10 @@ func TestParseCityList(t *testing.T) {
3939
resultSize, len(result.Items))
4040
}
4141

42-
for i, city := range expectedCites {
42+
/*for i, city := range expectedCities {
4343
if city != result.Items[i].(string) {
4444
t.Errorf("expected city %s, but was %s", city, result.Items[i])
4545
}
46-
}
46+
}*/
4747
// fmt.Printf("%s \n", contents)
4848
}

0 commit comments

Comments
 (0)