Skip to main content
Version: v6

PubSub V2

motivation

Asynchronous messaging helps with decoupling publishers from consumers, as it avoids the blocking during publishing. Especially in Azure Logic Apps workflows, where the publisher and consumer is not always synchronous.

Invictus provides a solution called the PubSub, that allows Azure Service Bus to act as a message broker and interact in a publish/subscribe-approach via HTTP endpoints; plus having Azure Blob Storage act as a claim-check provider for when messages are to big to be published directly.

🔗 See also the Publisher-Subscriber integration pattern.

Available endpoints

  • /api/Publish: by sending a HTTP request with a custom content, it places a message on an Azure Service Bus topic.
  • /api/Subscribe: by sending a HTTP request with a specified Azure Service Bus topic subscription name, it response available messages.
  • /api/Acknowledge: by sending a HTTP request with a specified message sequence number, it settles the Azure Service Bus message.

PubSub pseudo Logic App diagram

➡️ Publish single message

The /api/Publish endpoint allows users to send a single message to the configured Azure Service Bus topic (default: pubsubv2router) where subscribers are listening to.

The following request properties must/can be supplied:

JSON propertyRequired (default)Translates to ServiceBusMessageDescription
ContentyesBodyRaw binary content for the message. If exceeds certain size (default 20 000 bytes), then the claim-check pattern will be applied: The message gets send with an empty body, and the content gets saved in Azure Blob Storage. The Subscribe action automatically loads the content based on specific application properties from either the message itself or from Blob Storage.
ContextyesApplicationPropertiesUser-provided properties, will be appended with the HTTP request headers x-ms-client-tracking-id and x-ms-workflow-run-id if present
MessageIdno (new GUID)MessageIdOptional message ID for duplicate detection purposes.
Full request example
// POST /api/Publish
{
"Content": "ew0KICAiQ291bnRyeUNvZGUiOiAiQkUiLA0KICAiTW9uZXkiOiAgeyAiQW1vdW50IjogIDUwLCAiQ3VycmVuY3kiOiAgIkdCUCIgIH0NCn0NCg==",
"MessageId": "b0f11049-7f4d-4bae-90b2-91d93e69367d",
"Context": {
"x-applicationName": "InvoiceApp",
"x-batchId": "975f7ea4-6247-431b-afb6-6d27fb47516f",
"x-conversationId": "29500405-d7cf-4877-a72b-a3288cff9dc0",
"x-correlationId": "fc13d345-ebd7-44f2-89a9-4371258c0a08"
}
}

The endpoint will respond with 202 Accepted, if the message got published successfully.

⬅️ Subscribe for messages

The /api/Subscribe endpoint allows users to periodically ask for any available published messages on the configured Azure Service Bus topic (default: pubsubv2router).

The following request properties must/can be supplied:

JSON propertyRequired (default)Translates to Service BusDescription
SubscriptionyesSubscriptionNameName of Azure Service Bus topic subscription, gets created if not exists. (Name is also used as name of the Rule.)
Filterno (subscribe on all messages)SqlExpressionOptional SQL expression that acts as a filter rule for which messages to subscribe on.
BatchSizeno (10)BatchSizeMaximum messages to receive during this single call.
TimeoutMillisecondsno (1min)MaxWaitTimeMaximum time to wait for a message before responding with an empty set of messages.
ShouldDeleteOnReceiveno (false)ReceiveAndDeletefalse (default) means PeekLock, true means receiving messages with ReceiveAndDelete.

⚠️ In some rare cases, the use of ShouldDeleteOnReceive=true could cause messages to be lost, for example when an error occurs on the client-side and the sequence number is lost or when cancelled/scaled-down happens at the exact moment the message was received (and deleted) from the topic subscription.
SkipSubscriptionUpsertno (false)create/update subscription and ruletrue means there should already be a topic subscription available, false (default) means that a subscription will be created with the provided Filter.
tip

One can also use the HTTP request query parameters instead of the request body to POST to the /api/Subscribe endpoint: /api/Subscribe?Subscription=orderProcessor.

Full request example
// POST -> /api/Subscribe
{
"Subscription": "orderProcessor",
"Filter": "sys.label = 'OrderCreated'",
"BatchSize": 11,
"TimeoutMilliseconds": 30000,
"ShouldDeleteOnReceive": false,
"SkipSubscriptionUpsert": false
}
Full response example
// 200 OK <- /api/Subscribe
[
{
"subscription": "orderProcessor",
"content": "ew0KICAiQ291bnRyeUNvZGUiOiAiQkUiLA0KICAiTW9uZXkiOiAgeyAiQW1vdW50IjogIDUwLCAiQ3VycmVuY3kiOiAgIkdCUCIgIH0NCn0NCg==",
"context": {
"x-applicationName": "InvoiceApp",
"x-batchId": "975f7ea4-6247-431b-afb6-6d27fb47516f",
"x-conversationId": "29500405-d7cf-4877-a72b-a3288cff9dc0",
"x-correlationId": "fc13d345-ebd7-44f2-89a9-4371258c0a08",
"x-ms-client-tracking-id": "test",
"Diagnostic-Id": "00-0cc7ed09eeaa51b0e835d90890aefb60-b0a02deac9f6fe6d-00"
},
"sequenceNumber": 99
},
...
]
internal workaround

Because messages can be 'acknowledged' separately from the location where it is received by 'subscription', the message is internally deferred. This is due to restrictions in the Azure SDK which impose that a message can only be settled by the same receiver instance which received the message. Deferring a message allows it to be picked up by any receiver.

✔️ Acknowledge message

