Skip to content

Commit c2e2cb6

Browse files
committed
[+] add pg_timetable.current_chain_id and pg_timetable.current_task_id run-time parameters, closes #438
1 parent b4b35b1 commit c2e2cb6

File tree

5 files changed

+56
-20
lines changed

5 files changed

+56
-20
lines changed

.vscode/launch.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
"mode": "auto",
1212
"program": "${workspaceFolder}",
1313
"env": {},
14-
"args": ["-v"]
14+
"args": ["postgresql://scheduler@localhost:5432/timetable",
15+
"--clientname=loader",
16+
"--log-level=debug"]
1517
}
1618
]
1719
}

internal/pgengine/pgengine_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -144,17 +144,19 @@ func TestSchedulerFunctions(t *testing.T) {
144144

145145
t.Run("Check GetChainElements funсtion", func(t *testing.T) {
146146
var chains []pgengine.ChainTask
147-
tx, err := pge.StartTransaction(ctx)
147+
tx, txid, err := pge.StartTransaction(ctx, 0)
148148
assert.NoError(t, err, "Should start transaction")
149+
assert.Greater(t, txid, 0, "Should return transaction id")
149150
assert.True(t, pge.GetChainElements(ctx, tx, &chains, 0), "Should no error in clean database")
150151
assert.Empty(t, chains, "Should be empty in clean database")
151152
pge.CommitTransaction(ctx, tx)
152153
})
153154

154155
t.Run("Check GetChainParamValues funсtion", func(t *testing.T) {
155156
var paramVals []string
156-
tx, err := pge.StartTransaction(ctx)
157+
tx, txid, err := pge.StartTransaction(ctx, 0)
157158
assert.NoError(t, err, "Should start transaction")
159+
assert.Greater(t, txid, 0, "Should return transaction id")
158160
assert.True(t, pge.GetChainParamValues(ctx, tx, &paramVals, &pgengine.ChainTask{
159161
TaskID: 0,
160162
ChainID: 0}), "Should no error in clean database")
@@ -170,8 +172,9 @@ func TestSchedulerFunctions(t *testing.T) {
170172
})
171173

172174
t.Run("Check ExecuteSQLCommand function", func(t *testing.T) {
173-
tx, err := pge.StartTransaction(ctx)
175+
tx, txid, err := pge.StartTransaction(ctx, 0)
174176
assert.NoError(t, err, "Should start transaction")
177+
assert.Greater(t, txid, 0, "Should return transaction id")
175178
f := func(sql string, params []string) error {
176179
_, err := pge.ExecuteSQLCommand(ctx, tx, sql, params)
177180
return err

internal/pgengine/transaction.go

+29-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"strconv"
89
"strings"
910
"time"
1011

@@ -31,9 +32,15 @@ type ChainTask struct {
3132
Txid int
3233
}
3334

34-
// StartTransaction return transaction object and panic in the case of error
35-
func (pge *PgEngine) StartTransaction(ctx context.Context) (pgx.Tx, error) {
36-
return pge.ConfigDb.Begin(ctx)
35+
// StartTransaction returns transaction object, transaction id and error
36+
func (pge *PgEngine) StartTransaction(ctx context.Context, chainID int) (tx pgx.Tx, txid int, err error) {
37+
tx, err = pge.ConfigDb.Begin(ctx)
38+
if err != nil {
39+
return
40+
}
41+
err = pgxscan.Get(ctx, tx, &txid, "SELECT txid_current()")
42+
_, err = tx.Exec(ctx, `SELECT set_config('pg_timetable.current_chain_id', $1, true)`, strconv.Itoa(chainID))
43+
return
3744
}
3845

3946
// CommitTransaction commits transaction and log error in the case of error
@@ -128,15 +135,14 @@ func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainT
128135
defer pge.FinalizeRemoteDBConnection(ctx, remoteDb)
129136
}
130137

131-
// Set Role
132-
if task.RunAs.Status != pgtype.Null && !task.Autonomous {
138+
if !task.Autonomous {
133139
pge.SetRole(ctx, execTx, task.RunAs)
140+
if task.IgnoreError {
141+
pge.MustSavepoint(ctx, execTx, fmt.Sprintf("task_%d", task.TaskID))
142+
}
134143
}
135144

136-
if task.IgnoreError && !task.Autonomous {
137-
pge.MustSavepoint(ctx, execTx, fmt.Sprintf("task_%d", task.TaskID))
138-
}
139-
145+
pge.SetCurrentTaskContext(ctx, execTx, task.TaskID)
140146
out, err = pge.ExecuteSQLCommand(ctx, executor, task.Script, paramValues)
141147

142148
if err != nil && task.IgnoreError && !task.Autonomous {
@@ -223,6 +229,9 @@ func (pge *PgEngine) FinalizeRemoteDBConnection(ctx context.Context, remoteDb Pg
223229

224230
// SetRole - set the current user identifier of the current session
225231
func (pge *PgEngine) SetRole(ctx context.Context, tx pgx.Tx, runUID pgtype.Varchar) {
232+
if runUID.Status == pgtype.Null {
233+
return
234+
}
226235
l := log.GetLogger(ctx)
227236
l.Info("Setting Role to ", runUID.String)
228237
_, err := tx.Exec(ctx, fmt.Sprintf("SET ROLE %v", runUID.String))
@@ -238,6 +247,16 @@ func (pge *PgEngine) ResetRole(ctx context.Context, tx pgx.Tx) {
238247
const sqlResetRole = `RESET ROLE`
239248
_, err := tx.Exec(ctx, sqlResetRole)
240249
if err != nil {
241-
l.WithError(err).Error("Error in ReSetting role", err)
250+
l.WithError(err).Error("Failed to set a role", err)
251+
}
252+
}
253+
254+
// SetCurrentTaskContext - set the working transaction "pg_timetable.current_task_id" run-time parameter
255+
func (pge *PgEngine) SetCurrentTaskContext(ctx context.Context, tx pgx.Tx, taskID int) {
256+
l := log.GetLogger(ctx)
257+
l.Debug("Setting current task context to ", taskID)
258+
_, err := tx.Exec(ctx, "SELECT set_config('pg_timetable.current_task_id', $1, true)", strconv.Itoa(taskID))
259+
if err != nil {
260+
l.WithError(err).Error("Failed to set current task context", err)
242261
}
243262
}

internal/scheduler/chain.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/cybertec-postgresql/pg_timetable/internal/log"
99
"github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
10-
"github.com/georgysavva/scany/pgxscan"
1110
pgx "github.com/jackc/pgx/v4"
1211
)
1312

@@ -178,10 +177,7 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
178177

179178
chainL := sch.l.WithField("chain", chain.ChainID)
180179

181-
tx, err := sch.pgengine.StartTransaction(ctx)
182-
if err == nil {
183-
err = pgxscan.Get(ctx, tx, &txid, "SELECT txid_current()")
184-
}
180+
tx, txid, err := sch.pgengine.StartTransaction(ctx, chain.ChainID)
185181
if err != nil {
186182
chainL.WithError(err).Error("Cannot start transaction")
187183
return

samples/Chain.sql

+17-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ BEGIN
1818
PRIMARY KEY (chain_log)
1919
);
2020

21+
-- Let's create a new chain and add tasks to it later
2122
INSERT INTO timetable.chain (
2223
chain_id,
2324
chain_name,
@@ -46,12 +47,27 @@ BEGIN
4647
VALUES (v_chain_id, 2, 'INSERT INTO timetable.chain_log (EVENT, time) VALUES ($1, CURRENT_TIMESTAMP)', TRUE)
4748
RETURNING task_id INTO v_task_id;
4849

49-
5050
INSERT INTO timetable.parameter(task_id, order_id, value)
5151
VALUES
5252
-- Parameter for HEAD (parent) task
5353
(v_parent_id, 1, '["Added"]' :: jsonb),
5454
-- Parameter for the next task
5555
(v_task_id, 1, '["Updated"]' :: jsonb);
56+
57+
-- Add one more task swowing IDs for all tasks within the chain
58+
INSERT INTO timetable.task (chain_id, task_order, command, ignore_error)
59+
VALUES (v_chain_id, 3,
60+
$CMD$
61+
DO $BODY$
62+
DECLARE tasks TEXT;
63+
BEGIN
64+
SELECT array_agg(task_id ORDER BY task_order) FROM timetable.task
65+
INTO tasks
66+
WHERE chain_id = current_setting('pg_timetable.current_chain_id')::bigint;
67+
RAISE NOTICE 'Task IDs in chain: %', tasks;
68+
END;
69+
$BODY$
70+
$CMD$, TRUE);
71+
5672
END;
5773
$$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)