diff --git a/tablestore/model.go b/tablestore/model.go index 457c721..be8b514 100644 --- a/tablestore/model.go +++ b/tablestore/model.go @@ -10,11 +10,13 @@ import ( "strings" "time" - "github.com/aliyun/aliyun-tablestore-go-sdk/common" - "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol" + "sync" + "github.com/golang/protobuf/proto" lruCache "github.com/hashicorp/golang-lru" - "sync" + + "github.com/aliyun/aliyun-tablestore-go-sdk/common" + "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol" ) type internalClient struct { @@ -368,7 +370,7 @@ type VariantType int32 const ( Variant_INTEGER VariantType = 0 Variant_DOUBLE VariantType = 1 - //VT_BOOLEAN = 2; + // VT_BOOLEAN = 2; Variant_STRING VariantType = 3 ) @@ -380,7 +382,7 @@ type ValueTransferRule struct { type SingleColumnCondition struct { Comparator *ComparatorType ColumnName *string - ColumnValue interface{} //[]byte + ColumnValue interface{} // []byte FilterIfMissing bool LatestVersionOnly bool TransferRule *ValueTransferRule @@ -555,6 +557,24 @@ type GetRowRequest struct { ExtraRequestInfo } +func (response GetRowResponse) Marshal() (primaryKeys map[string]interface{}, rows map[string]interface{}) { + primaryKeys = make(map[string]interface{}) + rows = make(map[string]interface{}) + PrimaryKeyRaws := response.PrimaryKey.PrimaryKeys + index := uint(0) + for range PrimaryKeyRaws { + primaryKeys[PrimaryKeyRaws[index].ColumnName] = PrimaryKeyRaws[index].Value + index++ + } + index = 0 + RowRaws := response.Columns + for range RowRaws { + rows[RowRaws[index].ColumnName] = RowRaws[index].Value + index++ + } + return +} + type MultiRowQueryCriteria struct { PrimaryKey []*PrimaryKey ColumnsToGet []string @@ -599,6 +619,24 @@ type RowResult struct { Index int32 } +func (rowResponse RowResult) Marshal() (PrimaryKeys map[string]interface{}, Rows map[string]interface{}) { + index := uint(0) + PrimaryKeys = make(map[string]interface{}) + primaryKeysRaw := rowResponse.PrimaryKey.PrimaryKeys + for range primaryKeysRaw { + PrimaryKeys[primaryKeysRaw[index].ColumnName] = primaryKeysRaw[index].Value + index++ + } + index = 0 // reset index + Rows = make(map[string]interface{}) + rowsRaw := rowResponse.Columns + for range rowsRaw { + Rows[rowsRaw[index].ColumnName] = rowsRaw[index].Value + index++ + } + return +} + type RowChange interface { Serialize() []byte getOperationType() otsprotocol.OperationType @@ -610,8 +648,50 @@ type BatchGetRowResponse struct { TableToRowsResult map[string][]RowResult ResponseInfo } +type SingleRow struct { + PrimaryKey map[string]interface{} // 主键 + Row map[string]interface{} // 行数据 + Err Error // 错误信息 +} +type Rows struct { + mu sync.Mutex + Data RowData +} +type RowData map[int32]*SingleRow + +func (response *GetRangeResponse) Marshal() (*RowData, error) { + rows := &Rows{Data: make(map[int32]*SingleRow)} + var wg sync.WaitGroup + if len(response.Rows) <= 256 { + rowData := make(RowData) + + for index, data := range response.Rows { + pk, row := data.Marshal() + rowData[int32(index)] = &SingleRow{PrimaryKey: pk, Row: row} + } + return &rowData, nil + } + for index, data := range response.Rows { // 假设response.Rows是Row类型的切片,且Row实现了Marshal方法 + wg.Add(1) + go func(index int32, data *Row) { + defer wg.Done() + + singleRow := new(SingleRow) + singleRow.PrimaryKey, singleRow.Row = data.Marshal() // 假设Row类型有Marshal方法可以直接使用 + + // 在写入之前锁定mutex + rows.mu.Lock() + rows.Data[index] = singleRow + rows.mu.Unlock() + }(int32(index), data) + } + wg.Wait() + // 返回指向Rows.Data的指针,满足函数签名的要求 + return &rows.Data, nil +} -// IsAtomic设置是否为批量原子写 +// BatchWriteRowRequest +// IsAtomic 设置是否为批量原子写 // 如果设置了批量原子写,需要保证写入到同一张表格中的分区键相同,否则会写入失败 type BatchWriteRowRequest struct { RowChangesGroupByTable map[string][]RowChange @@ -708,6 +788,24 @@ type GetRangeResponse struct { ResponseInfo } +func (response Row) Marshal() (PrimaryKeys map[string]interface{}, rows map[string]interface{}) { + PrimaryKeysRaw := response.PrimaryKey.PrimaryKeys + PrimaryKeys = make(map[string]interface{}) + index := uint(0) + for range PrimaryKeysRaw { + PrimaryKeys[PrimaryKeysRaw[index].ColumnName] = PrimaryKeysRaw[index].Value + index++ + } + index = 0 + rows = make(map[string]interface{}) + rowsRaw := response.Columns + for range rowsRaw { + rows[rowsRaw[index].ColumnName] = rowsRaw[index].Value + index++ + } + return +} + type SQLQueryRequest struct { Query string ExtraRequestInfo @@ -756,7 +854,7 @@ type ListStreamResponse struct { type StreamSpecification struct { EnableStream bool ExpirationTime int32 // must be positive. in hours - OriginColumnsToGet []string //origin columns to get for stream data + OriginColumnsToGet []string // origin columns to get for stream data } type StreamDetails struct { @@ -764,7 +862,7 @@ type StreamDetails struct { StreamId *StreamId // nil when stream is disabled. ExpirationTime int32 // in hours LastEnableTime int64 // the last time stream is enabled, in usec - OriginColumnsToGet []string //origin columns to get for stream data + OriginColumnsToGet []string // origin columns to get for stream data } type DescribeStreamRequest struct {