The /api/Acknowledge endpoint allows users to 'settle' a previously received message via the /api/Subscribe endpoint. The sequence number of the message is required to run this operation.

The following request properties must/can be supplied:

JSON PropertyRequired (default)Translates to Service BusDescription
SubscriptionyesCreateReceiverName of Azure Service Bus topic subscription to receive the deferred message on (See internal workaround on /api/Subscribe)
SequenceNumberyesReceiveDeferredMessageUnique number assigned by Service Bus, received by the /api/Subscribe response.
AcknowledgementTypeno (Complete)Message settlementType of acknowledge action to take on the message:
  • Complete
  • Abandon
  • Defer
  • DeadLetter
IgnoreNotFoundExceptionno (false)MessageNotFoundtrue means that MessageNotFound Service Bus failures during lookup of the message by its SequenceNumber will result in 202 Accepted; false means a 400 BadRequest will be responded.
Full request example
// POST /api/Acknowledge
{
"Subscription": "subscriptionName",
"AcknowledgementType":"Complete",
"SequenceNumber": 99,
"IgnoreNotFoundException": false,
}

Customization

Available Bicep parameters
Bicep parameterDefaultDescription
pubSubV2TopicNamepubsubv2routerName of Azure Service Bus topic that acts as message broker for the PubSub V2 component.
approvedMessageSizeInBytes200000 (200 KB)Threshold when Azure Service Bus messages' contents should be saved to Blob Storage, a.k.a. claim-checked.
blobContainerPrefixinvictus (final container name: {blobContainerPrefix}{pubSubV2TopicName})Prefix of the Azure Blob Storage container that gets created when messages are to big to be routed via Service Bus and gets saved in a container instead, a.k.a. claim-checked.
serviceBusMessageTimeToLiveMinutes30 daysPeriod the published message should be active on the Azure Service Bus topic (translates to TimeToLive).
pubSubSubscriptionLockTimeoutInMinutes1 minDuration of the peek lock receive, see LockDuration.

Migrating PubSub v1 to v2

Migrating to v2 includes changes in the authentication, endpoint and removal the metadata links.

👉 The /api/Subscribe endpoint also needs to use a POST instead of a GET HTTP method.

Publish message example
{
"PublishMessage": {
"type": "Http",
"inputs": {
"authentication": {
- "password": "@parameters('invictusPassword')",
- "type": "Basic",
- "username": "Invictus"
+ "audience": "[parameters('invictus').authentication.audience]",
+ "identity": "[parameters('infra').managedIdentity.id]",
+ "type": "ManagedServiceIdentity"
},
"method": "post",
- "uri": "[parameters('invictus').framework.pubSub.v1.publishUrl]",
+ "uri": "[parameters('invictus').framework.pubSub.v2.publishUrl]",
"body": {
"Content": "@{decodeBase64(body('Extract_Message_Context')['Content'])}",
"Context": "@body('Extract_Message_Context')?['Context']"
},
},
- "metadata": {
- "apiDefinitionUrl": "[parameters('invictus').framework.pubSub.definitionUrl]",
- "swaggerSource": "custom"
- },
"runAfter": {
"Extract_Message_Context": [
"Succeeded"
]
}
}
}
Subscribe message example
{
"SubscribeMessage": {
"type": "Http"
"inputs": {
"authentication": {
- "password": "@parameters('invictusPassword')",
- "type": "Basic",
- "username": "Invictus"
+ "audience": "[parameters('invictus').authentication.audience]",
+ "identity": "[parameters('infra').managedIdentity.id]",
+ "type": "ManagedServiceIdentity"
},
- "method": "get",
+ "method": "post",
"queries": {
"deleteOnReceive": false,
"filter": "Domain = 'B2B-Gateway' AND Action = 'EDI' AND Version = '1.0'",
"subscription": "[concat(substring(variables('logicAppName'), max(createarray(0, sub(length(variables('logicAppName')), 36)))), '-', uniquestring(variables('logicAppName')))]"
},
- "uri": "[parameters('invictus').framework.pubSub.v1.subscribeUrl]",
+ "uri": "[parameters('invictus').framework.pubSub.v2.subscribeUrl]",
},
- "metadata": {
- "apiDefinitionUrl": "[parameters('invictus').framework.pubSub.definitionUrl]",
- "swaggerSource": "custom"
- },
"recurrence": {
"frequency": "Second",
"interval": 1
},
"splitOn": "@triggerBody()",
"splitOnConfiguration": {
"correlation": {
"clientTrackingId": "@triggerBody()['Context']['x-ms-client-tracking-id']"
}
}
}
}
Acknowledge message example
{
"AcknowledgeMessage": {
"type": "Http",
"inputs": {
"authentication": {
- "password": "@parameters('invictusPassword')",
- "type": "Basic",
- "username": "Invictus"
+ "audience": "[parameters('invictus').authentication.audience]",
+ "identity": "[parameters('infra').managedIdentity.id]",
+ "type": "ManagedServiceIdentity"
},
"body": {
"AcknowledgementType": "Complete",
"IgnoreNotFoundException": true,
"Subscription": "@triggerBody()?['subscription']",
- "LockToken": "@triggerBody()?['LockToken']",
- "MessageReadTime": "@trigger()['startTime']"
},
"method": "post",
- "uri": "[parameters('invictus').framework.pubSub.v1.acknowledgeUrl]",
+ "uri": "[parameters('invictus').framework.pubSub.v2.acknowledgeUrl]",
},
- "metadata": {
- "apiDefinitionUrl": "[parameters('invictus').framework.pubSub.v1.definitionUrl]",
- "swaggerSource": "custom"
- },
"runAfter": {}
}
}