Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove sleep logic when the queue is full from CDC #5600

Merged
merged 3 commits into from
Aug 30, 2021

Conversation

subodh1810
Copy link
Contributor

@subodh1810 subodh1810 commented Aug 24, 2021

Issue : #5516
Ref thread : https://airbytehq.slack.com/archives/C01MFR03D5W/p1629636715470600
A user reported an issue with MySQL CDC where the sync was failing with the following exception.
This was happening when the queue is at max capacity, we sleep for 10 milliseconds so that the consumer can consume records and the queue gets empty. Now while sleeping, the consumer signals the producer to shutdown (if we have already reached the target position in binlog) but if we are sleeping, the sleep would be interrupted causing this error. Sleeping in such design doesn't seem right and its better to just avoid sleeping and continue to try in the loop.

2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.requestClose(DebeziumRecordIterator.java:150)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.computeNext(DebeziumRecordIterator.java:107)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.computeNext(DebeziumRecordIterator.java:52)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:42)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.commons.util.DefaultAutoCloseableIterator.computeNext(DefaultAutoCloseableIterator.java:58)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.commons.util.CompositeIterator.computeNext(CompositeIterator.java:83)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.commons.util.CompositeIterator.computeNext(CompositeIterator.java:83)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.commons.util.DefaultAutoCloseableIterator.computeNext(DefaultAutoCloseableIterator.java:58)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:139)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.source.mysql.MySqlSource.main(MySqlSource.java:250)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher.close(DebeziumRecordPublisher.java:144)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:35)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.debezium.internals.DebeziumRecordIterator.requestClose(DebeziumRecordIterator.java:148)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	... 20 more
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - Caused by: java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher.lambda$start$0(DebeziumRecordPublisher.java:105)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$0(ConvertingEngineBuilder.java:71)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.debezium.embedded.EmbeddedEngine$1.handleBatch(EmbeddedEngine.java:472)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.lang.Thread.run(Thread.java:832)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - Caused by: java.lang.InterruptedException: sleep interrupted
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at java.base/java.lang.Thread.sleep(Native Method)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	at io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher.lambda$start$0(DebeziumRecordPublisher.java:103)
2021-08-22 06:13:41 ERROR () LineGobbler(voidCall):85 - 	... 7 more

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions
  • Connector added to connector index like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions
  • Connector version bumped like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Connector Generator

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed.

@subodh1810 subodh1810 requested a review from davinchia August 24, 2021 19:25
@subodh1810 subodh1810 self-assigned this Aug 24, 2021
@davinchia
Copy link
Contributor

Would we have to run tests for the various CDC sources?

@subodh1810
Copy link
Contributor Author

subodh1810 commented Aug 25, 2021

/test connector=source-mysql

🕑 source-mysql https://github.com/airbytehq/airbyte/actions/runs/1165660696
✅ source-mysql https://github.com/airbytehq/airbyte/actions/runs/1165660696

@subodh1810
Copy link
Contributor Author

subodh1810 commented Aug 25, 2021

/test connector=source-postgres

🕑 source-postgres https://github.com/airbytehq/airbyte/actions/runs/1165660977
✅ source-postgres https://github.com/airbytehq/airbyte/actions/runs/1165660977

@subodh1810
Copy link
Contributor Author

subodh1810 commented Aug 25, 2021

/test connector=source-mssql

🕑 source-mssql https://github.com/airbytehq/airbyte/actions/runs/1165661162
✅ source-mssql https://github.com/airbytehq/airbyte/actions/runs/1165661162

@jrhizor jrhizor temporarily deployed to more-secrets August 25, 2021 07:55 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets August 25, 2021 07:55 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets August 25, 2021 07:55 Inactive
@github-actions github-actions bot added the area/connectors Connector related issues label Aug 26, 2021
@sherifnada
Copy link
Contributor

@subodh1810 should this close #5516?

@subodh1810
Copy link
Contributor Author

@sherifnada yes it should
#5600 (comment)

@subodh1810
Copy link
Contributor Author

subodh1810 commented Aug 30, 2021

/publish connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/1181712345
✅ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/1181712345

@subodh1810
Copy link
Contributor Author

subodh1810 commented Aug 30, 2021

/publish connector=connectors/source-mssql

🕑 connectors/source-mssql https://github.com/airbytehq/airbyte/actions/runs/1181712502
✅ connectors/source-mssql https://github.com/airbytehq/airbyte/actions/runs/1181712502

@subodh1810
Copy link
Contributor Author

subodh1810 commented Aug 30, 2021

/publish connector=connectors/source-postgres

🕑 connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/1181712690
✅ connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/1181712690

@jrhizor jrhizor temporarily deployed to more-secrets August 30, 2021 09:59 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets August 30, 2021 09:59 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets August 30, 2021 09:59 Inactive
@subodh1810 subodh1810 merged commit a53dd7e into master Aug 30, 2021
@subodh1810 subodh1810 deleted the fix-interrupted-exception branch August 30, 2021 12:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants