MaxCompute费用对账分摊统计,优化使用数据资源

LogicCipherX
• 阅读 1725

利用MaxCompute InformationSchema与阿里云交易和账单管理API 实现MaxCompute费用对账分摊统计 一、需求场景分析 很多的企业用户选择MaxCompute按量付费模式构建自己的数据平台,利用MaxCompute按作业付费的计费模型,在获得高性能的同时避免"IDLE"状态的不必要资源费用支出,仅为实际使用付费。 那么在一个规模比较大的公司,企业购买了MaxCompute服务,会支撑企业内部的不同部门、个人来使用MaxCompute来开展数据处理分析。为了更好地识别数据平台使用方的周期性花费成本,优化数据资源的使用,就有必要对作业的费用进行统计,从而确认不同人员或归属部门的作业数量、作业费用、作业时长、作业资源使用量等指标。基于这些指标进行成本分摊、作业优化等管理工作。 阿里云交易和账单系统包含了MaxCompute产品的费用信息及费用明细,通过关联交易和账单系统的计费明细与MaxCompute项目的作业明细或某时间段的账单费用,可以获得使用用户、作业明细信息(如提交人账号、作业SQL内容、作业资源使用大小等信息)与计费明细或账单费用间的关系,从而开展分析。 本文将介绍如果自动化实现MaxCompute按量付费项目的作业费用统计,您也可以通过阿里云交易和账单系统API获取其他需要的费用信息,扩展分析场景。 二、方案设计思路 1、获得MaxCompute项目历史作业明细 MaxCompute Information_Schema服务是MaxCompute产品的开放元数据服务,通过Information_Schema提供的一组视图,用户可以自助地查询访问到项目内的准实时的table,column,function等全量元数据信息,同时也提供了项目内近期的作业历史明细,供使用者自助查询使用。 通过元数据服务Information_Schema里面的作业历史表tasks_history视图,可以查询到准实时的项目作业历史明细。包括:项目名称、任务名称、Instance id、开始时间、结束时间、任务复杂度、任务CPU使用情况等字段。

MaxCompute费用对账分摊统计,优化使用数据资源

备注:Information_Schema目前在中国公共云已全面开放,详情见产品文档,国际Region即将开放。 2、获取作业的计费明细数据 用户可以通过费用中心账号总览消费记录去查询具体的消费情况。 同时,格式阿里云交易和账单管理OpenAPI为用户提供管理阿里云产品售卖和财资能力,通过该api可以程序化获取MaxCompute作业计费明细数据。 调用QueryUserOmsData接口(阿里云的账单系统OMS),可以查询到具体计量信息编号、数据分类、存储、SQL读取量、公网上下行流量等字段信息。 3、关联计费明细与作业明细 通过表关联,查询到需要计算的数据结果

select
distinct
t.task_schema,
o.MeteringId,
t.owner_id,
t.operation_text,
o.type,
o.endtime,
o.computationsqlinput,
o.computationsqlcomplexity,
t.cost_cpu,o.starttime,
t.cost_mem
from information_schema.tasks_history t
right join OdpsFeeDemo o
on t.inst_id = o. meteringid
and t.task_schema = o.projectid
where o.type = "ComputationSql";

这些数据可以通过作业ID与计费明细数据进行关联后,您就获取各个作业明细的费用信息(例如,SQL费用=数据扫描量*复杂度) ,从而可以开展不同视角的分析。

MaxCompute费用对账分摊统计,优化使用数据资源

需要强调的是:MaxCompute的计费都是以阿里云费用中心的出账结果及费用明细为准。

三、具体实现步骤(含参考代码) 1.查询元数据服务里面的作业历史表tasks_history 例如,您登录访问的当前项目为 myproject1,在 myproject1 中,可以通过查询 INFORMATION_SCHEMA.tables 获得当前 myproject1 中所有表的元数据信息。

odps@ myproject1 > select * from information_schema.tables;

