Skip to content

Commit 52d5030

Browse files
committed
[server] add relay to retrieve messages persisted on unitdb server
1 parent 3182325 commit 52d5030

File tree

14 files changed

+77
-172
lines changed

14 files changed

+77
-172
lines changed

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,7 @@ Make use of the client by importing it in your Go client source code. For exampl
3333
3434
Unitdb supports Get, Put, Delete operations. It also supports encryption, batch operations, and writing to wildcard topics. See [usage guide](https://github.com/unit-io/unitdb/tree/master/docs/usage.md).
3535

36-
Samples are available in the examples directory for reference. To build unitdb client from latest source code use "replace" in go.mod to point to your local module.
37-
38-
```golang
39-
go mod edit -replace github.com/unit-io/unitdb-go=$GOPATH/src/github.com/unit-io/unitdb-go
40-
```
36+
Samples are available in the examples directory for reference.
4137

4238
## Clustering
4339
To bring up the Unitdb cluster start 2 or more nodes. For fault tolerance 3 nodes or more are recommended.
@@ -49,6 +45,11 @@ To bring up the Unitdb cluster start 2 or more nodes. For fault tolerance 3 node
4945

5046
Above example shows each Unitdb node running on the same host, so each node must listen on different ports. This would not be necessary if each node ran on a different host.
5147

48+
## Client Libraries
49+
Make use of officially supported client libraries to connect to unitdb server running on single node and running on a cluster.
50+
- [unitdb-go](https://github.com/unit-io/unitdb-go) Lightweight and high performance unitdb Go client library.
51+
- [unitdb-dart](https://github.com/unit-io/unitdb-dart) High performance unitdb Flutter/Dart client library.
52+
5253
## Architecture Overview
5354
The unitdb engine handles data from the point put request is received through writing data to the physical disk. Data is compressed and encrypted (if encryption is set) then written to a WAL for immediate durability. Entries are written to memdb and become immediately queryable. The memdb entries are periodically written to log files in the form of blocks.
5455

docs/utp.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ An application transport the data by uTP across network, it contains payload dat
4545
### Client
4646
A Client opens the network connection to the Server using TCP/IP, WebSocket, GRPC or other bi-direction network protocols.
4747
- Pubslihes Application Mesasges to a topic that other Clients subscribes to.
48-
- Sends a relay request to retrieve persisted Application Messages from Server.
48+
- Sends a relay request to retrieve Application Messages persisted on Server.
4949
- Subscribes to a topic to receive Application Messages.
5050
- Unsubcribe to remove a topic subscription.
5151
- Closes the network connection to the Server.
@@ -284,7 +284,7 @@ The PublishMessage contains folowing fields:
284284
A publisher can specify time to-live (TTL) when publishing an Application Message.
285285

286286
### RELAY - Relay request
287-
The RELAY Message is sent from the Client to the Server to get persisted Application Messages from server for one or more topics. Each Relay request pairs the topics with last durations. The Server sends PUBLISH Messages to the Client to forward Application Messages that were persisted by the Server for the Topics that match these Relay requests. The RELAY Message also specifies (for each request) the Last duration for which the Server can send persisted Application Messages to the Client.
287+
The RELAY Message is sent from the Client to the Server to get Application Messages persisted on server for one or more topics. Each Relay request pairs the topics with last durations. The Server sends PUBLISH Messages to the Client to forward Application Messages that were persisted by the Server for the Topics that match these Relay requests. The RELAY Message also specifies (for each request) the Last duration for which the Server can send stored Application Messages to the Client.
288288

289289
The payload contains following fields.
290290

@@ -301,7 +301,7 @@ The RelayRequest contains folowing fields:
301301
| Last | string |
302302

303303
#### Last
304-
A Client can specify Last duration (for example "1h") to retrive persisted Application Messages published to the Topic.
304+
A Client can specify Last duration (for example "1h") to retrive Application Messages published to the Topic.
305305

306306
### SUBSCRIBE - Subscribe request
307307
The SUBSCRIBE Message is sent from the Client to the Server to create one or more Subscriptions. Each Subscription registers one or more Topics for a Client. The Server sends PUBLISH Messages to the Client to forward Application Messages that were published to Topics that match these Subscriptions. The SUBSCRIBE Message also specifies (for each Subscription) the Delivery Mode with which the Server can send Application Messages to the Client.

examples/server_client/main.go

Lines changed: 0 additions & 83 deletions
This file was deleted.

go.mod

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ require (
99
github.com/gorilla/websocket v1.4.2
1010
github.com/rs/zerolog v1.21.0
1111
github.com/unit-io/bpool v0.0.0-20200906005724-1643bbf59264
12-
github.com/unit-io/unitdb-go v0.0.0-20210407101657-d9db9270f78d
1312
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
1413
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
1514
google.golang.org/grpc v1.39.0
1615
)
17-
18-
// replace github.com/unit-io/unitdb-go => /src/github.com/unit-io/unitdb-go

go.sum

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
1515
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
1616
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
1717
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
18-
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
1918
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
2019
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
2120
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -59,7 +58,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
5958
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
6059
github.com/unit-io/bpool v0.0.0-20200906005724-1643bbf59264 h1:31MK/k8NNVPI3UEUCJ8+9aEzlFxJRhawKk2gqxSbJeA=
6160
github.com/unit-io/bpool v0.0.0-20200906005724-1643bbf59264/go.mod h1:jLqAtkF257MDiAc5K8svPVUGjfig2qdIhnWs3OCDwKg=
62-
github.com/unit-io/unitdb v0.1.1/go.mod h1:B8CPnLiFt1OcRxWIj+69kNxMRuSoS7o3muBzgqLZbkk=
6361
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
6462
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
6563
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -129,7 +127,6 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
129127
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
130128
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
131129
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
132-
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
133130
google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI=
134131
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
135132
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

memdb/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Make use of the client by importing it in your Go client source code. For exampl
2323
2424
The memdb supports Get, Set, Delete operations. It also supports batch operations.
2525

26-
Samples are available in the cmd directory for reference.
26+
Samples are available in the examples directory for reference.
2727

2828
### Opening a database
2929
To open or create a new database, use the memdb.Open() function:
@@ -101,7 +101,7 @@ use DB.Delete() function to delete a key-value pair.
101101
```
102102

103103
### Batch operation
104-
Use batch operation to bulk insert records into memdb or bulk delete records from memdb. See examples under cmd/memdb folder.
104+
Use batch operation to bulk insert records into memdb or bulk delete records from memdb. See examples under examples/memdb folder.
105105

106106
#### Writing to a batch
107107
Use Batch.Put() to insert a new key-value or Batch.Delete() to delete a key-value from DB.

message/topic.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ const (
3333
TopicInvalid = uint8(iota)
3434
TopicStatic
3535
TopicWildcard
36-
TopicWildcardSymbol = '*'
37-
TopicGenericSymbol = "..."
38-
TopicSeparator = '.' // The separator character.
39-
TopicMaxDepth = 100 // Maximum depth for topic using a separator
36+
TopicWildcardSymbol = '*'
37+
TopicMultiWildcardSymbol = "..."
38+
TopicSeparator = '.' // The separator character.
39+
TopicMaxDepth = 100 // Maximum depth for topic using a separator
4040

4141
// Wildcard wildcard is hash for wildcard topic such as '*' or '...'
4242
Wildcard = uint32(857445537)
@@ -132,15 +132,6 @@ func (t *Topic) Last() (time.Time, int, bool) {
132132
return zeroTime, 0, ok
133133
}
134134

135-
// toUnix converts the time to Unix Time with validation.
136-
func toUnix(t int64) time.Time {
137-
if t == 0 {
138-
return zeroTime
139-
}
140-
141-
return time.Unix(t, 0)
142-
}
143-
144135
// getOption retrieves a Uint option.
145136
func (t *Topic) getOption(name string) (string, int, bool) {
146137
for i := 0; i < len(t.Options); i++ {
@@ -207,8 +198,6 @@ func (t *Topic) Parse(contract uint32, wildcard bool) {
207198
return
208199
}
209200
parseStaticTopic(contract, t)
210-
211-
return
212201
}
213202

214203
// parseStaticTopic attempts to parse the topic from the underlying slice.
@@ -254,10 +243,10 @@ func parseWildcardTopic(contract uint32, topic *Topic) (ok bool) {
254243
}
255244

256245
depth := uint8(0)
257-
q := []byte(TopicGenericSymbol)
246+
q := []byte(TopicMultiWildcardSymbol)
258247
if bytes.HasSuffix(topic.Topic, q) {
259248
depth++
260-
topic.Topic = bytes.TrimRight(topic.Topic, string(TopicGenericSymbol))
249+
topic.Topic = bytes.TrimRight(topic.Topic, string(TopicMultiWildcardSymbol))
261250
topic.TopicType = TopicWildcard
262251
topic.Depth = TopicMaxDepth
263252
}

server/internal/conn.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (c *_Conn) SendMessage(msg *message.Message) bool {
135135
}
136136

137137
// Check batch, relay or delay delivery.
138-
if msg.DeliveryMode == 2 || msg.DeliveryMode == 3 || msg.Delay > 0 {
138+
if msg.DeliveryMode == 2 || msg.Delay > 0 {
139139
c.batchManager.add(msg.Delay, pubMsg)
140140
return true
141141
}
@@ -180,7 +180,7 @@ func (c *_Conn) subscribe(subMsg utp.Subscribe, topic *security.Topic, sub *utp.
180180

181181
key := string(topic.Key)
182182
if key == "" {
183-
key, err = security.GenerateKey(c.clientID.Contract(), []byte(topic.Topic), security.AllowNone)
183+
key, err = security.GenerateKey(c.clientID.Contract(), topic.Topic, security.AllowNone)
184184
if err != nil {
185185
log.ErrLogger.Err(err).Str("context", "conn.subscribe")
186186
return err
@@ -302,7 +302,7 @@ func (c *_Conn) publish(pub utp.Publish, topic *security.Topic, pubMsg *utp.Publ
302302

303303
if !pub.IsForwarded && Globals.Cluster.isRemoteContract(fmt.Sprint(c.clientID.Contract())) {
304304
if err = Globals.Cluster.routeToContract(&pub, topic, message.PUBLISH, msg, c); err != nil {
305-
log.ErrLogger.Err(err).Str("context", "conn.publish").Int64("connid", int64(c.connID)).Msg("unable to publish to remote topic")
305+
log.ErrLogger.Err(err).Str("context", "conn.publish").Int64("connid", int64(c.connID)).Msg("unable to publish to a remote topic")
306306
return err
307307
}
308308
}

server/internal/db/adapter.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,25 +44,25 @@ type Adapter interface {
4444
// SSID, where first element should be a contract ID. The time resolution
4545
// for TTL will be in seconds. The function is executed synchronously and
4646
// it returns an error if some error was encountered during storage.
47-
Put(contract uint32, topic, payload []byte, ttl string) error
47+
Put(contract uint32, topic string, payload []byte, ttl string) error
4848

4949
// PutWithID is used to store a message using a pre generated ID, the SSID provided must be a full SSID
5050
// SSID, where first element should be a contract ID. The time resolution
5151
// for TTL will be in seconds. The function is executed synchronously and
5252
// it returns an error if some error was encountered during storage.
53-
PutWithID(contract uint32, messageId, topic, payload []byte, ttl string) error
53+
PutWithID(contract uint32, messageId []byte, topic string, payload []byte, ttl string) error
5454

5555
// Get performs a query and attempts to fetch last messages where
5656
// last is specified by last duration argument.
57-
Get(contract uint32, topic []byte, last string) ([][]byte, error)
57+
Get(contract uint32, topic string, last string) ([][]byte, error)
5858

5959
// NewID generate messageId that can later used to store and delete message from message store
6060
NewID() ([]byte, error)
6161

6262
// Delete is used to delete entry, the SSID provided must be a full SSID
6363
// SSID, where first element should be a contract ID. The function is executed synchronously and
6464
// it returns an error if some error was encountered during delete.
65-
Delete(contract uint32, messageId, topic []byte) error
65+
Delete(contract uint32, messageId []byte, topic string) error
6666

6767
// PutMessage is used to store a message.
6868
// it returns an error if some error was encountered during storage.

server/internal/db/unitdb/adapter.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,17 +127,17 @@ func (a *adapter) GetName() string {
127127
}
128128

129129
// Put appends the messages to the store.
130-
func (a *adapter) Put(contract uint32, topic, payload []byte, ttl string) error {
131-
entry := unitdb.NewEntry(topic, payload).WithContract(contract)
130+
func (a *adapter) Put(contract uint32, topic string, payload []byte, ttl string) error {
131+
entry := unitdb.NewEntry([]byte(topic), payload).WithContract(contract)
132132
if ttl != "" {
133133
entry.WithTTL(ttl)
134134
}
135135
return a.db.PutEntry(entry)
136136
}
137137

138138
// PutWithID appends the messages to the store using a pre generated messageId.
139-
func (a *adapter) PutWithID(contract uint32, messageId, topic, payload []byte, ttl string) error {
140-
entry := unitdb.NewEntry(topic, payload).WithContract(contract).WithID(messageId)
139+
func (a *adapter) PutWithID(contract uint32, messageId []byte, topic string, payload []byte, ttl string) error {
140+
entry := unitdb.NewEntry([]byte(topic), payload).WithContract(contract).WithID(messageId)
141141
if ttl != "" {
142142
entry.WithTTL(ttl)
143143
}
@@ -146,9 +146,9 @@ func (a *adapter) PutWithID(contract uint32, messageId, topic, payload []byte, t
146146

147147
// Get performs a query and attempts to fetch last messages where
148148
// last is specified by last duration argument.
149-
func (a *adapter) Get(contract uint32, topic []byte, last string) (matches [][]byte, err error) {
149+
func (a *adapter) Get(contract uint32, topic string, last string) (matches [][]byte, err error) {
150150
// Iterating over key/value pairs.
151-
query := unitdb.NewQuery(topic).WithContract(contract)
151+
query := unitdb.NewQuery([]byte(topic)).WithContract(contract)
152152
if last != "" {
153153
query.WithLast(last)
154154
}
@@ -166,8 +166,8 @@ func (a *adapter) NewID() ([]byte, error) {
166166
}
167167

168168
// Put appends the messages to the store.
169-
func (a *adapter) Delete(contract uint32, messageId, topic []byte) error {
170-
entry := unitdb.NewEntry(topic, nil)
169+
func (a *adapter) Delete(contract uint32, messageId []byte, topic string) error {
170+
entry := unitdb.NewEntry([]byte(topic), nil)
171171
entry.WithContract(contract)
172172
return a.db.DeleteEntry(entry.WithID(messageId))
173173
}

0 commit comments

Comments
 (0)