Skip to content

feat: implement concurrent workers #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
redis-migrator
dist
.test/
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ Supported Platforms:-

For installation on debian and redhat based system, `.deb` and `.rpm` packages can be used.

For installing on MacOS system, use brew:-
For installing on MacOS system, use brew:

```shell
brew install redis-migrator
```

Install via `go install`
```shell
go install github.com/opstree/redis-migration@latest
```

### Configuration

For using redis-migrator, we have to create a configuration file and provide some needful information to it. An example configuration file will look like this:-
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package client
import (
"fmt"
"github.com/gomodule/redigo/redis"
"redis-migrator/config"
"github.com/opstree/redis-migration/config"
)

func generateClient(connectionURL string, redisPassword string, redisDatabase int) (redis.Conn, error) {
Expand Down
12 changes: 7 additions & 5 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package cmd

import (
"github.com/opstree/redis-migration/config"
"github.com/opstree/redis-migration/migrator"
"github.com/spf13/cobra"
"redis-migrator/config"
"redis-migrator/migrator"
)

var migrateCmd = &cobra.Command{
Use: "migrate",
Short: "Runs redis-migrator to run migration",
Long: `Runs redis-migrator to run migration`,
Run: func(cmd *cobra.Command, args []string) {
runMigration()
if err := runMigration(); err != nil {
panic(err)
}
},
}

Expand All @@ -20,7 +22,7 @@ func init() {
rootCmd.AddCommand(migrateCmd)
}

func runMigration() {
func runMigration() error {
data := config.ParseConfig(configFilePath)
migrator.MigrateRedisData(data)
return migrator.MigrateRedisData(data)
}
20 changes: 12 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
module redis-migrator
module github.com/opstree/redis-migration

go 1.15
go 1.21

require (
github.com/go-redis/redis/v8 v8.4.2
github.com/gomodule/redigo v1.8.3
github.com/mitchellh/mapstructure v1.1.2
github.com/sirupsen/logrus v1.7.0
github.com/spf13/cobra v1.1.1
github.com/gomodule/redigo v1.8.9
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/sys v0.13.0 // indirect
)
76 changes: 24 additions & 52 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"fmt"
"redis-migrator/cmd"
"github.com/opstree/redis-migration/cmd"
)

var (
Expand Down
100 changes: 73 additions & 27 deletions migrator/migrator.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,92 @@
package migrator

import (
"errors"
"fmt"
"github.com/gomodule/redigo/redis"
"github.com/opstree/redis-migration/client"
"github.com/opstree/redis-migration/config"
"github.com/sirupsen/logrus"
"redis-migrator/client"
"redis-migrator/config"
)

// MigrateRedisData is the function to migrate keys from old to new redis
func MigrateRedisData(redConfig config.Configuration) {
for _, database := range redConfig.Databases {
logrus.Debugf("Executing migrator for database: %v", database)
oldRedisClient, err := client.OldRedisClient(redConfig, database)
if err != nil {
logrus.Errorf("Error while connecting with redis %v", err)
func MigrateRedisData(redConfig config.Configuration) error {
concurrentWorkers := max(1, redConfig.ConcurrentWorkers)
if concurrentWorkers > len(redConfig.Databases) {
concurrentWorkers = len(redConfig.Databases)
}
logrus.Infof("Migrating with %d concurrent processes", concurrentWorkers)
errCh := make(chan error, len(redConfig.Databases))
defer close(errCh)
databaseCh := make(chan int, concurrentWorkers)
// Schedule task for workers
go func() {
for _, database := range redConfig.Databases {
databaseCh <- database
}
newRedisClient, err := client.NewRedisClient(redConfig, database)
if err != nil {
logrus.Errorf("Error while connecting with redis %v", err)
close(databaseCh)
}()

for i := 0; i < concurrentWorkers; i++ {
go func() {
var threadErr error
defer func() { errCh <- threadErr }()

for db := range databaseCh {
logrus.Infof("Migrating database: %d", db)
if err := migrateDB(redConfig, db); err != nil {
threadErr = err
return
}
logrus.Infof("Migrated database: %d", db)
}
}()
}

for i := 0; i < len(redConfig.Databases); i++ {
if err := <-errCh; err != nil {
return err
}
keys, err := redis.Strings(oldRedisClient.Do("KEYS", "*"))
}

return nil
}

func migrateDB(redConfig config.Configuration, db int) error {
oldRedisClient, err := client.OldRedisClient(redConfig, db)
if err != nil {
return fmt.Errorf("[DB %d] Error while connecting with redis %v", db, err)
}
newRedisClient, err := client.NewRedisClient(redConfig, db)
if err != nil {
return fmt.Errorf("[DB %d] Error while connecting with redis %v", db, err)
}
keys, err := redis.Strings(oldRedisClient.Do("KEYS", "*"))
if err != nil {
return fmt.Errorf("[DB %d] Error while listing redis keys %v", db, err)
}
logrus.Infof("[DB %d] Migrating %d keys", db, len(keys))
for i, key := range keys {
keyType, err := redis.String(oldRedisClient.Do("TYPE", key))
if err != nil {
logrus.Errorf("Error while listing redis keys %v", err)
return fmt.Errorf("[DB %d] Not able to get the key type %s: %v", db, key, err)
}
for _, key := range keys {
keyType, err := redis.String(oldRedisClient.Do("TYPE", key))
if err != nil {
logrus.Errorf("Not able to get the key type %s: %v", key, err)
}
switch keyType {
case "string":
migrateStringKeys(oldRedisClient, newRedisClient, key)
case "hash":
migrateHashKeys(oldRedisClient, newRedisClient, key)
case "list":
migarteListKeys(oldRedisClient, newRedisClient, key)
}
switch keyType {
case "string":
migrateStringKeys(oldRedisClient, newRedisClient, key)
case "hash":
migrateHashKeys(oldRedisClient, newRedisClient, key)
case "list":
migrateListKeys(oldRedisClient, newRedisClient, key)
default:
return errors.New(fmt.Sprintf("[DB %d] key type is not supported: %s", db, keyType))
}
logrus.Debugf("[DB %d] Migrated %d/%d", db, i, len(keys))
}
return nil
}

func migarteListKeys(oldClient redis.Conn, newClient redis.Conn, key string) {
func migrateListKeys(oldClient redis.Conn, newClient redis.Conn, key string) {
value, err := redis.Strings(oldClient.Do("LPOP", key))
if err != nil {
logrus.Errorf("Not able to get the value for key %s: %v", key, err)
Expand Down