@@ -29,6 +29,7 @@ import (
29
29
"net"
30
30
"reflect"
31
31
"strconv"
32
+ "strings"
32
33
"sync"
33
34
"sync/atomic"
34
35
"time"
@@ -555,7 +556,7 @@ func (context *ContextImpl) refreshSessions() {
555
556
session := entry .Val
556
557
log .Debugf ("refreshing session for %s" , key )
557
558
558
- if s , err := context .refreshSession (* session . ID ); err != nil {
559
+ if s , err := context .refreshSession (session ); err != nil {
559
560
log .WithError (err ).Errorf ("failed to refresh session for %s" , key )
560
561
toDelete = append (toDelete , * session .ID )
561
562
} else {
@@ -881,7 +882,7 @@ func (context *ContextImpl) DialWithOptions(serviceName string, options *DialOpt
881
882
}
882
883
883
884
var refreshErr error
884
- if _ , refreshErr = context .refreshSession (* session . ID ); refreshErr == nil {
885
+ if _ , refreshErr = context .refreshSession (session ); refreshErr == nil {
885
886
// if the session wasn't expired, no reason to try again, return the failure
886
887
return nil , errors .Wrapf (err , "unable to dial service '%s'" , serviceName )
887
888
}
@@ -1034,7 +1035,7 @@ func (context *ContextImpl) listenSession(service *rest_model.ServiceDetail, opt
1034
1035
func (context * ContextImpl ) getEdgeRouterConn (session * rest_model.SessionDetail , options edge.ConnOptions ) (edge.RouterConn , error ) {
1035
1036
logger := pfxlog .Logger ().WithField ("sessionId" , * session .ID )
1036
1037
1037
- if refreshedSession , err := context .refreshSession (* session . ID ); err != nil {
1038
+ if refreshedSession , err := context .refreshSession (session ); err != nil {
1038
1039
target := & rest_session.DetailSessionNotFound {}
1039
1040
if errors .As (err , & target ) {
1040
1041
sessionKey := fmt .Sprintf ("%s:%s" , session .Service .ID , * session .Type )
@@ -1055,9 +1056,11 @@ func (context *ContextImpl) getEdgeRouterConn(session *rest_model.SessionDetail,
1055
1056
var bestER edge.RouterConn
1056
1057
var unconnected []* rest_model.SessionEdgeRouter
1057
1058
for _ , edgeRouter := range session .EdgeRouters {
1058
- for _ , routerUrl := range edgeRouter .Urls {
1059
- if er , found := context .routerConnections .Get (routerUrl ); found {
1060
- h := context .metrics .Histogram ("latency." + routerUrl ).(metrics2.Histogram )
1059
+ for proto , addr := range edgeRouter .SupportedProtocols {
1060
+ addr = strings .Replace (addr , "://" , ":" , 1 )
1061
+ edgeRouter .SupportedProtocols [proto ] = addr
1062
+ if er , found := context .routerConnections .Get (addr ); found {
1063
+ h := context .metrics .Histogram ("latency." + addr ).(metrics2.Histogram )
1061
1064
if h .Mean () < float64 (bestLatency ) {
1062
1065
bestLatency = time .Duration (int64 (h .Mean ()))
1063
1066
bestER = er
@@ -1074,9 +1077,9 @@ func (context *ContextImpl) getEdgeRouterConn(session *rest_model.SessionDetail,
1074
1077
}
1075
1078
1076
1079
for _ , edgeRouter := range unconnected {
1077
- for _ , routerUrl := range edgeRouter .Urls {
1078
- if context .options .isEdgeRouterUrlAccepted (routerUrl ) {
1079
- go context .connectEdgeRouter (* edgeRouter .Name , routerUrl , ch )
1080
+ for _ , addr := range edgeRouter .SupportedProtocols {
1081
+ if context .options .isEdgeRouterUrlAccepted (addr ) {
1082
+ go context .connectEdgeRouter (* edgeRouter .Name , addr , ch )
1080
1083
}
1081
1084
}
1082
1085
}
@@ -1373,13 +1376,21 @@ func (context *ContextImpl) createSession(service *rest_model.ServiceDetail, ses
1373
1376
return session , nil
1374
1377
}
1375
1378
1376
- func (context * ContextImpl ) refreshSession (id string ) (* rest_model.SessionDetail , error ) {
1377
- session , err := context .CtrlClt .GetSession (id )
1379
+ func (context * ContextImpl ) refreshSession (session * rest_model.SessionDetail ) (* rest_model.SessionDetail , error ) {
1380
+ var refreshedSession * rest_model.SessionDetail
1381
+ var err error
1382
+ if strings .HasPrefix (* session .Token , apis .JwtTokenPrefix ) {
1383
+ refreshedSession , err = context .CtrlClt .GetSessionFromJwt (* session .Token )
1384
+ } else {
1385
+ refreshedSession , err = context .CtrlClt .GetSession (* session .ID )
1386
+ }
1387
+
1378
1388
if err != nil {
1379
1389
return nil , err
1380
1390
}
1381
- context .cacheSession ("refresh" , session )
1382
- return session , nil
1391
+
1392
+ context .cacheSession ("refresh" , refreshedSession )
1393
+ return refreshedSession , nil
1383
1394
}
1384
1395
1385
1396
func (context * ContextImpl ) cacheSession (op string , session * rest_model.SessionDetail ) {
@@ -1609,7 +1620,8 @@ func (mgr *listenerManager) refreshSession() {
1609
1620
return
1610
1621
}
1611
1622
1612
- session , err := mgr .context .refreshSession (* mgr .session .ID )
1623
+ session , err := mgr .context .refreshSession (mgr .session )
1624
+
1613
1625
if err != nil {
1614
1626
var target error = & rest_session.DetailSessionNotFound {}
1615
1627
if errors .As (err , & target ) {
@@ -1630,7 +1642,7 @@ func (mgr *listenerManager) refreshSession() {
1630
1642
}
1631
1643
}
1632
1644
1633
- session , err = mgr .context .refreshSession (* mgr .session . ID )
1645
+ session , err = mgr .context .refreshSession (mgr .session )
1634
1646
if err != nil {
1635
1647
target = & rest_session.DetailSessionUnauthorized {}
1636
1648
if errors .As (err , & target ) {
0 commit comments