INFORMATION_SCHEMA 同时包含了作业历史视图,可以查询到当前项目内的作业历史信息,使用时注意添加日期分区进行过滤,例如。

odps@ mypoject1 > select * from information_schema.tasks_history where ds=’yyyymmdd’ limit 100;
odps@ myproject1 > desc package information_schema.systables;

MaxCompute费用对账分摊统计,优化使用数据资源

查询历史表字段属性

odps@ myproject1 > desc information_schema.tasks_history;

如下如所示:

MaxCompute费用对账分摊统计,优化使用数据资源

2.使用阿里云交易和账单管理API获取费用明细和分摊统计 方法1:手工下载上传方式 (一)首先在MaxCompute中创建结果输出表OMS表,建表语句如下:

CREATE TABLE IF NOT EXISTS OdpsFeeDemo(
ProjectId STRING COMMENT '项目编号',
MeteringId STRING COMMENT '计量信息编号',
Type STRING COMMENT '数据分类',
Storage STRING COMMENT '存储(Byte)',
EndTime STRING COMMENT '结束时间',
ComputationSqlInput STRING COMMENT 'SQL读取量',
ComputationSqlComplexity STRING COMMENT 'SQL复杂度',
StartTime STRING COMMENT '开始时间',
OdpsSpecCode STRING COMMENT '规格类型'
);

方法一:手动从视图下载oms账单详细费用,将数据上传(tunnel upload)到odps对应输出表

手动下载步骤: https://help.aliyun.com/product/87964.html?spm=a2c4g.750001.list.245.5e907b138Ik9xM

MaxCompute费用对账分摊统计,优化使用数据资源

MaxCompute费用对账分摊统计,优化使用数据资源

进入阿里云用户中心:https://usercenter2.aliyun.com/home 返回旧版

费用中心>消费记录>使用记录 选择产品类型,填写使用期间,计算粒度,导出CSV格式

MaxCompute费用对账分摊统计,优化使用数据资源

把oms数据定期取下来,然后上传到odps中创建的结果输出表(OdpsFeeDemo) tunnel upload C:UsersDesktopaa.txt project.tablename ; (二)进行表关联,将最终结果存储在上面创建的MaxComputeFee中

select 
distinct 
t.task_schema,
o.MeteringId,
t.owner_id,
o.type,
o.endtime,
o.computationsqlinput,
o.computationsqlcomplexity,
t.cost_cpu,o.starttime,
t.cost_mem 
from information_schema.tasks_history t 
right join OdpsFeeDemo o 
on t.inst_id = o. meteringid 
and t.task_schema = o.projectid
where o.type = “ComputationSql”;

方法2:程序化API下载费用明细数据&上传到MaxCompute后分析 (一)在odps创建oms表OdpsFeeDemo,参考如下:

CREATE TABLE IF NOT EXISTS OdpsFeeDemo(
ProjectId STRING COMMENT '项目编号',
MeteringId STRING COMMENT '计量信息编号',
Type STRING COMMENT '数据分类',
Storage STRING COMMENT '存储(Byte)',
EndTime STRING COMMENT '结束时间',
ComputationSqlInput STRING COMMENT 'SQL读取量',
ComputationSqlComplexity STRING COMMENT 'SQL复杂度',
StartTime STRING COMMENT '开始时间',
OdpsSpecCode STRING COMMENT '规格类型'
);

通过API下载OMS系统数据并上传到odps对于表格中 代码参考如下: 1) 服务启动类Application

package com.alibaba.odps;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * @ClassName: Application
 * @Description: 服务启动类
 * @Author: ***
 * @Data: 2019/7/30 17:15
 **/
@SpringBootApplication
@EnableScheduling
public class Application {
 public static void main(String[] args) {
 SpringApplication.run(Application.class, args);
 }
}

2) 从odps接收数据ReceiveData

