Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions pkg/bsql/entitlements.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,46 @@ func (s *SQLSyncer) dynamicEntitlements(ctx context.Context, resource *v2.Resour
return nil, "", nil, err
}

npt, err := s.runQuery(ctx, pToken, s.config.Entitlements.Query, s.config.Entitlements.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) {
for _, mapping := range s.config.Entitlements.Map {
r, ok, err := s.mapEntitlement(ctx, resource, mapping, rowMap)
if err != nil {
return false, err
}

if ok {
r.Resource = resource
ret = append(ret, r)
// Use size-aware query execution to prevent exceeding gRPC message size limits
result, err := s.runQueryWithSizeLimit(ctx, pToken, s.config.Entitlements.Query, s.config.Entitlements.Pagination, queryVars,
func(ctx context.Context, rowMap map[string]any) (bool, int64, error) {
var itemSize int64
for _, mapping := range s.config.Entitlements.Map {
r, ok, err := s.mapEntitlement(ctx, resource, mapping, rowMap)
if err != nil {
return false, 0, err
}

if ok {
r.Resource = resource
ret = append(ret, r)
itemSize += estimateEntitlementSize(r)
}
}
}
return true, nil
})
return true, itemSize, nil
})
if err != nil {
return nil, "", nil, err
}

return ret, npt, nil, nil
return ret, result.NextPageToken, nil, nil
}

// estimateEntitlementSize provides a rough estimate of the serialized size of an entitlement.
func estimateEntitlementSize(e *v2.Entitlement) int64 {
if e == nil {
return 0
}
var size int64
size += int64(len(e.Id))
size += int64(len(e.DisplayName))
size += int64(len(e.Description))
size += int64(len(e.Slug))
// Add overhead for annotations, resource reference, and protobuf encoding
size += int64(len(e.Annotations) * 50)
size += int64(len(e.GrantableTo) * 50)
size += sizeEstimateOverhead
return size
}

func (s *SQLSyncer) mapEntitlement(ctx context.Context, resource *v2.Resource, mappings *EntitlementMapping, rowMap map[string]any) (*v2.Entitlement, bool, error) {
Expand Down
48 changes: 36 additions & 12 deletions pkg/bsql/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,48 @@ func (s *SQLSyncer) listGrants(ctx context.Context, resource *v2.Resource, pToke
return nil, "", err
}

npt, err := s.runQuery(ctx, pToken, grantConfig.Query, grantConfig.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) {
for _, mapping := range grantConfig.Map {
g, ok, err := s.mapGrant(ctx, resource, mapping, rowMap)
if err != nil {
return false, err
}
// Use size-aware query execution to prevent exceeding gRPC message size limits
result, err := s.runQueryWithSizeLimit(ctx, pToken, grantConfig.Query, grantConfig.Pagination, queryVars,
func(ctx context.Context, rowMap map[string]any) (bool, int64, error) {
var itemSize int64
for _, mapping := range grantConfig.Map {
g, ok, err := s.mapGrant(ctx, resource, mapping, rowMap)
if err != nil {
return false, 0, err
}

if ok {
ret = append(ret, g)
if ok {
ret = append(ret, g)
itemSize += estimateGrantSize(g)
}
}
}
return true, nil
})
return true, itemSize, nil
})
if err != nil {
return nil, "", err
}

return ret, npt, nil
return ret, result.NextPageToken, nil
}

// estimateGrantSize provides a rough estimate of the serialized size of a grant.
func estimateGrantSize(g *v2.Grant) int64 {
if g == nil {
return 0
}
var size int64
size += int64(len(g.Id))
if g.Entitlement != nil {
size += int64(len(g.Entitlement.Id))
}
if g.Principal != nil && g.Principal.Id != nil {
size += int64(len(g.Principal.Id.Resource))
size += int64(len(g.Principal.Id.ResourceType))
}
// Add overhead for annotations and protobuf encoding
size += int64(len(g.Annotations) * 50)
size += sizeEstimateOverhead
return size
}

