Skip to content

Commit

Permalink
make it work with Java
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 25, 2024
1 parent cd4a310 commit 7e7a862
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 16 deletions.
33 changes: 18 additions & 15 deletions internal/pgproxy/pgproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,27 @@ func HandleConnection(ctx context.Context, conn net.Conn, connectionFn DSNConstr
logger.Infof("client disconnected without startup message: %s", conn.RemoteAddr())
return
}
logger.Debugf("startup message: %+v", startup)
logger.Debugf("backend connected: %s", conn.RemoteAddr())
logger.Infof("startup message: %+v", startup)
logger.Infof("backend connected: %s", conn.RemoteAddr())

frontend, err := connectFrontend(ctx, connectionFn, startup)
frontend, hijacked, err := connectFrontend(ctx, connectionFn, startup)
if err != nil {
// try again, in case there was a credential rotation
logger.Warnf("failed to connect frontend: %s, trying again", err)

frontend, err = connectFrontend(ctx, connectionFn, startup)
frontend, hijacked, err = connectFrontend(ctx, connectionFn, startup)
if err != nil {
handleBackendError(ctx, backend, err)
return
}
}
logger.Debugf("frontend connected")

backend.Send(&pgproto3.AuthenticationOk{})
backend.Send(&pgproto3.ReadyForQuery{})
logger.Infof("frontend connected")
for key, value := range hijacked.ParameterStatuses {
backend.Send(&pgproto3.ParameterStatus{Name: key, Value: value})
}

backend.Send(&pgproto3.ReadyForQuery{TxStatus: 'I'})
if err := backend.Flush(); err != nil {
logger.Errorf(err, "failed to flush backend authentication ok")
return
Expand Down Expand Up @@ -173,19 +176,19 @@ func connectBackend(ctx context.Context, conn net.Conn) (*pgproto3.Backend, *pgp
}
}

