Container.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T> Method
Definition
Important
Some information relates to prerelease product that may be substantially modified before it’s released. Microsoft makes no warranties, express or implied, with respect to the information provided here.
Initializes a GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(String, Container.ChangeFeedHandler<ChangeFeedItem<T>>) for change feed processing with all versions and deletes.
public abstract Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(string processorName, Microsoft.Azure.Cosmos.Container.ChangeFeedHandler<Microsoft.Azure.Cosmos.ChangeFeedItem<T>> onChangesDelegate);
abstract member GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes : string * Microsoft.Azure.Cosmos.Container.ChangeFeedHandler<Microsoft.Azure.Cosmos.ChangeFeedItem<'T>> -> Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder
Public MustOverride Function GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(Of T) (processorName As String, onChangesDelegate As Container.ChangeFeedHandler(Of ChangeFeedItem(Of T))) As ChangeFeedProcessorBuilder
Type Parameters
- T
Document type
Parameters
- processorName
- String
A name that identifies the Processor and the particular work it will do.
- onChangesDelegate
- Container.ChangeFeedHandler<ChangeFeedItem<T>>
Delegate to receive all changes and deletes
Returns
An instance of ChangeFeedProcessorBuilder
Examples
Container leaseContainer = await this.database.CreateContainerAsync(
new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
cancellationToken: this.cancellationToken);
ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
ChangeFeedProcessor changeFeedProcessor = this.Container
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> documents, CancellationToken token) =>
{
Console.WriteLine($"number of documents processed: {documents.Count}");
string id = default;
string pk = default;
string description = default;
foreach (ChangeFeedItem<dynamic> changeFeedItem in documents)
{
if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
{
id = changeFeedItem.Current.id.ToString();
pk = changeFeedItem.Current.pk.ToString();
description = changeFeedItem.Current.description.ToString();
}
else
{
id = changeFeedItem.Previous.id.ToString();
pk = changeFeedItem.Previous.pk.ToString();
description = changeFeedItem.Previous.description.ToString();
}
ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
long previousLsn = changeFeedItem.Metadata.PreviousLsn;
DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
long lsn = changeFeedItem.Metadata.Lsn;
bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
}
return Task.CompletedTask;
})
.WithInstanceName(Guid.NewGuid().ToString())
.WithLeaseContainer(leaseContainer)
.WithErrorNotification((leaseToken, error) =>
{
Console.WriteLine(error.ToString());
return Task.CompletedTask;
})
.Build();
await changeFeedProcessor.StartAsync();
await Task.Delay(1000);
await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
allProcessedDocumentsEvent.WaitOne(10 * 1000);
await changeFeedProcessor.StopAsync();