func (s *SQLSyncer) mapGrant(ctx context.Context, resource *v2.Resource, mapping *GrantMapping, rowMap map[string]any) (*v2.Grant, bool, error) {
Expand Down
175 changes: 175 additions & 0 deletions pkg/bsql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,51 @@ const (
cursorKey = "cursor"
limitKey = "limit"
unquotedKey = "unquoted"

// gRPC message size limits - we use a conservative threshold to avoid hitting the 4MB limit.
// The actual limit is 4MB, but we use 3MB to leave room for overhead and metadata.
maxResponseSizeBytes = 3 * 1024 * 1024 // 3MB conservative limit
sizeEstimateOverhead = 500 // Overhead per item for protobuf encoding
)

var ErrQueryAffectedZeroRows = errors.New("query affected 0 rows, ending and rolling back")
var ErrQueryAffectedMoreThanOneRow = errors.New("query affected more than one row, ending and rolling back")

// queryResult contains the result of a query execution along with size tracking information.
type queryResult struct {
// NextPageToken is the token for the next page of results
NextPageToken string
// TotalSize is the approximate total size of all results in bytes
TotalSize int64
// ItemCount is the number of items returned
ItemCount int
// HitSizeLimit indicates whether the query stopped early due to size limits
HitSizeLimit bool
}

// estimateRowSize provides a rough estimate of the serialized size of a row map.
// This is used to prevent exceeding gRPC message size limits.
func estimateRowSize(rowMap map[string]interface{}) int64 {
var size int64
for k, v := range rowMap {
size += int64(len(k))
switch val := v.(type) {
case string:
size += int64(len(val))
case []byte:
size += int64(len(val))
case nil:
// nil values have minimal overhead
default:
// For other types, estimate 8-32 bytes
size += 16
}
}
// Add overhead for protobuf encoding, traits, and other fields
size += sizeEstimateOverhead
return size
}

type executor interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
Expand Down Expand Up @@ -490,6 +530,141 @@ func (s *SQLSyncer) normalizeValue(val any) any {
return val
}

// runQueryWithSizeLimit executes a query with size-based early termination.
// It returns a queryResult containing pagination info and size tracking.
// The sizeEstimator function should return the estimated serialized size of the processed item.
func (s *SQLSyncer) runQueryWithSizeLimit(
ctx context.Context,
pToken *pagination.Token,
query string,
pOpts *Pagination,
vars map[string]any,
rowCallback func(context.Context, map[string]interface{}) (bool, int64, error),
) (*queryResult, error) {
l := ctxzap.Extract(ctx)

q, qArgs, pCtx, err := s.prepareQuery(pToken, query, pOpts, vars)
if err != nil {
return nil, err
}

l.Debug("running query with size limit", zap.String("query", q), zap.Any("args", qArgs))

rows, err := s.db.QueryContext(ctx, q, qArgs...)
if err != nil {
l.Error("failed to run query", zap.String("query", q), zap.Any("args", qArgs), zap.Error(err))
return nil, err
}
defer rows.Close()

columns, err := rows.Columns()
if err != nil {
return nil, err
}

values := make([]interface{}, len(columns))
scanArgs := make([]interface{}, len(values))
for i := range values {
scanArgs[i] = &values[i]
}

result := &queryResult{}
var lastRowID any
rowCount := 0
hitPageLimit := false

for rows.Next() {
rowCount++

// Check if we've exceeded the page limit (standard pagination).
if pCtx != nil && rowCount > int(pCtx.Limit) {
hitPageLimit = true
break
}

if err := rows.Scan(scanArgs...); err != nil {
return nil, err
}

foundPaginationKey := false
var currentRowID any
rowMap := make(map[string]interface{})
for i, colName := range columns {
rowMap[colName] = values[i]
if pCtx != nil && pCtx.PrimaryKey == colName {
currentRowID = values[i]
foundPaginationKey = true
}
}

Comment on lines +586 to +599
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (Critical): Data loss on size-limit-triggered pagination with cursor strategy

When the size limit is hit (line 611), lastRowID has already been updated to the current (unprocessed) row's primary key here at line 595. The current row is then skipped (we break without calling the callback).

For cursor-based pagination, nextPageToken returns lastRowID, and the next query uses WHERE id > lastRowID. This means the unprocessed row is permanently skipped — data loss.

For offset-based pagination, nextPageToken computes offset + limit, but we processed fewer rows than limit. This also skips rows.

Fix: track lastProcessedRowID separately and only update it after the callback succeeds.

Suggested change
return nil, err
}
foundPaginationKey := false
rowMap := make(map[string]interface{})
for i, colName := range columns {
rowMap[colName] = values[i]
if pCtx != nil && pCtx.PrimaryKey == colName {
lastRowID = values[i]
foundPaginationKey = true
}
}
if err := rows.Scan(scanArgs...); err != nil {
return nil, err
}
foundPaginationKey := false
var currentRowID any
rowMap := make(map[string]interface{})
for i, colName := range columns {
rowMap[colName] = values[i]
if pCtx != nil && pCtx.PrimaryKey == colName {
currentRowID = values[i]
foundPaginationKey = true
}
}

Then after the size check passes and the callback succeeds, add lastRowID = currentRowID. And for offset-based pagination, you'll need to track actual items processed to compute the correct next offset.

if pCtx != nil && !foundPaginationKey {
return nil, errors.New("primary key not found in query results")
}

// Check if adding this item would exceed size limits.
// We check BEFORE processing so that we can stop without data loss.
// The unprocessed row will be picked up by the next page.
if result.TotalSize > 0 && result.TotalSize+estimateRowSize(rowMap) > maxResponseSizeBytes {
result.HitSizeLimit = true
l.Info("stopping query early due to response size limit",
zap.Int64("current_size", result.TotalSize),
zap.Int("items_returned", result.ItemCount))
break
}

ok, itemSize, err := rowCallback(ctx, rowMap)
if err != nil {
return nil, err
}

// Use the actual item size if provided, otherwise use the row estimate.
if itemSize > 0 {
result.TotalSize += itemSize
} else {
result.TotalSize += estimateRowSize(rowMap)
}
result.ItemCount++

// Only update lastRowID after the callback succeeds, so that
// the next page token points to the last *processed* row.
lastRowID = currentRowID

// Post-callback size check: if accumulated size now exceeds the
// limit, stop before processing the next row.
if result.TotalSize > maxResponseSizeBytes {
result.HitSizeLimit = true
l.Info("stopping query after callback due to response size limit",
zap.Int64("current_size", result.TotalSize),
zap.Int("items_returned", result.ItemCount))
break
}

if !ok {
break
}
}

if err := rows.Err(); err != nil {
return nil, err
}

// Determine if we need a next page token.
// We need one if: we hit the page limit, OR we hit the size limit.
if pCtx != nil && (hitPageLimit || result.HitSizeLimit) {
// For offset strategy when hitting size limits, adjust the limit
// to reflect actual items processed so the next offset is correct.
if result.HitSizeLimit && pCtx.Strategy == offsetKey {
pCtx.Limit = int64(result.ItemCount)
}
result.NextPageToken, err = s.nextPageToken(pCtx, lastRowID)
if err != nil {
return nil, err
}
}

return result, nil
}

