Skip to content

Allow to run with an externally created session #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,58 @@ private MigrationResolver createMigrationResolver() {
}

public int migrate() {
return execute(new Action<Integer>() {
return execute(migrateAction());
}

public int migrate(Session session) {
return execute(migrateAction(), session);
}

public MigrationInfoService info() {
return execute(infoAction());
}

public MigrationInfoService info(Session session) {
return execute(infoAction(), session);
}

public void validate() {
String validationError = execute(validationAction());

if (validationError != null) {
throw new CassandraMigrationException("Validation failed. " + validationError);
}
}

public void validate(Session session) {
String validationError = execute(validationAction(), session);

if (validationError != null) {
throw new CassandraMigrationException("Validation failed. " + validationError);
}
}

public void baseline() {
//TODO
throw new NotImplementedException();
}

private Action<Integer> migrateAction() {
return new Action<Integer>() {
public Integer execute(Session session) {
new Initialize().run(session, keyspace, MigrationVersion.CURRENT.getTable());

MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
Migrate migrate = new Migrate(migrationResolver, configs.getTarget(), schemaVersionDAO, session,
keyspace.getCluster().getUsername(), configs.isAllowOutOfOrder());

return migrate.run();
}
});
};
}

public MigrationInfoService info() {
return execute(new Action<MigrationInfoService>() {
private Action<MigrationInfoService> infoAction() {
return new Action<MigrationInfoService>() {
public MigrationInfoService execute(Session session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
Expand All @@ -90,28 +126,19 @@ public MigrationInfoService execute(Session session) {

return migrationInfoService;
}
});
};
}

public void validate() {
String validationError = execute(new Action<String>() {
@Override
public String execute(Session session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false);
return validate.run();
}
});

if (validationError != null) {
throw new CassandraMigrationException("Validation failed. " + validationError);
}
}

public void baseline() {
//TODO
throw new NotImplementedException();
private Action<String> validationAction() {
return new Action<String>() {
@Override
public String execute(Session session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false);
return validate.run();
}
};
}

private String getConnectionInfo(Metadata metadata) {
Expand Down Expand Up @@ -142,6 +169,9 @@ <T> T execute(Action<T> action) {
if (null == keyspace.getCluster())
throw new IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured.");

if (null == keyspace.getName() || keyspace.getName().trim().length() == 0)
throw new IllegalArgumentException("Keyspace name not specified.");

com.datastax.driver.core.Cluster.Builder builder = new com.datastax.driver.core.Cluster.Builder();
builder.addContactPoints(keyspace.getCluster().getContactpoints()).withPort(keyspace.getCluster().getPort());
if (null != keyspace.getCluster().getUsername() && !keyspace.getCluster().getUsername().trim().isEmpty()) {
Expand All @@ -158,8 +188,6 @@ <T> T execute(Action<T> action) {
LOG.info(getConnectionInfo(metadata));

session = cluster.newSession();
if (null == keyspace.getName() || keyspace.getName().trim().length() == 0)
throw new IllegalArgumentException("Keyspace not specified.");
List<KeyspaceMetadata> keyspaces = metadata.getKeyspaces();
boolean keyspaceExists = false;
for (KeyspaceMetadata keyspaceMetadata : keyspaces) {
Expand All @@ -171,24 +199,29 @@ <T> T execute(Action<T> action) {
else
throw new CassandraMigrationException("Keyspace: " + keyspace.getName() + " does not exist.");

result = action.execute(session);
result = execute(action, session);
} finally {
if (null != session && !session.isClosed())
try {
session.close();
} catch(Exception e) {
} catch (Exception e) {
LOG.warn("Error closing Cassandra session");
}
if (null != cluster && !cluster.isClosed())
try {
cluster.close();
} catch(Exception e) {
} catch (Exception e) {
LOG.warn("Error closing Cassandra cluster");
}
}
return result;
}

<T> T execute(Action<T> action, Session session) {
T result = action.execute(session);
return result;
}

interface Action<T> {
T execute(Session session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.contrastsecurity.cassandra.migration.info.MigrationInfoService;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import org.junit.Assert;
Expand Down Expand Up @@ -178,6 +179,38 @@ public void testValidate() {
}
}

@Test
public void testValidateWithSession() {
// apply migration scripts
String[] scriptsLocations = { "migration/integ", "migration/integ/java" };
Session session = getSession();
CassandraMigration cm = new CassandraMigration();
cm.getConfigs().setScriptsLocations(scriptsLocations);
cm.setKeyspace(getKeyspace());
cm.migrate(session);

MigrationInfoService infoService = cm.info(session);
String validationError = infoService.validate();
Assert.assertNull(validationError);

cm = new CassandraMigration();
cm.getConfigs().setScriptsLocations(scriptsLocations);
cm.setKeyspace(getKeyspace());
cm.validate(session);

cm = new CassandraMigration();
cm.getConfigs().setScriptsLocations(new String[] { "migration/integ/java" });
cm.setKeyspace(getKeyspace());
try {
cm.validate(session);
Assert.fail("The expected CassandraMigrationException was not raised");
} catch (CassandraMigrationException e) {
}

Assert.assertFalse(session.isClosed());
}


static boolean runCmdTestCompleted = false;
static boolean runCmdTestSuccess = false;

Expand Down