package com.alibaba.odps.controller;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.bssopenapi.model.v20171214.QueryUserOmsDataRequest;
import com.aliyuncs.bssopenapi.model.v20171214.QueryUserOmsDataResponse;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @ClassName: ReceiveData
 * @Description: 接收数据
 * @Author: LiuJianwei
 * @Data: 2019/7/30 17:18
 **/
@Component
public class ReceiveData {

 @Value("${table}")
 private String table;

 @Value("${odps.accessKeyId}")
 private String accessKeyId;

 @Value("${odps.accessKeySecret}")
 private String accessKeySecret;

 @Value("${file.save.path}")
 private String fileSavePath;

 @Autowired
 private OdpsServer odpsServer;

 protected final ObjectMapper objectMapper = new ObjectMapper();

 {
 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 }

// String[] fileds = {"DBVersion", "InstanceId", "NetworkIn", "NetworkOut", "Storage", "Memory", "Region", "ProviderId",
// "DBType", "EndTime", "StartTime", "InstanceUseType", "InstanceName"};
 String[] fileds = {"ProjectId","MeteringId","Type","Storage","EndTime","ComputationSqlInput","ComputationSqlComplexity","StartTime","OdpsSpecCode"};

 @Scheduled(cron = "${cron}")
 public void queryUserOmsData() {
 //获取昨天的开始日期和结束日期
 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
 String yesterday = format.format(DateUtils.addDays(new Date(), -1));
 //String yesterday = "2019-07-29";
 String startTime = yesterday + "T00:00:00Z";
 String endTime = yesterday + "T23:59:59Z";
 DefaultProfile profile = DefaultProfile.getProfile("cn-hangzhou", accessKeyId, accessKeySecret);
 IAcsClient client = new DefaultAcsClient(profile);
 for (String tab : table.split(",")) {
 QueryUserOmsDataRequest request = new QueryUserOmsDataRequest();
 request.setTable(tab.trim());
 request.setDataType("HOUR");
 request.setStartTime(startTime);
 request.setEndTime(endTime);
 try {
 QueryUserOmsDataResponse response = client.getAcsResponse(request);
 String data = new Gson().toJson(response);
 //将数据插入
 odpsServer.writeDataToOdps(data, yesterday, tab.trim());
 //将查询到的数据保存到TXT中
 writeDataToTxt(data, yesterday);
 } catch (IOException | ServerException e) {
 e.printStackTrace();
 } catch (ClientException e) {
 System.out.println(e);
 System.out.println("ErrCode:" + e.getErrCode());
 System.out.println("ErrMsg:" + e.getErrMsg());
 System.out.println("RequestId:" + e.getRequestId());
 }
 }
 }

 public void writeDataToTxt(String data, String yesterday) throws IOException {
 String path = fileSavePath + File.separator + yesterday + ".txt";
 FileWriter writer = new FileWriter(new File(path));
 if (StringUtils.isNotEmpty(data)) {
 JSONObject json = objectMapper.readValue(data, JSONObject.class);
 JSONObject datas = json.getJSONObject("data");
 if (datas.containsKey("omsData")) {
 List<Map<String, Object>> list = (List<Map<String, Object>>) datas.get("omsData");
 if (!list.isEmpty()) {
 for (Map<String, Object> map : list) {
 StringBuilder sb = new StringBuilder();
 for (String key : fileds) {
 if (map.containsKey(key)) {
 sb.append(map.get(key));
 } else {
 sb.append(" ");
 }
 sb.append(",");
 }
 sb.setLength(sb.length() - 1);
 sb.append("\r\n");
 writer.write(sb.toString());
 }
 }
 }
 }
 writer.flush();
 writer.close();
 }
}

3) 将接收数据上传到MaxCompute项目里建好的oms表,类名:OdpsServer

