# Removing Flink Table API legacy Incomplete plan to get rid of the legacy in Table API, including: * Old `Factory` stack * Old String expression stack * Old functions stack * Old type system * Legacy rules ## A couple of notes first * When removing legacy in tests, make sure the test still makes sense after removing legacy. We have a lot of test utilities that are used only by tests effectively testing the old stack, and tests checking new stack might already have a similar utility implemented. * When porting test code, make sure to: * Don't rely on planner internals * Use JUnit 5/AssertJ * Move it to `flink-table-tests` module whenever is possible ## Peeling the [onion’s layers](https://www.youtube.com/watch?v=aJQmVZSAqlc) ### 1. Remove all implementations of `TableSink` and `TableSource` ``` Implementations of TableSink in (25 usages found) Production (11 usages found) flink-connector-cassandra_2.12 (1 usage found) org.apache.flink.streaming.connectors.cassandra (1 usage found) CassandraAppendTableSink.java (1 usage found) 33 public class CassandraAppendTableSink implements AppendStreamTableSink<Row> { flink-table-api-java-bridge (6 usages found) org.apache.flink.table.sinks (6 usages found) AppendStreamTableSink.java (1 usage found) 39 public interface AppendStreamTableSink<T> extends StreamTableSink<T> {} CsvTableSink.java (1 usage found) 44 public class CsvTableSink implements AppendStreamTableSink<Row> { OutputFormatTableSink.java (1 usage found) 38 public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> { RetractStreamTableSink.java (1 usage found) 47 public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> { StreamTableSink.java (1 usage found) 33 public interface StreamTableSink<T> extends TableSink<T> { UpsertStreamTableSink.java (1 usage found) 58 public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> { flink-table-common (1 usage found) org.apache.flink.table.sinks (1 usage found) TableSinkBase.java (1 usage found) 36 public abstract class TableSinkBase<T> implements TableSink<T> { flink-table-planner_2.12 (3 usages found) org.apache.flink.table.planner.sinks (3 usages found) CollectTableSink.scala (2 usages found) 34 class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => TypeInformation[T])) 80 class CollectRowTableSink extends CollectTableSink[Row](new RowTypeInfo(_: _*)) DataStreamTableSink.scala (1 usage found) 36 class DataStreamTableSink[T]( Test (14 usages found) flink-python_2.12 (4 usages found) org.apache.flink.table.legacyutils (4 usages found) legacyTestingSinks.scala (3 usages found) 39 private[flink] class TestAppendSink extends AppendStreamTableSink[Row] { 69 private[flink] class TestRetractSink extends RetractStreamTableSink[Row] { 95 private[flink] class TestUpsertSink( TestCollectionTableFactory.scala (1 usage found) 168 class CollectionTableSink(val outputType: RowTypeInfo) flink-table-common (1 usage found) org.apache.flink.table.utils (1 usage found) TypeMappingUtilsTest.java (1 usage found) 419 private static class TestTableSink implements TableSink<Tuple2<Boolean, Row>> { flink-table-planner_2.12 (9 usages found) org.apache.flink.table.planner.factories.utils (1 usage found) TestCollectionTableFactory.scala (1 usage found) 152 class CollectionTableSink(val schema: TableSchema) org.apache.flink.table.planner.runtime.batch.sql (1 usage found) PartitionableSinkITCase.scala (1 usage found) 331 private class TestSink( org.apache.flink.table.planner.runtime.utils (3 usages found) StreamTestSink.scala (3 usages found) 265 final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone) 344 final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[Row] { 499 final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink[Row] { org.apache.flink.table.planner.utils (4 usages found) MemoryTableSourceSinkUtil.scala (3 usages found) 80 final class UnsafeMemoryAppendTableSink 146 final class DataTypeOutputFormatTableSink( 181 final class DataTypeAppendStreamTableSink( testTableSourceSinks.scala (1 usage found) 1317 class OptionsTableSink( ``` ``` Implementations of TableSource in (31 usages found) Production (8 usages found) flink-batch-sql-test (1 usage found) org.apache.flink.sql.tests (1 usage found) BatchSQLTestProgram.java (1 usage found) 79 public static class GeneratorTableSource extends InputFormatTableSource<Row> { flink-python_2.12 (2 usages found) org.apache.flink.table.runtime.arrow.sources (1 usage found) ArrowTableSource.java (1 usage found) 32 public class ArrowTableSource implements StreamTableSource<RowData> { org.apache.flink.table.utils.python (1 usage found) PythonInputFormatTableSource.java (1 usage found) 35 public class PythonInputFormatTableSource extends InputFormatTableSource<Row> { flink-stream-sql-test_2.12 (1 usage found) org.apache.flink.sql.tests (1 usage found) StreamSQLTestProgram.java (1 usage found) 192 public static class GeneratorTableSource flink-table-api-java-bridge (3 usages found) org.apache.flink.table.sources (3 usages found) CsvTableSource.java (1 usage found) 62 public class CsvTableSource InputFormatTableSource.java (1 usage found) 39 public abstract class InputFormatTableSource<T> implements StreamTableSource<T> { StreamTableSource.java (1 usage found) 33 public interface StreamTableSource<T> extends TableSource<T> { flink-table-common (1 usage found) org.apache.flink.table.sources (1 usage found) LookupableTableSource.java (1 usage found) 38 public interface LookupableTableSource<T> extends TableSource<T> { Test (23 usages found) flink-python_2.12 (1 usage found) org.apache.flink.table.legacyutils (1 usage found) TestCollectionTableFactory.scala (1 usage found) 127 class CollectionTableSource( flink-sql-client_2.12 (1 usage found) org.apache.flink.table.client.gateway.utils (1 usage found) SimpleCatalogFactory.java (1 usage found) 95 new StreamTableSource<Row>() { flink-table-api-java (1 usage found) org.apache.flink.table.utils (1 usage found) TableSourceMock.java (1 usage found) 27 public class TableSourceMock implements TableSource<Row> { flink-table-common (1 usage found) org.apache.flink.table.utils (1 usage found) TypeMappingUtilsTest.java (1 usage found) 375 private static class TestTableSource flink-table-planner_2.12 (19 usages found) org.apache.flink.table.planner.factories.utils (1 usage found) TestCollectionTableFactory.scala (1 usage found) 115 class CollectionTableSource( org.apache.flink.table.planner.plan.stream.sql.join (2 usages found) LookupJoinTest.scala (2 usages found) 601 class TestTemporalTable(bounded: Boolean = false) 667 class TestInvalidTemporalTable private( org.apache.flink.table.planner.runtime.utils (1 usage found) InMemoryLookupableTableSource.scala (1 usage found) 50 class InMemoryLookupableTableSource( org.apache.flink.table.planner.utils (15 usages found) TableTestBase.scala (1 usage found) 1372 class TestTableSource(override val isBounded: Boolean, schema: TableSchema) TestLegacyLimitableTableSource.scala (1 usage found) 44 class TestLegacyLimitableTableSource( testTableSourceSinks.scala (13 usages found) 205 class TestTableSourceWithTime[T]( 316 class TestPreserveWMTableSource[T]( 344 class TestLegacyProjectableTableSource( 407 class TestNestedProjectableTableSource( 504 class TestLegacyFilterableTableSource( 731 class TestInputFormatTableSource[T]( 786 class TestDataTypeTableSource( 847 class TestDataTypeTableSourceWithTime( 926 class TestStreamTableSource( 977 class TestFileInputFormatTableSource( 1038 class TestPartitionableTableSource( 1209 class WithoutTimeAttributesTableSource(bounded: Boolean) extends StreamTableSource[Row] { 1297 class OptionsTableSource( ``` This task can be further splitted in: 1. Remove all usages of production implementations of `TableSink` and `TableSource`. There should be already implementations replacing the legacy ones, except for: 1. `CassandraAppendTableSink`. Discuss with cassandra maintainers. 1. `ArrowTableSource` and related `PythonInputFormatTableSource`. Discuss with python maintainers. 1. Identify which test implementations are still required, and port them to `DynamicTableSourceFactory` and `DynamicTableSinkFactory`. Note that some of the test utilities are already provided in the new stack, e.g. `CollectionTableSource` and `GeneratorTableSource` sounds like overcomplicated mocks of connectors we already support natively in the new stack 1. Remove all usages of test implementations of `TableSink` and `TableSource`. This will involve removing entire tests, e.g. `LegacyTableSinkITCase` which is merely testing the old stack. 1. Cleanup remaining implementations of `TableSink` and `TableSource`. I suggest in step 2 to port the old test implementations of `TableSink`/`TableSource` without removing the old ones, in order to simplify the execution of step 3. Then, in step 4, all the code now not referenced anymore, can be removed. ### 2. Remove `TableSink` and `TableSource` Remove `TableSink`, `TableSource` and `TableFormatFactory` and cleanup all the code relying on these interfaces until the project compiles. On the road, you should hit the following things to cleanup: * Some user facing APIs, notably: * `Table#insertInto` * `TableEnvironment#insertInto` * `TableEnvironment#fromTableSource` * Old `Operation` nodes, e.g. `TableSourceQueryOperation`, `UnregisteredSinkModifyOperation` * Some code here and there in the sql-to-rel conversion * Old conversion rules, e.g. `BatchPhysicalLegacySinkRule`, `PushPartitionIntoLegacyTableSourceScanRule`. ### 3. Remove `TableFactory` Remove `TableFactory` and cleanup all the code relying on it until the project compiles. Some interfaces are implementing `TableFactory` for compatibility reason but should not be cleaned up, e.g. `CatalogFactory`, `ModuleFactory`. This involves also cleaning up `TableFactoryUtil` and `TableFactoryService`. ### 4. Old functions stack ~~* Update Scala table and aggregate functions implicits to new stack.~~ * Update Python table and aggregate functions to new stack. Discuss with python maintainers. * Update Hive functions to the new stack. Discuss with Hive maintainers. * Remove old stack i.e. `TableFunctionDefinition`, `TableEnvironment.registerFunction` ### 5. ~~Remove String Scala Expression stack~~ This includes all the Table API methods that take strings, e.g. `Table.select(String)` or `fromDataStream(DataStream<T>, String)`. This task includes: * Reimplement the codegen tests with the new expression stack * Python API methods that take expression strings. Discuss with python maintainers. * Remove `ExpressionParser` entrypoint interface, its usages and its implementation. ### 6. Remove `PlannerExpression` tree List of classes: ``` Subclasses of PlannerExpression in (56 usages found) Production (56 usages found) Unclassified (56 usages found) flink-table-planner_2.12 (56 usages found) org.apache.flink.table.planner.expressions (56 usages found) aggregations.scala (5 usages found) 32 abstract sealed class Aggregation extends PlannerExpression { 43 case class ApiResolvedAggregateCallExpression( 55 case class DistinctAgg(child: PlannerExpression) extends Aggregation { 79 case class Collect(child: PlannerExpression) extends Aggregation { 92 case class AggFunctionCall( call.scala (6 usages found) 38 case class ApiResolvedExpression(resolvedDataType: DataType) 51 case class UnresolvedOverCall(agg: PlannerExpression, alias: PlannerExpression) 71 case class OverCall( 166 case class PlannerScalarFunctionCall( 206 case class PlannerTableFunctionCall( 234 case class ThrowException(msg: PlannerExpression, tp: TypeInformation[_]) extends UnaryExpression { collection.scala (3 usages found) 27 case class ArrayElement(array: PlannerExpression) extends PlannerExpression { 47 case class Cardinality(container: PlannerExpression) extends PlannerExpression { 64 case class ItemAt(container: PlannerExpression, key: PlannerExpression) extends PlannerExpression { fieldExpression.scala (12 usages found) 32 trait NamedExpression extends PlannerExpression { 37 abstract class Attribute extends LeafExpression with NamedExpression { 46 case class RexPlannerExpression( 54 case class UnresolvedFieldReference(name: String) extends Attribute { 68 case class PlannerResolvedFieldReference( 83 case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute { 99 case class TableReference(name: String, tableOperation: QueryOperation) 112 abstract class TimeAttribute(val expression: PlannerExpression) 119 case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) { 159 case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) { 184 case class StreamRecordTimestamp() extends LeafExpression { 194 case class PlannerLocalReference( InputTypeSpec.scala (1 usage found) 29 trait InputTypeSpec extends PlannerExpression { literals.scala (2 usages found) 51 case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression { 80 case class Null(resultType: TypeInformation[_]) extends LeafExpression { ordering.scala (3 usages found) 23 abstract class Ordering extends UnaryExpression { 33 case class Asc(child: PlannerExpression) extends Ordering { 39 case class Desc(child: PlannerExpression) extends Ordering { overOffsets.scala (4 usages found) 24 case class CurrentRow() extends PlannerExpression { 32 case class CurrentRange() extends PlannerExpression { 40 case class UnboundedRow() extends PlannerExpression { 48 case class UnboundedRange() extends PlannerExpression { PlannerExpression.scala (3 usages found) 74 abstract class BinaryExpression extends PlannerExpression { 80 abstract class UnaryExpression extends PlannerExpression { 85 abstract class LeafExpression extends PlannerExpression { Reinterpret.scala (1 usage found) 26 case class Reinterpret(child: PlannerExpression, resultType: TypeInformation[_], subquery.scala (1 usage found) 29 case class In(expression: PlannerExpression, elements: Seq[PlannerExpression]) symbols.scala (1 usage found) 31 case class SymbolPlannerExpression(symbol: PlannerSymbol) extends LeafExpression { time.scala (11 usages found) 32 case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpression) 80 abstract class CurrentTimePoint( 106 case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false) 108 case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false) 110 case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false) 112 case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true) 114 case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true) 119 case class TemporalOverlaps( 214 case class DateFormat(timestamp: PlannerExpression, format: PlannerExpression) 223 case class TimestampDiff( 275 case class ToTimestampLtz( windowProperties.scala (3 usages found) 33 abstract class AbstractWindowProperty(child: PlannerExpression) 49 case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) { 56 case class WindowEnd(child: PlannerExpression) extends AbstractWindowProperty(child) { ``` This task includes: * Convert functions still creating these `PlannerExpression` nodes to new type inference, like `Extract`, `CurrentTimestamp`, `ArrayElement`, ... * Remove `PlannerExpressionConverter`. ### 7. Pre-FLIP-136 DataStream conversions * toAppendStream/toRetractStream/fromDataStream(..., Expression)