Skip to content

Commit f305bc4

Browse files
committed
Using rpc pool.
1 parent 82f9ff7 commit f305bc4

File tree

8 files changed

+89
-24
lines changed

8 files changed

+89
-24
lines changed

crawler-distributed/main.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,36 @@
11
package main
22

33
import (
4+
"flag"
45
"polaris/crawler-distributed/config"
56
itemsaver "polaris/crawler-distributed/persist/client"
7+
"polaris/crawler-distributed/pool"
68
worker "polaris/crawler-distributed/worker/client"
79
"polaris/crawler/engine"
810
"polaris/crawler/scheduler"
911
"polaris/crawler/zhenai/parser"
12+
"strings"
1013
)
1114

15+
var (
16+
itemSaverHost = flag.String("itemsaver_host", "",
17+
"itemsaver_host")
18+
19+
workerHosts = flag.String("worker_host", "",
20+
"worker_host(comma separated)")
21+
)
1222
func main() {
1323
// run condition:
1424
// run itemsaver.go first
1525
// then to run worker.go
16-
itemChan, err := itemsaver.ItemSaver(config.ItemSaver0Host)
26+
flag.Parse()
27+
itemChan, err := itemsaver.ItemSaver(*itemSaverHost)
1728
if err != nil {
1829
panic(err)
1930
}
2031

21-
processor, err := worker.CreateProcessor()
32+
clientPool := pool.CreateClientPool(strings.Split(*workerHosts, ","))
33+
processor, err := worker.CreateProcessor(clientPool)
2234
if err != nil {
2335
panic(err)
2436
}
@@ -34,3 +46,4 @@ func main() {
3446
Parser: engine.NewFuncParser(parser.ParseCityList, config.ParseCityList),
3547
})
3648
}
49+

crawler-distributed/persist/rpc.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@ type ItemSaverService struct {
1111
Index string
1212
}
1313

14-
1514
func (s *ItemSaverService) Save(item engine.Item, result *string) error {
16-
const index = "polaris"
15+
const index = "polaris"
1716
err := persist.Save(item, index)
1817
if err == nil {
19-
fmt.Println("saving item success")
18+
fmt.Printf("saving item#%v \n", item.Url)
2019
*result = "ok"
2120
} else {
22-
log.Error("saving item fail")
21+
log.Error("saving item#%v fail", item)
2322
*result = "failed"
2423
}
2524
return err

crawler-distributed/persist/server/itemsaver.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package main
22

33
import (
4-
"polaris/crawler-distributed/config"
4+
"flag"
5+
"fmt"
6+
"log"
57
"polaris/crawler-distributed/persist"
68
"polaris/crawler-distributed/rpc"
79
)
@@ -13,6 +15,14 @@ func serveRpc(host, index string) error {
1315
})
1416
}
1517

18+
var port = flag.Int("port", 0,
19+
"The port for me to listen on")
20+
1621
func main() {
17-
serveRpc(config.ItemSaver0Host, "polaris")
22+
flag.Parse()
23+
if *port == 0 {
24+
log.Printf("Must specify a port ")
25+
return
26+
}
27+
serveRpc(fmt.Sprintf(":%d", *port), "polaris")
1828
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package pool
2+
3+
import (
4+
"log"
5+
"net/rpc"
6+
"polaris/crawler-distributed/rpc"
7+
)
8+
9+
func CreateClientPool(hosts []string) chan *rpc.Client {
10+
11+
var clients []*rpc.Client
12+
for _, h := range hosts{
13+
client, err := rpcsupport.NewClient(h)
14+
if err == nil {
15+
clients = append(clients, client)
16+
log.Printf("Connected to %s", h)
17+
} else {
18+
log.Printf("Error connecting to %s: %v",
19+
h, err)
20+
}
21+
}
22+
23+
24+
out := make(chan *rpc.Client)
25+
go func() {
26+
for {
27+
for _, client := range clients {
28+
out <- client
29+
}
30+
}
31+
}()
32+
return out
33+
}

crawler-distributed/rpc/rpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func ServeRpc(host string, service interface{}) error {
1313
if err != nil {
1414
return err
1515
}
16-
16+
log.Printf("Listen on %s", host)
1717
for {
1818
conn, err := listener.Accept()
1919
if err != nil {

crawler-distributed/worker/client/worker.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
package client
22

33
import (
4-
"fmt"
4+
"net/rpc"
55
"polaris/crawler-distributed/config"
6-
"polaris/crawler-distributed/rpc"
76
"polaris/crawler-distributed/worker"
87
"polaris/crawler/engine"
98
)
109

11-
func CreateProcessor() (engine.Processor, error) {
12-
client, err := rpcsupport.NewClient(fmt.Sprintf("%s", config.Worker0Host))
13-
if err != nil {
14-
return nil, err
15-
}
10+
func CreateProcessor(
11+
pool chan *rpc.Client) (engine.Processor, error) {
1612

1713
return func(request engine.Request) (engine.ParseResult, error) {
1814
sReq := worker.SerializedRequest(request)
1915
var sResult worker.ParseResult
20-
err := client.Call(config.CrawlServiceRpc, sReq, &sResult)
16+
17+
c := <-pool
18+
err := c.Call(config.CrawlServiceRpc, sReq, &sResult)
2119
if err != nil {
2220
return engine.ParseResult{}, err
2321
}
Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
package main
22

33
import (
4-
"polaris/crawler-distributed/config"
4+
"flag"
5+
"fmt"
6+
"log"
57
"polaris/crawler-distributed/rpc"
68
"polaris/crawler-distributed/worker"
79
)
810

11+
var port = flag.Int("port", 0,
12+
"The port for me to listen on")
13+
914
func main() {
10-
err := rpcsupport.ServeRpc(config.Worker0Host, worker.CrawlService{})
11-
if err != nil{
15+
flag.Parse()
16+
if *port == 0 {
17+
log.Printf("Must specify a port ")
18+
return
19+
}
20+
err := rpcsupport.ServeRpc(fmt.Sprintf(
21+
":%d", *port), worker.CrawlService{})
22+
if err != nil {
1223
panic(err)
1324
}
1425
}

crawler/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"polaris/crawler-distributed/config"
45
"polaris/crawler/engine"
56
"polaris/crawler/persist"
67
"polaris/crawler/scheduler"
@@ -19,16 +20,16 @@ func main() {
1920
RequestProcessor: engine.Worker,
2021
}
2122

22-
/*e.Run(engine.Request{
23+
e.Run(engine.Request{
2324
Url: seed,
2425
Parser: engine.NewFuncParser(
2526
parser.ParseCityList,
2627
config.ParseCityList),
27-
})*/
28-
e.Run(engine.Request{
28+
})
29+
/*e.Run(engine.Request{
2930
Url: shanghai,
3031
Parser: engine.NewFuncParser(
3132
parser.ParseCity,
3233
"ParseCity"),
33-
})
34+
})*/
3435
}

0 commit comments

Comments
 (0)