Share via


Container.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T> Method

Definition

public abstract Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(string processorName, Microsoft.Azure.Cosmos.Container.ChangeFeedHandler<Microsoft.Azure.Cosmos.ChangeFeedItem<T>> onChangesDelegate);
abstract member GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes : string * Microsoft.Azure.Cosmos.Container.ChangeFeedHandler<Microsoft.Azure.Cosmos.ChangeFeedItem<'T>> -> Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder
Public MustOverride Function GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(Of T) (processorName As String, onChangesDelegate As Container.ChangeFeedHandler(Of ChangeFeedItem(Of T))) As ChangeFeedProcessorBuilder

Type Parameters

T

Document type

Parameters

processorName
String

A name that identifies the Processor and the particular work it will do.

onChangesDelegate
Container.ChangeFeedHandler<ChangeFeedItem<T>>

Delegate to receive all changes and deletes

Returns

An instance of ChangeFeedProcessorBuilder

Examples

Container leaseContainer = await this.database.CreateContainerAsync(
    new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
    cancellationToken: this.cancellationToken);

ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);

ChangeFeedProcessor changeFeedProcessor = this.Container
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> documents, CancellationToken token) =>
{
    Console.WriteLine($"number of documents processed: {documents.Count}");

    string id = default;
    string pk = default;
    string description = default;

    foreach (ChangeFeedItem<dynamic> changeFeedItem in documents)
    {
        if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
        {
            id = changeFeedItem.Current.id.ToString();
            pk = changeFeedItem.Current.pk.ToString();
            description = changeFeedItem.Current.description.ToString();
        }
        else
        {
            id = changeFeedItem.Previous.id.ToString();
            pk = changeFeedItem.Previous.pk.ToString();
            description = changeFeedItem.Previous.description.ToString();
        }

        ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
        long previousLsn = changeFeedItem.Metadata.PreviousLsn;
        DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
        long lsn = changeFeedItem.Metadata.Lsn;
        bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
    }

    return Task.CompletedTask;
})
.WithInstanceName(Guid.NewGuid().ToString())
.WithLeaseContainer(leaseContainer)
.WithErrorNotification((leaseToken, error) =>
{
    Console.WriteLine(error.ToString());

    return Task.CompletedTask;
})
.Build();

await changeFeedProcessor.StartAsync();
await Task.Delay(1000);
await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));

allProcessedDocumentsEvent.WaitOne(10 * 1000);

await changeFeedProcessor.StopAsync();

Applies to