Skip to content

Logging: virtual table schema #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 12, 2025
4 changes: 2 additions & 2 deletions app/server/datasource/rdbms/clickhouse/connection_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type rows struct {
*sql.Rows
}

func (r *rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) {
func (r *rows) MakeTransformer(ydbColumns []*Ydb.Column, cc conversion.Collection) (paging.RowTransformer[any], error) {
columns, err := r.ColumnTypes()
if err != nil {
return nil, fmt.Errorf("column types: %w", err)
Expand All @@ -37,7 +37,7 @@ func (r *rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (
typeNames = append(typeNames, column.DatabaseTypeName())
}

transformer, err := transformerFromSQLTypes(typeNames, ydbTypes, cc)
transformer, err := transformerFromSQLTypes(typeNames, common.YDBColumnsToYDBTypes(ydbColumns), cc)
if err != nil {
return nil, fmt.Errorf("transformer from sql types: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions app/server/datasource/rdbms/clickhouse/connection_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func (rowsNative) NextResultSet() bool {
return false
}

func (r *rowsNative) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) {
func (r *rowsNative) MakeTransformer(ydbColumns []*Ydb.Column, cc conversion.Collection) (paging.RowTransformer[any], error) {
columns := r.ColumnTypes()

typeNames := make([]string, 0, len(columns))
for _, column := range columns {
typeNames = append(typeNames, column.DatabaseTypeName())
}

transformer, err := transformerFromSQLTypes(typeNames, ydbTypes, cc)
transformer, err := transformerFromSQLTypes(typeNames, common.YDBColumnsToYDBTypes(ydbColumns), cc)
if err != nil {
return nil, fmt.Errorf("transformer from sql types: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func TestMakeSelectQuery(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tc.outputQuery, readSplitsQuery.QueryText)
require.Equal(t, tc.outputArgs, readSplitsQuery.QueryArgs.Values())
require.Equal(t, tc.outputYdbTypes, readSplitsQuery.YdbTypes)
require.Equal(t, tc.outputYdbTypes, common.YDBColumnsToYDBTypes(readSplitsQuery.YdbColumns))
})
}
}
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(

defer common.LogCloserError(logger, rows, "close rows")

transformer, err := rows.MakeTransformer(query.YdbTypes, ds.converterCollection)
transformer, err := rows.MakeTransformer(query.YdbColumns, ds.converterCollection)
if err != nil {
return 0, fmt.Errorf("make transformer: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions app/server/datasource/rdbms/datasource_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func NewDataSourceFactory(
dsf.logging = Preset{
SQLFormatter: logging.NewSQLFormatter(ydb.NewSQLFormatter(cfg.Logging.Ydb.Mode, cfg.Logging.Ydb.Pushdown)),
ConnectionManager: logging.NewConnectionManager(cfg.Logging, connManagerBase, dsf.loggingResolver),
TypeMapper: ydbTypeMapper,
SchemaProvider: ydb.NewSchemaProvider(ydbTypeMapper),
TypeMapper: nil,
SchemaProvider: logging.NewSchemaProvider(),
SplitProvider: logging.NewSplitProvider(dsf.loggingResolver, ydb.NewSplitProvider(cfg.Logging.Ydb.Splitting)),
RetrierSet: &retry.RetrierSet{
MakeConnection: retry.NewRetrierFromConfig(cfg.Ydb.ExponentialBackoff, retry.ErrorCheckerMakeConnectionCommon),
Expand Down
23 changes: 19 additions & 4 deletions app/server/datasource/rdbms/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,16 @@ func TestReadSplit(t *testing.T) {
}

rows.On("MakeTransformer",
[]*Ydb.Type{common.MakePrimitiveType(Ydb.Type_INT32), common.MakePrimitiveType(Ydb.Type_UTF8)},
[]*Ydb.Column{
{
Name: "col1",
Type: common.MakePrimitiveType(Ydb.Type_INT32),
},
{
Name: "col2",
Type: common.MakePrimitiveType(Ydb.Type_UTF8),
},
},
).Return(transformer, nil).Once()
rows.On("Next").Return(true).Times(2)
rows.On("Next").Return(false).Once()
Expand Down Expand Up @@ -158,9 +167,15 @@ func TestReadSplit(t *testing.T) {
scanErr := fmt.Errorf("scan failed")

rows.On("MakeTransformer",
[]*Ydb.Type{
common.MakePrimitiveType(Ydb.Type_INT32),
common.MakePrimitiveType(Ydb.Type_UTF8),
[]*Ydb.Column{
{
Name: "col1",
Type: common.MakePrimitiveType(Ydb.Type_INT32),
},
{
Name: "col2",
Type: common.MakePrimitiveType(Ydb.Type_UTF8),
},
},
).Return(transformer, nil).Once()
rows.On("Next").Return(true).Times(2)
Expand Down
37 changes: 37 additions & 0 deletions app/server/datasource/rdbms/logging/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package logging

import (
"fmt"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

"github.com/ydb-platform/fq-connector-go/app/server/conversion"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/paging"
)

var _ rdbms_utils.Rows = (*rowsImpl)(nil)

type rowsImpl struct {
rdbms_utils.Rows
}

func (rowsImpl) MakeTransformer(ydbColumns []*Ydb.Column, cc conversion.Collection) (paging.RowTransformer[any], error) {
return makeRowTransformer(ydbColumns, cc)

Check warning on line 20 in app/server/datasource/rdbms/logging/connection.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/connection.go#L19-L20

Added lines #L19 - L20 were not covered by tests
}

var _ rdbms_utils.Connection = (*connectionImpl)(nil)

type connectionImpl struct {
rdbms_utils.Connection
}

func (c *connectionImpl) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
ydbRows, err := c.Connection.Query(params)
if err != nil {
return nil, fmt.Errorf("ydb connection query: %w", err)
}

Check warning on line 33 in app/server/datasource/rdbms/logging/connection.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/connection.go#L29-L33

Added lines #L29 - L33 were not covered by tests

// Wrap YDB row iterator with new implementation that uses custom transformer
return &rowsImpl{Rows: ydbRows}, nil

Check warning on line 36 in app/server/datasource/rdbms/logging/connection.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/connection.go#L36

Added line #L36 was not covered by tests
}
11 changes: 8 additions & 3 deletions app/server/datasource/rdbms/logging/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
)

type connectionManager struct {
rdbms_utils.Connection
ydbConnectionManager rdbms_utils.ConnectionManager
resolver Resolver
}
Expand Down Expand Up @@ -121,12 +120,18 @@
UseTls: true,
}

cs, err := cm.makeConnectionFromDataSourceInstance(params.Ctx, params.Logger, dsi, src.TableName)
ydbConns, err := cm.makeConnectionFromDataSourceInstance(params.Ctx, params.Logger, dsi, src.TableName)

Check warning on line 123 in app/server/datasource/rdbms/logging/connection_manager.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/connection_manager.go#L123

Added line #L123 was not covered by tests
if err != nil {
return nil, fmt.Errorf("make connection from data source instance: %w", err)
}

return cs, nil
// Wrap YDB connections with Logging decorator to override data types
loggingCons := make([]rdbms_utils.Connection, 0, len(ydbConns))
for _, ydbConn := range ydbConns {
loggingCons = append(loggingCons, &connectionImpl{Connection: ydbConn})
}

Check warning on line 132 in app/server/datasource/rdbms/logging/connection_manager.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/connection_manager.go#L129-L132

Added lines #L129 - L132 were not covered by tests

return loggingCons, nil

Check warning on line 134 in app/server/datasource/rdbms/logging/connection_manager.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/connection_manager.go#L134

Added line #L134 was not covered by tests
}

func (cm *connectionManager) makeConnectionFromDataSourceInstance(
Expand Down
15 changes: 15 additions & 0 deletions app/server/datasource/rdbms/logging/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package logging

const (
levelColumnName = "level"
timestampColumnName = "timestamp"
messageColumnName = "message"
metaColumnName = "meta"

levelTraceValue = "TRACE"
levelDebugValue = "DEBUG"
levelInfoValue = "INFO"
levelWarnValue = "WARN"
levelErrorValue = "ERROR"
levelFatalValue = "FATAL"
)
40 changes: 40 additions & 0 deletions app/server/datasource/rdbms/logging/schema_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package logging

import (
"context"

"go.uber.org/zap"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/common"
)

var loggingVirtualSchema = &api_service_protos.TSchema{
Columns: []*Ydb.Column{
{Name: levelColumnName, Type: common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_UTF8))},
{Name: timestampColumnName, Type: common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_TIMESTAMP))},
{Name: messageColumnName, Type: common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_UTF8))},
{Name: metaColumnName, Type: common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_JSON))},
},
}