package com.alibaba.odps.controller;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.Odps;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TableTunnel.UploadStatus;
import com.aliyun.odps.tunnel.TunnelException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @ClassName: OdpsServer
 * @Description: 将数据写入ODPS中
 * @Author: LiuJianwei
 * @Data: 2019/7/30 17:23
 **/
@Component
public class OdpsServer implements InitializingBean {

 @Value("${odps.accessKeyId}")
 private String accessKeyId;

 @Value("${odps.accessKeySecret}")
 private String accessKeySecret;

 @Value("${odps.project}")
 private String project;

 @Value("${odps.url}")
 private String url;

 private UploadSession ossUploadSession;

 private UploadSession rdsUploadSession;

 private UploadSession odpsUploadSession;

 private String OSSTableName = "MaxComputeFee";

 private String RDSTableName ="RDS";

 private String ODPSTableName ="OdpsFeeDemo";

 protected final ObjectMapper objectMapper = new ObjectMapper();

 {
 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 }

 public void writeDataToOdps(String data, String yesday, String tab) {
 List<Map<String, Object>> dataList = new ArrayList<>();
 if (StringUtils.isNotEmpty(data)) {
 try {
 JSONObject json = objectMapper.readValue(data, JSONObject.class);
 JSONObject datas = json.getJSONObject("data");
 if (datas.containsKey("omsData")) {
 dataList = (List<Map<String, Object>>)datas.get("omsData");
 }
 if (dataList.isEmpty()) {
 return;
 }
 //数据不为空,开发往ODPS中写入数据
 if (tab.equals("oss")) {
 for (Map<String, Object> map : dataList) {
 UploadSession session = getSession(OSSTableName);
 RecordWriter writer = session.openRecordWriter(session.getAvailBlockId());
 Record record = session.newRecord();
 writer.write(record);
 if (writer != null) {
 writer.close();
 session.commit(new Long[] {0L});
 }
 }
 } else if (tab.equals("rds")) {
 for (Map<String, Object> map : dataList) {
 UploadSession session = getSession(RDSTableName);
 RecordWriter writer = session.openRecordWriter(session.getAvailBlockId());
 Record record = session.newRecord();
 record.set("dbversion", map.get("DBVersion").toString());
 record.set("instanceid", map.get("InstanceId").toString());
 record.set("networkin", map.get("NetworkIn").toString());
 record.set("networkout", map.get("NetworkOut").toString());
 record.set("storage", Long.valueOf(map.get("Storage").toString()));
 record.set("memory", map.get("Memory").toString());
 record.set("region", map.get("Region").toString());
 record.set("providerid", map.get("ProviderId").toString());
 record.set("dbtype", map.get("DBType").toString());
 record.set("endtime", map.get("EndTime").toString());
 record.set("starttime", map.get("StartTime").toString());
 record.set("instanceusetype", map.get("InstanceUseType").toString());
 record.set("instancename", map.get("InstanceName").toString());
 writer.write(record);
 if (writer != null) {
 writer.close();
 session.commit(new Long[] {0L});
 }
 }
 } else if (tab.equals("odps")) {
 for (Map<String, Object> map : dataList) {
 UploadSession session = getSession(ODPSTableName);
 RecordWriter writer = session.openRecordWriter(session.getAvailBlockId());
 Record record = session.newRecord();
 record.set("projectid", map.containsKey("ProjectId") ? map.get("ProjectId").toString() : "");
record.set("meteringid", map.containsKey("MeteringId") ? map.get("MeteringId").toString() : "");
record.set("type", map.containsKey("Type") ? map.get("Type").toString() : "");
record.set("storage", map.containsKey("Storage") ? map.get("Storage").toString() : "");
record.set("endtime", map.containsKey("EndTime") ? map.get("EndTime").toString() : "");
record.set("computationsqlinput", map.containsKey("ComputationSqlInput") ? map.get("ComputationSqlInput").toString() : "");
record.set("computationsqlcomplexity", map.containsKey("ComputationSqlComplexity") ? map.get("ComputationSqlComplexity").toString() : "");
record.set("starttime", map.containsKey("StartTime") ? map.get("StartTime").toString() : "");
record.set("odpsspeccode", map.containsKey("OdpsSpecCode") ? map.get("OdpsSpecCode").toString() : "");

 writer.write(record);
 if (writer != null) {
 writer.close();
 session.commit(new Long[] {0L});
 }
 }
 }
 } catch (Exception e) {
 throw new RuntimeException();
 }
 }
 }

