define(["lib/underscore", "common/util/log", "common/util/id_generator",
"lib/jquery", "common/network/dispatcher", "common/constants",
"common/util/check", "require", "lib/meteor",
"ARCore/machines",
],
function(_, Log, IDGenerator,
$, Dispatcher, Constants,
check, require, meteor) {
"use strict";
var LocalCollection = meteor.minimongo.LocalCollection;
var DDP = meteor.livedata.DDP;
var Collection = meteor.meteor.Meteor.Collection;
var logger = Log("DB");
var localConnectionFactory = function() {
return DDP.connect("http://localhost:9000/");
};
var Machines = function() {
return require("ARCore/machines");
};
// name -> DB for all DBs that have been instantiated
var existingDBs = {};
// array of published DB names, so we can unpublish them on unload
var publishedDBs = [];
// returns the existing DB from existingDBs, or creates a new one that
// uses localConnectionFactory
var findOrCreateLocalDB = function(name) {
if(!existingDBs[name]) {
existingDBs[name] = DB(name, Machines().localhost, localConnectionFactory);
}
return existingDBs[name];
};
// joins parts using the DB NAME_SEPARATOR
var makeDBName = function(parts) {
return parts.join(Constants.db.NAME_SEPARATOR);
};
// pass this to Array#sort to sort machines by hostname
var machineSorter = function(machineA, machineB) {
if(machineA.hostname < machineB.hostname) { return -1; }
else if(machineA.hostname > machineB.hostname) { return 1; }
else { return 0; }
};
// on unload, we need to unpublish our DBs
$(window).unload(function() {
_.each(publishedDBs, function(name) {
Dispatcher._bridge.fireEvent("db_unpublish", { name: name });
});
});
/**
* Creates a new Lens.DB, but does not connect to it. Connection is done
* lazily when needed, so there are no side-effects of creating
* a DB.
*
* @param {String} name
* Name of the DB server. Must be unqiue for the connection.
* @param {Machine} machine
* Which machine the DB belongs to.
* @param {Function} connectionFactory
* A function that returns a Meteor connection (as returned by
* DDP.connect).
* @param {Boolean} [isLensDBServer]
* Whether the server supports the initCollection RPC for client-side
* collection creation, and manages the peers collection. Defaults to
* true.
*
* @privconstructor
* @class
*
* Lens.DB provides MongoDB-like database that can easily be synchronized
* between different Lens applications or LuXoR devices.
*
* Databases are reactive, and can be used in conjunction with
* {@link Lens.autorun}. See {@tutorial reactivity} for more on reactive
* programming in Lens.
*
* Lens.DB is taken from [Meteor](http://www.meteor.com/). Once you've
* created a collection using {@link Lens.DB#Collection}, you can
* use the [Meteor Collection API](http://docs.meteor.com/#collections)
* to query the collection and insert new records.
*
* See {@tutorial dbs} for more information on the types of databases
* available.
*
* @property {String} name The name of this database. Read-only.
* @property {String} machine The machine who's backend is managing this DB.
* Null for server and in-memory databases. Read-only.
* @memberOf Lens
*/
var DB = function(name, machine, connectionFactory, isLensDBServer) {
/** @alias Lens.DB.prototype */
var self = {};
self.name = name;
self.machine = machine;
var collections = {};
var connection = null;
var open = false;
// isLensDBServer defaults to true
if(isLensDBServer === undefined) {
isLensDBServer = true;
}
// when we close a connection, we override the retry timeout so Meteor
// doens't autoatically re-connect. We save the old value here so
// we can undo this to re-open the conneciton.
var prevRetryTimeout;
// the setInterval ID for the interval that's sending hearbeats to the
// backend
var heartbeat;
// send a heartbeat to the backend. assumes the connection is open.
var sendHeartbeat = function() {
connection.call("heartbeat", name, Machines().localhost.hostname);
};
// opens the connection to the backend.
var openConnection = function() {
if(open || (!connectionFactory)) {
// we're either already open, or this is an in-memory database
return;
}
// connect
if(!connection) {
// make a new connection
connection = connectionFactory();
}
else {
// revive the old connection
connection._stream._retryTimeout = prevRetryTimeout;
connection.reconnect();
}
// set up the heartbeat
if(isLensDBServer) {
heartbeat = setInterval(sendHeartbeat, Constants.db.HEARTBEAT_INTERVAL);
sendHeartbeat();
}
// mark the DB as open
open = true;
// open the peers collection
if(isLensDBServer) {
getCollection(Constants.db.PEERS_COLLECTION);
}
};
// creates a new collection and returns it.
var getCollection = function(collectionName, onReady) {
var collectionFullName = collectionName;
if(name) {
// if the DB has a name, use it to prefix the collection name
collectionFullName = makeDBName([name, collectionName]);
}
openConnection();
if(collections[collectionFullName] === undefined) {
if(!connection) {
// no connection -> this is an in-memory DB
collections[collectionFullName] = new LocalCollection();
}
else {
collections[collectionFullName] = new Collection(collectionFullName, {connection: connection});
if(isLensDBServer) {
connection.call("initCollection", collectionFullName, function(error, result) {
if(result) {
connection.subscribe(collectionFullName, onReady);
}
else {
logger.error("initCollection returned an error: " + error);
}
});
}
}
}
return collections[collectionFullName];
};
/**
* Returns a new database collection. See the
* [Meteor Collection API](http://docs.meteor.com/#collections)
* for documentation on how to make queries against the collection.
*
* Automatically calls {@link open} if this database isn't open.
*
* @param {String} collectionName
* Name of the collection. If this name already exists, returns the
* existing collection.
* @param {Function} [onReady]
* You can use the returned collection immediately, but it won't
* have synced with the server immediately. If you pass this
* function, it will be called when the collection finished its
* initial sync with the server.
*
* @return {Collection}
*/
self.Collection = function(collectionName, onReady) {
check(collectionName, String);
check(onReady, check.Match.Optional(Function));
return getCollection(collectionName, onReady);
};
/**
* Returns the Meteor connection to this DB's server. This object
* is the same as the one returned by
* [DDP.connect](http://docs.meteor.com/#ddp_connect).
*
* Returns null for in-memory databases.
*
* Automatically calls {@link open} if the database isn't open.
*/
self.connection = function() {
openConnection();
return connection;
};
/**
* Opens the connection to the database backend. This is automatically
* called by {@link Collection} and {@link connection}; you only need to
* call this if you have called {@link close} and want to re-open the
* connection
*/
self.open = function() {
openConnection();
};
/**
* Closes the connection to the database backend. You can still use
* collections, but they won't be synced with the server, and other
* clients of published or shared databases won't see you as a peer.
*/
self.close = function() {
if(!open) {
return;
}
// stop the heartbeat
clearInterval(heartbeat);
// notify the backend
connection.call("disconnect", name, Machines().localhost.hostname);
// disconnect
var stream = connection._stream;
prevRetryTimeout = stream._retryTimeout;
stream._retryTimeout = function() { return 20 * 365 * 24 * 60 * 60 * 1000; }; // far future
stream._lostConnection();
// mark the DB as closed
open = false;
};
/**
* Returns an array of {@link Machine}s that are also connected to this
* database. A reactive data source.
*/
self.peers = function() {
return getCollection(Constants.db.PEERS_COLLECTION).find().fetch().map(function(peer) {
return Machines().machine(peer.hostname);
}).filter(function(machine) {
return machine !== null;
});
};
Object.freeze(self);
return self;
};
_.extend(DB, /** @lends Lens.DB */ {
/**
* Returns a new in-memory database.
* @return {Lens.DB}
*/
inMemory: function() {
return DB(IDGenerator.uuid(), null);
},
/**
* Returns a new persistent database accessible only to this app.
* @param {String} name
* @return {Lens.DB}
*/
app: function(name) {
check(name, String);
return findOrCreateLocalDB(makeDBName([Constants.db.APP_PREFIX, Lens.appName(), name]));
},
/**
* Returns a new persistent database accessible only to this app.
* @param {String} name
* @return {Lens.DB}
*/
machine: function(name) {
check(name, String);
return findOrCreateLocalDB(makeDBName([Constants.db.MACHINE_PREFIX, name]));
},
/**
* Returns a new database that is published to nearby LuXoR devices.
* You can find nearby published databases using {@link Lens.Machines}
* @param {String} name
* @return {Lens.DB}
*/
published: function(name) {
check(name, String);
name = makeDBName([Constants.db.PUBLISHED_PREFIX, name]);
Dispatcher._bridge.fireEvent("db_publish", { name: name });
publishedDBs.push(name);
return findOrCreateLocalDB(name);
},
/**
* Returns a new database that is connection to an external Meteor
* backend.
* @param {String} url The URL of the Meteor app.
* @param {Boolean} [isLensDBServer] Set this to true if the Meteor app
* is a LuXoR DB server (which allows
* LuXoR to create Collection client-
* side). Defaults to false.
* @return {Lens.DB}
*/
server: function(url, isLensDBServer) {
check(url, String);
check(isLensDBServer, check.Match.Optional(Boolean));
return DB("", null, function() {
return DDP.connect(url);
}, !!isLensDBServer);
},
/**
* Returns a new ad-hoc shared database. A shared database is only
* persisted for as long as the machine that created it is connected.
* Moreover, unlike a published database, you need not have one machine
* publish the database and other connect to it. Rather, have all
* machines call DB.shared and Lens with automatically select a "master"
* and have the other machines connect to it.
*
* @param {String} name
* @param {Function} callback Callback to invoke with the shared
* {@link Lens.DB} once one has been created.
*/
shared: function(name, callback) {
check(name, String);
check(callback, Function);
var dbName = makeDBName([Constants.db.SHARED_PREFIX, name]);
// if we've already initialized this shared database, return the
// existing one
if(existingDBs[dbName]) {
callback(existingDBs[dbName]);
return;
}
// if another machine is advertising this shared database, connect
// to that one. Pick the one with the lowest hostname if there
// are more than one.
var machines = Lens.Machines.all().filter(function(machine) {
return machine._sharedDB(name) !== null;
}).sort(machineSorter);
if(machines.length > 0) {
callback(machines[0]._sharedDB([name]));
return;
}
// If we can't find an existing database to connect to, we might
// be the master for this shared database. Let nearby devices know
// that we can master and wait for this to propagate. If we're the
// lowest-named machine who wants to master for this database, then
// we become the master. Else, keep waiting for someone to become
// the master.
var rpcName = "_master_for_" + name;
var rpcBinding = Lens.Machines.localhost.registerVoidRPC(rpcName, function(){});
var machineSubscriptionBinding;
var intervalId = setInterval(function() {
var machines = Lens.Machines.findByRPC(rpcName).sort(machineSorter);
if(machines[0].isLocalhost()) {
// we're the master!
logger.info("This machine is the master for", dbName);
rpcBinding.clear();
clearInterval(intervalId);
machineSubscriptionBinding.clear();
Dispatcher._bridge.fireEvent("db_publish", { name: dbName });
publishedDBs.push(dbName);
callback(findOrCreateLocalDB(dbName));
}
}, Constants.capability.PERIOD_CLEANUP_SECONDS*1000);
machineSubscriptionBinding = Lens.Machines.subscribe(function(db) {
// someone else is the master
logger.info("Found external master for", dbName);
rpcBinding.clear();
clearInterval(intervalId);
machineSubscriptionBinding.clear();
callback(db);
}, {
type: "db",
name: dbName
});
},
/**
* Connects to a database on another machine.
* @private
*/
_connect: function(machine, name) {
return DB(name, machine, function() {
if(machine.isLocalhost()) {
return localConnectionFactory();
}
else {
return DDP.connect("http://" + machine.name() + ".local:9000");
}
});
},
CAP_NAME_PREFIX: "lens.db."
});
Lens._addMember(DB, "DB");
return DB;
});