/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2009, 2010 Oracle and/or its affiliates. All rights reserved. * */ using System; using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using BerkeleyDB; /** * RepQuoteExample is a simple but complete demonstration of a replicated * application. The application is a mock stock ticker. The master accepts a * stock symbol and an numerical value as input, and stores this information * into a replicated database; either master or clients can display the * contents of the database. * * The options to start a given replication node are: * * -h home (required; h stands for home directory) * -l host:port (required; l stands for local) * -C or M (optional; start up as client or master) * -r host:port (optional; r stands for remote; any number of these may * be specified) * -R host:port (optional; R stands for remote peer; only one of these may * be specified) * -a all|quorum (optional; a stands for ack policy) * -b (optional; b stands for bulk) * -n nsites (optional; number of sites in replication group; defaults to 0 * to try to dynamically compute nsites) * -p priority (optional; defaults to 100) * -v (optional; v stands for verbose) * * A typical session begins with a command such as the following to start a * master: * * excs_repquote.exe -M -h dir1 -l localhost:6000 * * and several clients: * * excs_repquote.exe -C -h dir2 * -l localhost:6001 -r localhost:6000 * excs_repquote.exe -C -h dir3 * -l localhost:6002 -r localhost:6000 * excs_repquote.exe -C -h dir4 * -l localhost:6003 -r localhost:6000 * * Each process is a member of a DB replication group. The sample application * expects the following commands to stdin: * * NEWLINE -- print all the stocks held in the database * quit -- shutdown this node * exit -- shutdown this node * stock_symbol number -- enter this stock and number into the database */ namespace excs_repquote { public class RepQuoteExample { private RepQuoteEnvironment dbenv; private Thread checkpointThread; private Thread logArchiveThread; public static void usage() { Console.WriteLine( "usage: " + RepConfig.progname + " -h home -l host:port [-CM][-r host:port][-R host:port]\n" + " [-a all|quorum][-b][-n nsites][-p priority][-v]"); Console.WriteLine( "\t -h home (required; h stands for home directory)\n" + "\t -l host:port (required; l stands for local)\n" + "\t -C or -M (optional; start up as client or master)\n" + "\t -r host:port (optional; r stands for remote; any number " + "of these\n" + "\t may be specified)\n" + "\t -R host:port (optional; R stands for remote peer; only " + "one of\n" + "\t these may be specified)\n" + "\t -a all|quorum (optional; a stands for ack policy)\n" + "\t -b (optional; b stands for bulk)\n" + "\t -n nsites (optional; number of sites in replication " + "group; defaults\n" + "\t to 0 to try to dynamically compute nsites)\n" + "\t -p priority (optional; defaults to 100)\n" + "\t -v (optional; v stands for verbose)\n"); Environment.Exit(1); } public static void Main(string[] args) { RepConfig config = new RepConfig(); bool isPeer; uint tmpPort = 0; /* * RepQuoteExample is meant to be run from build_windows\AnyCPU, in * either the Debug or Release directory. The required core * libraries, however, are in either build_windows\Win32 or * build_windows\x64, depending upon the platform. That location * needs to be added to the PATH environment variable for the * P/Invoke calls to work. */ try { String pwd = Environment.CurrentDirectory; pwd = Path.Combine(pwd, ".."); pwd = Path.Combine(pwd, ".."); if (IntPtr.Size == 4) pwd = Path.Combine(pwd, "Win32"); else pwd = Path.Combine(pwd, "x64"); #if DEBUG pwd = Path.Combine(pwd, "Debug"); #else pwd = Path.Combine(pwd, "Release"); #endif pwd += ";" + Environment.GetEnvironmentVariable("PATH"); Environment.SetEnvironmentVariable("PATH", pwd); } catch (Exception e) { Console.WriteLine( "Unable to set the PATH environment variable."); Console.WriteLine(e.Message); return; } /* Extract the command line parameters. */ for (int i = 0; i < args.Length; i++) { isPeer = false; string s = args[i]; if (s[0] != '-') continue; switch (s[1]) { case 'a': if (i == args.Length - 1) usage(); i++; if (args[i].Equals("all")) config.ackPolicy = AckPolicy.ALL; else if (!args[i].Equals("quorum")) usage(); break; case 'b': config.bulk = true; break; case 'C': config.startPolicy = StartPolicy.CLIENT; break; case 'h': if (i == args.Length - 1) usage(); i++; config.home = args[i]; break; case 'l': if (i == args.Length - 1) usage(); i++; string[] words = args[i].Split(':'); if (words.Length != 2) { Console.Error.WriteLine("Invalid host " + "specification host:port needed."); usage(); } try { tmpPort = uint.Parse(words[1]); } catch (InvalidCastException) { Console.Error.WriteLine("Invalid host " + "specification, could not parse port number."); usage(); } config.host.Host = words[0]; config.host.Port = tmpPort; break; case 'M': config.startPolicy = StartPolicy.MASTER; break; case 'n': if (i == args.Length - 1) usage(); i++; try { config.totalSites = uint.Parse(args[i]); } catch (InvalidCastException) { Console.Error.WriteLine( "Unable to parse number of total sites."); usage(); } break; case 'p': if (i == args.Length - 1) usage(); i++; try { config.priority = uint.Parse(args[i]); } catch (InvalidCastException) { Console.Error.WriteLine("Unable to parse priority."); usage(); } break; case 'r': case 'R': if (i == args.Length - 1) usage(); if (args[i].Equals("R")) isPeer = true; i++; words = args[i].Split(':'); if (words.Length != 2) { Console.Error.WriteLine("Invalid host " + "specification host:port needed."); usage(); } try { tmpPort = uint.Parse(words[1]); } catch (InvalidCastException) { Console.Error.WriteLine("Invalid host " + "specification, could not parse port number."); usage(); } config.remote.Add( new RemoteSite(words[0], tmpPort, isPeer)); break; case 'v': config.verbose = true; break; default: Console.Error.WriteLine( "Unrecognized option: " + args[i]); usage(); break; } } /* Error check command line. */ if (config.host.Host == null || config.home.Length == 0) usage(); RepQuoteExample runner = null; try { runner = new RepQuoteExample(); runner.init(config); runner.doloop(); runner.terminate(); runner = null; } catch (DatabaseException dbErr) { Console.Error.WriteLine("Caught an exception during " + "initialization or processing: " + dbErr); if (runner != null) runner.terminate(); } } /* End main. */ public RepQuoteExample() { dbenv = null; } public int init(RepConfig config) { int ret = 0; DatabaseEnvironmentConfig envConfig = new DatabaseEnvironmentConfig(); envConfig.ErrorPrefix = RepConfig.progname; envConfig.RepSystemCfg = new ReplicationConfig(); envConfig.RepSystemCfg.RepMgrLocalSite = config.host; for (int i = 0; i < config.remote.Count; i++) envConfig.RepSystemCfg.AddRemoteSite(config.remote[i].Host, config.remote[i].IsPeer); if (config.totalSites > 0) envConfig.RepSystemCfg.NSites = config.totalSites; envConfig.RepSystemCfg.BulkTransfer = config.bulk; /* * Configure heartbeat timeouts so that repmgr monitors the * health of the TCP connection. Master sites broadcast a heartbeat * at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout. * Client sites wait for message activity the length of the * DB_REP_HEARTBEAT_MONITOR timeout before concluding that the * connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR * timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout. */ envConfig.RepSystemCfg.HeartbeatMonitor = 10000000; envConfig.RepSystemCfg.HeartbeatSend = 5000000; /* * Set replication group election priority for this environment. * An election first selects the site with the most recent log * records as the new master. If multiple sites have the most * recent log records, the site with the highest priority value * is selected as master. */ envConfig.RepSystemCfg.Priority = config.priority; envConfig.MPoolSystemCfg = new MPoolConfig(); envConfig.MPoolSystemCfg.CacheSize = RepConfig.CACHESIZE; envConfig.TxnNoSync = true; envConfig.EventNotify = new EventNotifyDelegate(RepQuoteEventHandler); /* * Set the policy that determines how master and client sites * handle acknowledgement of replication messages needed for * permanent records. The default policy of "quorum" requires only * a quorum of electable peers sufficient to ensure a permanent * record remains durable if an election is held. The "all" option * requires all clients to acknowledge a permanent replication * message instead. */ envConfig.RepSystemCfg.RepMgrAckPolicy = config.ackPolicy; /* * Set the threshold for the minimum and maximum time the client * waits before requesting retransmission of a missing message. * Base these values on the performance and load characteristics * of the master and client host platforms as well as the round * trip message time. */ envConfig.RepSystemCfg.RetransmissionRequest(20000, 500000); /* * Configure deadlock detection to ensure that any deadlocks * are broken by having one of the conflicting lock requests * rejected. DB_LOCK_DEFAULT uses the lock policy specified * at environment creation time or DB_LOCK_RANDOM if none was * specified. */ envConfig.LockSystemCfg = new LockingConfig(); envConfig.LockSystemCfg.DeadlockResolution = DeadlockPolicy.DEFAULT; envConfig.Create = true; envConfig.RunRecovery = true; envConfig.FreeThreaded = true; envConfig.UseReplication = true; envConfig.UseLocking = true; envConfig.UseLogging = true; envConfig.UseMPool = true; envConfig.UseTxns = true; envConfig.Verbosity = new VerboseMessages(); envConfig.Verbosity.Replication = config.verbose; try { dbenv = RepQuoteEnvironment.Open(config.home, envConfig); } catch(DatabaseException e) { Console.WriteLine("Fail to open environment: " + e.Message); return 1; } /* The following base replication features may also be useful to your * application. See Berkeley DB documentation for more details. * - Master leases: Provide stricter consistency for data reads * on a master site. * - Timeouts: Customize the amount of time Berkeley DB waits * for such things as an election to be concluded or a master * lease to be granted. * - Delayed client synchronization: Manage the master site's * resources by spreading out resource-intensive client * synchronizations. * - Blocked client operations: Return immediately with an error * instead of waiting indefinitely if a client operation is * blocked by an ongoing client synchronization. * * The following repmgr features may also be useful to your * application. See Berkeley DB documentation for more details. * - Two-site strict majority rule - In a two-site replication * group, require both sites to be available to elect a new * master. * - Timeouts - Customize the amount of time repmgr waits * for such things as waiting for acknowledgements or attempting * to reconnect to other sites. * - Site list - return a list of sites currently known to repmgr. */ /* Start checkpoint and log archive support threads. */ checkpointThread = new Thread(new ThreadStart(CheckPoint)); checkpointThread.Start(); logArchiveThread = new Thread(new ThreadStart(LogArchive)); logArchiveThread.Start(); /* Start replication manager. */ if (config.startPolicy == StartPolicy.CLIENT) dbenv.env.RepMgrStartClient(3); else if (config.startPolicy == StartPolicy.ELECTION) dbenv.env.RepMgrStartClient(3, true); else if (config.startPolicy == StartPolicy.MASTER) dbenv.env.RepMgrStartMaster(3); return ret; } public int doloop() { BTreeDatabase db = null; for (;;) { if (db == null) { BTreeDatabaseConfig dbConfig = new BTreeDatabaseConfig(); dbConfig.Env = dbenv.env; if (dbenv.IsMaster) { /* * Open database allowing create only if this is a master * database. A client database uses polling to attempt * to open the database, without creating it, until the * open succeeds. * * This polling logic for allowing create can be * simplified under some circumstances. For example, if * the application can be sure a database is already * there, it would never need to open it allowing create. */ dbConfig.Creation = CreatePolicy.IF_NEEDED; } dbConfig.AutoCommit = true; try { db = BTreeDatabase.Open(RepConfig.progname, dbConfig); } catch (DatabaseException) { Console.WriteLine("no stock database available yet."); if (db != null) { db.Close(true); db = null; } Thread.Sleep(RepConfig.SLEEPTIME); continue; } } /* Listen for input, and add it to the database. */ Console.Write("QUOTESERVER"); if (dbenv.IsMaster == false) Console.Write("(read-only)"); Console.Write("> "); string nextLine = null; try { nextLine = Console.ReadLine(); } catch(System.IO.IOException) { Console.WriteLine("Unable to get data"); break; } /* A blank line causes the DB to be dumped. */ string[] words = nextLine.Split(' '); if (words.Length == 0 || words.Length == 1 && words[0].Length == 0) { try { if (dbenv.InClientSync) Console.WriteLine("Cannot read data during " + "client initialization - please try again."); else printStocks(db); } catch (DeadlockException) { continue; } catch (DatabaseException e) { /* * This could be DB_REP_HANDLE_DEAD, which * should close the database and continue. */ Console.WriteLine("Got db exception reading replication" + "DB: " + e); Console.WriteLine("Expected if it was due to a dead " + "replication handle, otherwise an unexpected error."); db.Close(false);/* Close no sync. */ db = null; continue; } continue; } if (words.Length == 1 && (words[0].ToLower().Equals("quit") || words[0].ToLower().Equals("exit"))) { dbenv.AppFinished = true; break; } else if (words.Length != 2) { Console.WriteLine("Format: TICKER VALUE"); continue; } if (!dbenv.IsMaster) { Console.WriteLine("Can't update client"); continue; } DatabaseEntry key = new DatabaseEntry( ASCIIEncoding.ASCII.GetBytes(words[0])); DatabaseEntry data = new DatabaseEntry( ASCIIEncoding.ASCII.GetBytes(words[1])); db.Put(key, data); } if (db != null) db.Close(true); return 0; } public void terminate() { /* Wait for checkpoint and log archive threads to finish. */ try { logArchiveThread.Join(); checkpointThread.Join(); } catch (Exception) { Console.WriteLine("Support thread join failed."); } /* * We have used the DB_TXN_NOSYNC environment flag for improved * performance without the usual sacrifice of transactional durability, * as discussed in the "Transactional guarantees" page of the Reference * Guide: if one replication site crashes, we can expect the data to * exist at another site. However, in case we shut down all sites * gracefully, we push out the end of the log here so that the most * recent transactions don't mysteriously disappear. */ dbenv.env.LogFlush(); dbenv.env.Close(); } /* * void return type since error conditions are propogated * via exceptions. */ private void printStocks(Database db) { Cursor dbc = db.Cursor(); Console.WriteLine("\tSymbol\t\tPrice"); Console.WriteLine("\t======\t\t====="); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); foreach (KeyValuePair pair in dbc) { string keyStr = ASCIIEncoding.ASCII.GetString(pair.Key.Data); string dataStr = ASCIIEncoding.ASCII.GetString(pair.Value.Data); Console.WriteLine("\t"+keyStr+"\t\t"+dataStr); } dbc.Close(); } /* * This is a very simple thread that performs checkpoints at a fixed * time interval. For a master site, the time interval is one minute * plus the duration of the checkpoint_delay timeout (30 seconds by * default.) For a client site, the time interval is one minute. */ public void CheckPoint() { for (; ; ) { /* * Wait for one minute, polling once per second to see if * application has finished. When application has finished, * terminate this thread. */ for (int i = 0; i < 60; i++) { Thread.Sleep(1000); if (dbenv.AppFinished) return; } /* Perform a checkpoint. */ try { dbenv.env.Checkpoint(); } catch (DatabaseException) { Console.WriteLine("Could not perform checkpoint."); } } } /* * This is a simple log archive thread. Once per minute, it removes all but * the most recent 3 logs that are safe to remove according to a call to * DBENV->log_archive(). * * Log cleanup is needed to conserve disk space, but aggressive log cleanup * can cause more frequent client initializations if a client lags too far * behind the current master. This can happen in the event of a slow client, * a network partition, or a new master that has not kept as many logs as the * previous master. * * The approach in this routine balances the need to mitigate against a * lagging client by keeping a few more of the most recent unneeded logs * with the need to conserve disk space by regularly cleaning up log files. * Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE * flag) is not recommended for replication due to the risk of frequent * client initializations. */ public void LogArchive() { int logKeep = 3; int minLog; List logFileList; for (; ; ) { /* * Wait for one minute, polling once per second to see if * application has finished. When application has finished, * terminate this thread. */ for (int i = 0; i < 60; i++) { Thread.Sleep(1000); if (dbenv.AppFinished) return; } try { /* Get the list of unneeded log files. */ logFileList = dbenv.env.ArchivableLogFiles(true); /* * Remove all but the logs_to_keep most recent unneeded * log files. */ minLog = logFileList.Count - logKeep; for (int i = 0; i < minLog; i++) { FileInfo logFile = new FileInfo(logFileList[i]); logFile.Delete(); } } catch (DatabaseException) { Console.WriteLine("Problem deleting log archive files."); } } } /* * Implemention of EventHandler interface to handle the Berkeley DB events * we are interested in receiving. */ private void RepQuoteEventHandler(NotificationEvent eventCode, byte[] event_info) { switch (eventCode) { case NotificationEvent.REP_CLIENT: dbenv.IsMaster = false; dbenv.InClientSync = true; break; case NotificationEvent.REP_MASTER: dbenv.IsMaster = true; dbenv.InClientSync = false; break; case NotificationEvent.REP_NEWMASTER: dbenv.InClientSync = true; break; case NotificationEvent.REP_STARTUPDONE: dbenv.InClientSync = false; break; /* * Did not get enough acks to guarantee transaction * durability based on the configured ack policy. This * transaction will be flushed to the master site's * local disk storage for durability. */ case NotificationEvent.REP_PERM_FAILED: Console.WriteLine("Insufficient acknowledgements " + "to guarantee transaction durability."); break; default: Console.WriteLine("Ignoring event: {0}", eventCode); break; } } } }