@@ -67,6 +67,7 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
6767 private static final String timestampColumnsProp = "Timestamp Columns" ;
6868 private static final String readDelayProp = "Read Delay" ;
6969 private static final String fullLoadIntervalProp = "Full Load Interval" ;
70+ private static final String loadIntervalProp = "Incremental Load Interval" ;
7071 private static final String userNameProp = "User Name" ;
7172 private static final String passwordProp = "Password" ;
7273 private static final String keepSourceTypes = "Keep JDBC source types" ;
@@ -83,10 +84,12 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
8384 new SimplePropertyDescription (timestampColumnsProp , "Comma separated list of timestamp columns to use for loading new rows. The fist non-null value will be used. At least one of the values must not be null for each row" , true ),
8485 new SimplePropertyDescription (readDelayProp , "How long (in seconds) to wait before reading rows based on their timestamp. This allows waiting for all transactions of a certain timestamp to complete to avoid loading partial data. Default value is 0" , true ),
8586 new SimplePropertyDescription (fullLoadIntervalProp , "If set the full table will be read every configured interval (in minutes). When this is configured the update time and incrementing columns are not used." , true ),
86- new SimplePropertyDescription (keepSourceTypes , "Keep original data types from source to use string representation" , true , false , null , null , null , true , Optional .of ("true" )));
87+ new SimplePropertyDescription (keepSourceTypes , "Keep original data types from source to use string representation" , true , false , null , null , null , true , Optional .of ("true" )),
88+ new SimplePropertyDescription (loadIntervalProp , "Configures how often (in minutes) the data source will poll the database for new changes" , true ));
8789
8890 private long readDelay ;
8991 private long fullLoadIntervalMinutes ;
92+ private long loadIntervalMinutes ;
9093 private TableInfo tableInfo ;
9194 private QueryDialect queryDialect ;
9295 private long dbTimezoneOffset ;
@@ -103,6 +106,10 @@ private boolean isFullLoad() {
103106 return fullLoadIntervalMinutes > 0 ;
104107 }
105108
109+ private boolean hasCustomLoadInterval () {
110+ return loadIntervalMinutes > 1 ;
111+ }
112+
106113 @ Override
107114 public DataSourceDescription getDataSourceDescription () {
108115 return new JDBCDataSourceDescription ();
@@ -146,6 +153,7 @@ public void setProperties(Map<String, String> properties) {
146153 try (Connection con = getConnection ()) {
147154 readDelay = Long .parseLong (properties .getOrDefault (readDelayProp , "0" ));
148155 fullLoadIntervalMinutes = Long .parseLong (properties .getOrDefault (fullLoadIntervalProp , "0" ));
156+ loadIntervalMinutes = Long .parseLong (properties .getOrDefault (loadIntervalProp , "1" ));
149157 DatabaseMetaData metadata = con .getMetaData ();
150158 String userProvidedIncColumn = properties .get (incrementingColumnNameProp );
151159 tableInfo = loadTableInfo (metadata , properties .getOrDefault (schemaPatternProp , null ), properties .get (tableNameProp ));
@@ -403,9 +411,10 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
403411 ShardDefinition shardDefinition ) {
404412 var taskCount = completedRanges .size () + wantedRanges .size ();
405413 var itemsPerTask = (taskInfo .getMetadata ().itemsPerTask (taskCount ));
406- var emptyFullLoad = isFullLoad () && wantedRanges .stream ().noneMatch (this ::matchesLoadInterval );
414+ var skipAll = hasCustomLoadInterval () && wantedRanges .stream ().noneMatch (this ::matchesLoadInterval );
415+ var emptyFullLoad = isFullLoad () && wantedRanges .stream ().noneMatch (this ::matchesFullLoadInterval );
407416 var noDataToLoad = !isFullLoad () && !tableInfo .hasTimeColumns () && itemsPerTask == 0 ;
408- if (emptyFullLoad || noDataToLoad ) {
417+ if (skipAll || emptyFullLoad || noDataToLoad ) {
409418 List <DataLoader <JDBCTaskMetadata >> result =
410419 wantedRanges .stream ().map (t -> new NoDataLoader (t , taskInfo .getMetadata ())).collect (Collectors .toList ());
411420 return CompletableFuture .completedFuture (result .iterator ());
@@ -422,8 +431,16 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
422431 }
423432 }
424433
434+ private boolean matchesFullLoadInterval (TaskRange x ) {
435+ return getTimeInMinutes (x .getInclusiveStartTime ()) % fullLoadIntervalMinutes == 0 ;
436+ }
437+
425438 private boolean matchesLoadInterval (TaskRange x ) {
426- return x .getInclusiveStartTime ().getEpochSecond () / 60 % fullLoadIntervalMinutes == 0 ;
439+ return getTimeInMinutes (x .getInclusiveStartTime ()) % loadIntervalMinutes == 0 ;
440+ }
441+
442+ private Long getTimeInMinutes (Instant time ) {
443+ return time .getEpochSecond () / 60L ;
427444 }
428445
429446 private List <JDBCTaskMetadata > getRunMetadatas (TaskInformation <JDBCTaskMetadata > taskInfo ,
@@ -444,10 +461,18 @@ private List<JDBCTaskMetadata> getRunMetadatas(TaskInformation<JDBCTaskMetadata>
444461 // First task does not have lower bound to ensure we don't skip data from the last point we stopped at
445462 var startTime =
446463 firstInBatch ? taskInfo .getMetadata ().getStartTime () : wantedRange .getInclusiveStartTime ().minusSeconds (readDelay );
464+ var lowerBound = taskInfo .getMetadata ().getStartTime ().getEpochSecond ();
465+ var truncatedStartTime = Math .max (lowerBound , hasCustomLoadInterval () ?
466+ (getTimeInMinutes (startTime ) / loadIntervalMinutes * 60 * loadIntervalMinutes )
467+ : startTime .getEpochSecond ());
468+ var endTime = wantedRange .getExclusiveEndTime ().minusSeconds (readDelay );
469+ var truncatedEndTime = Math .max (lowerBound ,hasCustomLoadInterval () ?
470+ getTimeInMinutes (endTime ) / loadIntervalMinutes * 60 * loadIntervalMinutes
471+ : endTime .getEpochSecond ());
447472 var metadata = new JDBCTaskMetadata (taskInfo .getMetadata ().getInclusiveStart (),
448473 taskInfo .getMetadata ().getExclusiveEnd (),
449- startTime ,
450- wantedRange . getExclusiveEndTime (). minusSeconds ( readDelay ));
474+ Instant . ofEpochSecond ( truncatedStartTime ) ,
475+ Instant . ofEpochSecond ( truncatedEndTime ));
451476 result .add (metadata );
452477 }
453478 } else {
@@ -485,43 +510,58 @@ private CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> splitData(Result
485510 final var isLast = i == wantedRanges .size () - 1 ;
486511 final var taskRange = wantedRanges .get (i );
487512 final var metadata = runMetadatas .get (i );
488- var loader = new DataLoader <JDBCTaskMetadata >() {
489- @ Override
490- public TaskRange getTaskRange () {
491- return taskRange ;
492- }
493-
494- private final RowReader rowReader = new RowReader (tableInfo , valueGetter , metadata , connection , isFullLoad () && matchesLoadInterval (taskRange ));
495-
496- @ Override
497- public Iterator <LoadedData > loadData () {
498- ResultSetInputStream inputStream = new ResultSetInputStream (rowConverter , rowReader , isLast );
499- var result = new LoadedData (inputStream , new HashMap <>(), taskRange .getInclusiveStartTime ());
500- return Collections .singleton (result ).iterator ();
501- }
502-
503- @ Override
504- public JDBCTaskMetadata getCompletedMetadata () {
505- if (tableInfo .hasTimeColumns () && rowReader .readValues ()) {
506- if (rowReader .readValues ()) {
507- // If some data was successfully read then that's our next start point
508- lastReadTime .set (toUtc (rowReader .getLastTimestampValue ().toInstant ()));
509- lastReadIncValue .set (rowReader .getLastIncValue ());
510- }
511- metadata .setExclusiveEnd (lastReadIncValue .get () + 1 );
512- metadata .setEndTime (lastReadTime .get ());
513+ DataLoader <JDBCTaskMetadata > loader = null ;
514+ if (matchesLoadInterval (taskRange )){
515+ loader = getLoader (connection ,
516+ lastReadIncValue ,
517+ lastReadTime ,
518+ valueGetter ,
519+ isLast ,
520+ taskRange ,
521+ metadata );
522+ } else {
523+ loader = new NoDataLoader (taskRange , metadata );
513524 }
514-
515- return metadata ;
516- }
517- };
518525 result .add (loader );
519526
520527 }
521528 return CompletableFuture .completedFuture (result .iterator ());
522529
523530 }
524531
532+ private DataLoader <JDBCTaskMetadata > getLoader (Connection connection , AtomicReference <Long > lastReadIncValue , AtomicReference <Instant > lastReadTime , ResultSetValuesGetter valueGetter , boolean isLast , TaskRange taskRange , JDBCTaskMetadata metadata ) {
533+ return new DataLoader <JDBCTaskMetadata >() {
534+ @ Override
535+ public TaskRange getTaskRange () {
536+ return taskRange ;
537+ }
538+
539+ private final RowReader rowReader = new RowReader (tableInfo , valueGetter , metadata , connection , isFullLoad () && matchesFullLoadInterval (taskRange ));
540+
541+ @ Override
542+ public Iterator <LoadedData > loadData () {
543+ ResultSetInputStream inputStream = new ResultSetInputStream (rowConverter , rowReader , isLast );
544+ var result = new LoadedData (inputStream , new HashMap <>(), taskRange .getInclusiveStartTime ());
545+ return Collections .singleton (result ).iterator ();
546+ }
547+
548+ @ Override
549+ public JDBCTaskMetadata getCompletedMetadata () {
550+ if (tableInfo .hasTimeColumns () && rowReader .readValues ()) {
551+ if (rowReader .readValues ()) {
552+ // If some data was successfully read then that's our next start point
553+ lastReadTime .set (toUtc (rowReader .getLastTimestampValue ().toInstant ()));
554+ lastReadIncValue .set (rowReader .getLastIncValue ());
555+ }
556+ metadata .setExclusiveEnd (lastReadIncValue .get () + 1 );
557+ metadata .setEndTime (lastReadTime .get ());
558+ }
559+
560+ return metadata ;
561+ }
562+ };
563+ }
564+
525565
526566 @ Override
527567 public CompletionStage <TaskInformation <JDBCTaskMetadata >> getTaskInfo (JDBCTaskMetadata previousTaskMetadata ,
0 commit comments