练习 - 继续处理事件并将数据存储在 Azure Cosmos DB 中
第二个函数可以侦听 Azure 事件中心中特定命名空间的事件,并处理这些事件并将其存储在使用 Azure Cosmos DB 创建的数据库中。
使用 Azure Cosmos DB 创建数据库
若要创建数据库,请使用 az cosmosdb create 命令。 该命令使用 Azure Cosmos DB 帐户、数据库和 SQL 容器。
az cosmosdb create \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--name TelemetryDb
az cosmosdb sql container create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--database-name TelemetryDb \
--name TelemetryInfo \
--partition-key-path '/temperatureStatus'
对于我们的情境,温度值得关注。 因此,我们定义为 temperatureStatus 分区键。
生成、配置和部署另一个 Azure 函数
使用事件中心,可以从以兆字节为单位的数据流开始,并增长到 GB 或 TB。 自动膨胀功能是许多选项之一,可用于缩放吞吐量单位数以满足使用需求。
每个函数的消费应用程序都有事件流的单独视图。 他们以自己的节奏和自己的偏移量独立地读取数据流。
在我们的场景中,你需要创建一个消耗 Azure 函数作为示例。 若要创建函数,应遵循最佳实践,使其独立运行,并配置自己的存储帐户和绑定,以实现松散耦合和可伸缩性。
az storage account create \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--sku Standard_LRS
az functionapp create \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c"\
--storage-account $STORAGE_ACCOUNT"c" \
--consumption-plan-location $LOCATION \
--runtime java \
--functions-version 4
检索连接字符串
使用者函数需要了解其存储帐户和事件中心。 它还需要了解将处理的事件写入到的数据库。
AZURE_WEB_JOBS_STORAGE=$( \
az storage account show-connection-string \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--query connectionString \
--output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
az cosmosdb keys list \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT \
--type connection-strings \
--query 'connectionStrings[0].connectionString' \
--output tsv)
echo $COSMOS_DB_CONNECTION_STRING
可以使用命令 echo $EVENT_HUB_CONNECTION_STRING 检查变量是否仍然正确设置。 否则,请重新运行以下命令:
EVENT_HUB_CONNECTION_STRING=$( \
az eventhubs eventhub authorization-rule keys list \
--resource-group $RESOURCE_GROUP \
--name $EVENT_HUB_AUTHORIZATION_RULE \
--eventhub-name $EVENT_HUB_NAME \
--namespace-name $EVENT_HUB_NAMESPACE \
--query primaryConnectionString \
--output tsv)
echo $EVENT_HUB_CONNECTION_STRING
这些连接字符串需要存储在 Azure Functions 帐户的应用程序设置中。
az functionapp config appsettings set \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c" \
--settings \
AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING
注释
对于生产环境,可以使用 Azure Key Vault 的实例来存储和管理连接字符串。
创建函数应用程序
在创建下一个函数之前,请确保位于正确的文件夹中。
cd ..
mvn archetype:generate --batch-mode \
-DarchetypeGroupId=com.microsoft.azure \
-DarchetypeArtifactId=azure-functions-archetype \
-DappName=$FUNCTION_APP"-c" \
-DresourceGroup=$RESOURCE_GROUP \
-DappRegion=$LOCATION \
-DappServicePlanName=$LOCATION"plan" \
-DgroupId=com.learn \
-DartifactId=telemetry-functions-consumer
该命令创建类似于上一个练习的应用程序。 删除测试文件,使用local.settings.file命令更新fetch-app-settings,然后替换现有Function.java文件。
cd telemetry-functions-consumer
rm -r src/test
更新本地执行和调试的本地设置。
func azure functionapp fetch-app-settings $FUNCTION_APP"-c"
接下来,打开 Function.java 该文件,并将内容替换为以下代码:
package com.learn;
import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;
public class Function {
@FunctionName("processSensorData")
public void processSensorData(
@EventHubTrigger(
name = "msg",
eventHubName = "", // blank because the value is included in the connection string
cardinality = Cardinality.ONE,
connection = "EventHubConnectionString")
TelemetryItem item,
@CosmosDBOutput(
name = "databaseOutput",
databaseName = "TelemetryDb",
collectionName = "TelemetryInfo",
connectionStringSetting = "CosmosDBConnectionString")
OutputBinding<TelemetryItem> document,
final ExecutionContext context) {
context.getLogger().info("Event hub message received: " + item.toString());
if (item.getPressure() > 30) {
item.setNormalPressure(false);
} else {
item.setNormalPressure(true);
}
if (item.getTemperature() < 40) {
item.setTemperatureStatus(status.COOL);
} else if (item.getTemperature() > 90) {
item.setTemperatureStatus(status.HOT);
} else {
item.setTemperatureStatus(status.WARM);
}
document.setValue(item);
}
}
在与Function.java相同的位置创建另一个名为TelemetryItem.java的新文件,并添加以下代码:
package com.learn;
public class TelemetryItem {
private String id;
private double temperature;
private double pressure;
private boolean isNormalPressure;
private status temperatureStatus;
static enum status {
COOL,
WARM,
HOT
}
public TelemetryItem(double temperature, double pressure) {
this.temperature = temperature;
this.pressure = pressure;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public double getPressure() {
return pressure;
}
@Override
public String toString() {
return "TelemetryItem={id=" + id + ",temperature="
+ temperature + ",pressure=" + pressure + "}";
}
public boolean isNormalPressure() {
return isNormalPressure;
}
public void setNormalPressure(boolean isNormal) {
this.isNormalPressure = isNormal;
}
public status getTemperatureStatus() {
return temperatureStatus;
}
public void setTemperatureStatus(status temperatureStatus) {
this.temperatureStatus = temperatureStatus;
}
}
当事件中心收到消息时,它会生成一个事件。 processSensorData 函数在收到事件时运行。 然后,它会处理事件数据,并使用 Azure Cosmos DB 的输出绑定将结果发送到数据库。 我们再次使用该 TelemetryItem.java 类。 对象 TelemetryItem 可被视为此事件驱动系统的参与者之间的使用者驱动协定。
在本地运行
使用 Azure Functions,可以从世界各地接收事件。 是的,你甚至可以在开发计算机上本地接收事件!
mvn clean package
mvn azure-functions:run
生成和启动消息后,函数运行时会看到传入事件:
[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker
在 Azure 门户中,转到 Azure Cosmos DB 帐户。 选择“数据资源管理器”,选择“TelemetryInfo”,然后选择“项”在数据到达时查看数据。

在 Azure 上部署
现在,让我们在云中转移整个工作负荷。 若要将函数部署到 Azure Functions,请使用 Maven 命令 mvn azure-functions:deploy。 请确保你仍在正确的存储库telemetry-functions中。
mvn azure-functions:deploy
美妙! 我们部署了整个遥测方案,方法是将数据发送到事件中心,并使用不同的独立函数使用数据。 该函数处理数据,然后将结果存储在使用 Azure Cosmos DB 创建的数据库中。 如何确保应用程序满足预定义的要求? 通过使用监控。