| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.springblade;
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
- import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springblade.sink.MongoSink;
- /**
- * Skeleton for a Flink Streaming Job.
- *
- * <p>For a tutorial how to write a Flink streaming application, check the
- * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
- *
- * <p>To package your application into a JAR file for execution, run
- * 'mvn clean package' on the command line.
- *
- * <p>If you change the name of the main class (with the public static void main(String[] args))
- * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
- */
- public class StreamingJob {
- // 创建Logger对象
- private static final Logger log = LoggerFactory.getLogger(StreamingJob.class);
- public static void main(String[] args) throws Exception {
- // set up the streaming execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- /*
- * Here, you can start creating your execution plan for Flink.
- *
- * Start with getting some data from the environment, like
- * env.readTextFile(textPath);
- *
- * then, transform the resulting DataStream<String> using operations
- * like
- * .filter()
- * .flatMap()
- * .join()
- * .coGroup()
- *
- * and many more.
- * Have a look at the programming guide for the Java API:
- *
- * http://flink.apache.org/docs/latest/apis/streaming/index.html
- *
- */
- // execute program
- final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
- .setHost("192.168.1.218")
- .setPort(5672)
- .setUserName("guest")
- .setPassword("guest")
- .setVirtualHost("ldt")
- .build();
- final DataStream<String> stream = env
- .addSource(new RMQSource<String>(
- connectionConfig, // config for the RabbitMQ connection
- "LogDirectQueue", // name of the RabbitMQ queue to consume
- true, // use correlation ids; can be false if only at-least-once is required
- new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
- .setParallelism(1);
- stream.addSink(new MongoSink());
- stream.print("demo");
- log.error("ceshi===========================================================================");
- env.execute("Flink Streaming Java API Skeleton");
- }
- }
|