var _ rdbms_utils.SchemaProvider = (*schemaProviderImpl)(nil)

type schemaProviderImpl struct {
}

func (schemaProviderImpl) GetSchema(
_ context.Context,
_ *zap.Logger,
_ rdbms_utils.Connection,
_ *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TSchema, error) {
return loggingVirtualSchema, nil

Check warning on line 35 in app/server/datasource/rdbms/logging/schema_provider.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/schema_provider.go#L34-L35

Added lines #L34 - L35 were not covered by tests
}

func NewSchemaProvider() rdbms_utils.SchemaProvider {
return &schemaProviderImpl{}
}
93 changes: 93 additions & 0 deletions app/server/datasource/rdbms/logging/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
"fmt"

"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/ydb"
"github.com/ydb-platform/fq-connector-go/common"
)

var _ rdbms_utils.SQLFormatter = (*sqlFormatter)(nil)
Expand All @@ -29,6 +33,95 @@
return s.RenderSelectQueryTextForColumnShard(parts, dst.GetYdb().TabletIds)
}

func (sqlFormatter) TransformSelectWhat(src *api_service_protos.TSelect_TWhat) *api_service_protos.TSelect_TWhat {
dst := &api_service_protos.TSelect_TWhat{}

for _, item := range src.GetItems() {
name := item.GetColumn().Name
switch name {
case metaColumnName:
dst.Items = append(dst.Items, &api_service_protos.TSelect_TWhat_TItem{
Payload: &api_service_protos.TSelect_TWhat_TItem_Column{
Column: &Ydb.Column{
Name: "json_payload",
Type: common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_JSON)),
},
},
})
case levelColumnName:
dst.Items = append(dst.Items, &api_service_protos.TSelect_TWhat_TItem{
Payload: &api_service_protos.TSelect_TWhat_TItem_Column{
Column: &Ydb.Column{
Name: "level",
Type: common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_INT32)),
},
},
})
default:
dst.Items = append(dst.Items, item)
}
}

