Clover coverage report -
Coverage timestamp: Mon May 5 2008 11:56:20 GMT-05:00
file stats: LOC: 593   Methods: 9
NCLOC: 370   Classes: 1
 
 Source file Conditionals Statements Methods TOTAL
HashSpanFixingThread.java 85.1% 80% 100% 82.1%
coverage coverage
 1    package org.proteomecommons.tranche.flatfile;
 2   
 3    import com.sun.mail.iap.ConnectionException;
 4    import java.io.ByteArrayInputStream;
 5    import java.io.File;
 6    import java.math.BigInteger;
 7    import java.util.ArrayList;
 8    import java.util.List;
 9    import java.util.Set;
 10    import org.proteomecommons.tranche.TrancheServer;
 11    import org.proteomecommons.tranche.util.BigHash;
 12    import org.proteomecommons.tranche.util.Configuration;
 13    import org.proteomecommons.tranche.util.GetFileTool;
 14    import org.proteomecommons.tranche.util.HashSpan;
 15    import org.proteomecommons.tranche.util.IOUtil;
 16    import org.proteomecommons.tranche.util.MetaData;
 17    import org.proteomecommons.tranche.util.MetaDataUtil;
 18    import org.proteomecommons.tranche.util.ServerInfo;
 19    import org.proteomecommons.tranche.util.ServerUtil;
 20    import org.proteomecommons.tranche.util.SimpleDiskBackedBigHashList;
 21    import org.proteomecommons.tranche.util.TestUtil;
 22   
 23    /**
 24    * "Heals" a server's data and meta-data based on the configured hash span.
 25    * @author Jayson Falkner - jfalkner@umich.edu
 26    * @author Bryan Smith <bryanesmith at gmail dot com>
 27    */
 28    public class HashSpanFixingThread extends Thread {
 29    // the ffts to use
 30    FlatFileTrancheServer ffts;
 31    // batch size of hashes
 32    int hashBatchSize = 1000;
 33    // the closing flag
 34    private boolean waitIfRequested = true;
 35    // Number of required legitimate replications for deletion
 36    private int REQUIRED_REPLICATIONS = 3;
 37   
 38    // Test var used to flag test used to demonstrate data and meta
 39    // data are replicated together
 40    private boolean isTestingConcurrentReplication = false;
 41   
 42    // status variables for the FFTS server to report
 43    long dataCopied = 0;
 44    long dataSkipped = 0;
 45    long metaDataCopied = 0;
 46    long metaDataSkipped = 0;
 47    long iterationCount = 0;
 48    long dataDeleted = 0;
 49    long metaDataDeleted = 0;
 50    String serverBeingIndexed = "Unknown";
 51   
 52    /**
 53    * <p>Get the number of replications on other servers before chunk deleted on this server. A policy.
 54    */
 55  15 public int getRequiredReplications() {
 56  15 return REQUIRED_REPLICATIONS;
 57    }
 58   
 59   
 60   
 61    /**
 62    * <p>Set the number of required replications of chunk on other servers before deletion. Default is 3.</p>
 63    * <p>For example, if the required replications were 3, and 3+ servers have the chunk and this server has the chunk out of its hash span, the server will delete it.</p>
 64    * <p>However, if the required reps were 5, this server won't delete the chunk even though it is out of its hash span.</p>
 65    * <p>Note this operation blocks until the current iteration is complete.</p>
 66    */
 67  6 public void setRequiredReplications(int num) {
 68  6 try {
 69  6 waitForIteration();
 70    } catch (InterruptedException ex) { /* Nope */ }
 71  6 this.REQUIRED_REPLICATIONS = num;
 72    }
 73   
 74  883 public HashSpanFixingThread(FlatFileTrancheServer ffts) {
 75  883 this.ffts = ffts;
 76    // set as daemon
 77  883 setDaemon(true);
 78    // low priority
 79  883 setPriority(Thread.MIN_PRIORITY);
 80    // set the name
 81  883 setName("Hash Span Fixing Thread");
 82    }
 83   
 84    /**
 85    * Blocks until next iteration. Mostly useful for tests or to determine
 86    * whether affects have kicked in yet.
 87    */
 88  58 public void waitForIteration() throws InterruptedException {
 89    // only block if notifyAll() will later be called
 90  58 if (waitIfRequested) {
 91  58 synchronized (this) {
 92  58 wait();
 93    }
 94    }
 95    }
 96   
 97  883 public void run() {
 98  883 try {
 99    // get teh configuration
 100  883 Configuration config;
 101   
 102    // loop forever
 103  883 while (!ffts.isClosed() && (!TestUtil.isTesting() || TestUtil.isTestUsingFakeCoreServers())) {
 104    // inc the iteration count
 105  129620 this.iterationCount++;
 106   
 107    // Under no exception should this stop in core server
 108  129620 try {
 109   
 110    // notify of a complete iteration
 111  129620 synchronized (this) {
 112  129620 notifyAll();
 113    }
 114   
 115  129620 config = ffts.getConfiguration();
 116   
 117    // If there are sticky projects, load them.
 118    // If config changes, the method will load new sticky projects
 119  129620 this.loadStickyProjects(config);
 120   
 121    // if no hash spans are configured, skip for 1 sec
 122  129620 if (config.getHashSpans().isEmpty()) {
 123  197 try {
 124    // wait for a second
 125  197 synchronized (this) {
 126  197 wait(1000);
 127    }
 128    // continue
 129  191 continue;
 130    } catch (InterruptedException ex) {
 131    // noop
 132    }
 133    }
 134   
 135   
 136    // the array of servers to use -- this mig
 137  129423 String[] coreServers = ServerUtil.getCoreServers().toArray(new String[0]);
 138  129423 if (TestUtil.isTestUsingFakeCoreServers()) {
 139  129423 List<ServerInfo> servers = ServerUtil.getServers();
 140  129423 coreServers = new String[servers.size()];
 141  129423 for (int i = 0; i < servers.size(); i++) {
 142  383 coreServers[i] = servers.get(i).getUrl();
 143    }
 144    }
 145   
 146    // loop through all of the other servers and try to grab content
 147  129423 for (String coreServer : coreServers) {
 148    // for trakcing
 149  380 this.serverBeingIndexed = coreServer;
 150   
 151  380 try {
 152    // skip if this server is off-line
 153  380 if (!ServerUtil.isServerOnline(coreServer)) {
 154  4 continue;
 155    }
 156   
 157    // connect
 158  376 TrancheServer ts = IOUtil.connect(coreServer);
 159   
 160    // break out if appropriate
 161  376 if (ffts.isClosed()) {
 162  7 break;
 163    }
 164   
 165    // get all of the data hashes
 166    {
 167  369 BigInteger offset = BigInteger.ZERO;
 168  369 BigInteger limit = BigInteger.valueOf(hashBatchSize);
 169   
 170    // Start with server's known data chunks.
 171  369 for (BigHash[] chunks = ts.getDataHashes(offset, limit); chunks.length > 0; chunks = ts.getDataHashes(offset, limit)) {
 172   
 173    // skip if the server's URL is set and it is this server(i.e. don't check yourself!)
 174  196 String thisServerUrl = ffts.getConfiguration().getValue("serverURL");
 175  196 if (thisServerUrl != null && thisServerUrl.equals(coreServer)) {
 176  0 break;
 177    }
 178   
 179    // update the offset
 180  196 offset = offset.add(BigInteger.valueOf(chunks.length));
 181   
 182    // break out if appropriate
 183  196 if (ffts.isClosed()) {
 184  0 break;
 185    }
 186   
 187    // Check each hash to see if it is in the configured hash span
 188  196 Set<HashSpan> hashSpans = config.getHashSpans();
 189  196 for (int i = 0; i < chunks.length; i++) {
 190  438 for (HashSpan hs : hashSpans) {
 191    // if the data should be on this box, add it
 192  438 if (hs.contains(chunks[i])) {
 193    // Start with data. If local server doesn't have, add.
 194  288 if (!ffts.hasData(chunks[i])) {
 195    // Add it
 196  27 byte[] data = ts.getData(chunks[i]);
 197  27 ffts.getDataBlockUtil().addData(chunks[i], data);
 198    // track for server config info
 199  27 this.dataCopied++;
 200    } else {
 201    // track for server config info
 202  261 this.dataSkipped++;
 203    }
 204   
 205    // Check if server has meta data. If not, add.
 206  288 if (ts.hasMetaData(chunks[i])) {
 207  113 if (!ffts.hasMetaData(chunks[i])) {
 208  9 byte[] meta = ts.getMetaData(chunks[i]);
 209  9 ffts.getDataBlockUtil().addMetaData(chunks[i], meta);
 210    // log for server config info
 211  9 this.metaDataCopied++;
 212    } else {
 213    // log for server config info
 214  104 this.metaDataSkipped++;
 215    }
 216    }
 217    }
 218    }
 219    } // For all chunks in batch
 220   
 221    // Break out if appropriate
 222  187 if (ffts.isClosed()) {
 223  0 break;
 224    }
 225    } // For all known data chunks
 226    }
 227   
 228    // If testing whether data and meta data replicated
 229    // together, continue
 230  344 if (isTestingConcurrentReplication) {
 231  134 continue;
 232    }
 233   
 234    // break out if appropriate
 235  210 if (ffts.isClosed()) {
 236  0 break;
 237    }
 238   
 239    // Next iterate server's known meta-data hashes
 240    {
 241  210 BigInteger offset = BigInteger.ZERO;
 242  210 BigInteger limit = BigInteger.valueOf(hashBatchSize);
 243   
 244    // get the data metaData
 245  210 for (BigHash[] chunks = ts.getMetaDataHashes(offset, limit); chunks.length > 0; chunks = ts.getMetaDataHashes(offset, limit)) {
 246    // skip if the server's URL is set and it is this server(i.e. don't check yourself!)
 247  80 String thisServerUrl = ffts.getConfiguration().getValue("serverURL");
 248  80 if (thisServerUrl != null && thisServerUrl.equals(coreServer)) {
 249  0 break;
 250    }
 251   
 252    // update the offset
 253  80 offset = offset.add(BigInteger.valueOf(chunks.length));
 254   
 255    // break out if appropriate
 256  80 if (ffts.isClosed()) {
 257  0 break;
 258    }
 259   
 260    // check each hash to see if it is in the configured hash span
 261  80 Set<HashSpan> hashSpans = config.getHashSpans();
 262  80 for (int i = 0; i < chunks.length; i++) {
 263  80 for (HashSpan hs : hashSpans) {
 264    // if the data should be on this box, add it
 265  80 if (hs.contains(chunks[i])) {
 266    // if already on this box, continue
 267  57 if (!ffts.hasMetaData(chunks[i])) {
 268    // else, add it
 269  9 byte[] meta = ts.getMetaData(chunks[i]);
 270  9 ffts.getDataBlockUtil().addMetaData(chunks[i], meta);
 271    // log for server config info
 272  9 this.metaDataCopied++;
 273    } else {
 274  48 this.metaDataSkipped++;
 275    }
 276   
 277    // Check if server has data. If not, add.
 278  57 if (ts.hasData(chunks[i])) {
 279  0 if (!ffts.hasData(chunks[i])) {
 280  0 byte[] data = ts.getData(chunks[i]);
 281  0 ffts.getDataBlockUtil().addData(chunks[i], data);
 282    // track for config
 283  0 this.dataCopied++;
 284    } else {
 285    // track for config
 286  0 this.dataSkipped++;
 287    }
 288    }
 289    }
 290    }
 291    } // For all chunks in batch
 292   
 293    // Break out if appropriate
 294  80 if (ffts.isClosed()) {
 295  0 break;
 296    }
 297    } // For all known data chunks
 298    }
 299   
 300    // break out if appropriate
 301  209 if (ffts.isClosed()) {
 302  0 break;
 303    }
 304    } // Copying files
 305    // ignore connection exceptions
 306    catch (ConnectionException e) {
 307    // noop
 308    } catch (Exception e) {
 309  26 System.err.println(e.getMessage());
 310    }
 311    }
 312   
 313    // scan for files that can be removed
 314  129423 Set<DataDirectoryConfiguration> ddcs = config.getDataDirectories();
 315  129423 for (DataDirectoryConfiguration ddc : ddcs) {
 316    // use a stack to handle all directories
 317  129423 List<String> filenames = new ArrayList();
 318  129423 filenames.add(ddc.getDirectory());
 319    // loop over all entries
 320  129423 while (!filenames.isEmpty()) {
 321   
 322  129690 String filename = filenames.remove(filenames.size() - 1);
 323  129690 try {
 324  129690 File file = new File(filename);
 325   
 326  129690 if (file == null) {
 327  0 System.err.println("Found null file for " + filename);
 328  0 continue;
 329    }
 330   
 331  129690 if (file.isDirectory()) {
 332    // add all contents
 333  129421 String[] moreFileNames = file.list();
 334  129421 for (String moreFileName : moreFileNames) {
 335  275 filenames.add(file.getAbsolutePath() + File.separator + moreFileName);
 336    }
 337    } else {
 338    // check for project files
 339  269 DataBlock block = ffts.getDataBlockUtil().getDataBlock(filename.substring(ddc.getDirectory().length()));
 340  269 List<BigHash> metaDataHashes;
 341   
 342  269 if (block == null) {
 343  2 System.err.println("Found null data block for " + ddc.getDirectory());
 344  2 continue;
 345    }
 346   
 347  267 synchronized (block) {
 348  267 metaDataHashes = ffts.getDataBlockUtil().getDataBlock(filename.substring(ddc.getDirectory().length())).getHashes(true);
 349    }
 350   
 351  267 META:
 352    for (BigHash metaDataHash : metaDataHashes) {
 353   
 354    // Check if belongs on server
 355  121 for (HashSpan span : ffts.getConfiguration().getHashSpans()) {
 356   
 357    // If contained in span or is sticky element
 358  121 if (span.contains(metaDataHash) || isStickyChunk(metaDataHash, true)) {
 359  87 continue META;
 360    }
 361    }
 362    /*
 363    * To delete, must meet two conditions:
 364    * Must be three copies online
 365    * At least 1 copy must be in correct hash span (else all can delete chunk thinking others have it)
 366    */
 367  34 int copiesOnline = 0, copiesInHashSpan = 0;
 368   
 369  34 CHECK:
 370    for (String server : coreServers) {
 371   
 372    // skip off-line servers
 373  108 if (!ServerUtil.isServerOnline(server)) {
 374  2 continue;
 375    }
 376  106 try {
 377   
 378  106 TrancheServer check = IOUtil.connect(server);
 379   
 380  106 boolean hasMD = check.hasMetaData(metaDataHash);
 381   
 382  104 if (hasMD) {
 383  45 copiesOnline++;
 384    }
 385   
 386  104 if (hasMD && this.isInHashSpanConfiguration(metaDataHash, ServerUtil.getServerInfo(server).getConfiguration())) {
 387  22 copiesInHashSpan++;
 388    }
 389    } catch (Exception e) {
 390  1 ServerUtil.setServerOnlineStatus(server, false);
 391    }
 392   
 393    // Speed-up: If meet min requirements, break.
 394  105 if (copiesOnline > REQUIRED_REPLICATIONS && copiesInHashSpan > 0) {
 395    //if (copiesOnline > REQUIRED_REPLICATIONS && copiesInHashSpan >= REQUIRED_REPLICATIONS) {
 396  5 break CHECK;
 397    }
 398    }
 399    // Delete if enough copies online AND at least one in correct hash span
 400  33 if (copiesOnline > REQUIRED_REPLICATIONS && copiesInHashSpan > 0) {
 401    //if (copiesOnline > REQUIRED_REPLICATIONS && copiesInHashSpan >= REQUIRED_REPLICATIONS) {
 402  5 ffts.getDataBlockUtil().deleteMetaData(metaDataHash);
 403    // track for showing in server config
 404  5 this.metaDataDeleted++;
 405    }
 406    }
 407   
 408  266 List<BigHash> dataHashes;
 409  266 synchronized (block) {
 410  266 dataHashes = block.getHashes(false);
 411    }
 412   
 413  266 DATA:
 414    for (BigHash dataHash : dataHashes) {
 415   
 416    // Check if belongs on server
 417  155 for (HashSpan span : ffts.getConfiguration().getHashSpans()) {
 418    // If in hash span or is a sticky element
 419  155 if (span.contains(dataHash) || isStickyChunk(dataHash, false)) {
 420  135 continue DATA;
 421    }
 422    }
 423   
 424    /*
 425    * To delete, must meet two conditions:
 426    * Must be three copies online
 427    * At least 1 copy must be in correct hash span (else all can delete chunk thinking others have it)
 428    */
 429  20 int copiesOnline = 0, copiesInHashSpan = 0;
 430   
 431  20 CHECK:
 432    for (String server : coreServers) {
 433    // skip off-line servers
 434  55 if (!ServerUtil.isServerOnline(server)) {
 435