 private UploadSession getSession(String tableName) {
 try {
 if (tableName.equals(OSSTableName)) {
 if (!this.ossUploadSession.getStatus().equals(UploadStatus.NORMAL)) {
 this.ossUploadSession = createNewSession(tableName);
 }
 return this.ossUploadSession;
 } else if (tableName.equals(RDSTableName)) {
 if (!this.rdsUploadSession.getStatus().equals(UploadStatus.NORMAL)) {
 this.rdsUploadSession = createNewSession(tableName);
 }
 return this.rdsUploadSession;
 }else if (tableName.equals(ODPSTableName)) {
 if (!this.odpsUploadSession.getStatus().equals(UploadStatus.NORMAL)) {
 this.odpsUploadSession = createNewSession(tableName);
 }
 return this.odpsUploadSession;
 }

 } catch (TunnelException | IOException e) {
 throw new RuntimeException(e);
 }
 return null;
 }

 private UploadSession createNewSession(String tableName) {
 try {
 AliyunAccount account = new AliyunAccount(accessKeyId, accessKeySecret);
 Odps odps = new Odps(account);
 odps.setEndpoint(url);
 odps.setDefaultProject(project);
 TableTunnel odpsTunnel = new TableTunnel(odps);
 UploadSession session = odpsTunnel.createUploadSession(project, tableName);
 return session;
 } catch (TunnelException e) {
 throw new RuntimeException(e);
 }
 }

 @Override
 public void afterPropertiesSet() throws Exception {
 this.ossUploadSession = createNewSession(OSSTableName);
 this.rdsUploadSession = createNewSession(RDSTableName);
 this.odpsUploadSession = createNewSession(ODPSTableName);
 }

}

4) 配置文件

MaxCompute费用对账分摊统计,优化使用数据资源

配置accessKeyId
odps.accessKeyId=******** 
#配置accessKeySecret
odps.accessKeySecret=********
#配置project
odps.project=工作空间
#配置url
odps.url=http://service.odps.aliyun.co...
#配置table
table=odps

ds#配置定时任务时间设置
cron=0/1 0/1 * * * ?

5) 现在将数据上传到odps里面对应的表,然后进行关联

select 
distinct 
t.task_schema,
o.MeteringId,
t.owner_id,
o.type,
o.endtime,
o.computat

select 
distinct 
t.task_schema,
o.MeteringId,
t.owner_id,
o.type,
o.endtime,
o.computationsqlinput,
o.computationsqlcomplexity,
t.cost_cpu,o.starttime,
t.cost_mem 
from information_schema.tasks_history t 
right join OdpsFeeDemo o 
on t.inst_id

上云就看云栖号:更多云资讯,上云案例,最佳实践,产品入门,访问:https://yqh.aliyun.com/

本文为阿里云原创内容,未经允许不得转载。