return dst
}

func (sqlFormatter) TransformPredicateComparison(
src *api_service_protos.TPredicate_TComparison,
) (*api_service_protos.TPredicate_TComparison, error) {
dst := proto.Clone(src).(*api_service_protos.TPredicate_TComparison)

// For the comparison related to `level` field
if src.LeftValue.GetColumn() == levelColumnName && src.RightValue.GetTypedValue() != nil {
if src.Operation != api_service_protos.TPredicate_TComparison_EQ {
return nil, fmt.Errorf("unsupported operation %v for `level` column comparison", src.Operation)
}

Check warning on line 77 in app/server/datasource/rdbms/logging/sql_formatter.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/sql_formatter.go#L76-L77

Added lines #L76 - L77 were not covered by tests

// Extract filter value of a string type
var levelValue string

switch src.RightValue.GetTypedValue().GetType().GetTypeId() {
case Ydb.Type_UTF8:
levelValue = src.RightValue.GetTypedValue().GetValue().GetTextValue()

Check warning on line 84 in app/server/datasource/rdbms/logging/sql_formatter.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/sql_formatter.go#L83-L84

Added lines #L83 - L84 were not covered by tests
case Ydb.Type_STRING:
levelValue = string(src.RightValue.GetTypedValue().GetValue().GetBytesValue())
default:
return nil, fmt.Errorf(
"unsupported typed value of type %v for `level` column comparison",
src.RightValue.GetTypedValue().GetType(),
)

Check warning on line 91 in app/server/datasource/rdbms/logging/sql_formatter.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/sql_formatter.go#L87-L91

Added lines #L87 - L91 were not covered by tests
}

// Replace it with number representation
switch levelValue {
case levelTraceValue:
dst.RightValue.Payload = makeTypedValueForLevel(1)
case levelDebugValue:
dst.RightValue.Payload = makeTypedValueForLevel(2)
case levelInfoValue:
dst.RightValue.Payload = makeTypedValueForLevel(3)
case levelWarnValue:
dst.RightValue.Payload = makeTypedValueForLevel(4)

Check warning on line 103 in app/server/datasource/rdbms/logging/sql_formatter.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/sql_formatter.go#L96-L103

Added lines #L96 - L103 were not covered by tests
case levelErrorValue:
dst.RightValue.Payload = makeTypedValueForLevel(5)
case levelFatalValue:
dst.RightValue.Payload = makeTypedValueForLevel(6)
default:
return nil, fmt.Errorf("unsupported `level` value %s", levelValue)

Check warning on line 109 in app/server/datasource/rdbms/logging/sql_formatter.go

View check run for this annotation

Codecov / codecov/patch

app/server/datasource/rdbms/logging/sql_formatter.go#L106-L109

Added lines #L106 - L109 were not covered by tests
}
}

return dst, nil
}

func makeTypedValueForLevel(level int32) *api_service_protos.TExpression_TypedValue {
return &api_service_protos.TExpression_TypedValue{
TypedValue: common.MakeTypedValue(
common.MakePrimitiveType(Ydb.Type_INT32),
level,
),
}
}

func NewSQLFormatter(ydbSQLFormatter ydb.SQLFormatter) rdbms_utils.SQLFormatter {
return &sqlFormatter{
SQLFormatter: ydbSQLFormatter,
Expand Down
Loading
Loading