.. _change-streams: ************** Change Streams ************** .. default-domain:: mongodb .. contents:: On this page :local: :backlinks: none :depth: 1 :class: singlecol As of version 3.6 of the MongoDB server, a new ``$changeStream`` pipeline stage is supported in the aggregation framework. Specifying this stage first in an aggregation pipeline allows users to request that notifications are sent for all changes to a particular collection. As of MongoDB 4.0, change streams are supported on databases and clusters in addition to collections. The Ruby driver provides an API for receiving notifications for changes to a particular collection, database or cluster using this new pipeline stage. Although you can create a change stream using the pipeline operator and aggregation framework directly, it is recommended to use the driver API described below as the driver resumes the change stream one time if there is a timeout, a network error, a server error indicating that a failover is taking place or another type of a resumable error. Change streams on the server require a ``"majority"`` read concern or no read concern. Change streams do not work properly with JRuby because of the issue documented here_. Namely, JRuby eagerly evaluates ``#next`` on an Enumerator in a background green thread, therefore calling ``#next`` on the change stream will cause getMores to be called in a loop in the background. .. _here: https://github.com/jruby/jruby/issues/4212 Watching for Changes on a Collection ==================================== A collection change stream is created by calling the ``#watch`` method on a collection: .. code-block:: ruby client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') collection = client[:test] stream = collection.watch collection.insert_one(a: 1) doc = stream.to_enum.next process(doc) You can also receive the notifications as they become available: .. code-block:: ruby stream = collection.watch enum = stream.to_enum while doc = enum.next process(doc) end The ``next`` method blocks and polls the cluster until a change is available. Use the ``try_next`` method to iterate a change stream without blocking; this method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil. If there is a non-resumable error, both ``next`` and ``try_next`` will raise an exception. See Resuming a Change Stream section below for an example that reads changes from a collection indefinitely. The change stream can take filters in the aggregation framework pipeline operator format: .. code-block:: ruby stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } }, {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } } ]) enum = stream.to_enum while doc = enum.next process(doc) end Watching for Changes on a Database ================================== A database change stream notifies on changes on any collection within the database as well as database-wide events, such as the database being dropped. A database change stream is created by calling the ``#watch`` method on a database object: .. code-block:: ruby client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') database = client.database stream = database.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc) Watching for Changes on a Cluster ================================= A cluster change stream notifies on changes on any collection, any database within the cluster as well as cluster-wide events. A cluster change stream is created by calling the ``#watch`` method on a client object (not the cluster object): .. code-block:: ruby client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test') stream = client.watch client[:test].insert_one(a: 1) doc = stream.to_enum.next process(doc) Closing a Change Stream ======================= You can close a change stream by calling its ``#close`` method: .. code-block:: ruby stream.close Resuming a Change Stream ======================== A change stream consists of two types of operations: the initial aggregation and ``getMore`` requests to receive the next batch of changes. The driver will automatically retry each ``getMore`` operation once on network errors and when the server returns an error indicating it changed state (for example, it is no longer the primary). The driver does not retry the initial aggregation. In practical terms this means that, for example: - Calling ``collection.watch`` will fail if the cluster does not have enough available nodes to satisfy the ``"majority"`` read preference. - Once ``collection.watch`` successfully returns, if the cluster subsequently experiences an election or loses a node, but heals quickly enough, change stream reads via ``next`` or ``each`` methods will continue transparently to the application. To indefinitely and reliably watch for changes without losing any changes or processing a change more than once, the application must track the resume token for the change stream and restart the change stream when it experiences extended error conditions that cause the driver's automatic resume to also fail. The following code snippet shows an example of iterating a change stream indefinitely, retrieving the resume token using the ``resume_token`` change stream method and restarting the change stream using the ``:resume_after`` option on all MongoDB or network errors: .. code-block:: ruby token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum while doc = enum.next process(doc) token = stream.resume_token end rescue Mongo::Error sleep 1 end end The above iteration is blocking at the ``enum.next`` call, and does not permit resuming processing in the event the Ruby process running this code is terminated. The driver also provides the ``try_next`` method which returns ``nil`` (after a small waiting period) instead of blocking indefinitely when there are no changes in the change stream. Using the ``try_next`` method, the resume token may be persisted after each ``getMore`` request, even when a particular request does not return any changes, such that the resume token remains at the top of the oplog and the application has an opportunity to persist it should the process handling changes terminates: .. code-block:: ruby token = nil loop do begin stream = collection.watch([], resume_after: token) enum = stream.to_enum doc = enum.try_next if doc process(doc) end token = stream.resume_token # Persist +token+ to support resuming processing upon process restart rescue Mongo::Error sleep 1 end end Note that the resume token should be retrieved from the change stream after every ``try_next`` call, even if the call returned no document. The resume token is also provided in the ``_id`` field of each change stream document. Reading the ``_id`` field is not recommended because it may be projected out by the application, and because using only the ``_id`` field would not advance the resume token when a ``getMore`` returns no documents.