From 0ffc56e88c313c157f8a42e017b2b61da5c1c9be Mon Sep 17 00:00:00 2001 From: Jesse Hodges Date: Wed, 23 Mar 2016 09:18:07 -0500 Subject: [PATCH] Allow to run with an externally created session --- .../migration/CassandraMigration.java | 93 +++++++++++++------ .../migration/CassandraMigrationIT.java | 33 +++++++ 2 files changed, 96 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/contrastsecurity/cassandra/migration/CassandraMigration.java b/src/main/java/com/contrastsecurity/cassandra/migration/CassandraMigration.java index 043b956..183597b 100644 --- a/src/main/java/com/contrastsecurity/cassandra/migration/CassandraMigration.java +++ b/src/main/java/com/contrastsecurity/cassandra/migration/CassandraMigration.java @@ -65,7 +65,44 @@ private MigrationResolver createMigrationResolver() { } public int migrate() { - return execute(new Action() { + return execute(migrateAction()); + } + + public int migrate(Session session) { + return execute(migrateAction(), session); + } + + public MigrationInfoService info() { + return execute(infoAction()); + } + + public MigrationInfoService info(Session session) { + return execute(infoAction(), session); + } + + public void validate() { + String validationError = execute(validationAction()); + + if (validationError != null) { + throw new CassandraMigrationException("Validation failed. " + validationError); + } + } + + public void validate(Session session) { + String validationError = execute(validationAction(), session); + + if (validationError != null) { + throw new CassandraMigrationException("Validation failed. " + validationError); + } + } + + public void baseline() { + //TODO + throw new NotImplementedException(); + } + + private Action migrateAction() { + return new Action() { public Integer execute(Session session) { new Initialize().run(session, keyspace, MigrationVersion.CURRENT.getTable()); @@ -73,14 +110,13 @@ public Integer execute(Session session) { SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable()); Migrate migrate = new Migrate(migrationResolver, configs.getTarget(), schemaVersionDAO, session, keyspace.getCluster().getUsername(), configs.isAllowOutOfOrder()); - return migrate.run(); } - }); + }; } - public MigrationInfoService info() { - return execute(new Action() { + private Action infoAction() { + return new Action() { public MigrationInfoService execute(Session session) { MigrationResolver migrationResolver = createMigrationResolver(); SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable()); @@ -90,28 +126,19 @@ public MigrationInfoService execute(Session session) { return migrationInfoService; } - }); + }; } - public void validate() { - String validationError = execute(new Action() { - @Override - public String execute(Session session) { - MigrationResolver migrationResolver = createMigrationResolver(); - SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable()); - Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false); - return validate.run(); - } - }); - - if (validationError != null) { - throw new CassandraMigrationException("Validation failed. " + validationError); - } - } - - public void baseline() { - //TODO - throw new NotImplementedException(); + private Action validationAction() { + return new Action() { + @Override + public String execute(Session session) { + MigrationResolver migrationResolver = createMigrationResolver(); + SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable()); + Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false); + return validate.run(); + } + }; } private String getConnectionInfo(Metadata metadata) { @@ -142,6 +169,9 @@ T execute(Action action) { if (null == keyspace.getCluster()) throw new IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured."); + if (null == keyspace.getName() || keyspace.getName().trim().length() == 0) + throw new IllegalArgumentException("Keyspace name not specified."); + com.datastax.driver.core.Cluster.Builder builder = new com.datastax.driver.core.Cluster.Builder(); builder.addContactPoints(keyspace.getCluster().getContactpoints()).withPort(keyspace.getCluster().getPort()); if (null != keyspace.getCluster().getUsername() && !keyspace.getCluster().getUsername().trim().isEmpty()) { @@ -158,8 +188,6 @@ T execute(Action action) { LOG.info(getConnectionInfo(metadata)); session = cluster.newSession(); - if (null == keyspace.getName() || keyspace.getName().trim().length() == 0) - throw new IllegalArgumentException("Keyspace not specified."); List keyspaces = metadata.getKeyspaces(); boolean keyspaceExists = false; for (KeyspaceMetadata keyspaceMetadata : keyspaces) { @@ -171,24 +199,29 @@ T execute(Action action) { else throw new CassandraMigrationException("Keyspace: " + keyspace.getName() + " does not exist."); - result = action.execute(session); + result = execute(action, session); } finally { if (null != session && !session.isClosed()) try { session.close(); - } catch(Exception e) { + } catch (Exception e) { LOG.warn("Error closing Cassandra session"); } if (null != cluster && !cluster.isClosed()) try { cluster.close(); - } catch(Exception e) { + } catch (Exception e) { LOG.warn("Error closing Cassandra cluster"); } } return result; } + T execute(Action action, Session session) { + T result = action.execute(session); + return result; + } + interface Action { T execute(Session session); } diff --git a/src/test/java/com/contrastsecurity/cassandra/migration/CassandraMigrationIT.java b/src/test/java/com/contrastsecurity/cassandra/migration/CassandraMigrationIT.java index 4505a31..00d7587 100644 --- a/src/test/java/com/contrastsecurity/cassandra/migration/CassandraMigrationIT.java +++ b/src/test/java/com/contrastsecurity/cassandra/migration/CassandraMigrationIT.java @@ -6,6 +6,7 @@ import com.contrastsecurity.cassandra.migration.info.MigrationInfoService; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import org.junit.Assert; @@ -178,6 +179,38 @@ public void testValidate() { } } + @Test + public void testValidateWithSession() { + // apply migration scripts + String[] scriptsLocations = { "migration/integ", "migration/integ/java" }; + Session session = getSession(); + CassandraMigration cm = new CassandraMigration(); + cm.getConfigs().setScriptsLocations(scriptsLocations); + cm.setKeyspace(getKeyspace()); + cm.migrate(session); + + MigrationInfoService infoService = cm.info(session); + String validationError = infoService.validate(); + Assert.assertNull(validationError); + + cm = new CassandraMigration(); + cm.getConfigs().setScriptsLocations(scriptsLocations); + cm.setKeyspace(getKeyspace()); + cm.validate(session); + + cm = new CassandraMigration(); + cm.getConfigs().setScriptsLocations(new String[] { "migration/integ/java" }); + cm.setKeyspace(getKeyspace()); + try { + cm.validate(session); + Assert.fail("The expected CassandraMigrationException was not raised"); + } catch (CassandraMigrationException e) { + } + + Assert.assertFalse(session.isClosed()); + } + + static boolean runCmdTestCompleted = false; static boolean runCmdTestSuccess = false;