点赞
收藏
评论区
推荐文章
待兔 待兔
1年前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
4年前
SQL优化器原理
摘要:在MaxCompute中,Join操作符的实现算法之一名为"HashJoin",其实现原理是,把小表的数据全部读入内存中,并拷贝多份分发到大表数据所在机器,在map阶段直接扫描大表数据与内存中的小表数据进行匹配。    这是MaxCompute有关SQL优化器原理的系列文章之一。我们会陆续推出SQL优化器有关优化规则和框架
专注IP定位 专注IP定位
4年前
微信拟推出个人微信云存储付费服务,你的微信聊天记录值多少钱?
消息称微信拟推出个人微信云存储付费服务有知情人士表示,腾讯拟推出个人微信云存储付费服务,据悉,该业务可能采用按年付费的模式,苹果用户或在180元/年左右,安卓用户或在130元/年左右。但该费用具体可以存储多大容量的数据尚未得到确定。知情人士表示,当前该项目仍在最后推动中,具体细节未最终敲定。(中国日报网)其实,一直以来,保存和迁移微信聊天记录既是用户的痛点,
专注IP定位 专注IP定位
4年前
勒索软件应急处置方法
勒索软件是黑客用来劫持用户资源或资源实施勒索的一种恶意程序。黑客利用勒索软件,通过加密用户数据、更改配置等方式,使用户资产或资源无法正常使用,并以此为条件要求用户支付费用以获得解密密码或恢复系统正常运行。勒索软件防范九要、四不要防范勒索软件“九要”一:要做好资产梳理与分级分类管理清点和梳理组织内的信息系统和应用程序,建立完整的资产清单;梳理通信数据在不同信息
Stella981 Stella981
4年前
GreenPlum tidb 性能比较
主要的需求  针对大体量表的OLAP统计查询,需要找到一个稳定,高性能的大数据数据库,具体使用  数据可以实时的写入和查询,并发的tps不是很高建立数据仓库,模式上主要采用星星模型、雪花模型,或者宽表前端展示分为3类 saiku、granafa、c代码开发数据体量:事实表在35亿、维度表大的在500
Stella981 Stella981
4年前
Hologres+Flink流批一体首次落地4982亿背后的营销分析大屏
简介: 本篇将重点介绍Hologres在阿里巴巴淘宝营销活动分析场景的最佳实践,揭秘FlinkHologres流批一体首次落地阿里双11营销分析大屏背后的技术考验。_概要:刚刚结束的2020天猫双11中,MaxCompute交互式分析(下称Hologres)实时计算Flink搭建的云原生实时数仓首次在核心数据场景落地,为大数据平台创下一项新纪
Stella981 Stella981
4年前
Serverless Kubernetes 快速入门指南
_摘要:_ 5月,阿里云宣布推出ServerlessKubernetes服务。开发者可在5秒内创建集群、30秒部署应用上线。用户无需管理集群基础设施,根据应用实际消耗资源按量付费;用户可以直接使用K8SAPI或命令行来管理容器应用,容器应用可以与VPC中现有应用和阿里云能力无缝集成。5月,阿里云宣布推出ServerlessKubernetes服务。
Stella981 Stella981
4年前
Kafka数据迁移MaxCompute最佳实践
前提条件搭建Kafka集群进行数据迁移前,您需要保证自己的Kafka集群环境正常。本文使用阿里云EMR服务自动化搭建Kafka集群,详细过程请参见:Kafka快速入门。本文使用的EMRKafka版本信息如下:EMR版本:EMR3.12.1集群类型:Kafka软件信息:Ganglia3.7.2ZooKeeper
性能提升,成本降低,原生数据库的崛起
腾讯高级工程师杨宇基介绍,作为国内首个云原生无服务器数据库,TDSQLC实现了自动伸缩三大目标,可以根据业务负载进行伸缩。开发者不需要提前预测负载和扩展资源;按使用量计费,按实际使用负载计费,开发者不需要为未使用的资源付费;没有使用,没有付款,没有数据请求
幂简集成 幂简集成
1年前
股票API接口:技术评估与市场趋势
在程序化交易领域,股票API是自动化交易系统的核心组成部分,允许交易策略在毫秒级别进行执行,极大提高了交易效率和精度。随着功能的不断扩展,股票API不仅提供传统的交易数据,还涵盖了财经指标、情绪分析和预测模型等更复杂的数据类型。开发者可以利用这些API快速构建强大的交易和分析系统,提升市场竞争力。