func connectFrontend(ctx context.Context, connectionFn DSNConstructor, startup *pgproto3.StartupMessage) (*pgproto3.Frontend, error) {
func connectFrontend(ctx context.Context, connectionFn DSNConstructor, startup *pgproto3.StartupMessage) (*pgproto3.Frontend, *pgconn.HijackedConn, error) {
dsn, err := connectionFn(ctx, startup.Parameters)
if err != nil {
return nil, fmt.Errorf("failed to construct dsn: %w", err)
return nil, nil, fmt.Errorf("failed to construct dsn: %w", err)
}

conn, err := pgconn.Connect(ctx, dsn)
if err != nil {
return nil, fmt.Errorf("failed to connect to backend: %w", err)
return nil, nil, fmt.Errorf("failed to connect to backend: %w", err)
}
frontend := pgproto3.NewFrontend(conn.Conn(), conn.Conn())

return frontend, nil
hijacked, err := conn.Hijack()

Check failure on line 189 in internal/pgproxy/pgproxy.go

View workflow job for this annotation

GitHub Actions / Lint

ineffectual assignment to err (ineffassign)
frontend := hijacked.Frontend
return frontend, hijacked, nil
}

func proxy(ctx context.Context, backend *pgproto3.Backend, frontend *pgproto3.Frontend) error {
Expand All @@ -204,7 +207,7 @@ func proxy(ctx context.Context, backend *pgproto3.Backend, frontend *pgproto3.Fr
errors <- fmt.Errorf("failed to receive backend message: %w", err)
return
}
logger.Tracef("backend message: %T", msg)
logger.Infof("backend message: %T", msg)
frontend.Send(msg)
err = frontend.Flush()
if err != nil {
Expand All @@ -229,7 +232,7 @@ func proxy(ctx context.Context, backend *pgproto3.Backend, frontend *pgproto3.Fr
errors <- fmt.Errorf("failed to receive frontend message: %w", err)
return
}
logger.Tracef("frontend message: %T", msg)
logger.Infof("frontend message: %T", msg)
backend.Send(msg)
err = backend.Flush()
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import io.quarkus.agroal.spi.JdbcDataSourceBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.GeneratedResourceBuildItem;
import io.quarkus.deployment.builditem.SystemPropertyBuildItem;
import xyz.block.ftl.runtime.FTLDatasourceCredentials;
import xyz.block.ftl.runtime.FTLRecorder;
import xyz.block.ftl.runtime.config.FTLConfigSource;
import xyz.block.ftl.v1.ModuleContextResponse;
import xyz.block.ftl.v1.schema.Database;
import xyz.block.ftl.v1.schema.Decl;

Expand All @@ -21,10 +25,12 @@ public class DatasourceProcessor {
private static final Logger log = Logger.getLogger(DatasourceProcessor.class);

@BuildStep
@Record(ExecutionTime.STATIC_INIT)
public SchemaContributorBuildItem registerDatasources(
List<JdbcDataSourceBuildItem> datasources,
BuildProducer<SystemPropertyBuildItem> systemPropProducer,
BuildProducer<GeneratedResourceBuildItem> generatedResourceBuildItemBuildProducer) {
BuildProducer<GeneratedResourceBuildItem> generatedResourceBuildItemBuildProducer,
FTLRecorder recorder) {
log.infof("Processing %d datasource annotations into decls", datasources.size());
List<Decl> decls = new ArrayList<>();
List<String> namedDatasources = new ArrayList<>();
Expand All @@ -37,6 +43,11 @@ public SchemaContributorBuildItem registerDatasources(
// FTL and quarkus use slightly different names
dbKind = "postgres";
}
if (dbKind.equals("mysql")) {
recorder.registerDatabase(ds.getName(), ModuleContextResponse.DBType.MYSQL);
} else {
recorder.registerDatabase(ds.getName(), ModuleContextResponse.DBType.POSTGRES);
}
//default name is <default> which is not a valid name
String sanitisedName = ds.getName().replace("<", "").replace(">", "");
//we use a dynamic credentials provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -36,6 +38,8 @@ public class FTLController implements LeaseClient {

private static volatile FTLController controller;

private final Map<String, ModuleContextResponse.DBType> databases = new ConcurrentHashMap<>();

/**
* TODO: look at how init should work, this is terrible and will break dev mode
*/
Expand Down Expand Up @@ -71,6 +75,10 @@ public static FTLController instance() {
verbService = VerbServiceGrpc.newStub(channel);
}

public void registerDatabase(String name, ModuleContextResponse.DBType type) {
databases.put(name, type);
}

public byte[] getSecret(String secretName) {
var context = getModuleContext();
if (context.containsSecrets(secretName)) {
Expand All @@ -88,6 +96,10 @@ public byte[] getConfig(String secretName) {
}

public Datasource getDatasource(String name) {
if (databases.get(name) == ModuleContextResponse.DBType.POSTGRES) {
var proxyAddress = System.getenv("FTL_PROXY_POSTGRES_ADDRESS");
return new Datasource("jdbc:postgresql://" + proxyAddress + "/" + name, "ftl", "ftl");
}
List<ModuleContextResponse.DSN> databasesList = getModuleContext().getDatabasesList();
for (var i : databasesList) {
if (i.getName().equals(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import xyz.block.ftl.runtime.http.FTLHttpHandler;
import xyz.block.ftl.runtime.http.HTTPVerbInvoker;
import xyz.block.ftl.v1.CallRequest;
import xyz.block.ftl.v1.ModuleContextResponse;

@Recorder
public class FTLRecorder {
Expand Down Expand Up @@ -171,4 +172,8 @@ public void run() {
}
});
}

public void registerDatabase(String dbKind, ModuleContextResponse.DBType name) {
FTLController.instance().registerDatabase(dbKind, name);
}
}

0 comments on commit 7e7a862

Please sign in to comment.