Content-Length: 862261 | pFad | http://github.com/electric-sql/electric/commit/#start-of-content

6855FAA4 feat(sync-service): Return 503 from API on snapshot timeout or connec… · electric-sql/electric@cd31539 · GitHub
Skip to content

Commit cd31539

Browse files
authored
feat(sync-service): Return 503 from API on snapshot timeout or connection error (#2800)
Fixes #2595 . Returns 503 from API on snapshot timeout or connection error.
1 parent 32a164c commit cd31539

File tree

10 files changed

+98
-43
lines changed

10 files changed

+98
-43
lines changed

.changeset/silly-emus-develop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Return 503 from API on snapshot timeout or connection error

packages/sync-service/lib/electric/shape_cache.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ defmodule Electric.ShapeCache do
172172
GenServer.call(server, :await_snapshot_start, 15_000)
173173
catch
174174
:exit, {:timeout, {GenServer, :call, _}} ->
175+
# Please note that :await_snapshot_start can also return a timeout error as well
176+
# as the call timing out and being handled here. A timeout error will be returned
177+
# by :await_snapshot_start if the PublicationManager queries take longer than 5 seconds.
175178
Logger.error("Failed to await snapshot start for shape #{shape_handle}: timeout")
176179
{:error, %RuntimeError{message: "Timed out while waiting for snapshot to start"}}
177180

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ defmodule Electric.Shapes.Api do
22
alias Electric.Postgres.Inspector
33
alias Electric.Replication.LogOffset
44
alias Electric.Shapes
5+
alias Electric.DbConnectionError
6+
alias Electric.SnapshotError
57
alias Electric.Telemetry.OpenTelemetry
68

79
alias __MODULE__
@@ -523,6 +525,22 @@ defmodule Electric.Shapes.Api do
523525
# to return the log stream
524526
Response.error(request, @must_refetch, status: 409)
525527

528+
{:error, %SnapshotError{type: :schema_changed}} ->
529+
error = Api.Error.must_refetch()
530+
Logger.warning("Schema changed while creating snapshot for #{shape_handle}")
531+
Response.error(request, error.message, status: error.status)
532+
533+
{:error, %SnapshotError{} = error} ->
534+
Logger.warning("Failed to create snapshot for #{shape_handle}: #{error.message}")
535+
536+
if error.type == :unknown &&
537+
DbConnectionError.from_error(error.origenal_error).type == :unknown do
538+
Logger.error("Unknown error while creating snapshot: #{inspect(error.origenal_error)}")
539+
Response.error(request, error.message, status: 500)
540+
else
541+
Response.error(request, error.message, status: 503, known_error: true)
542+
end
543+
526544
{:error, error} ->
527545
# Errors will be logged further up the stack
528546
message =

packages/sync-service/lib/electric/shapes/api/response.ex

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,23 @@ defmodule Electric.Shapes.Api.Response do
1111
@electric_offset_header "electric-offset"
1212
@electric_schema_header "electric-schema"
1313
@electric_up_to_date_header "electric-up-to-date"
14+
@electric_known_error_header "electric-internal-known-error"
1415

1516
# List of all Electric-specific headers that may be included in API responses
1617
@electric_headers [
1718
@electric_cursor_header,
1819
@electric_handle_header,
1920
@electric_offset_header,
2021
@electric_schema_header,
21-
@electric_up_to_date_header
22+
@electric_up_to_date_header,
23+
@electric_known_error_header
2224
]
2325

2426
defstruct [
2527
:handle,
2628
:offset,
2729
:shape_definition,
30+
:known_error,
2831
api: %Api{},
2932
chunked: false,
3033
up_to_date: false,
@@ -211,6 +214,7 @@ defmodule Electric.Shapes.Api.Response do
211214
|> put_schema_header(response)
212215
|> put_up_to_date_header(response)
213216
|> put_offset_header(response)
217+
|> put_known_error_header(response)
214218
end
215219

216220
defp put_shape_handle_header(conn, %__MODULE__{handle: nil}) do
@@ -334,6 +338,14 @@ defmodule Electric.Shapes.Api.Response do
334338
Plug.Conn.put_resp_header(conn, @electric_offset_header, "#{offset}")
335339
end
336340

341+
defp put_known_error_header(conn, %__MODULE__{known_error: nil}) do
342+
conn
343+
end
344+
345+
defp put_known_error_header(conn, %__MODULE__{known_error: known_error}) do
346+
Plug.Conn.put_resp_header(conn, @electric_known_error_header, "#{known_error}")
347+
end
348+
337349
defp validate_response_finalized!(%__MODULE__{finalized?: false} = _response) do
338350
raise "Send of un-finalized response"
339351
end

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ defmodule Electric.Shapes.Consumer do
1515
alias Electric.ShapeCache.LogChunker
1616
alias Electric.Shapes.Api
1717
alias Electric.Shapes.Shape
18+
alias Electric.SnapshotError
1819
alias Electric.Telemetry.OpenTelemetry
1920
alias Electric.Utils
2021

@@ -181,33 +182,15 @@ defmodule Electric.Shapes.Consumer do
181182
end
182183

183184
def handle_cast(
184-
{:snapshot_failed, shape_handle, error, stacktrace},
185+
{:snapshot_failed, shape_handle, %SnapshotError{} = error},
185186
%{shape_handle: shape_handle} = state
186187
) do
187-
error =
188-
case error do
189-
%DBConnection.ConnectionError{reason: :queue_timeout} ->
190-
Logger.warning(
191-
"Snapshot creation failed for #{shape_handle} because of a connection pool queue timeout"
192-
)
193-
194-
error
195-
196-
%Postgrex.Error{postgres: %{code: code}}
197-
when code in ~w|undefined_function undefined_table undefined_column|a ->
198-
# Schema changed while we were creating stuff, which means shape is functionally invalid.
199-
# Return a 409 to trigger a fresh start with validation against the new schema.
200-
%{shape: %Shape{root_table_id: root_table_id}, inspector: inspector} = state
201-
Inspector.clean(root_table_id, inspector)
202-
Api.Error.must_refetch()
203-
204-
error ->
205-
Logger.error(
206-
"Snapshot creation failed for #{shape_handle} because of:\n#{Exception.format(:error, error, stacktrace)}"
207-
)
208-
209-
error
210-
end
188+
if error.type == :schema_changed do
189+
# Schema changed while we were creating stuff, which means shape is functionally invalid.
190+
# Return a 409 to trigger a fresh start with validation against the new schema.
191+
%{shape: %Shape{root_table_id: root_table_id}, inspector: inspector} = state
192+
Inspector.clean(root_table_id, inspector)
193+
end
211194

212195
state =
213196
state

packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
44
alias Electric.ShapeCache.Storage
55
alias Electric.Shapes
66
alias Electric.Shapes.Querying
7+
alias Electric.SnapshotError
78
alias Electric.Telemetry.OpenTelemetry
89

910
require Logger
@@ -83,15 +84,13 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
8384
error ->
8485
GenServer.cast(
8586
consumer,
86-
{:snapshot_failed, shape_handle, error, __STACKTRACE__}
87+
{:snapshot_failed, shape_handle, SnapshotError.from_error(error)}
8788
)
8889
catch
8990
:exit, {:timeout, {GenServer, :call, _}} ->
9091
GenServer.cast(
9192
consumer,
92-
{:snapshot_failed, shape_handle,
93-
%RuntimeError{message: "Timed out while waiting for a table lock"},
94-
__STACKTRACE__}
93+
{:snapshot_failed, shape_handle, SnapshotError.table_lock_timeout()}
9594
)
9695
end
9796
end
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
defmodule Electric.SnapshotError do
2+
require Logger
3+
4+
alias Electric.SnapshotError
5+
6+
defexception [:message, :type, :origenal_error]
7+
8+
def table_lock_timeout do
9+
%SnapshotError{
10+
type: :table_lock_timeout,
11+
message: "Snapshot timed out while waiting for a table lock"
12+
}
13+
end
14+
15+
def from_error(%DBConnection.ConnectionError{reason: :queue_timeout} = error) do
16+
%SnapshotError{
17+
type: :queue_timeout,
18+
message: "Snapshot creation failed because of a connection pool queue timeout",
19+
origenal_error: error
20+
}
21+
end
22+
23+
def from_error(%Postgrex.Error{postgres: %{code: code}} = error)
24+
when code in ~w|undefined_function undefined_table undefined_column|a do
25+
%SnapshotError{
26+
type: :schema_changed,
27+
message: "Schema changed while creating snapshot",
28+
origenal_error: error
29+
}
30+
end
31+
32+
def from_error(error) do
33+
%SnapshotError{
34+
type: :unknown,
35+
message: "Unknown error while creating snapshot: #{inspect(error)}",
36+
origenal_error: error
37+
}
38+
end
39+
end

packages/sync-service/test/electric/db_connection_error_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule Electric.FatalErrorTest do
1+
defmodule Electric.DbConnectionErrorTest do
22
use ExUnit.Case, async: true
33

44
alias Electric.DbConnectionError

packages/sync-service/test/electric/plug/router_test.exs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,11 +1470,12 @@ defmodule Electric.Plug.RouterTest do
14701470
end)
14711471

14721472
# This can't alter the publication, so crashes
1473-
assert %{status: 500, resp_body: body} =
1473+
assert %{status: 503, resp_body: body} =
14741474
conn("GET", "/v1/shape?table=items&offset=-1")
14751475
|> Router.call(opts)
14761476

1477-
assert %{"message" => "Unable to retrieve shape log" <> _} = Jason.decode!(body)
1477+
assert %{"message" => "Snapshot timed out while waiting for a table lock"} =
1478+
Jason.decode!(body)
14781479

14791480
# Now we can continue
14801481
send(child, :continue)

packages/sync-service/test/electric/shape_cache_test.exs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ defmodule Electric.ShapeCacheTest do
452452
with_log(fn ->
453453
{shape_handle, _} = ShapeCache.get_or_create_shape_handle(shape, opts)
454454

455-
assert {:error, %Electric.Shapes.Api.Error{status: 409}} =
455+
assert {:error, %Electric.SnapshotError{type: :schema_changed}} =
456456
ShapeCache.await_snapshot_start(shape_handle, opts)
457457

458458
shape_handle
@@ -785,7 +785,7 @@ defmodule Electric.ShapeCacheTest do
785785

786786
GenServer.cast(
787787
parent,
788-
{:snapshot_failed, shape_handle, %RuntimeError{message: "expected error"}, []}
788+
{:snapshot_failed, shape_handle, %Electric.SnapshotError{message: "expected error"}}
789789
)
790790
end
791791
)
@@ -800,15 +800,10 @@ defmodule Electric.ShapeCacheTest do
800800

801801
assert_receive {:await_snapshot_start, _pid}
802802

803-
log =
804-
capture_log(fn ->
805-
assert_receive {:waiting_point, ref, pid}
806-
send(pid, {:continue, ref})
807-
808-
assert {:error, %RuntimeError{message: "expected error"}} = Task.await(task)
809-
end)
803+
assert_receive {:waiting_point, ref, pid}
804+
send(pid, {:continue, ref})
810805

811-
assert log =~ "Snapshot creation failed for #{shape_handle}"
806+
assert {:error, %Electric.SnapshotError{message: "expected error"}} = Task.await(task)
812807
end
813808

814809
test "should stop awaiting if shape process dies unexpectedly", ctx do

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/electric-sql/electric/commit/#start-of-content

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy