You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
3.2 KiB

package util.db;
import static util.ByteUtils.*;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.rocksdb.*;
import util.TimeWatch;
public class RDB {
private RocksDB db;
private String path;
private static final String UTF_8 = "UTF-8";
public RDB() {
// different dbs i ncase of concurrent calculations
this.path = System.getProperty("java.io.tmpdir")
.concat(File.separator)
.concat(String.format("corpusAnalyzer_db%d", LocalDateTime.now().toString().hashCode()));
this.db = createDB();
}
private RocksDB createDB() {
RocksDB.loadLibrary();
// the Options class contains a set of configurable DB options
// that determines the behaviour of the database.
try (final Options options = new Options()) {
options.setCreateIfMissing(true);
// a factory method that returns a RocksDB instance
try (final RocksDB rdb = RocksDB.open(options, path)) {
if (db != null) {
return rdb;
} else {
this.db = rdb;
}
}
} catch (RocksDBException e) {
// do some error handling
}
return null;
}
public void writeBatch(Map<String, AtomicLong> results) throws UnsupportedEncodingException {
RocksDB.loadLibrary();
// a factory method that returns a RocksDB instance
try (final RocksDB rdb = RocksDB.open(new Options(), path)) {
final WriteBatch wb = new WriteBatch();
for (Map.Entry<String, AtomicLong> entry : results.entrySet()) {
byte[] key = entry.getKey().getBytes(UTF_8);
long resultValue = entry.getValue().longValue();
try {
final byte[] dbValue = rdb.get(key);
if (dbValue != null) {
// value == null if key does not exist in db.
wb.put(key, longToBytes(bytesToLong(dbValue) + resultValue));
} else {
wb.put(key, longToBytes(entry.getValue().longValue()));
}
} catch (RocksDBException e) {
// TODO: error handling
}
}
TimeWatch watch = TimeWatch.start();
rdb.write(new WriteOptions(), wb);
System.out.println(String.format("Writing %d entries took: %s", wb.count(), watch.toFullTime()));
} catch (RocksDBException e) {
// do some error handling
}
}
public Map<String, AtomicLong> getDump() throws UnsupportedEncodingException {
Map<String, AtomicLong> dump = new HashMap<>();
RocksDB.loadLibrary();
// the Options class contains a set of configurable DB options
// that determines the behaviour of the database.
// a factory method that returns a RocksDB instance
try (final RocksDB rdb = RocksDB.open(new Options(), path)) {
try (RocksIterator it = rdb.newIterator()) {
it.seekToFirst();
// it.next();
while (it.isValid()) {
byte[] key = it.key();
byte[] value = it.value();
dump.put(new String(key, UTF_8), new AtomicLong(bytesToLong(value)));
it.next();
}
}
} catch (RocksDBException e) {
e.printStackTrace();
}
return dump;
}
public void delete() {
try {
FileUtils.deleteDirectory(new File(path));
} catch (IOException e) {
e.printStackTrace();
}
}
}