func (s *SQLSyncer) runQuery(
ctx context.Context,
pToken *pagination.Token,
Expand Down
40 changes: 31 additions & 9 deletions pkg/bsql/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,41 @@ func (s *SQLSyncer) List(ctx context.Context, parentResourceID *v2.ResourceId, p
return nil, "", nil, err
}

npt, err := s.runQuery(ctx, pToken, s.config.List.Query, s.config.List.Pagination, queryVars, func(ctx context.Context, rowMap map[string]any) (bool, error) {
r, err := s.mapResource(ctx, rowMap)
if err != nil {
return false, err
}
ret = append(ret, r)
return true, nil
})
// Use size-aware query execution to prevent exceeding gRPC message size limits
result, err := s.runQueryWithSizeLimit(ctx, pToken, s.config.List.Query, s.config.List.Pagination, queryVars,
func(ctx context.Context, rowMap map[string]any) (bool, int64, error) {
r, err := s.mapResource(ctx, rowMap)
if err != nil {
return false, 0, err
}
ret = append(ret, r)
// Estimate size of the serialized resource
size := estimateResourceSize(r)
return true, size, nil
})
if err != nil {
return nil, "", nil, err
}

return ret, npt, nil, nil
return ret, result.NextPageToken, nil, nil
}

// estimateResourceSize provides a rough estimate of the serialized size of a resource.
func estimateResourceSize(r *v2.Resource) int64 {
if r == nil {
return 0
}
var size int64
size += int64(len(r.DisplayName))
size += int64(len(r.Description))
if r.Id != nil {
size += int64(len(r.Id.Resource))
size += int64(len(r.Id.ResourceType))
}
// Add overhead for annotations, traits, and protobuf encoding
size += int64(len(r.Annotations) * 100)
size += sizeEstimateOverhead
return size
}

func (s *SQLSyncer) fetchTraits() map[string]bool {
Expand Down
Loading